xref: /4.6.4/ep-engine/src/dcp/stream.h (revision 2513928d)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_DCP_STREAM_H_
19#define SRC_DCP_STREAM_H_ 1
20
21#include "config.h"
22
23#include "ep_engine.h"
24#include "ext_meta_parser.h"
25#include "dcp/dcp-types.h"
26#include "dcp/producer.h"
27#include "response.h"
28#include "vbucket.h"
29
30#include <atomic>
31#include <climits>
32#include <queue>
33
34class EventuallyPersistentEngine;
35class MutationResponse;
36class SetVBucketState;
37class SnapshotMarker;
38class DcpResponse;
39
40enum stream_state_t {
41    STREAM_PENDING,
42    STREAM_BACKFILLING,
43    STREAM_IN_MEMORY,
44    STREAM_TAKEOVER_SEND,
45    STREAM_TAKEOVER_WAIT,
46    STREAM_READING,
47    STREAM_DEAD
48};
49
50enum end_stream_status_t {
51    //! The stream ended due to all items being streamed
52    END_STREAM_OK,
53    //! The stream closed early due to a close stream message
54    END_STREAM_CLOSED,
55    //! The stream closed early because the vbucket state changed
56    END_STREAM_STATE,
57    //! The stream closed early because the connection was disconnected
58    END_STREAM_DISCONNECTED,
59    //! The stream was closed early because it was too slow (currently unused,
60    //! but not deleted because it is part of the externally-visible API)
61    END_STREAM_SLOW
62};
63
64enum stream_type_t {
65    STREAM_ACTIVE,
66    STREAM_NOTIFIER,
67    STREAM_PASSIVE
68};
69
70enum snapshot_type_t {
71    none,
72    disk,
73    memory
74};
75
76enum process_items_error_t {
77    all_processed,
78    more_to_process,
79    cannot_process
80};
81
82enum backfill_source_t {
83    BACKFILL_FROM_MEMORY,
84    BACKFILL_FROM_DISK
85};
86
87class Stream : public RCValue {
88public:
89    Stream(const std::string &name, uint32_t flags, uint32_t opaque,
90           uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
91           uint64_t vb_uuid, uint64_t snap_start_seqno,
92           uint64_t snap_end_seqno);
93
94    virtual ~Stream();
95
96    uint32_t getFlags() { return flags_; }
97
98    uint16_t getVBucket() { return vb_; }
99
100    uint32_t getOpaque() { return opaque_; }
101
102    uint64_t getStartSeqno() { return start_seqno_; }
103
104    uint64_t getEndSeqno() { return end_seqno_; }
105
106    uint64_t getVBucketUUID() { return vb_uuid_; }
107
108    uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
109
110    uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
111
112    stream_state_t getState() { return state_; }
113
114    stream_type_t getType() { return type_; }
115
116    virtual void addStats(ADD_STAT add_stat, const void *c);
117
118    virtual DcpResponse* next() = 0;
119
120    virtual uint32_t setDead(end_stream_status_t status) = 0;
121
122    virtual void notifySeqnoAvailable(uint64_t seqno) {}
123
124    const std::string& getName() {
125        return name_;
126    }
127
128    virtual void setActive() {
129        // Stream defaults to do nothing
130    }
131
132    bool isActive() {
133        return state_ != STREAM_DEAD;
134    }
135
136    /// @Returns true if state_ is Backfilling
137    bool isBackfilling() const;
138
139    /// @Returns true if state_ is InMemory
140    bool isInMemory() const;
141
142    void clear() {
143        LockHolder lh(streamMutex);
144        clear_UNLOCKED();
145    }
146
147    /// Return a string describing the given stream state.
148    static const char* stateName(stream_state_t st);
149
150protected:
151
152    void clear_UNLOCKED();
153
154    /* To be called after getting streamMutex lock */
155    void pushToReadyQ(DcpResponse* resp);
156
157    /* To be called after getting streamMutex lock */
158    void popFromReadyQ(void);
159
160    uint64_t getReadyQueueMemory(void);
161
162    const std::string &name_;
163    uint32_t flags_;
164    uint32_t opaque_;
165    uint16_t vb_;
166    uint64_t start_seqno_;
167    uint64_t end_seqno_;
168    uint64_t vb_uuid_;
169    uint64_t snap_start_seqno_;
170    uint64_t snap_end_seqno_;
171    AtomicValue<stream_state_t> state_;
172    stream_type_t type_;
173
174    AtomicValue<bool> itemsReady;
175    Mutex streamMutex;
176    std::queue<DcpResponse*> readyQ;
177
178    // Number of items in the readyQ that are not meta items. Used for
179    // calculating getItemsRemaining(). Atomic so it can be safely read by
180    // getItemsRemaining() without acquiring streamMutex.
181    AtomicValue<size_t> readyQ_non_meta_items;
182
183    const static uint64_t dcpMaxSeqno;
184
185private:
186    /* readyQueueMemory tracks the memory occupied by elements
187     * in the readyQ.  It is an atomic because otherwise
188       getReadyQueueMemory would need to acquire streamMutex.
189     */
190    AtomicValue <uint64_t> readyQueueMemory;
191};
192
193
194class ActiveStreamCheckpointProcessorTask;
195
196class ActiveStream : public Stream {
197public:
198    ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
199                 const std::string &name, uint32_t flags, uint32_t opaque,
200                 uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
201                 uint64_t vb_uuid, uint64_t snap_start_seqno,
202                 uint64_t snap_end_seqno);
203
204    ~ActiveStream();
205
206    DcpResponse* next();
207
208    void setActive() {
209        LockHolder lh(streamMutex);
210        if (state_ == STREAM_PENDING) {
211            transitionState(STREAM_BACKFILLING);
212        }
213    }
214
215    uint32_t setDead(end_stream_status_t status);
216
217    void notifySeqnoAvailable(uint64_t seqno);
218
219    void snapshotMarkerAckReceived();
220
221    void setVBucketStateAckRecieved();
222
223    void incrBackfillRemaining(size_t by) {
224        backfillRemaining.fetch_add(by, std::memory_order_relaxed);
225    }
226
227    void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
228
229    bool backfillReceived(Item* itm, backfill_source_t backfill_source);
230
231    void completeBackfill();
232
233    bool isCompressionEnabled();
234
235    void addStats(ADD_STAT add_stat, const void *c);
236
237    void addTakeoverStats(ADD_STAT add_stat, const void *c);
238
239    /* Returns a count of how many items are outstanding to be sent for this
240     * stream's vBucket.
241     */
242    size_t getItemsRemaining();
243
244    uint64_t getLastReadSeqno() const;
245
246    uint64_t getLastSentSeqno() const;
247
248    const Logger& getLogger() const;
249
250    bool isSendMutationKeyOnlyEnabled() const;
251
252    // Runs on ActiveStreamCheckpointProcessorTask
253    void nextCheckpointItemTask();
254
255    /* Function to handle a slow stream that is supposedly hogging memory in
256       checkpoint mgr. Currently we handle the slow stream by switching from
257       in-memory to backfilling */
258    void handleSlowStream();
259
260protected:
261    // Returns the outstanding items for the stream's checkpoint cursor.
262    void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
263
264    // Given a set of queued items, create mutation responses for each item,
265    // and pass onto the producer associated with this stream.
266    void processItems(std::vector<queued_item>& items);
267
268    bool nextCheckpointItem();
269
270    DcpResponse* nextQueuedItem();
271
272    /* The transitionState function is protected (as opposed to private) for
273     * testing purposes.
274     */
275    void transitionState(stream_state_t newState);
276
277    /* Indicates that a backfill has been scheduled and has not yet completed.
278     * Is protected (as opposed to private) for testing purposes.
279     */
280    std::atomic<bool> isBackfillTaskRunning;
281
282    /* Indicates if another backfill must be scheduled following the completion
283     * of current running backfill.  Guarded by streamMutex.
284     * Is protected (as opposed to private) for testing purposes.
285     */
286    bool pendingBackfill;
287
288private:
289
290    DcpResponse* backfillPhase(LockHolder& lh);
291
292    DcpResponse* inMemoryPhase();
293
294    DcpResponse* takeoverSendPhase();
295
296    DcpResponse* takeoverWaitPhase();
297
298    DcpResponse* deadPhase();
299
300    void snapshot(std::deque<MutationResponse*>& snapshot, bool mark);
301
302    void endStream(end_stream_status_t reason);
303
304    /* reschedule = FALSE ==> First backfill on the stream
305     * reschedule = TRUE ==> Schedules another backfill on the stream that has
306     *                       finished backfilling once and still in
307     *                       STREAM_BACKFILLING state or in STREAM_IN_MEMORY
308     *                       state.
309     * Note: Expects the streamMutex to be acquired when called
310     */
311    void scheduleBackfill_UNLOCKED(bool reschedule);
312
313    const char* getEndStreamStatusStr(end_stream_status_t status);
314
315    bool isCurrentSnapshotCompleted() const;
316
317    /* Drop the cursor registered with the checkpoint manager.
318     * Note: Expects the streamMutex to be acquired when called
319     */
320    void dropCheckpointCursor_UNLOCKED();
321
322    /* The last sequence number queued from disk or memory, but is yet to be
323       snapshotted and put onto readyQ */
324    AtomicValue<uint64_t> lastReadSeqnoUnSnapshotted;
325
326    /* The last sequence number queued from disk or memory and is
327       snapshotted and put onto readyQ */
328    AtomicValue<uint64_t> lastReadSeqno;
329
330    //! The last sequence number sent to the network layer
331    AtomicValue<uint64_t> lastSentSeqno;
332
333    //! The last known seqno pointed to by the checkpoint cursor
334    AtomicValue<uint64_t> curChkSeqno;
335
336    //! The current vbucket state to send in the takeover stream
337    vbucket_state_t takeoverState;
338
339    /* backfillRemaining is a stat recording the amount of
340     * items remaining to be read from disk.  It is an atomic
341     * because otherwise the function incrBackfillRemaining
342     * must acquire the streamMutex lock.
343     */
344    AtomicValue <size_t> backfillRemaining;
345
346    //! Stats to track items read and sent from the backfill phase
347    struct {
348        AtomicValue<size_t> memory;
349        AtomicValue<size_t> disk;
350        AtomicValue<size_t> sent;
351    } backfillItems;
352
353    //! The amount of items that have been sent during the memory phase
354    AtomicValue<size_t> itemsFromMemoryPhase;
355
356    //! Whether ot not this is the first snapshot marker sent
357    bool firstMarkerSent;
358
359    AtomicValue<int> waitForSnapshot;
360
361    EventuallyPersistentEngine* engine;
362    dcp_producer_t producer;
363
364    struct {
365        AtomicValue<uint32_t> bytes;
366        AtomicValue<uint32_t> items;
367    } bufferedBackfill;
368
369    AtomicValue<rel_time_t> takeoverStart;
370    size_t takeoverSendMaxTime;
371
372    /* Enum indicating whether the stream mutations should contain key only or
373       both key and value */
374    MutationPayload payloadType;
375
376    //! Last snapshot end seqno sent to the DCP client
377    AtomicValue<uint64_t> lastSentSnapEndSeqno;
378
379    /* Flag used by checkpointCreatorTask that is set before all items are
380       extracted for given checkpoint cursor, and is unset after all retrieved
381       items are added to the readyQ */
382    AtomicValue<bool> chkptItemsExtractionInProgress;
383
384};
385
386
387class ActiveStreamCheckpointProcessorTask : public GlobalTask {
388public:
389    ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
390        : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
391                     INT_MAX, false),
392      notified(false),
393      iterationsBeforeYield(e.getConfiguration()
394                            .getDcpProducerSnapshotMarkerYieldLimit()) { }
395
396    std::string getDescription() {
397        std::string rv("Process checkpoint(s) for DCP producer");
398        return rv;
399    }
400
401    bool run();
402    void schedule(stream_t stream);
403    void wakeup();
404    void clearQueues();
405
406private:
407
408    stream_t queuePop() {
409        stream_t rval;
410        LockHolder lh(workQueueLock);
411        if (!queue.empty()) {
412            rval = queue.front();
413            queue.pop();
414            queuedVbuckets.erase(rval->getVBucket());
415        }
416        return rval;
417    }
418
419    bool queueEmpty() {
420        LockHolder lh(workQueueLock);
421        return queue.empty();
422    }
423
424    void pushUnique(stream_t stream) {
425        LockHolder lh(workQueueLock);
426        if (queuedVbuckets.count(stream->getVBucket()) == 0) {
427            queue.push(stream);
428            queuedVbuckets.insert(stream->getVBucket());
429        }
430    }
431
432    Mutex workQueueLock;
433
434    /**
435     * Maintain a queue of unique stream_t
436     * There's no need to have the same stream in the queue more than once
437     */
438    std::queue<stream_t> queue;
439    std::set<uint16_t> queuedVbuckets;
440
441    AtomicValue<bool> notified;
442    size_t iterationsBeforeYield;
443};
444
445class NotifierStream : public Stream {
446public:
447    NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
448                   const std::string &name, uint32_t flags, uint32_t opaque,
449                   uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
450                   uint64_t vb_uuid, uint64_t snap_start_seqno,
451                   uint64_t snap_end_seqno);
452
453    ~NotifierStream() {
454        transitionState(STREAM_DEAD);
455    }
456
457    DcpResponse* next();
458
459    uint32_t setDead(end_stream_status_t status);
460
461    void notifySeqnoAvailable(uint64_t seqno);
462
463private:
464
465    void transitionState(stream_state_t newState);
466
467    dcp_producer_t producer;
468};
469
470class PassiveStream : public Stream {
471public:
472    PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
473                  const std::string &name, uint32_t flags, uint32_t opaque,
474                  uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
475                  uint64_t vb_uuid, uint64_t snap_start_seqno,
476                  uint64_t snap_end_seqno, uint64_t vb_high_seqno);
477
478    ~PassiveStream();
479
480    process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
481                                                  size_t batchSize);
482
483    DcpResponse* next();
484
485    uint32_t setDead(end_stream_status_t status);
486
487    void acceptStream(uint16_t status, uint32_t add_opaque);
488
489    void reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque,
490                         uint64_t start_seqno);
491
492    ENGINE_ERROR_CODE messageReceived(DcpResponse* response);
493
494    void addStats(ADD_STAT add_stat, const void *c);
495
496private:
497
498    ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
499
500    ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
501
502    void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
503
504    void processMarker(SnapshotMarker* marker);
505
506    void processSetVBucketState(SetVBucketState* state);
507
508    bool transitionState(stream_state_t newState);
509
510    uint32_t clearBuffer_UNLOCKED();
511
512    const char* getEndStreamStatusStr(end_stream_status_t status);
513
514    EventuallyPersistentEngine* engine;
515    dcp_consumer_t consumer;
516
517    AtomicValue<uint64_t> last_seqno;
518
519    AtomicValue<uint64_t> cur_snapshot_start;
520    AtomicValue<uint64_t> cur_snapshot_end;
521    AtomicValue<snapshot_type_t> cur_snapshot_type;
522    bool cur_snapshot_ack;
523
524    struct Buffer {
525        Buffer() : bytes(0), items(0) {}
526        size_t bytes;
527        size_t items;
528        /* Lock ordering w.r.t to streamMutex:
529           First acquire bufMutex and then streamMutex */
530        Mutex bufMutex;
531        std::queue<DcpResponse*> messages;
532    } buffer;
533};
534
535#endif  // SRC_DCP_STREAM_H_
536