xref: /5.5.2/kv_engine/engines/ep/src/dcp/stream.h (revision 075614a6)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_DCP_STREAM_H_
19#define SRC_DCP_STREAM_H_ 1
20
21#include "config.h"
22
23#include "collections/vbucket_filter.h"
24#include "dcp/dcp-types.h"
25#include "dcp/producer.h"
26#include "ep_engine.h"
27#include "ext_meta_parser.h"
28#include "response.h"
29#include "vbucket.h"
30
31#include <atomic>
32#include <climits>
33#include <queue>
34
35class EventuallyPersistentEngine;
36class MutationResponse;
37class SetVBucketState;
38class SnapshotMarker;
39class DcpResponse;
40
41enum backfill_source_t {
42    BACKFILL_FROM_MEMORY,
43    BACKFILL_FROM_DISK
44};
45
46class Stream {
47public:
48
49    enum class Type {
50        Active,
51        Notifier,
52        Passive
53    };
54
55    enum class Snapshot {
56           None,
57           Disk,
58           Memory
59    };
60
61    Stream(const std::string &name,
62           uint32_t flags,
63           uint32_t opaque,
64           uint16_t vb,
65           uint64_t start_seqno,
66           uint64_t end_seqno,
67           uint64_t vb_uuid,
68           uint64_t snap_start_seqno,
69           uint64_t snap_end_seqno,
70           Type type);
71
72    virtual ~Stream();
73
74    uint32_t getFlags() { return flags_; }
75
76    uint16_t getVBucket() { return vb_; }
77
78    uint32_t getOpaque() { return opaque_; }
79
80    uint64_t getStartSeqno() { return start_seqno_; }
81
82    uint64_t getEndSeqno() { return end_seqno_; }
83
84    uint64_t getVBucketUUID() { return vb_uuid_; }
85
86    uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
87
88    uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
89
90    virtual void addStats(ADD_STAT add_stat, const void *c);
91
92    virtual std::unique_ptr<DcpResponse> next() = 0;
93
94    virtual uint32_t setDead(end_stream_status_t status) = 0;
95
96    virtual void notifySeqnoAvailable(uint64_t seqno) {}
97
98    const std::string& getName() {
99        return name_;
100    }
101
102    virtual void setActive() {
103        // Stream defaults to do nothing
104    }
105
106    Type getType() { return type_; }
107
108    /// @returns true if the stream type is Active
109    bool isTypeActive() const;
110
111    /// @returns true if state_ is not Dead
112    bool isActive() const;
113
114    /// @Returns true if state_ is Backfilling
115    bool isBackfilling() const;
116
117    /// @Returns true if state_ is InMemory
118    bool isInMemory() const;
119
120    /// @Returns true if state_ is Pending
121    bool isPending() const;
122
123    /// @Returns true if state_ is TakeoverSend
124    bool isTakeoverSend() const;
125
126    /// @Returns true if state_ is TakeoverWait
127    bool isTakeoverWait() const;
128
129    void clear() {
130        LockHolder lh(streamMutex);
131        clear_UNLOCKED();
132    }
133
134protected:
135
136    // The StreamState is protected as it needs to be accessed by sub-classes
137    enum class StreamState {
138          Pending,
139          Backfilling,
140          InMemory,
141          TakeoverSend,
142          TakeoverWait,
143          Reading,
144          Dead
145      };
146
147    static const std::string to_string(Stream::StreamState type);
148
149    StreamState getState() const { return state_; }
150
151    void clear_UNLOCKED();
152
153    /* To be called after getting streamMutex lock */
154    void pushToReadyQ(std::unique_ptr<DcpResponse> resp);
155
156    /* To be called after getting streamMutex lock */
157    std::unique_ptr<DcpResponse> popFromReadyQ(void);
158
159    uint64_t getReadyQueueMemory(void);
160
161    /**
162     * Uses the associated connection logger to log the message if the
163     * connection is alive else uses a default logger
164     *
165     * @param severity Desired logging level
166     * @param fmt Format string
167     * @param ... Variable list of params as per the fmt
168     */
169    virtual void log(EXTENSION_LOG_LEVEL severity,
170                     const char* fmt,
171                     ...) const = 0;
172
173    const std::string &name_;
174    uint32_t flags_;
175    uint32_t opaque_;
176    uint16_t vb_;
177    uint64_t start_seqno_;
178    uint64_t end_seqno_;
179    uint64_t vb_uuid_;
180    uint64_t snap_start_seqno_;
181    uint64_t snap_end_seqno_;
182    std::atomic<StreamState> state_;
183    Type type_;
184
185    std::atomic<bool> itemsReady;
186    std::mutex streamMutex;
187
188    /**
189     * Ordered queue of DcpResponses to be sent on the stream.
190     * Elements are added to this queue by reading from disk/memory etc, and
191     * are removed when sending over the network to our peer.
192     * The readyQ owns the elements in it.
193     */
194    std::queue<std::unique_ptr<DcpResponse>> readyQ;
195
196    // Number of items in the readyQ that are not meta items. Used for
197    // calculating getItemsRemaining(). Atomic so it can be safely read by
198    // getItemsRemaining() without acquiring streamMutex.
199    std::atomic<size_t> readyQ_non_meta_items;
200
201    const static uint64_t dcpMaxSeqno;
202
203private:
204    /* readyQueueMemory tracks the memory occupied by elements
205     * in the readyQ.  It is an atomic because otherwise
206       getReadyQueueMemory would need to acquire streamMutex.
207     */
208    std::atomic <uint64_t> readyQueueMemory;
209};
210
211const char* to_string(Stream::Snapshot type);
212const std::string to_string(Stream::Type type);
213
214class ActiveStream : public Stream,
215                     public std::enable_shared_from_this<ActiveStream> {
216public:
217    ActiveStream(EventuallyPersistentEngine* e,
218                 std::shared_ptr<DcpProducer> p,
219                 const std::string& name,
220                 uint32_t flags,
221                 uint32_t opaque,
222                 VBucket& vbucket,
223                 uint64_t st_seqno,
224                 uint64_t en_seqno,
225                 uint64_t vb_uuid,
226                 uint64_t snap_start_seqno,
227                 uint64_t snap_end_seqno,
228                 IncludeValue includeVal,
229                 IncludeXattrs includeXattrs,
230                 IncludeDeleteTime includeDeleteTime,
231                 const Collections::Filter& filter,
232                 const Collections::VB::Manifest& manifest);
233
234    virtual ~ActiveStream();
235
236    std::unique_ptr<DcpResponse> next() override;
237
238    void setActive() override {
239        LockHolder lh(streamMutex);
240        if (isPending()) {
241            transitionState(StreamState::Backfilling);
242        }
243    }
244
245    uint32_t setDead(end_stream_status_t status) override;
246
247    void notifySeqnoAvailable(uint64_t seqno) override;
248
249    void snapshotMarkerAckReceived();
250
251    void setVBucketStateAckRecieved();
252
253    void incrBackfillRemaining(size_t by) {
254        backfillRemaining.fetch_add(by, std::memory_order_relaxed);
255    }
256
257    void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
258
259    bool backfillReceived(std::unique_ptr<Item> itm,
260                          backfill_source_t backfill_source,
261                          bool force);
262
263    void completeBackfill();
264
265    bool isCompressionEnabled();
266
267    bool isForceValueCompressionEnabled() const {
268        return forceValueCompression == ForceValueCompression::Yes;
269    }
270
271    bool isSnappyEnabled() const {
272        return snappyEnabled == SnappyEnabled::Yes;
273    }
274
275    void addStats(ADD_STAT add_stat, const void* c) override;
276
277    void addTakeoverStats(ADD_STAT add_stat, const void *c, const VBucket& vb);
278
279    /* Returns a count of how many items are outstanding to be sent for this
280     * stream's vBucket.
281     */
282    size_t getItemsRemaining();
283
284    uint64_t getLastReadSeqno() const;
285
286    uint64_t getLastSentSeqno() const;
287
288    void log(EXTENSION_LOG_LEVEL severity, const char* fmt, ...) const override;
289
290    // Runs on ActiveStreamCheckpointProcessorTask
291    void nextCheckpointItemTask();
292
293    /**
294     * Function to handle a slow stream that is supposedly hogging memory in
295     * checkpoint mgr. Currently we handle the slow stream by switching from
296     * in-memory to backfilling
297     *
298     * @return true if cursor is dropped; else false
299     */
300    bool handleSlowStream();
301
302    /// @return true if both includeValue and includeXattributes are set to No,
303    /// otherwise return false.
304    bool isKeyOnly() const {
305        return (includeValue == IncludeValue::No) &&
306               (includeXattributes == IncludeXattrs::No);
307    }
308
309    /// @returns a copy of the current collections separator.
310    std::string getCurrentSeparator() const {
311        return currentSeparator;
312    }
313
314    /// @return a const reference to the streams cursor name
315    const std::string& getCursorName() const {
316        return cursorName;
317    }
318
319protected:
320    /**
321     * @param vb reference to the associated vbucket
322     *
323     * @return the outstanding items for the stream's checkpoint cursor.
324     */
325    virtual std::vector<queued_item> getOutstandingItems(VBucket& vb);
326
327    // Given a set of queued items, create mutation responses for each item,
328    // and pass onto the producer associated with this stream.
329    void processItems(std::vector<queued_item>& items,
330                      const LockHolder& streamMutex);
331
332    bool nextCheckpointItem();
333
334    std::unique_ptr<DcpResponse> nextQueuedItem();
335
336    /**
337     * @return a DcpResponse to represent the item. This will be either a
338     *         MutationResponse or SystemEventProducerMessage.
339     */
340    std::unique_ptr<DcpResponse> makeResponseFromItem(const queued_item& item);
341
342    /* The transitionState function is protected (as opposed to private) for
343     * testing purposes.
344     */
345    void transitionState(StreamState newState);
346
347    /**
348     * Check to see if the response is a SystemEvent and if so, apply any
349     * actions to the stream.
350     *
351     * @param response A DcpResponse that is about to be sent to a client
352     */
353    void processSystemEvent(DcpResponse* response);
354
355    /**
356     * Registers a cursor with a given CheckpointManager.
357     * The result of calling the function is that it sets the pendingBackfill
358     * flag, if another backfill is required.  It also sets the curChkSeqno to
359     * be at the position the new cursor is registered.
360     *
361     * @param chkptmgr  The CheckpointManager the cursor will be registered to.
362     * @param lastProcessedSeqno  The last processed seqno.
363     */
364    virtual void registerCursor(CheckpointManager& chkptmgr,
365                                uint64_t lastProcessedSeqno);
366
367    /**
368     * Unlocked variant of nextCheckpointItemTask caller must obtain
369     * streamMutex and pass a reference to it
370     * @param streamMutex reference to lockholder
371     */
372    void nextCheckpointItemTask(const LockHolder& streamMutex);
373
374    /* Indicates that a backfill has been scheduled and has not yet completed.
375     * Is protected (as opposed to private) for testing purposes.
376     */
377    std::atomic<bool> isBackfillTaskRunning;
378
379    /* Indicates if another backfill must be scheduled following the completion
380     * of current running backfill.  Guarded by streamMutex.
381     * Is protected (as opposed to private) for testing purposes.
382     */
383    bool pendingBackfill;
384
385    //! Stats to track items read and sent from the backfill phase
386    struct {
387        std::atomic<size_t> memory;
388        std::atomic<size_t> disk;
389        std::atomic<size_t> sent;
390    } backfillItems;
391
392    /* The last sequence number queued from disk or memory and is
393       snapshotted and put onto readyQ */
394    AtomicMonotonic<uint64_t, ThrowExceptionPolicy> lastReadSeqno;
395
396    /* backfillRemaining is a stat recording the amount of
397     * items remaining to be read from disk.  It is an atomic
398     * because otherwise the function incrBackfillRemaining
399     * must acquire the streamMutex lock.
400     */
401    std::atomic<size_t> backfillRemaining;
402
403    std::unique_ptr<DcpResponse> backfillPhase(std::lock_guard<std::mutex>& lh);
404
405private:
406    std::unique_ptr<DcpResponse> next(std::lock_guard<std::mutex>& lh);
407
408    std::unique_ptr<DcpResponse> inMemoryPhase();
409
410    std::unique_ptr<DcpResponse> takeoverSendPhase();
411
412    std::unique_ptr<DcpResponse> takeoverWaitPhase();
413
414    std::unique_ptr<DcpResponse> deadPhase();
415
416    void snapshot(std::deque<std::unique_ptr<DcpResponse>>& snapshot,
417                  bool mark);
418
419    void endStream(end_stream_status_t reason);
420
421    /* reschedule = FALSE ==> First backfill on the stream
422     * reschedule = TRUE ==> Schedules another backfill on the stream that has
423     *                       finished backfilling once and still in
424     *                       STREAM_BACKFILLING state or in STREAM_IN_MEMORY
425     *                       state.
426     * Note: Expects the streamMutex to be acquired when called
427     */
428    void scheduleBackfill_UNLOCKED(bool reschedule);
429
430    std::string getEndStreamStatusStr(end_stream_status_t status);
431
432    bool isCurrentSnapshotCompleted() const;
433
434    /**
435     * Drop the cursor registered with the checkpoint manager. Used during
436     * cursor dropping. Upon failure to drop the cursor, puts stream to
437     * dead state and notifies the producer connection
438     * Note: Expects the streamMutex to be acquired when called
439     *
440     * @return true if cursor is dropped; else false
441     */
442    bool dropCheckpointCursor_UNLOCKED();
443
444    /**
445     * Notifies the producer connection that the stream has items ready to be
446     * pick up.
447     *
448     * @param force Indiciates if the function should notify the connection
449     *              irrespective of whether the connection already knows that
450     *              the items are ready to be picked up. Default is 'false'
451     */
452    void notifyStreamReady(bool force = false);
453
454    /**
455     * Helper function that tries to takes the ownership of the vbucket
456     * (temporarily) and then removes the checkpoint cursor held by the stream.
457     */
458    void removeCheckpointCursor();
459
460    /**
461     * Decides what log level must be used for (active) stream state
462     * transitions
463     *
464     * @param currState current state of the stream
465     * @param newState new state of the stream
466     *
467     * @return log level
468     */
469    EXTENSION_LOG_LEVEL getTransitionStateLogLevel(StreamState currState,
470                                                   StreamState newState);
471
472    /* The last sequence number queued from memory, but is yet to be
473       snapshotted and put onto readyQ */
474    std::atomic<uint64_t> lastReadSeqnoUnSnapshotted;
475
476    //! The last sequence number sent to the network layer
477    std::atomic<uint64_t> lastSentSeqno;
478
479    //! The last known seqno pointed to by the checkpoint cursor
480    std::atomic<uint64_t> curChkSeqno;
481
482    //! The current vbucket state to send in the takeover stream
483    vbucket_state_t takeoverState;
484
485    //! The amount of items that have been sent during the memory phase
486    std::atomic<size_t> itemsFromMemoryPhase;
487
488    //! Whether or not this is the first snapshot marker sent
489    bool firstMarkerSent;
490
491    std::atomic<int> waitForSnapshot;
492
493    EventuallyPersistentEngine* engine;
494    std::weak_ptr<DcpProducer> producerPtr;
495
496    struct {
497        std::atomic<size_t> bytes;
498        std::atomic<size_t> items;
499    } bufferedBackfill;
500
501    std::atomic<rel_time_t> takeoverStart;
502    size_t takeoverSendMaxTime;
503
504    //! Last snapshot end seqno sent to the DCP client
505    std::atomic<uint64_t> lastSentSnapEndSeqno;
506
507    /* Flag used by checkpointCreatorTask that is set before all items are
508       extracted for given checkpoint cursor, and is unset after all retrieved
509       items are added to the readyQ */
510    std::atomic<bool> chkptItemsExtractionInProgress;
511
512    // Whether the responses sent using this stream should contain the value
513    IncludeValue includeValue;
514    // Whether the responses sent using the stream should contain the xattrs
515    // (if any exist)
516    IncludeXattrs includeXattributes;
517
518    // Will the stream send dcp deletions with delete-times?
519    IncludeDeleteTime includeDeleteTime;
520
521    /// Is Snappy compression supported on this connection?
522    SnappyEnabled snappyEnabled;
523
524    /// Should items be forcefully compressed on this stream?
525    ForceValueCompression forceValueCompression;
526
527    /**
528     * A copy of the collections separator so we can generate MutationResponse
529     * instances that embed the collection/document-name data so we can
530     * replicate that collection information (as a length).
531     *
532     * As checkpoints/backfills are processed, we will monitor for
533     * CollectionsSeparatorChanged events and update the copy accordingly.
534     */
535    std::string currentSeparator;
536
537    /**
538     * The filter the stream will use to decide which keys should be transmitted
539     */
540    Collections::VB::Filter filter;
541
542    /**
543     * The name which uniquely identifies this stream's checkpoint cursor
544     */
545    std::string cursorName;
546
547    /// True if cursorName is registered in CheckpointManager.
548    std::atomic<bool> cursorRegistered{false};
549
550    /**
551     * To ensure each stream gets a unique cursorName, we maintain a 'uid'
552     * which is really just an incrementing uint64
553     */
554    static std::atomic<uint64_t> cursorUID;
555};
556
557
558class ActiveStreamCheckpointProcessorTask : public GlobalTask {
559public:
560    ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e,
561                                        std::shared_ptr<DcpProducer> p);
562
563    std::string getDescription() {
564        return description;
565    }
566
567    std::chrono::microseconds maxExpectedDuration() {
568        // Empirical evidence suggests this task runs under 100ms 99.9999% of
569        // the time.
570        return std::chrono::milliseconds(100);
571    }
572
573    bool run();
574    void schedule(std::shared_ptr<ActiveStream> stream);
575    void wakeup();
576
577    /* Clears the queues and resets the producer reference */
578    void cancelTask();
579
580    /* Returns the number of unique streams waiting to be processed */
581    size_t queueSize() {
582        LockHolder lh(workQueueLock);
583        return queue.size();
584    }
585
586    /// Outputs statistics related to this task via the given callback.
587    void addStats(const std::string& name,
588                  ADD_STAT add_stat,
589                  const void* c) const;
590
591private:
592    std::shared_ptr<ActiveStream> queuePop() {
593        uint16_t vbid = 0;
594        {
595            LockHolder lh(workQueueLock);
596            if (queue.empty()) {
597                return nullptr;
598            }
599            vbid = queue.front();
600            queue.pop();
601            queuedVbuckets.erase(vbid);
602        }
603
604        /* findStream acquires DcpProducer::streamsMutex, hence called
605           without acquiring workQueueLock */
606        auto producer = producerPtr.lock();
607        if (producer) {
608            return dynamic_pointer_cast<ActiveStream>(
609                    producer->findStream(vbid));
610        }
611        return nullptr;
612    }
613
614    bool queueEmpty() {
615        LockHolder lh(workQueueLock);
616        return queue.empty();
617    }
618
619    void pushUnique(uint16_t vbid) {
620        LockHolder lh(workQueueLock);
621        if (queuedVbuckets.count(vbid) == 0) {
622            queue.push(vbid);
623            queuedVbuckets.insert(vbid);
624        }
625    }
626
627    /// Human-readable description of this task.
628    const std::string description;
629
630    /// Guards queue && queuedVbuckets
631    mutable std::mutex workQueueLock;
632
633    /**
634     * Maintain a queue of unique vbucket ids for which stream should be
635     * processed.
636     * There's no need to have the same stream in the queue more than once
637     *
638     * The streams are kept in the 'streams map' of the producer object. We
639     * should not hold a shared reference (even a weak ref) to the stream object
640     * here because 'streams map' is the actual owner. If we hold a weak ref
641     * here and the streams map replaces the stream for the vbucket id with a
642     * new one, then we would end up not updating it here as we append to the
643     * queue only if there is no entry for the vbucket in the queue.
644     */
645    std::queue<VBucket::id_type> queue;
646    std::unordered_set<VBucket::id_type> queuedVbuckets;
647
648    std::atomic<bool> notified;
649    const size_t iterationsBeforeYield;
650
651    const std::weak_ptr<DcpProducer> producerPtr;
652};
653
654class NotifierStream : public Stream {
655public:
656    NotifierStream(EventuallyPersistentEngine* e,
657                   std::shared_ptr<DcpProducer> producer,
658                   const std::string& name,
659                   uint32_t flags,
660                   uint32_t opaque,
661                   uint16_t vb,
662                   uint64_t start_seqno,
663                   uint64_t end_seqno,
664                   uint64_t vb_uuid,
665                   uint64_t snap_start_seqno,
666                   uint64_t snap_end_seqno);
667
668    std::unique_ptr<DcpResponse> next() override;
669
670    uint32_t setDead(end_stream_status_t status) override;
671
672    void notifySeqnoAvailable(uint64_t seqno) override;
673
674    void addStats(ADD_STAT add_stat, const void* c) override;
675
676private:
677
678    void transitionState(StreamState newState);
679
680    void log(EXTENSION_LOG_LEVEL severity, const char* fmt, ...) const override;
681
682    /**
683     * Notifies the producer connection that the stream has items ready to be
684     * pick up.
685     */
686    void notifyStreamReady();
687
688    std::weak_ptr<DcpProducer> producerPtr;
689};
690
691class PassiveStream : public Stream {
692public:
693    PassiveStream(EventuallyPersistentEngine* e,
694                  std::shared_ptr<DcpConsumer> consumer,
695                  const std::string& name,
696                  uint32_t flags,
697                  uint32_t opaque,
698                  uint16_t vb,
699                  uint64_t start_seqno,
700                  uint64_t end_seqno,
701                  uint64_t vb_uuid,
702                  uint64_t snap_start_seqno,
703                  uint64_t snap_end_seqno,
704                  uint64_t vb_high_seqno);
705
706    virtual ~PassiveStream();
707
708    process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
709                                                  size_t batchSize);
710
711    std::unique_ptr<DcpResponse> next() override;
712
713    uint32_t setDead(end_stream_status_t status) override;
714
715    /**
716     * Place a StreamRequest message into the readyQueue, requesting a DCP
717     * stream for the given UUID.
718     *
719     * @params vb_uuid The UUID to use in the StreamRequest.
720     */
721    void streamRequest(uint64_t vb_uuid);
722
723    void acceptStream(uint16_t status, uint32_t add_opaque);
724
725    void reconnectStream(VBucketPtr &vb, uint32_t new_opaque,
726                         uint64_t start_seqno);
727
728    /*
729     * Calls the appropriate function to process the message.
730     *
731     * @params response The dcp message that needs to be processed.
732     * @returns the error code from processing the message.
733     */
734    virtual ENGINE_ERROR_CODE messageReceived(
735            std::unique_ptr<DcpResponse> response);
736
737    void addStats(ADD_STAT add_stat, const void* c) override;
738
739    static const size_t batchSize;
740
741protected:
742
743    bool transitionState(StreamState newState);
744
745    virtual ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
746
747    ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
748
749    /**
750     * Handle DCP system events against this stream.
751     *
752     * @param event The system-event to process against the stream.
753     */
754    ENGINE_ERROR_CODE processSystemEvent(const SystemEventMessage& event);
755
756    /**
757     * Process a create collection event, creating the collection on vb
758     *
759     * @param vb Vbucket onto which the collection is created.
760     * @param event The collection system event creating the collection.
761     */
762    ENGINE_ERROR_CODE processCreateCollection(
763            VBucket& vb, const CreateOrDeleteCollectionEvent& event);
764
765    /**
766     * Process a begin delete collection event.
767     *
768     * @param vb Vbucket which we apply the delete on.
769     * @param event The collection system event deleting the collection.
770     */
771    ENGINE_ERROR_CODE processBeginDeleteCollection(
772            VBucket& vb, const CreateOrDeleteCollectionEvent& event);
773
774    /**
775     * Process a collections change separator event.
776     *
777     * @param vb Vbucket which we apply the delete on.
778     * @param event The collection system event changing the separator.
779     */
780    ENGINE_ERROR_CODE processSeparatorChanged(
781            VBucket& vb, const ChangeSeparatorCollectionEvent& event);
782
783    void handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno);
784
785    virtual void processMarker(SnapshotMarker* marker);
786
787    void processSetVBucketState(SetVBucketState* state);
788
789    uint32_t clearBuffer_UNLOCKED();
790
791    std::string getEndStreamStatusStr(end_stream_status_t status);
792
793    /**
794     * Push a StreamRequest into the readyQueue. The StreamRequest is initiaised
795     * from the object's state except for the uuid.
796     * This function assumes the caller is holding streamMutex.
797     *
798     * @params vb_uuid The VB UUID to use in the StreamRequest.
799     */
800    void streamRequest_UNLOCKED(uint64_t vb_uuid);
801
802    void log(EXTENSION_LOG_LEVEL severity, const char* fmt, ...) const override;
803
804    /**
805     * Notifies the consumer connection that the stream has items ready to be
806     * pick up.
807     */
808    void notifyStreamReady();
809
810    EventuallyPersistentEngine* engine;
811    std::weak_ptr<DcpConsumer> consumerPtr;
812
813    std::atomic<uint64_t> last_seqno;
814
815    std::atomic<uint64_t> cur_snapshot_start;
816    std::atomic<uint64_t> cur_snapshot_end;
817    std::atomic<Snapshot> cur_snapshot_type;
818    bool cur_snapshot_ack;
819
820    struct Buffer {
821        Buffer() : bytes(0) {}
822
823        bool empty() const {
824            LockHolder lh(bufMutex);
825            return messages.empty();
826        }
827
828        void push(std::unique_ptr<DcpResponse> message) {
829            std::lock_guard<std::mutex> lg(bufMutex);
830            bytes += message->getMessageSize();
831            messages.push_back(std::move(message));
832        }
833
834        /*
835         * Caller must of locked bufMutex and pass as lh (not asserted)
836         */
837        std::unique_ptr<DcpResponse> pop_front(std::unique_lock<std::mutex>& lh) {
838            std::unique_ptr<DcpResponse> rval(std::move(messages.front()));
839            messages.pop_front();
840            bytes -= rval->getMessageSize();
841            return rval;
842        }
843
844        /*
845         * Caller must of locked bufMutex and pass as lh (not asserted)
846         */
847        void push_front(std::unique_ptr<DcpResponse> message,
848                        std::unique_lock<std::mutex>& lh) {
849            bytes += message->getMessageSize();
850            messages.push_front(std::move(message));
851        }
852
853        size_t bytes;
854        /* Lock ordering w.r.t to streamMutex:
855           First acquire bufMutex and then streamMutex */
856        mutable std::mutex bufMutex;
857        std::deque<std::unique_ptr<DcpResponse> > messages;
858    } buffer;
859};
860
861#endif  // SRC_DCP_STREAM_H_
862