xref: /4.6.4/ep-engine/src/dcp/stream.h (revision 370f70cd)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_DCP_STREAM_H_
19#define SRC_DCP_STREAM_H_ 1
20
21#include "config.h"
22
23#include "ep_engine.h"
24#include "ext_meta_parser.h"
25#include "dcp/dcp-types.h"
26#include "dcp/producer.h"
27#include "response.h"
28#include "vbucket.h"
29
30#include <atomic>
31#include <climits>
32#include <queue>
33
34class EventuallyPersistentEngine;
35class MutationResponse;
36class SetVBucketState;
37class SnapshotMarker;
38class DcpResponse;
39
40enum stream_state_t {
41    STREAM_PENDING,
42    STREAM_BACKFILLING,
43    STREAM_IN_MEMORY,
44    STREAM_TAKEOVER_SEND,
45    STREAM_TAKEOVER_WAIT,
46    STREAM_READING,
47    STREAM_DEAD
48};
49
50enum end_stream_status_t {
51    //! The stream ended due to all items being streamed
52    END_STREAM_OK,
53    //! The stream closed early due to a close stream message
54    END_STREAM_CLOSED,
55    //! The stream closed early because the vbucket state changed
56    END_STREAM_STATE,
57    //! The stream closed early because the connection was disconnected
58    END_STREAM_DISCONNECTED,
59    //! The stream was closed early because it was too slow (currently unused,
60    //! but not deleted because it is part of the externally-visible API)
61    END_STREAM_SLOW
62};
63
64enum stream_type_t {
65    STREAM_ACTIVE,
66    STREAM_NOTIFIER,
67    STREAM_PASSIVE
68};
69
70enum snapshot_type_t {
71    none,
72    disk,
73    memory
74};
75
76enum process_items_error_t {
77    all_processed,
78    more_to_process,
79    cannot_process
80};
81
82enum backfill_source_t {
83    BACKFILL_FROM_MEMORY,
84    BACKFILL_FROM_DISK
85};
86
87class Stream : public RCValue {
88public:
89    Stream(const std::string &name, uint32_t flags, uint32_t opaque,
90           uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
91           uint64_t vb_uuid, uint64_t snap_start_seqno,
92           uint64_t snap_end_seqno);
93
94    virtual ~Stream();
95
96    uint32_t getFlags() { return flags_; }
97
98    uint16_t getVBucket() { return vb_; }
99
100    uint32_t getOpaque() { return opaque_; }
101
102    uint64_t getStartSeqno() { return start_seqno_; }
103
104    uint64_t getEndSeqno() { return end_seqno_; }
105
106    uint64_t getVBucketUUID() { return vb_uuid_; }
107
108    uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
109
110    uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
111
112    stream_state_t getState() { return state_; }
113
114    stream_type_t getType() { return type_; }
115
116    virtual void addStats(ADD_STAT add_stat, const void *c);
117
118    virtual DcpResponse* next() = 0;
119
120    virtual uint32_t setDead(end_stream_status_t status) = 0;
121
122    virtual void notifySeqnoAvailable(uint64_t seqno) {}
123
124    const std::string& getName() {
125        return name_;
126    }
127
128    virtual void setActive() {
129        // Stream defaults to do nothing
130    }
131
132    bool isActive() {
133        return state_ != STREAM_DEAD;
134    }
135
136    void clear() {
137        LockHolder lh(streamMutex);
138        clear_UNLOCKED();
139    }
140
141    /// Return a string describing the given stream state.
142    static const char* stateName(stream_state_t st);
143
144protected:
145
146    void clear_UNLOCKED();
147
148    /* To be called after getting streamMutex lock */
149    void pushToReadyQ(DcpResponse* resp);
150
151    /* To be called after getting streamMutex lock */
152    void popFromReadyQ(void);
153
154    uint64_t getReadyQueueMemory(void);
155
156    const std::string &name_;
157    uint32_t flags_;
158    uint32_t opaque_;
159    uint16_t vb_;
160    uint64_t start_seqno_;
161    uint64_t end_seqno_;
162    uint64_t vb_uuid_;
163    uint64_t snap_start_seqno_;
164    uint64_t snap_end_seqno_;
165    AtomicValue<stream_state_t> state_;
166    stream_type_t type_;
167
168    AtomicValue<bool> itemsReady;
169    Mutex streamMutex;
170    std::queue<DcpResponse*> readyQ;
171
172    // Number of items in the readyQ that are not meta items. Used for
173    // calculating getItemsRemaining(). Atomic so it can be safely read by
174    // getItemsRemaining() without acquiring streamMutex.
175    AtomicValue<size_t> readyQ_non_meta_items;
176
177    const static uint64_t dcpMaxSeqno;
178
179private:
180    /* readyQueueMemory tracks the memory occupied by elements
181     * in the readyQ.  It is an atomic because otherwise
182       getReadyQueueMemory would need to acquire streamMutex.
183     */
184    AtomicValue <uint64_t> readyQueueMemory;
185};
186
187
188class ActiveStreamCheckpointProcessorTask;
189
190class ActiveStream : public Stream {
191public:
192    ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
193                 const std::string &name, uint32_t flags, uint32_t opaque,
194                 uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
195                 uint64_t vb_uuid, uint64_t snap_start_seqno,
196                 uint64_t snap_end_seqno);
197
198    ~ActiveStream();
199
200    DcpResponse* next();
201
202    void setActive() {
203        LockHolder lh(streamMutex);
204        if (state_ == STREAM_PENDING) {
205            transitionState(STREAM_BACKFILLING);
206        }
207    }
208
209    uint32_t setDead(end_stream_status_t status);
210
211    void notifySeqnoAvailable(uint64_t seqno);
212
213    void snapshotMarkerAckReceived();
214
215    void setVBucketStateAckRecieved();
216
217    void incrBackfillRemaining(size_t by) {
218        backfillRemaining.fetch_add(by, std::memory_order_relaxed);
219    }
220
221    void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
222
223    bool backfillReceived(Item* itm, backfill_source_t backfill_source);
224
225    void completeBackfill();
226
227    bool isCompressionEnabled();
228
229    void addStats(ADD_STAT add_stat, const void *c);
230
231    void addTakeoverStats(ADD_STAT add_stat, const void *c);
232
233    /* Returns a count of how many items are outstanding to be sent for this
234     * stream's vBucket.
235     */
236    size_t getItemsRemaining();
237
238    uint64_t getLastReadSeqno() const;
239
240    uint64_t getLastSentSeqno() const;
241
242    const Logger& getLogger() const;
243
244    bool isSendMutationKeyOnlyEnabled() const;
245
246    // Runs on ActiveStreamCheckpointProcessorTask
247    void nextCheckpointItemTask();
248
249    /* Function to handle a slow stream that is supposedly hogging memory in
250       checkpoint mgr. Currently we handle the slow stream by switching from
251       in-memory to backfilling */
252    void handleSlowStream();
253
254protected:
255    // Returns the outstanding items for the stream's checkpoint cursor.
256    void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
257
258    // Given a set of queued items, create mutation responses for each item,
259    // and pass onto the producer associated with this stream.
260    void processItems(std::vector<queued_item>& items);
261
262    bool nextCheckpointItem();
263
264    DcpResponse* nextQueuedItem();
265
266    /* The transitionState function is protected (as opposed to private) for
267     * testing purposes.
268     */
269    void transitionState(stream_state_t newState);
270
271    /* Indicates that a backfill has been scheduled and has not yet completed.
272     * Is protected (as opposed to private) for testing purposes.
273     */
274    std::atomic<bool> isBackfillTaskRunning;
275
276    /* Indicates if another backfill must be scheduled following the completion
277     * of current running backfill.  Guarded by streamMutex.
278     * Is protected (as opposed to private) for testing purposes.
279     */
280    bool pendingBackfill;
281
282private:
283
284    DcpResponse* backfillPhase();
285
286    DcpResponse* inMemoryPhase();
287
288    DcpResponse* takeoverSendPhase();
289
290    DcpResponse* takeoverWaitPhase();
291
292    DcpResponse* deadPhase();
293
294    void snapshot(std::deque<MutationResponse*>& snapshot, bool mark);
295
296    void endStream(end_stream_status_t reason);
297
298    /* reschedule = FALSE ==> First backfill on the stream
299     * reschedule = TRUE ==> Schedules another backfill on the stream that has
300     *                       finished backfilling once and still in
301     *                       STREAM_BACKFILLING state or in STREAM_IN_MEMORY
302     *                       state.
303     * Note: Expects the streamMutex to be acquired when called
304     */
305    void scheduleBackfill_UNLOCKED(bool reschedule);
306
307    const char* getEndStreamStatusStr(end_stream_status_t status);
308
309    bool isCurrentSnapshotCompleted() const;
310
311    /* Drop the cursor registered with the checkpoint manager.
312     * Note: Expects the streamMutex to be acquired when called
313     */
314    void dropCheckpointCursor_UNLOCKED();
315
316    /* The last sequence number queued from disk or memory, but is yet to be
317       snapshotted and put onto readyQ */
318    AtomicValue<uint64_t> lastReadSeqnoUnSnapshotted;
319
320    /* The last sequence number queued from disk or memory and is
321       snapshotted and put onto readyQ */
322    AtomicValue<uint64_t> lastReadSeqno;
323
324    //! The last sequence number sent to the network layer
325    AtomicValue<uint64_t> lastSentSeqno;
326
327    //! The last known seqno pointed to by the checkpoint cursor
328    AtomicValue<uint64_t> curChkSeqno;
329
330    //! The current vbucket state to send in the takeover stream
331    vbucket_state_t takeoverState;
332
333    /* backfillRemaining is a stat recording the amount of
334     * items remaining to be read from disk.  It is an atomic
335     * because otherwise the function incrBackfillRemaining
336     * must acquire the streamMutex lock.
337     */
338    AtomicValue <size_t> backfillRemaining;
339
340    //! Stats to track items read and sent from the backfill phase
341    struct {
342        AtomicValue<size_t> memory;
343        AtomicValue<size_t> disk;
344        AtomicValue<size_t> sent;
345    } backfillItems;
346
347    //! The amount of items that have been sent during the memory phase
348    AtomicValue<size_t> itemsFromMemoryPhase;
349
350    //! Whether ot not this is the first snapshot marker sent
351    bool firstMarkerSent;
352
353    AtomicValue<int> waitForSnapshot;
354
355    EventuallyPersistentEngine* engine;
356    dcp_producer_t producer;
357
358    struct {
359        AtomicValue<uint32_t> bytes;
360        AtomicValue<uint32_t> items;
361    } bufferedBackfill;
362
363    AtomicValue<rel_time_t> takeoverStart;
364    size_t takeoverSendMaxTime;
365
366    /* Enum indicating whether the stream mutations should contain key only or
367       both key and value */
368    MutationPayload payloadType;
369
370    //! Last snapshot end seqno sent to the DCP client
371    AtomicValue<uint64_t> lastSentSnapEndSeqno;
372
373    /* Flag used by checkpointCreatorTask that is set before all items are
374       extracted for given checkpoint cursor, and is unset after all retrieved
375       items are added to the readyQ */
376    AtomicValue<bool> chkptItemsExtractionInProgress;
377
378};
379
380
381class ActiveStreamCheckpointProcessorTask : public GlobalTask {
382public:
383    ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
384        : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
385                     INT_MAX, false),
386      notified(false),
387      iterationsBeforeYield(e.getConfiguration()
388                            .getDcpProducerSnapshotMarkerYieldLimit()) { }
389
390    std::string getDescription() {
391        std::string rv("Process checkpoint(s) for DCP producer");
392        return rv;
393    }
394
395    bool run();
396    void schedule(stream_t stream);
397    void wakeup();
398    void clearQueues();
399
400private:
401
402    stream_t queuePop() {
403        stream_t rval;
404        LockHolder lh(workQueueLock);
405        if (!queue.empty()) {
406            rval = queue.front();
407            queue.pop();
408            queuedVbuckets.erase(rval->getVBucket());
409        }
410        return rval;
411    }
412
413    bool queueEmpty() {
414        LockHolder lh(workQueueLock);
415        return queue.empty();
416    }
417
418    void pushUnique(stream_t stream) {
419        LockHolder lh(workQueueLock);
420        if (queuedVbuckets.count(stream->getVBucket()) == 0) {
421            queue.push(stream);
422            queuedVbuckets.insert(stream->getVBucket());
423        }
424    }
425
426    Mutex workQueueLock;
427
428    /**
429     * Maintain a queue of unique stream_t
430     * There's no need to have the same stream in the queue more than once
431     */
432    std::queue<stream_t> queue;
433    std::set<uint16_t> queuedVbuckets;
434
435    AtomicValue<bool> notified;
436    size_t iterationsBeforeYield;
437};
438
439class NotifierStream : public Stream {
440public:
441    NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
442                   const std::string &name, uint32_t flags, uint32_t opaque,
443                   uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
444                   uint64_t vb_uuid, uint64_t snap_start_seqno,
445                   uint64_t snap_end_seqno);
446
447    ~NotifierStream() {
448        transitionState(STREAM_DEAD);
449    }
450
451    DcpResponse* next();
452
453    uint32_t setDead(end_stream_status_t status);
454
455    void notifySeqnoAvailable(uint64_t seqno);
456
457private:
458
459    void transitionState(stream_state_t newState);
460
461    dcp_producer_t producer;
462};
463
464class PassiveStream : public Stream {
465public:
466    PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
467                  const std::string &name, uint32_t flags, uint32_t opaque,
468                  uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
469                  uint64_t vb_uuid, uint64_t snap_start_seqno,
470                  uint64_t snap_end_seqno, uint64_t vb_high_seqno);
471
472    ~PassiveStream();
473
474    process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
475                                                  size_t batchSize);
476
477    DcpResponse* next();
478
479    uint32_t setDead(end_stream_status_t status);
480
481    void acceptStream(uint16_t status, uint32_t add_opaque);
482
483    void reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque,
484                         uint64_t start_seqno);
485
486    ENGINE_ERROR_CODE messageReceived(DcpResponse* response);
487
488    void addStats(ADD_STAT add_stat, const void *c);
489
490private:
491
492    ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
493
494    ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
495
496    void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
497
498    void processMarker(SnapshotMarker* marker);
499
500    void processSetVBucketState(SetVBucketState* state);
501
502    bool transitionState(stream_state_t newState);
503
504    uint32_t clearBuffer_UNLOCKED();
505
506    const char* getEndStreamStatusStr(end_stream_status_t status);
507
508    EventuallyPersistentEngine* engine;
509    dcp_consumer_t consumer;
510
511    AtomicValue<uint64_t> last_seqno;
512
513    AtomicValue<uint64_t> cur_snapshot_start;
514    AtomicValue<uint64_t> cur_snapshot_end;
515    AtomicValue<snapshot_type_t> cur_snapshot_type;
516    bool cur_snapshot_ack;
517
518    struct Buffer {
519        Buffer() : bytes(0), items(0) {}
520        size_t bytes;
521        size_t items;
522        /* Lock ordering w.r.t to streamMutex:
523           First acquire bufMutex and then streamMutex */
524        Mutex bufMutex;
525        std::queue<DcpResponse*> messages;
526    } buffer;
527};
528
529#endif  // SRC_DCP_STREAM_H_
530