1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#pragma once
19
20#include "collections/vbucket_filter.h"
21#include "dcp/consumer.h"
22#include "dcp/producer.h"
23#include "dcp/stream.h"
24#include "tests/mock/mock_dcp_producer.h"
25
26/*
27 * Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
28 * normally protected methods publically for test purposes.
29 */
30class MockActiveStream : public ActiveStream {
31public:
32    MockActiveStream(EventuallyPersistentEngine* e,
33                     std::shared_ptr<MockDcpProducer> p,
34                     uint32_t flags,
35                     uint32_t opaque,
36                     VBucket& vb,
37                     uint64_t st_seqno,
38                     uint64_t en_seqno,
39                     uint64_t vb_uuid,
40                     uint64_t snap_start_seqno,
41                     uint64_t snap_end_seqno,
42                     IncludeValue includeValue = IncludeValue::Yes,
43                     IncludeXattrs includeXattrs = IncludeXattrs::Yes)
44        : ActiveStream(e,
45                       p,
46                       p->getName(),
47                       flags,
48                       opaque,
49                       vb,
50                       st_seqno,
51                       en_seqno,
52                       vb_uuid,
53                       snap_start_seqno,
54                       snap_end_seqno,
55                       includeValue,
56                       includeXattrs,
57                       IncludeDeleteTime::No,
58                       p->getFilter(),
59                       vb.getManifest()) {
60    }
61
62    // Expose underlying protected ActiveStream methods as public
63    std::vector<queued_item> public_getOutstandingItems(VBucket& vb) {
64        return getOutstandingItems(vb);
65    }
66
67    void public_processItems(std::vector<queued_item>& items) {
68        LockHolder lh(streamMutex);
69        processItems(items, lh);
70    }
71
72    bool public_nextCheckpointItem() {
73        return nextCheckpointItem();
74    }
75
76    const std::queue<std::unique_ptr<DcpResponse>>& public_readyQ() {
77        return readyQ;
78    }
79
80    std::unique_ptr<DcpResponse> public_nextQueuedItem() {
81        return nextQueuedItem();
82    }
83
84    void public_setBackfillTaskRunning(bool b) {
85        isBackfillTaskRunning = b;
86    }
87
88    bool public_isBackfillTaskRunning() const {
89        return isBackfillTaskRunning;
90    }
91
92    bool public_getPendingBackfill() const {
93        return pendingBackfill;
94    }
95
96    void transitionStateToBackfilling() {
97        transitionState(StreamState::Backfilling);
98    }
99
100    void transitionStateToInMemory() {
101        transitionState(StreamState::InMemory);
102    }
103
104    void transitionStateToTakeoverSend() {
105        transitionState(StreamState::TakeoverSend);
106    }
107
108    void transitionStateToTakeoverWait() {
109        transitionState(StreamState::TakeoverWait);
110    }
111
112    void transitionStateToTakeoverDead() {
113        transitionState(StreamState::Dead);
114    }
115
116    int getNumBackfillItems() const {
117        return backfillItems.memory + backfillItems.disk;
118    }
119
120    int getLastReadSeqno() const {
121        return lastReadSeqno;
122    }
123
124    int getNumBackfillItemsRemaining() const {
125        return backfillRemaining;
126    }
127
128    std::unique_ptr<DcpResponse> public_makeResponseFromItem(
129            queued_item& item) {
130        return makeResponseFromItem(item);
131    }
132
133    /**
134     * Consumes numItems from the stream readyQ
135     */
136    void consumeBackfillItems(int numItems) {
137        std::lock_guard<std::mutex> lh(streamMutex);
138        for (int items = 0; items < numItems;) {
139            auto resp = backfillPhase(lh);
140            if (resp) {
141                ++items;
142            }
143        }
144    }
145
146    bool public_handleSlowStream() {
147        return handleSlowStream();
148    }
149
150    void setState(StreamState state) {
151        state_ = state;
152    }
153
154    virtual std::vector<queued_item> getOutstandingItems(VBucket& vb) override {
155        preGetOutstandingItemsCallback();
156        return ActiveStream::getOutstandingItems(vb);
157    }
158
159    /// A callback to allow tests to inject code before we access the checkpoint
160    std::function<void()> preGetOutstandingItemsCallback = [] { return; };
161};
162
163/**
164  * Variation of the MockActiveStream class, which overloads the registerCursor
165  * method.  In addition it implements two additional methods
166  * (continueRegisterCursor and waitForRegisterCursor), which are used to
167  * control the when registerCursor is executed.
168  */
169class MockActiveStreamWithOverloadedRegisterCursor : public MockActiveStream {
170public:
171    MockActiveStreamWithOverloadedRegisterCursor(
172            EventuallyPersistentEngine* e,
173            std::shared_ptr<MockDcpProducer> p,
174            uint32_t flags,
175            uint32_t opaque,
176            VBucket& vb,
177            uint64_t st_seqno,
178            uint64_t en_seqno,
179            uint64_t vb_uuid,
180            uint64_t snap_start_seqno,
181            uint64_t snap_end_seqno,
182            IncludeValue includeValue = IncludeValue::Yes,
183            IncludeXattrs includeXattrs = IncludeXattrs::Yes)
184        : MockActiveStream(e,
185                           p,
186                           flags,
187                           opaque,
188                           vb,
189                           st_seqno,
190                           en_seqno,
191                           vb_uuid,
192                           snap_start_seqno,
193                           snap_end_seqno,
194                           includeValue,
195                           includeXattrs) {
196    }
197
198    /**
199      * Overload of the ActiveStream registerCursor method.  It first executes
200      * the callback function which is used to inject additional work prior to
201      * the execution of the method ActiveStream::registerCursor.
202      */
203    virtual void registerCursor(CheckpointManager& chkptmgr,
204                                uint64_t lastProcessedSeqno) override {
205        callbackBeforeRegisterCursor();
206        ActiveStream::registerCursor(chkptmgr, lastProcessedSeqno);
207        callbackAfterRegisterCursor();
208    }
209
210    // Function that sets the callback function.  The callback is invoked at the
211    // start of the overloaded registerCursor method.
212    void setCallbackBeforeRegisterCursor(std::function<void()> func) {
213        callbackBeforeRegisterCursor = func;
214    }
215
216    // Function that sets the callback function.  The callback is invoked at the
217    // end of the overloaded registerCursor method.
218    void setCallbackAfterRegisterCursor(std::function<void()> func) {
219        callbackAfterRegisterCursor = func;
220    }
221
222    /**
223      * The callback function which is used to perform additional work on its
224      * first invocation.  The function moves checkpoints forward, whilst in the
225      * middle of performing a backfill.
226      */
227    std::function<void()> callbackBeforeRegisterCursor;
228
229    // The callback function which is used to check the state of
230    // pendingBackfill after the call to ActiveStream::registerCursor.
231    std::function<void()> callbackAfterRegisterCursor;
232};
233
234/* Mock of the PassiveStream class. Wraps the real PassiveStream, but exposes
235 * normally protected methods publically for test purposes.
236 */
237class MockPassiveStream : public PassiveStream {
238public:
239    MockPassiveStream(EventuallyPersistentEngine& e,
240                      std::shared_ptr<DcpConsumer> consumer,
241                      const std::string& name,
242                      uint32_t flags,
243                      uint32_t opaque,
244                      uint16_t vb,
245                      uint64_t start_seqno,
246                      uint64_t end_seqno,
247                      uint64_t vb_uuid,
248                      uint64_t snap_start_seqno,
249                      uint64_t snap_end_seqno,
250                      uint64_t vb_high_seqno)
251        : PassiveStream(&e,
252                        consumer,
253                        name,
254                        flags,
255                        opaque,
256                        vb,
257                        start_seqno,
258                        end_seqno,
259                        vb_uuid,
260                        snap_start_seqno,
261                        snap_end_seqno,
262                        vb_high_seqno) {
263    }
264
265    void transitionStateToDead() {
266        transitionState(StreamState::Dead);
267    }
268
269    ENGINE_ERROR_CODE messageReceived(
270            std::unique_ptr<DcpResponse> dcpResponse) override {
271        responseMessageSize = dcpResponse->getMessageSize();
272        return PassiveStream::messageReceived(std::move(dcpResponse));
273    }
274
275    size_t getNumBufferItems() const {
276        LockHolder lh(buffer.bufMutex);
277        return buffer.messages.size();
278    }
279
280    uint32_t responseMessageSize;
281};
282