xref: /6.6.0/kv_engine/engines/ep/src/dcp/producer.h (revision 166a75af)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#pragma once
18
19#include "atomic_shared_ptr.h"
20#include "atomic_unordered_map.h"
21#include "connhandler.h"
22#include "dcp/dcp-types.h"
23#include "dcp/stream_container.h"
24#include "ep_engine.h"
25#include "monotonic.h"
26#include "vb_ready_queue.h"
27
28#include <folly/AtomicHashMap.h>
29#include <folly/CachelinePadded.h>
30#include <folly/SharedMutex.h>
31
32class BackfillManager;
33class CheckpointCursor;
34class DcpResponse;
35class MutationResponse;
36class VBucket;
37
38class DcpProducer : public ConnHandler,
39                    public std::enable_shared_from_this<DcpProducer> {
40public:
41
42    /**
43     * Construct a DCP Producer
44     *
45     * @param e The engine.
46     * @param cookie Cookie of the connection creating the producer.
47     * @param n A name chosen by the client.
48     * @param flags The DCP_OPEN flags (as per mcbp).
49     * @param startTask If true an internal checkpoint task is created and
50     *        started. Test code may wish to defer or manually handle the task
51     *        creation.
52     */
53    DcpProducer(EventuallyPersistentEngine& e,
54                const void* cookie,
55                const std::string& n,
56                uint32_t flags,
57                bool startTask);
58
59    virtual ~DcpProducer();
60
61    /**
62     * Clears active stream checkpoint processor task's queue, resets its
63     * shared reference to the producer and cancels the task.
64     */
65    void cancelCheckpointCreatorTask();
66
67    ENGINE_ERROR_CODE streamRequest(
68            uint32_t flags,
69            uint32_t opaque,
70            Vbid vbucket,
71            uint64_t start_seqno,
72            uint64_t end_seqno,
73            uint64_t vbucket_uuid,
74            uint64_t last_seqno,
75            uint64_t next_seqno,
76            uint64_t* rollback_seqno,
77            dcp_add_failover_log callback,
78            boost::optional<cb::const_char_buffer> json) override;
79
80    ENGINE_ERROR_CODE step(struct dcp_message_producers* producers) override;
81
82    ENGINE_ERROR_CODE bufferAcknowledgement(uint32_t opaque,
83                                            Vbid vbucket,
84                                            uint32_t buffer_bytes) override;
85
86    ENGINE_ERROR_CODE control(uint32_t opaque,
87                              cb::const_char_buffer key,
88                              cb::const_char_buffer value) override;
89
90    ENGINE_ERROR_CODE seqno_acknowledged(uint32_t opaque,
91                                         Vbid vbucket,
92                                         uint64_t prepared_seqno) override;
93
94    /**
95     * Sub-classes must implement a method that processes a response
96     * to a request initiated by itself.
97     *
98     * @param resp A mcbp response message to process.
99     * @returns true/false which will be converted to SUCCESS/DISCONNECT by the
100     *          engine.
101     */
102    bool handleResponse(const protocol_binary_response_header* resp) override;
103
104    void addStats(const AddStatFn& add_stat, const void* c) override;
105
106    void addTakeoverStats(const AddStatFn& add_stat,
107                          const void* c,
108                          const VBucket& vb);
109
110    void aggregateQueueStats(ConnCounter& aggregator) override;
111
112    /**
113     * ALERT: Do NOT call this function while holding ConnMap::connLock.
114     * The call may acquire the VBucket::stateLock and raise a potential
115     * deadlock by lock-inversion with KVBucket::setVBucketState (where we
116     * aquire the two locks in opposite order).
117     */
118    void setDisconnect() override;
119
120    void notifySeqnoAvailable(Vbid vbucket,
121                              uint64_t seqno,
122                              SyncWriteOperation syncWriteOnly);
123
124    /**
125     * @param vbucket
126     * @param state The current vbstate
127     * @param vbstateLock (optional) Exclusive lock to vbstate
128     */
129    void closeStreamDueToVbStateChange(
130            Vbid vbucket,
131            vbucket_state_t state,
132            boost::optional<folly::SharedMutex::WriteHolder&> vbstateLock);
133
134    void closeStreamDueToRollback(Vbid vbucket);
135
136    /**
137     * This function handles a stream that is detected as slow by the checkpoint
138     * remover. Currently we handle the slow stream by switching from in-memory
139     * to backfilling.
140     *
141     * @param vbid vbucket the checkpoint-remover is processing
142     * @param cursor the cursor registered in the checkpoint manager which is
143     *        slow.
144     * @return true if the cursor was removed from the checkpoint manager
145     */
146    bool handleSlowStream(Vbid vbid, const CheckpointCursor* cursor);
147
148    void closeAllStreams();
149
150    const char *getType() const override;
151
152    void clearQueues();
153
154    size_t getBackfillQueueSize();
155
156    size_t getItemsSent();
157
158    size_t getTotalBytesSent();
159
160    size_t getTotalUncompressedDataSize();
161
162    std::vector<Vbid> getVBVector();
163
164    /**
165     * Close the stream for given vbucket stream
166     *
167     * @param vbucket the if for the vbucket to close
168     * @return ENGINE_SUCCESS upon a successful close
169     *         ENGINE_NOT_MY_VBUCKET the vbucket stream doesn't exist
170     */
171    ENGINE_ERROR_CODE closeStream(uint32_t opaque,
172                                  Vbid vbucket,
173                                  cb::mcbp::DcpStreamId sid = {}) override;
174
175    void notifyStreamReady(Vbid vbucket);
176
177    void notifyBackfillManager();
178    bool recordBackfillManagerBytesRead(size_t bytes, bool force);
179    void recordBackfillManagerBytesSent(size_t bytes);
180    virtual bool scheduleBackfillManager(VBucket& vb,
181                                         std::shared_ptr<ActiveStream> s,
182                                         uint64_t start,
183                                         uint64_t end);
184
185    bool isExtMetaDataEnabled () {
186        return enableExtMetaData;
187    }
188
189    bool isCompressionEnabled() {
190        if (forceValueCompression ||
191            engine_.isDatatypeSupported(getCookie(), PROTOCOL_BINARY_DATATYPE_SNAPPY)) {
192            return true;
193        }
194
195        return false;
196    }
197
198    bool isForceValueCompressionEnabled() {
199        return forceValueCompression.load();
200    }
201
202    bool isSnappyEnabled() {
203        return engine_.isDatatypeSupported(getCookie(),
204                                           PROTOCOL_BINARY_DATATYPE_SNAPPY);
205    }
206
207    bool isCursorDroppingEnabled() const {
208        return supportsCursorDropping.load();
209    }
210
211    /**
212     * Notifies the front-end synchronously on this thread that this paused
213     * connection should be re-considered for work.
214     */
215    void immediatelyNotify();
216
217    /**
218     * Schedule a notification to the front-end on a background thread for
219     * the ConnNotifier to pick that notifies this paused connection should
220     * be re-considered for work.
221     */
222    void scheduleNotify();
223
224    void setLastReceiveTime(const rel_time_t time) {
225        lastReceiveTime = time;
226    }
227
228    bool isDCPExpiryEnabled() const {
229        return enableExpiryOpcode;
230    }
231
232    /**
233     * Tracks the amount of outstanding sent data for a Dcp Producer, alongside
234     * how many bytes have been acknowledged by the peer connection.
235     *
236     * When the buffer becomes full (outstanding >= limit), the producer is
237     * paused. Similarly when data is subsequently acknowledged and outstanding
238     * < limit; the producer is un-paused.
239     */
240    class BufferLog {
241    public:
242
243        /*
244            BufferLog has 3 states.
245            Disabled - Flow-control is not in-use.
246             This is indicated by setting the size to 0 (i.e. setBufferSize(0)).
247
248            SpaceAvailable - There is *some* space available. You can always
249             insert n-bytes even if there's n-1 bytes spare.
250
251            Full - inserts have taken the number of bytes available equal or
252             over the buffer size.
253        */
254        enum State {
255            Disabled,
256            Full,
257            SpaceAvailable
258        };
259
260        BufferLog(DcpProducer& p)
261            : producer(p), maxBytes(0), bytesOutstanding(0), ackedBytes(0) {
262        }
263
264        /**
265         * Change the buffer size to the specified value. A maximum of zero
266         * disables buffering.
267         */
268        void setBufferSize(size_t maxBytes);
269
270        void addStats(const AddStatFn& add_stat, const void* c);
271
272        /**
273         * Insert N bytes into the buffer.
274         *
275         * @return false if the log is full, true if the bytes fit or if the
276         * buffer log is disabled. The outstanding bytes are increased.
277         */
278        bool insert(size_t bytes);
279
280        /**
281         * Acknowledge the bytes and unpause the producer if full.
282         * The outstanding bytes are decreased.
283         */
284        void acknowledge(size_t bytes);
285
286        /**
287         * Pause the producer if full.
288         * @return true if the producer was paused; else false.
289         */
290        bool pauseIfFull();
291
292        /// Unpause the producer if there's space (or disabled).
293        void unpauseIfSpaceAvailable();
294
295        size_t getBytesOutstanding() const {
296            return bytesOutstanding;
297        }
298
299    private:
300        bool isEnabled_UNLOCKED() {
301            return maxBytes != 0;
302        }
303
304        bool isFull_UNLOCKED() {
305            return bytesOutstanding >= maxBytes;
306        }
307
308        void release_UNLOCKED(size_t bytes);
309
310        State getState_UNLOCKED();
311
312        folly::SharedMutex logLock;
313        DcpProducer& producer;
314
315        /// Capacity of the buffer - maximum number of bytes which can be
316        /// outstanding before the buffer is considered full.
317        size_t maxBytes;
318
319        /// Number of bytes currently outstanding (in the buffer). Incremented
320        /// upon insert(); and then decremented by acknowledge().
321        cb::NonNegativeCounter<size_t> bytesOutstanding;
322
323        /// Total number of bytes acknowledeged. Should be non-decreasing in
324        /// normal usage; but can be reset to zero when buffer size changes.
325        Monotonic<size_t> ackedBytes;
326    };
327
328    /*
329        Insert bytes into this producer's buffer log.
330
331        If the log is disabled or the insert was successful returns true.
332        Else return false.
333    */
334    bool bufferLogInsert(size_t bytes);
335
336    /*
337        Schedules active stream checkpoint processor task
338        for given stream.
339    */
340    void scheduleCheckpointProcessorTask(std::shared_ptr<ActiveStream> s);
341
342    /** Searches the streams map for a stream for vbucket ID. Returns the
343     *  found stream, or an empty pointer if none found.
344     */
345    std::shared_ptr<StreamContainer<std::shared_ptr<Stream>>> findStreams(
346            Vbid vbid);
347
348    std::string getConsumerName() const;
349
350    void setIdleTimeout(size_t newValue);
351
352    void setBlacklistFtsConnectionLogs(bool newValue);
353
354    // FTS connection name prefix used to enable/disable logging.
355    static const std::string ftsConnectionName;
356
357protected:
358    /** We may disconnect if noop messages are enabled and the last time we
359     *  received any message (including a noop) exceeds the dcpTimeout.
360     *  Returns ENGINE_DISCONNECT if noop messages are enabled and the timeout
361     *  is exceeded.
362     *  Returns ENGINE_FAILED if noop messages are disabled, or if the timeout
363     *  is not exceeded.  In this case continue without disconnecting.
364     */
365    ENGINE_ERROR_CODE maybeDisconnect();
366
367    /** We may send a noop if a noop acknowledgement is not pending and
368     *  we have exceeded the dcpNoopTxInterval since we last sent a noop.
369     *  Returns ENGINE_SUCCESS if a noop was sent.
370     *  Returns ENGINE_FAILED if a noop is not required to be sent.
371     *  This occurs if noop messages are disabled, or because we have already
372     *  sent a noop and we are awaiting a receive, or because the time interval
373     *  has not passed.
374     */
375    ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers);
376
377    /**
378     * Create the ActiveStreamCheckpointProcessorTask and assign to
379     * checkpointCreatorTask
380     */
381    void createCheckpointProcessorTask();
382
383    /**
384     * Schedule the checkpointCreatorTask on the ExecutorPool
385     */
386    void scheduleCheckpointProcessorTask();
387
388    struct {
389        rel_time_t sendTime;
390        uint32_t opaque;
391
392        /// How often are DCP noop messages transmitted?
393        std::chrono::seconds dcpNoopTxInterval;
394
395        /**
396         * True if a DCP NOOP request has been sent and we are waiting for a
397         * response.
398         */
399        cb::RelaxedAtomic<bool> pendingRecv;
400        cb::RelaxedAtomic<bool> enabled;
401    } noopCtx;
402
403    /// Timestamp of when we last recieved a message from our peer.
404    cb::RelaxedAtomic<rel_time_t> lastReceiveTime;
405
406    std::unique_ptr<DcpResponse> getNextItem();
407
408    size_t getItemsRemaining();
409
410    /**
411     * Map the end_stream_status_t to one the client can understand.
412     * Maps END_STREAM_FILTER_EMPTY to END_STREAM_OK if the client does not
413     * understands collections
414     * @param cookie client cookie
415     * @param status the status to map
416     * @param a status safe for the client
417     */
418    end_stream_status_t mapEndStreamStatus(const void* cookie,
419                                           end_stream_status_t status) const;
420
421    /*
422     * deletionV1OrV2 unifies the code where a choice is made between triggering
423     * a deletion using version 1 or version 2.
424     */
425    ENGINE_ERROR_CODE deletionV1OrV2(IncludeDeleteTime includeDeleteTime,
426                                     MutationResponse& mutationResponse,
427                                     dcp_message_producers* producers,
428                                     std::unique_ptr<Item> itmCpy,
429                                     ENGINE_ERROR_CODE ret,
430                                     cb::mcbp::DcpStreamId sid);
431
432    /**
433     * Set the dead-status of the specified stream associated with the specified
434     * vbucket.
435     *
436     * @param vbid
437     * @param sid The stream id
438     * @param status The stream end status
439     * @param vbstateLock (optional) Exclusive lock to vbstate
440     * @return true if the operation succeeds, false otherwise
441     */
442    bool setStreamDeadStatus(
443            Vbid vbid,
444            cb::mcbp::DcpStreamId sid,
445            end_stream_status_t status,
446            boost::optional<folly::SharedMutex::WriteHolder&> vbstateLock = {});
447
448    /**
449     * Return the hotness value to use for this item in a DCP message.
450     * @param item Item to be sent.
451     * @return Hotness value.
452     */
453    uint8_t encodeItemHotness(const Item& item) const;
454
455    /**
456     * Convert a unique_ptr<Item> to cb::unique_item_ptr, to transfer ownership
457     * of an Item over the DCP interface.
458     */
459    cb::unique_item_ptr toUniqueItemPtr(std::unique_ptr<Item>&& item) const;
460
461    /**
462     * The StreamContainer stores the Stream via a shared_ptr, this is because
463     * we have multi-threaded access to the DcpProducer and the possibility
464     * that a stream maybe removed from the container whilst a thread is still
465     * working on the stream, e.g. closeStream and addStats occurring
466     * concurrently.
467     */
468    using ContainerElement = std::shared_ptr<Stream>;
469
470    /**
471     * The StreamsMap maps from vbid to the StreamContainer, which is stored
472     * via a shared_ptr. This allows multiple threads to obtain the
473     * StreamContainer and for safe destruction to occur.
474     */
475    using StreamMapValue = std::shared_ptr<StreamContainer<ContainerElement>>;
476    using StreamsMap = folly::AtomicHashMap<uint16_t, StreamMapValue>;
477
478    /**
479     * Attempt to update the map of vb to stream(s) with the new stream
480     *
481     * @throws logic_error if the update is not possible.
482     */
483    void updateStreamsMap(Vbid vbid,
484                          cb::mcbp::DcpStreamId sid,
485                          std::shared_ptr<Stream>& stream);
486
487    /**
488     * Attempt to locate a stream associated with the vbucket/stream-id and
489     * return it (this function is dedicated to the closeStream path)
490     * The function returns a pair because in the case the shared_ptr is null
491     * it could be because 1) the vbucket has no streams or 2) the vbucket
492     * has streams, but none that matched the given sid. If the vbucket does
493     * have streams, the pair.second will be true.
494     * The function is invoked in two places in the case that closeStream wants
495     * to send a CLOSE_STREAM message we leave the stream in the map... if a
496     * new stream is created, we would replace that stream... however with
497     * stream-ID enabled, it's possible a new stream is created with a new-ID
498     * leaking the dead stream, hence a second path now frees the dead stream
499     * by using this function and forcing erasure from the map.
500     *
501     * @param vbucket look for a stream associated with this vbucket
502     * @param sid and with a stream-ID matching sid
503     * @param eraseFromMapIfFound remove the shared_ptr from the streamsMap
504     * @return a pair the shared_ptr (can be null if not found) and a bool which
505     *  allows the caller to determine if the vbucket or sid caused not found
506     */
507    std::pair<std::shared_ptr<Stream>, bool> closeStreamInner(
508            Vbid vbucket, cb::mcbp::DcpStreamId sid, bool eraseFromMapIfFound);
509
510    /**
511     * Applies the given function object to every mapped value and returns from
512     * f some other value only if f returns a value that evaluates to true
513     * (bool operator)
514     *
515     * The function should take a value_type reference as a parameter and return
516     * some-type by value. some-type must be a type which supports operator bool
517     * e.g. std::shared_ptr. As each map element is evaluated, the iteration
518     * will stop when f returns a value which 'if (value)' evaluates to true,
519     * the value is then returned.
520     * If every element is visited and nothing evaluated to true, then a default
521     * initialised some-type is returned.
522     *
523     * @param key Key value to lookup
524     * @param f Function object to be applied
525     * @returns The value found by f or a default initialised value
526     */
527    template <class UnaryFunction>
528    auto find_if2(UnaryFunction f) {
529        using UnaryFunctionRval = decltype(f(*streams.find({})));
530        for (auto& kv : streams) {
531            auto rv = f(kv);
532            if (rv) {
533                return rv;
534            }
535        }
536        return UnaryFunctionRval{};
537    }
538
539    // stash response for retry if E2BIG was hit
540    std::unique_ptr<DcpResponse> rejectResp;
541
542    const bool notifyOnly;
543
544    cb::RelaxedAtomic<bool> enableExtMetaData;
545    cb::RelaxedAtomic<bool> forceValueCompression;
546    cb::RelaxedAtomic<bool> supportsCursorDropping;
547    cb::RelaxedAtomic<bool> sendStreamEndOnClientStreamClose;
548    cb::RelaxedAtomic<bool> consumerSupportsHifiMfu;
549    cb::RelaxedAtomic<bool> enableExpiryOpcode;
550
551    // SyncReplication: Producer needs to know the Consumer name to identify
552    // the source of received SeqnoAck messages.
553    std::string consumerName;
554
555    /// Timestamp of when we last transmitted a message to our peer.
556    cb::RelaxedAtomic<rel_time_t> lastSendTime;
557    BufferLog log;
558
559    // backfill manager object is owned by this class, but use an
560    // AtomicSharedPtr as the lifetime of the manager is shared between the
561    // producer (this class) and BackfillManagerTask (which has a
562    // weak_ptr) to this, and because different threads may attempt to access
563    // the shared_ptr - for example:
564    // - Bucket deletion thread may attempt to reset() the shared_ptr when
565    //   shutting down DCP connections
566    // - A frontend thread may also attempt to reset() the shared_ptr when
567    //   a connection is disconnected.
568    cb::AtomicSharedPtr<BackfillManager> backfillMgr;
569
570    VBReadyQueue ready;
571
572    /**
573     * Folly's AtomicHashMap offers great performance if you know the maximum
574     * size of the map up front and don't care about freeing memory when you
575     * call erase on an element.
576     */
577    const static int streamsMapSize = 512;
578
579    /**
580     * folly::AtomicHashMap of uint16_t (Vbid underlying type) to
581     * StreamContainer.
582     *
583     * We will create elements in the map as and when we need them. One caveat
584     * of Folly's AtomicHashMap is that you don't free memory when you call
585     * erase. Given that we don't gain anything from call erase, other than a
586     * boat load of locking issues, we will never call erase on streams.
587     * Instead, we will simply rely on the locks provided by the
588     * StreamContainer/ContainerElement and will just empty the StreamContainer
589     * in place of calling erase. We'll clear up any memory allocated when we
590     * destruct the DcpProducer.
591     */
592    StreamsMap streams;
593    std::atomic<size_t> itemsSent;
594    std::atomic<size_t> totalBytesSent;
595    std::atomic<size_t> totalUncompressedDataSize;
596
597    /// Guards access to checkpointCreatorTask, so multiple threads can
598    /// safely access  checkpointCreatorTask shared ptr.
599    struct CheckpointCreator {
600        mutable std::mutex mutex;
601        ExTask task;
602    };
603
604    // MB-30488: padding to keep mutex from sharing cachelines with
605    // unrelated data
606    folly::CachelinePadded<CheckpointCreator> checkpointCreator;
607
608    static const std::chrono::seconds defaultDcpNoopTxInterval;
609
610    // Indicates whether the active streams belonging to the DcpProducer should
611    // send the value in the response.
612    const IncludeValue includeValue;
613    // Indicates whether the active streams belonging to the DcpProducer should
614    // send the xattrs, (if any exist), in the response.
615    const IncludeXattrs includeXattrs;
616
617    /**
618     * Indicates whether the active streams belonging to the DcpProducer should
619     * send the tombstone creation time, (if any exist), in the delete messages.
620     */
621    IncludeDeleteTime includeDeleteTime;
622
623    /* Indicates if the 'checkpoint processor task' should be created.
624       NOTE: We always create the checkpoint processor task during regular
625             operation. This flag is used for unit testing only */
626    const bool createChkPtProcessorTsk;
627
628    /**
629     * Does the producer allow the client to create more than one active stream
630     * per vbucket (client must enable this feature)
631     */
632    MultipleStreamRequests multipleStreamRequests{MultipleStreamRequests::No};
633
634    /**
635     * Lock that prevent concurrent execution of closeAllStreams function. This
636     * is required as we have to atomically set our streams to dead (requires
637     * the backfillMgr) and reset the backfillMgr (to prevent lingering cyclic
638     * references).
639     */
640    std::mutex closeAllStreamsLock;
641
642    // MB-37702: Test hook set via mock class.
643    std::function<void()> closeAllStreamsHook;
644
645    // MB-38521: Test hook set via mock class.
646    std::function<void()> closeAllStreamsPreLockHook;
647    std::function<void()> closeAllStreamsPostLockHook;
648
649    // MB-37827: Test hook set via mock class.
650    std::function<void()> seqnoAckHook;
651};
652