1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#pragma once
19
20#include "dcp/active_stream.h"
21#include "dcp/passive_stream.h"
22
23class CheckpointManager;
24class MockDcpProducer;
25
26/*
27 * Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
28 * normally protected methods publically for test purposes.
29 */
30class MockActiveStream : public ActiveStream {
31public:
32    MockActiveStream(
33            EventuallyPersistentEngine* e,
34            std::shared_ptr<MockDcpProducer> p,
35            uint32_t flags,
36            uint32_t opaque,
37            VBucket& vb,
38            uint64_t st_seqno = std::numeric_limits<uint64_t>::min(),
39            uint64_t en_seqno = std::numeric_limits<uint64_t>::max(),
40            uint64_t vb_uuid = 0,
41            uint64_t snap_start_seqno = std::numeric_limits<uint64_t>::min(),
42            uint64_t snap_end_seqno = std::numeric_limits<uint64_t>::max(),
43            IncludeValue includeValue = IncludeValue::Yes,
44            IncludeXattrs includeXattrs = IncludeXattrs::Yes,
45            IncludeDeletedUserXattrs includeDeletedUserXattrs =
46                    IncludeDeletedUserXattrs::No);
47
48    // Expose underlying protected ActiveStream methods as public
49    OutstandingItemsResult public_getOutstandingItems(VBucket& vb) {
50        return getOutstandingItems(vb);
51    }
52
53    void public_processItems(OutstandingItemsResult& result) {
54        LockHolder lh(streamMutex);
55        processItems(result, lh);
56    }
57
58    bool public_nextCheckpointItem() {
59        return nextCheckpointItem();
60    }
61
62    const std::queue<std::unique_ptr<DcpResponse>>& public_readyQ() {
63        return readyQ;
64    }
65
66    size_t public_readyQSize() {
67        LockHolder lh(streamMutex);
68        return readyQ.size();
69    }
70
71    std::unique_ptr<DcpResponse> public_nextQueuedItem();
72
73    void public_setBackfillTaskRunning(bool b) {
74        isBackfillTaskRunning = b;
75    }
76
77    bool public_isBackfillTaskRunning() const {
78        return isBackfillTaskRunning;
79    }
80
81    bool public_getPendingBackfill() const {
82        return pendingBackfill;
83    }
84
85    void transitionStateToBackfilling() {
86        transitionState(StreamState::Backfilling);
87    }
88
89    void transitionStateToInMemory() {
90        transitionState(StreamState::InMemory);
91    }
92
93    void transitionStateToTakeoverSend() {
94        transitionState(StreamState::TakeoverSend);
95    }
96
97    void transitionStateToTakeoverWait() {
98        transitionState(StreamState::TakeoverWait);
99    }
100
101    void transitionStateToTakeoverDead() {
102        transitionState(StreamState::Dead);
103    }
104
105    int getNumBackfillItems() const {
106        return backfillItems.memory + backfillItems.disk;
107    }
108
109    int getLastReadSeqno() const {
110        return lastReadSeqno;
111    }
112
113    boost::optional<cb::NonNegativeCounter<size_t>>
114    getNumBackfillItemsRemaining() const {
115        return backfillRemaining;
116    }
117
118    std::unique_ptr<DcpResponse> public_makeResponseFromItem(
119            queued_item& item,
120            SendCommitSyncWriteAs sendMutationInsteadOfCommit);
121
122    /**
123     * Consumes numItems from the stream readyQ
124     */
125    void consumeBackfillItems(int numItems);
126
127    /// Consumes all ready backfill items.
128    void consumeAllBackfillItems();
129
130    bool public_handleSlowStream() {
131        return handleSlowStream();
132    }
133
134    void setState(StreamState state) {
135        state_ = state;
136    }
137
138    virtual OutstandingItemsResult getOutstandingItems(VBucket& vb) override {
139        preGetOutstandingItemsCallback();
140        return ActiveStream::getOutstandingItems(vb);
141    }
142
143    /// A callback to allow tests to inject code before we access the checkpoint
144    std::function<void()> preGetOutstandingItemsCallback = [] { return; };
145
146    void public_registerCursor(CheckpointManager& manager,
147                               const std::string& name,
148                               int64_t seqno);
149
150    bool isDead() { return ActiveStream::getState() == StreamState::Dead; };
151
152    std::unique_ptr<DcpResponse> public_popFromReadyQ();
153
154    bool public_supportSyncReplication() const {
155        return supportSyncReplication();
156    }
157
158    cb::mcbp::DcpStreamId getStreamId() const {
159        return sid;
160    }
161
162    void setCompleteBackfillHook(std::function<void()> hook) {
163        completeBackfillHook = hook;
164    }
165
166    void setNextHook(std::function<void()> hook) {
167        nextHook = hook;
168    }
169
170    IncludeValue public_getIncludeValue() const {
171        return includeValue;
172    }
173
174    IncludeXattrs public_getIncludeXattrs() const {
175        return includeXattributes;
176    }
177
178    IncludeDeletedUserXattrs public_getIncludeDeletedUserXattrs() const {
179        return includeDeletedUserXattrs;
180    }
181};
182
183/**
184  * Variation of the MockActiveStream class, which overloads the registerCursor
185  * method.  In addition it implements two additional methods
186  * (continueRegisterCursor and waitForRegisterCursor), which are used to
187  * control the when registerCursor is executed.
188  */
189class MockActiveStreamWithOverloadedRegisterCursor : public MockActiveStream {
190public:
191    MockActiveStreamWithOverloadedRegisterCursor(
192            EventuallyPersistentEngine* e,
193            std::shared_ptr<MockDcpProducer> p,
194            uint32_t flags,
195            uint32_t opaque,
196            VBucket& vb,
197            uint64_t st_seqno,
198            uint64_t en_seqno,
199            uint64_t vb_uuid,
200            uint64_t snap_start_seqno,
201            uint64_t snap_end_seqno,
202            IncludeValue includeValue = IncludeValue::Yes,
203            IncludeXattrs includeXattrs = IncludeXattrs::Yes)
204        : MockActiveStream(e,
205                           p,
206                           flags,
207                           opaque,
208                           vb,
209                           st_seqno,
210                           en_seqno,
211                           vb_uuid,
212                           snap_start_seqno,
213                           snap_end_seqno,
214                           includeValue,
215                           includeXattrs) {
216    }
217
218    /**
219      * Overload of the ActiveStream registerCursor method.  It first executes
220      * the callback function which is used to inject additional work prior to
221      * the execution of the method ActiveStream::registerCursor.
222      */
223    virtual void registerCursor(CheckpointManager& chkptmgr,
224                                uint64_t lastProcessedSeqno) override {
225        callbackBeforeRegisterCursor();
226        ActiveStream::registerCursor(chkptmgr, lastProcessedSeqno);
227        callbackAfterRegisterCursor();
228    }
229
230    // Function that sets the callback function.  The callback is invoked at the
231    // start of the overloaded registerCursor method.
232    void setCallbackBeforeRegisterCursor(std::function<void()> func) {
233        callbackBeforeRegisterCursor = func;
234    }
235
236    // Function that sets the callback function.  The callback is invoked at the
237    // end of the overloaded registerCursor method.
238    void setCallbackAfterRegisterCursor(std::function<void()> func) {
239        callbackAfterRegisterCursor = func;
240    }
241
242    /**
243      * The callback function which is used to perform additional work on its
244      * first invocation.  The function moves checkpoints forward, whilst in the
245      * middle of performing a backfill.
246      */
247    std::function<void()> callbackBeforeRegisterCursor;
248
249    // The callback function which is used to check the state of
250    // pendingBackfill after the call to ActiveStream::registerCursor.
251    std::function<void()> callbackAfterRegisterCursor;
252};
253
254/* Mock of the PassiveStream class. Wraps the real PassiveStream, but exposes
255 * normally protected methods publically for test purposes.
256 */
257class MockPassiveStream : public PassiveStream {
258public:
259    MockPassiveStream(EventuallyPersistentEngine& e,
260                      std::shared_ptr<DcpConsumer> consumer,
261                      const std::string& name,
262                      uint32_t flags,
263                      uint32_t opaque,
264                      Vbid vb,
265                      uint64_t start_seqno,
266                      uint64_t end_seqno,
267                      uint64_t vb_uuid,
268                      uint64_t snap_start_seqno,
269                      uint64_t snap_end_seqno,
270                      uint64_t vb_high_seqno,
271                      const Collections::ManifestUid vb_manifest_uid)
272        : PassiveStream(&e,
273                        consumer,
274                        name,
275                        flags,
276                        opaque,
277                        vb,
278                        start_seqno,
279                        end_seqno,
280                        vb_uuid,
281                        snap_start_seqno,
282                        snap_end_seqno,
283                        vb_high_seqno,
284                        vb_manifest_uid) {
285    }
286
287    void transitionStateToDead() {
288        transitionState(StreamState::Dead);
289    }
290
291    ENGINE_ERROR_CODE messageReceived(
292            std::unique_ptr<DcpResponse> dcpResponse) override;
293
294    void processMarker(SnapshotMarker* marker) override {
295        PassiveStream::processMarker(marker);
296    }
297
298    ENGINE_ERROR_CODE processMutation(
299            MutationConsumerMessage* mutation) override {
300        return PassiveStream::processMutation(mutation);
301    }
302
303    size_t getNumBufferItems() const {
304        LockHolder lh(buffer.bufMutex);
305        return buffer.messages.size();
306    }
307
308    auto& getBufferMessages() const {
309        return buffer.messages;
310    }
311
312    void setProcessBufferedMessages_postFront_Hook(
313            std::function<void()>& hook) {
314        processBufferedMessages_postFront_Hook = hook;
315    }
316
317    std::unique_ptr<DcpResponse> public_popFromReadyQ();
318
319    const std::queue<std::unique_ptr<DcpResponse>>& public_readyQ() const {
320        return readyQ;
321    }
322
323    const std::string public_createStreamReqValue() const {
324        return createStreamReqValue();
325    }
326
327    bool getCurSnapshotPrepare() const {
328        return cur_snapshot_prepare.load(std::memory_order_relaxed);
329    }
330
331    uint32_t responseMessageSize;
332};
333