1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#pragma once
19
20#include "dcp/active_stream.h"
21#include "dcp/producer.h"
22#include "dcp/stream.h"
23
24class ActiveStreamCheckpointProcessorTask;
25struct BackfillScanBuffer;
26class MockActiveStream;
27class MockDcpMessageProducers;
28
29/*
30 * Mock of the DcpProducer class.  Wraps the real DcpProducer, but exposes
31 * normally protected methods publically for test purposes.
32 */
33class MockDcpProducer : public DcpProducer {
34public:
35    MockDcpProducer(EventuallyPersistentEngine& theEngine,
36                    const void* cookie,
37                    const std::string& name,
38                    uint32_t flags,
39                    bool startTask = true);
40
41    using DcpProducer::updateStreamsMap;
42
43    ENGINE_ERROR_CODE maybeDisconnect() {
44        return DcpProducer::maybeDisconnect();
45    }
46
47    ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers) {
48        return DcpProducer::maybeSendNoop(producers);
49    }
50
51    void setLastReceiveTime(const rel_time_t timeValue) {
52        lastReceiveTime = timeValue;
53    }
54
55    void setNoopSendTime(const rel_time_t timeValue) {
56        noopCtx.sendTime = timeValue;
57    }
58
59    rel_time_t getNoopSendTime() {
60        return noopCtx.sendTime;
61    }
62
63    bool getNoopPendingRecv() {
64        return noopCtx.pendingRecv;
65    }
66
67    void setNoopEnabled(const bool booleanValue) {
68        noopCtx.enabled = booleanValue;
69    }
70
71    bool getNoopEnabled() {
72        return noopCtx.enabled;
73    }
74
75    void setDCPExpiry(bool value) {
76        enableExpiryOpcode = value;
77        if (enableExpiryOpcode) {
78            // Expiry opcode uses the same encoding as deleteV2 (includes
79            // delete time); therefore a client enabling expiry_opcode also
80            // implicitly enables includeDeletetime.
81            includeDeleteTime = IncludeDeleteTime::Yes;
82        }
83    }
84
85    bool getDCPExpiry() const {
86        return enableExpiryOpcode;
87    }
88
89    void setSyncReplication(SyncReplication value) {
90        supportsSyncReplication = value;
91    }
92
93    /**
94     * Create the ActiveStreamCheckpointProcessorTask and assign to
95     * checkpointCreator->task
96     */
97    void createCheckpointProcessorTask() {
98        DcpProducer::createCheckpointProcessorTask();
99    }
100
101    /**
102     * Schedule the checkpointCreator->task on the ExecutorPool
103     */
104    void scheduleCheckpointProcessorTask() {
105        DcpProducer::scheduleCheckpointProcessorTask();
106    }
107
108    ActiveStreamCheckpointProcessorTask* getCheckpointSnapshotTask() const;
109
110    /**
111     * Finds the stream for a given vbucket
112     */
113    std::shared_ptr<Stream> findStream(Vbid vbid);
114
115    /**
116     * Finds the stream for a given vbucket/sid
117     * @returns a pair where second indicates if the VB has no entries at all
118     *          first is the stream (or null if no stream)
119     */
120    std::pair<std::shared_ptr<Stream>, bool> findStream(
121            Vbid vbid, cb::mcbp::DcpStreamId sid);
122
123    /**
124     * Sets the backfill buffer size (max limit) to a particular value
125     */
126    void setBackfillBufferSize(size_t newSize);
127
128    bool getBackfillBufferFullStatus();
129
130    /*
131     * @return A reference to BackfillManager::scanBuffer
132     */
133    BackfillScanBuffer& public_getBackfillScanBuffer();
134
135    void bytesForceRead(size_t bytes);
136
137    BackfillManager& getBFM() {
138        return *(backfillMgr.load());
139    }
140
141    BackfillManager* getBFMPtr() {
142        return backfillMgr.load().get();
143    }
144
145    size_t getBytesOutstanding() const {
146        return log.getBytesOutstanding();
147    }
148
149    void ackBytesOutstanding(size_t bytes) {
150        return log.acknowledge(bytes);
151    }
152
153    VBReadyQueue& getReadyQueue() {
154        return ready;
155    }
156
157    /**
158     * Place a mock active stream into the producer
159     */
160    std::shared_ptr<MockActiveStream> mockActiveStreamRequest(
161            uint32_t flags,
162            uint32_t opaque,
163            VBucket& vb,
164            uint64_t start_seqno,
165            uint64_t end_seqno,
166            uint64_t vbucket_uuid,
167            uint64_t snap_start_seqno,
168            uint64_t snap_end_seqno,
169            IncludeValue includeValue = IncludeValue::Yes,
170            IncludeXattrs includeXattrs = IncludeXattrs::Yes,
171            IncludeDeletedUserXattrs includeDeleteUserXattrs =
172                    IncludeDeletedUserXattrs::No);
173
174    /**
175     * Step the producer and expect the opcode to be returned
176     */
177    ENGINE_ERROR_CODE stepAndExpect(MockDcpMessageProducers* producers,
178                                    cb::mcbp::ClientOpcode expectedOpcode);
179
180    /**
181     * Call step(), but wrap the producers with a DcpMsgProducersBorderGuard (
182     * in the same way it is called from EvPEngine::step().
183     *
184     * Useful when operating directly on a (Mock)DcpProducer object without
185     * ep_engine, but need to ensure currentEngine switching is still correct.
186     */
187    ENGINE_ERROR_CODE stepWithBorderGuard(dcp_message_producers& producers);
188
189    void enableMultipleStreamRequests() {
190        multipleStreamRequests = MultipleStreamRequests::Yes;
191    }
192
193    void enableStreamEndOnClientStreamClose() {
194        sendStreamEndOnClientStreamClose = true;
195    }
196
197    bool scheduleBackfillManager(VBucket& vb,
198                                 std::shared_ptr<ActiveStream> s,
199                                 uint64_t start,
200                                 uint64_t end) override {
201        beforeScheduleBackfillCB(end);
202        return DcpProducer::scheduleBackfillManager(
203                vb, std::move(s), start, end);
204    }
205
206    void setBeforeScheduleBackfillCB(std::function<void(uint64_t)> backfillCB) {
207        beforeScheduleBackfillCB = backfillCB;
208    }
209
210    std::function<void(uint64_t)> beforeScheduleBackfillCB = [](uint64_t) {};
211
212    void setCloseAllStreamsHook(std::function<void()> hook) {
213        closeAllStreamsHook = hook;
214    }
215
216    void setCloseAllStreamsPostLockHook(std::function<void()> hook) {
217        closeAllStreamsPostLockHook = hook;
218    }
219
220    void setCloseAllStreamsPreLockHook(std::function<void()> hook) {
221        closeAllStreamsPreLockHook = hook;
222    }
223
224    void setSeqnoAckHook(std::function<void()> hook) {
225        seqnoAckHook = hook;
226    }
227
228    IncludeValue public_getIncludeValue() const {
229        return includeValue;
230    }
231
232    IncludeXattrs public_getIncludeXattrs() const {
233        return includeXattrs;
234    }
235
236    IncludeDeletedUserXattrs public_getIncludeDeletedUserXattrs() const {
237        return includeDeletedUserXattrs;
238    }
239};
240