1ef22f9b0SDave Rigby /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2ef22f9b0SDave Rigby /*
3ef22f9b0SDave Rigby  *     Copyright 2016 Couchbase, Inc.
4ef22f9b0SDave Rigby  *
5ef22f9b0SDave Rigby  *   Licensed under the Apache License, Version 2.0 (the "License");
6ef22f9b0SDave Rigby  *   you may not use this file except in compliance with the License.
7ef22f9b0SDave Rigby  *   You may obtain a copy of the License at
8ef22f9b0SDave Rigby  *
9ef22f9b0SDave Rigby  *       http://www.apache.org/licenses/LICENSE-2.0
10ef22f9b0SDave Rigby  *
11ef22f9b0SDave Rigby  *   Unless required by applicable law or agreed to in writing, software
12ef22f9b0SDave Rigby  *   distributed under the License is distributed on an "AS IS" BASIS,
13ef22f9b0SDave Rigby  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14ef22f9b0SDave Rigby  *   See the License for the specific language governing permissions and
15ef22f9b0SDave Rigby  *   limitations under the License.
16ef22f9b0SDave Rigby  */
17ef22f9b0SDave Rigby 
18ef22f9b0SDave Rigby #pragma once
19ef22f9b0SDave Rigby 
20af43f7cfSDave Rigby #include "dcp/active_stream.h"
21ef22f9b0SDave Rigby #include "dcp/producer.h"
22ef22f9b0SDave Rigby #include "dcp/stream.h"
23ef22f9b0SDave Rigby 
243718696eSDave Rigby class ActiveStreamCheckpointProcessorTask;
253718696eSDave Rigby struct BackfillScanBuffer;
2638ba8360SJim Walker class MockActiveStream;
2751f63219SJim Walker class MockDcpMessageProducers;
2838ba8360SJim Walker 
29ef22f9b0SDave Rigby /*
30ef22f9b0SDave Rigby  * Mock of the DcpProducer class.  Wraps the real DcpProducer, but exposes
31ef22f9b0SDave Rigby  * normally protected methods publically for test purposes.
32ef22f9b0SDave Rigby  */
33ef22f9b0SDave Rigby class MockDcpProducer : public DcpProducer {
34ef22f9b0SDave Rigby public:
35ef22f9b0SDave Rigby     MockDcpProducer(EventuallyPersistentEngine& theEngine,
36ef22f9b0SDave Rigby                     const void* cookie,
37ef22f9b0SDave Rigby                     const std::string& name,
3814fcb066SJim Walker                     uint32_t flags,
393718696eSDave Rigby                     bool startTask = true);
40ef22f9b0SDave Rigby 
41d4c51818SJames Harrison     using DcpProducer::updateStreamsMap;
42d4c51818SJames Harrison 
maybeDisconnect()43ef22f9b0SDave Rigby     ENGINE_ERROR_CODE maybeDisconnect() {
44ef22f9b0SDave Rigby         return DcpProducer::maybeDisconnect();
45ef22f9b0SDave Rigby     }
46ef22f9b0SDave Rigby 
maybeSendNoop(struct dcp_message_producers * producers)47ef22f9b0SDave Rigby     ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers) {
48ef22f9b0SDave Rigby         return DcpProducer::maybeSendNoop(producers);
49ef22f9b0SDave Rigby     }
50ef22f9b0SDave Rigby 
setLastReceiveTime(const rel_time_t timeValue)51ef22f9b0SDave Rigby     void setLastReceiveTime(const rel_time_t timeValue) {
52ef22f9b0SDave Rigby         lastReceiveTime = timeValue;
53ef22f9b0SDave Rigby     }
54ef22f9b0SDave Rigby 
setNoopSendTime(const rel_time_t timeValue)55ef22f9b0SDave Rigby     void setNoopSendTime(const rel_time_t timeValue) {
56ef22f9b0SDave Rigby         noopCtx.sendTime = timeValue;
57ef22f9b0SDave Rigby     }
58ef22f9b0SDave Rigby 
getNoopSendTime()59ef22f9b0SDave Rigby     rel_time_t getNoopSendTime() {
60ef22f9b0SDave Rigby         return noopCtx.sendTime;
61ef22f9b0SDave Rigby     }
62ef22f9b0SDave Rigby 
getNoopPendingRecv()63ef22f9b0SDave Rigby     bool getNoopPendingRecv() {
64ef22f9b0SDave Rigby         return noopCtx.pendingRecv;
65ef22f9b0SDave Rigby     }
66ef22f9b0SDave Rigby 
setNoopEnabled(const bool booleanValue)67ef22f9b0SDave Rigby     void setNoopEnabled(const bool booleanValue) {
68ef22f9b0SDave Rigby         noopCtx.enabled = booleanValue;
69ef22f9b0SDave Rigby     }
70ef22f9b0SDave Rigby 
getNoopEnabled()71ef22f9b0SDave Rigby     bool getNoopEnabled() {
72ef22f9b0SDave Rigby         return noopCtx.enabled;
73ef22f9b0SDave Rigby     }
74ef22f9b0SDave Rigby 
setDCPExpiry(bool value)7560b97802SChristopher Farman     void setDCPExpiry(bool value) {
7660b97802SChristopher Farman         enableExpiryOpcode = value;
77f199e12eSChristopher Farman         if (enableExpiryOpcode) {
78f199e12eSChristopher Farman             // Expiry opcode uses the same encoding as deleteV2 (includes
79f199e12eSChristopher Farman             // delete time); therefore a client enabling expiry_opcode also
80f199e12eSChristopher Farman             // implicitly enables includeDeletetime.
81f199e12eSChristopher Farman             includeDeleteTime = IncludeDeleteTime::Yes;
82f199e12eSChristopher Farman         }
8360b97802SChristopher Farman     }
8460b97802SChristopher Farman 
getDCPExpiry()8560b97802SChristopher Farman     bool getDCPExpiry() const {
8660b97802SChristopher Farman         return enableExpiryOpcode;
8760b97802SChristopher Farman     }
8860b97802SChristopher Farman 
setSyncReplication(SyncReplication value)89d148b3d9SBen Huddleston     void setSyncReplication(SyncReplication value) {
9013f946abSDave Rigby         supportsSyncReplication = value;
9113f946abSDave Rigby     }
92d148b3d9SBen Huddleston 
93ef22f9b0SDave Rigby     /**
94ef22f9b0SDave Rigby      * Create the ActiveStreamCheckpointProcessorTask and assign to
95136cdca7SJames Harrison      * checkpointCreator->task
96ef22f9b0SDave Rigby      */
createCheckpointProcessorTask()97ef22f9b0SDave Rigby     void createCheckpointProcessorTask() {
98ef22f9b0SDave Rigby         DcpProducer::createCheckpointProcessorTask();
99ef22f9b0SDave Rigby     }
100ef22f9b0SDave Rigby 
101ef22f9b0SDave Rigby     /**
102136cdca7SJames Harrison      * Schedule the checkpointCreator->task on the ExecutorPool
103ef22f9b0SDave Rigby      */
scheduleCheckpointProcessorTask()104ef22f9b0SDave Rigby     void scheduleCheckpointProcessorTask() {
105ef22f9b0SDave Rigby         DcpProducer::scheduleCheckpointProcessorTask();
106ef22f9b0SDave Rigby     }
107ef22f9b0SDave Rigby 
10874881e67SBen Huddleston     ActiveStreamCheckpointProcessorTask* getCheckpointSnapshotTask() const;
109ef22f9b0SDave Rigby 
110ef22f9b0SDave Rigby     /**
111ef22f9b0SDave Rigby      * Finds the stream for a given vbucket
112ef22f9b0SDave Rigby      */
11356d72278SJim Walker     std::shared_ptr<Stream> findStream(Vbid vbid);
114ef22f9b0SDave Rigby 
115ef22f9b0SDave Rigby     /**
116787071edSJim Walker      * Finds the stream for a given vbucket/sid
117787071edSJim Walker      * @returns a pair where second indicates if the VB has no entries at all
118787071edSJim Walker      *          first is the stream (or null if no stream)
119787071edSJim Walker      */
120787071edSJim Walker     std::pair<std::shared_ptr<Stream>, bool> findStream(
121787071edSJim Walker             Vbid vbid, cb::mcbp::DcpStreamId sid);
122787071edSJim Walker 
123787071edSJim Walker     /**
124ef22f9b0SDave Rigby      * Sets the backfill buffer size (max limit) to a particular value
125ef22f9b0SDave Rigby      */
1263718696eSDave Rigby     void setBackfillBufferSize(size_t newSize);
127ef22f9b0SDave Rigby 
1283718696eSDave Rigby     bool getBackfillBufferFullStatus();
12914fcb066SJim Walker 
130b0449c4fSPaolo Cocchi     /*
131b0449c4fSPaolo Cocchi      * @return A reference to BackfillManager::scanBuffer
132b0449c4fSPaolo Cocchi      */
1333718696eSDave Rigby     BackfillScanBuffer& public_getBackfillScanBuffer();
134b0449c4fSPaolo Cocchi 
1353718696eSDave Rigby     void bytesForceRead(size_t bytes);
13606433454SJames Harrison 
getBFM()13706433454SJames Harrison     BackfillManager& getBFM() {
138fa8692d7SDave Rigby         return *(backfillMgr.load());
13906433454SJames Harrison     }
14079cf5688SJim Walker 
getBFMPtr()141715d4c32SBen Huddleston     BackfillManager* getBFMPtr() {
142715d4c32SBen Huddleston         return backfillMgr.load().get();
143715d4c32SBen Huddleston     }
144715d4c32SBen Huddleston 
getBytesOutstanding()14579cf5688SJim Walker     size_t getBytesOutstanding() const {
14679cf5688SJim Walker         return log.getBytesOutstanding();
14779cf5688SJim Walker     }
14849d69f53SJim Walker 
ackBytesOutstanding(size_t bytes)1492e084d16SPaolo Cocchi     void ackBytesOutstanding(size_t bytes) {
1502e084d16SPaolo Cocchi         return log.acknowledge(bytes);
1512e084d16SPaolo Cocchi     }
1522e084d16SPaolo Cocchi 
getReadyQueue()15323091b85SBen Huddleston     VBReadyQueue& getReadyQueue() {
15466985a89SChristopher Farman         return ready;
15566985a89SChristopher Farman     }
15666985a89SChristopher Farman 
157787236b3SJim Walker     /**
158787236b3SJim Walker      * Place a mock active stream into the producer
159787236b3SJim Walker      */
16038ba8360SJim Walker     std::shared_ptr<MockActiveStream> mockActiveStreamRequest(
16138ba8360SJim Walker             uint32_t flags,
162787236b3SJim Walker             uint32_t opaque,
163787236b3SJim Walker             VBucket& vb,
164787236b3SJim Walker             uint64_t start_seqno,
165787236b3SJim Walker             uint64_t end_seqno,
166787236b3SJim Walker             uint64_t vbucket_uuid,
167787236b3SJim Walker             uint64_t snap_start_seqno,
168d4c51818SJames Harrison             uint64_t snap_end_seqno,
169d4c51818SJames Harrison             IncludeValue includeValue = IncludeValue::Yes,
170*3268b913SPaolo Cocchi             IncludeXattrs includeXattrs = IncludeXattrs::Yes,
171*3268b913SPaolo Cocchi             IncludeDeletedUserXattrs includeDeleteUserXattrs =
172*3268b913SPaolo Cocchi                     IncludeDeletedUserXattrs::No);
173db693082SJim Walker 
174db693082SJim Walker     /**
175db693082SJim Walker      * Step the producer and expect the opcode to be returned
176db693082SJim Walker      */
17751f63219SJim Walker     ENGINE_ERROR_CODE stepAndExpect(MockDcpMessageProducers* producers,
178db693082SJim Walker                                     cb::mcbp::ClientOpcode expectedOpcode);
1794dabb0baSJim Walker 
1802b76e028SDave Rigby     /**
1812b76e028SDave Rigby      * Call step(), but wrap the producers with a DcpMsgProducersBorderGuard (
1822b76e028SDave Rigby      * in the same way it is called from EvPEngine::step().
1832b76e028SDave Rigby      *
1842b76e028SDave Rigby      * Useful when operating directly on a (Mock)DcpProducer object without
1852b76e028SDave Rigby      * ep_engine, but need to ensure currentEngine switching is still correct.
1862b76e028SDave Rigby      */
1872b76e028SDave Rigby     ENGINE_ERROR_CODE stepWithBorderGuard(dcp_message_producers& producers);
1882b76e028SDave Rigby 
enableMultipleStreamRequests()1894dabb0baSJim Walker     void enableMultipleStreamRequests() {
1904dabb0baSJim Walker         multipleStreamRequests = MultipleStreamRequests::Yes;
1914dabb0baSJim Walker     }
192787071edSJim Walker 
enableStreamEndOnClientStreamClose()193787071edSJim Walker     void enableStreamEndOnClientStreamClose() {
194787071edSJim Walker         sendStreamEndOnClientStreamClose = true;
195787071edSJim Walker     }
196dee7c17eSJames Harrison 
scheduleBackfillManager(VBucket & vb,std::shared_ptr<ActiveStream> s,uint64_t start,uint64_t end)197dee7c17eSJames Harrison     bool scheduleBackfillManager(VBucket& vb,
198dee7c17eSJames Harrison                                  std::shared_ptr<ActiveStream> s,
199dee7c17eSJames Harrison                                  uint64_t start,
200dee7c17eSJames Harrison                                  uint64_t end) override {
201dee7c17eSJames Harrison         beforeScheduleBackfillCB(end);
202dee7c17eSJames Harrison         return DcpProducer::scheduleBackfillManager(
203dee7c17eSJames Harrison                 vb, std::move(s), start, end);
204dee7c17eSJames Harrison     }
205dee7c17eSJames Harrison 
setBeforeScheduleBackfillCB(std::function<void (uint64_t)> backfillCB)206dee7c17eSJames Harrison     void setBeforeScheduleBackfillCB(std::function<void(uint64_t)> backfillCB) {
207dee7c17eSJames Harrison         beforeScheduleBackfillCB = backfillCB;
208dee7c17eSJames Harrison     }
209dee7c17eSJames Harrison 
210dee7c17eSJames Harrison     std::function<void(uint64_t)> beforeScheduleBackfillCB = [](uint64_t) {};
211255c7552SBen Huddleston 
setCloseAllStreamsHook(std::function<void ()> hook)212255c7552SBen Huddleston     void setCloseAllStreamsHook(std::function<void()> hook) {
213255c7552SBen Huddleston         closeAllStreamsHook = hook;
214255c7552SBen Huddleston     }
215e6a018e4SBen Huddleston 
setCloseAllStreamsPostLockHook(std::function<void ()> hook)216715d4c32SBen Huddleston     void setCloseAllStreamsPostLockHook(std::function<void()> hook) {
217715d4c32SBen Huddleston         closeAllStreamsPostLockHook = hook;
218715d4c32SBen Huddleston     }
219715d4c32SBen Huddleston 
setCloseAllStreamsPreLockHook(std::function<void ()> hook)220715d4c32SBen Huddleston     void setCloseAllStreamsPreLockHook(std::function<void()> hook) {
221715d4c32SBen Huddleston         closeAllStreamsPreLockHook = hook;
222715d4c32SBen Huddleston     }
223715d4c32SBen Huddleston 
setSeqnoAckHook(std::function<void ()> hook)224e6a018e4SBen Huddleston     void setSeqnoAckHook(std::function<void()> hook) {
225e6a018e4SBen Huddleston         seqnoAckHook = hook;
226e6a018e4SBen Huddleston     }
227c3663847SPaolo Cocchi 
public_getIncludeValue()228*3268b913SPaolo Cocchi     IncludeValue public_getIncludeValue() const {
229*3268b913SPaolo Cocchi         return includeValue;
230*3268b913SPaolo Cocchi     }
231*3268b913SPaolo Cocchi 
public_getIncludeXattrs()232*3268b913SPaolo Cocchi     IncludeXattrs public_getIncludeXattrs() const {
233*3268b913SPaolo Cocchi         return includeXattrs;
234*3268b913SPaolo Cocchi     }
235*3268b913SPaolo Cocchi 
public_getIncludeDeletedUserXattrs()236c3663847SPaolo Cocchi     IncludeDeletedUserXattrs public_getIncludeDeletedUserXattrs() const {
237c3663847SPaolo Cocchi         return includeDeletedUserXattrs;
238c3663847SPaolo Cocchi     }
239ef22f9b0SDave Rigby };
240