1ef22f9b0SDave Rigby /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2ef22f9b0SDave Rigby /* 3ef22f9b0SDave Rigby * Copyright 2016 Couchbase, Inc. 4ef22f9b0SDave Rigby * 5ef22f9b0SDave Rigby * Licensed under the Apache License, Version 2.0 (the "License"); 6ef22f9b0SDave Rigby * you may not use this file except in compliance with the License. 7ef22f9b0SDave Rigby * You may obtain a copy of the License at 8ef22f9b0SDave Rigby * 9ef22f9b0SDave Rigby * http://www.apache.org/licenses/LICENSE-2.0 10ef22f9b0SDave Rigby * 11ef22f9b0SDave Rigby * Unless required by applicable law or agreed to in writing, software 12ef22f9b0SDave Rigby * distributed under the License is distributed on an "AS IS" BASIS, 13ef22f9b0SDave Rigby * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14ef22f9b0SDave Rigby * See the License for the specific language governing permissions and 15ef22f9b0SDave Rigby * limitations under the License. 16ef22f9b0SDave Rigby */ 17ef22f9b0SDave Rigby 18ef22f9b0SDave Rigby #pragma once 19ef22f9b0SDave Rigby 20af43f7cfSDave Rigby #include "dcp/active_stream.h" 21ef22f9b0SDave Rigby #include "dcp/producer.h" 22ef22f9b0SDave Rigby #include "dcp/stream.h" 23ef22f9b0SDave Rigby 243718696eSDave Rigby class ActiveStreamCheckpointProcessorTask; 253718696eSDave Rigby struct BackfillScanBuffer; 2638ba8360SJim Walker class MockActiveStream; 2751f63219SJim Walker class MockDcpMessageProducers; 2838ba8360SJim Walker 29ef22f9b0SDave Rigby /* 30ef22f9b0SDave Rigby * Mock of the DcpProducer class. Wraps the real DcpProducer, but exposes 31ef22f9b0SDave Rigby * normally protected methods publically for test purposes. 32ef22f9b0SDave Rigby */ 33ef22f9b0SDave Rigby class MockDcpProducer : public DcpProducer { 34ef22f9b0SDave Rigby public: 35ef22f9b0SDave Rigby MockDcpProducer(EventuallyPersistentEngine& theEngine, 36ef22f9b0SDave Rigby const void* cookie, 37ef22f9b0SDave Rigby const std::string& name, 3814fcb066SJim Walker uint32_t flags, 393718696eSDave Rigby bool startTask = true); 40ef22f9b0SDave Rigby 41d4c51818SJames Harrison using DcpProducer::updateStreamsMap; 42d4c51818SJames Harrison maybeDisconnect()43ef22f9b0SDave Rigby ENGINE_ERROR_CODE maybeDisconnect() { 44ef22f9b0SDave Rigby return DcpProducer::maybeDisconnect(); 45ef22f9b0SDave Rigby } 46ef22f9b0SDave Rigby maybeSendNoop(struct dcp_message_producers * producers)47ef22f9b0SDave Rigby ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers) { 48ef22f9b0SDave Rigby return DcpProducer::maybeSendNoop(producers); 49ef22f9b0SDave Rigby } 50ef22f9b0SDave Rigby setLastReceiveTime(const rel_time_t timeValue)51ef22f9b0SDave Rigby void setLastReceiveTime(const rel_time_t timeValue) { 52ef22f9b0SDave Rigby lastReceiveTime = timeValue; 53ef22f9b0SDave Rigby } 54ef22f9b0SDave Rigby setNoopSendTime(const rel_time_t timeValue)55ef22f9b0SDave Rigby void setNoopSendTime(const rel_time_t timeValue) { 56ef22f9b0SDave Rigby noopCtx.sendTime = timeValue; 57ef22f9b0SDave Rigby } 58ef22f9b0SDave Rigby getNoopSendTime()59ef22f9b0SDave Rigby rel_time_t getNoopSendTime() { 60ef22f9b0SDave Rigby return noopCtx.sendTime; 61ef22f9b0SDave Rigby } 62ef22f9b0SDave Rigby getNoopPendingRecv()63ef22f9b0SDave Rigby bool getNoopPendingRecv() { 64ef22f9b0SDave Rigby return noopCtx.pendingRecv; 65ef22f9b0SDave Rigby } 66ef22f9b0SDave Rigby setNoopEnabled(const bool booleanValue)67ef22f9b0SDave Rigby void setNoopEnabled(const bool booleanValue) { 68ef22f9b0SDave Rigby noopCtx.enabled = booleanValue; 69ef22f9b0SDave Rigby } 70ef22f9b0SDave Rigby getNoopEnabled()71ef22f9b0SDave Rigby bool getNoopEnabled() { 72ef22f9b0SDave Rigby return noopCtx.enabled; 73ef22f9b0SDave Rigby } 74ef22f9b0SDave Rigby setDCPExpiry(bool value)7560b97802SChristopher Farman void setDCPExpiry(bool value) { 7660b97802SChristopher Farman enableExpiryOpcode = value; 77f199e12eSChristopher Farman if (enableExpiryOpcode) { 78f199e12eSChristopher Farman // Expiry opcode uses the same encoding as deleteV2 (includes 79f199e12eSChristopher Farman // delete time); therefore a client enabling expiry_opcode also 80f199e12eSChristopher Farman // implicitly enables includeDeletetime. 81f199e12eSChristopher Farman includeDeleteTime = IncludeDeleteTime::Yes; 82f199e12eSChristopher Farman } 8360b97802SChristopher Farman } 8460b97802SChristopher Farman getDCPExpiry()8560b97802SChristopher Farman bool getDCPExpiry() const { 8660b97802SChristopher Farman return enableExpiryOpcode; 8760b97802SChristopher Farman } 8860b97802SChristopher Farman setSyncReplication(SyncReplication value)89d148b3d9SBen Huddleston void setSyncReplication(SyncReplication value) { 9013f946abSDave Rigby supportsSyncReplication = value; 9113f946abSDave Rigby } 92d148b3d9SBen Huddleston 93ef22f9b0SDave Rigby /** 94ef22f9b0SDave Rigby * Create the ActiveStreamCheckpointProcessorTask and assign to 95136cdca7SJames Harrison * checkpointCreator->task 96ef22f9b0SDave Rigby */ createCheckpointProcessorTask()97ef22f9b0SDave Rigby void createCheckpointProcessorTask() { 98ef22f9b0SDave Rigby DcpProducer::createCheckpointProcessorTask(); 99ef22f9b0SDave Rigby } 100ef22f9b0SDave Rigby 101ef22f9b0SDave Rigby /** 102136cdca7SJames Harrison * Schedule the checkpointCreator->task on the ExecutorPool 103ef22f9b0SDave Rigby */ scheduleCheckpointProcessorTask()104ef22f9b0SDave Rigby void scheduleCheckpointProcessorTask() { 105ef22f9b0SDave Rigby DcpProducer::scheduleCheckpointProcessorTask(); 106ef22f9b0SDave Rigby } 107ef22f9b0SDave Rigby 10874881e67SBen Huddleston ActiveStreamCheckpointProcessorTask* getCheckpointSnapshotTask() const; 109ef22f9b0SDave Rigby 110ef22f9b0SDave Rigby /** 111ef22f9b0SDave Rigby * Finds the stream for a given vbucket 112ef22f9b0SDave Rigby */ 11356d72278SJim Walker std::shared_ptr<Stream> findStream(Vbid vbid); 114ef22f9b0SDave Rigby 115ef22f9b0SDave Rigby /** 116787071edSJim Walker * Finds the stream for a given vbucket/sid 117787071edSJim Walker * @returns a pair where second indicates if the VB has no entries at all 118787071edSJim Walker * first is the stream (or null if no stream) 119787071edSJim Walker */ 120787071edSJim Walker std::pair<std::shared_ptr<Stream>, bool> findStream( 121787071edSJim Walker Vbid vbid, cb::mcbp::DcpStreamId sid); 122787071edSJim Walker 123787071edSJim Walker /** 124ef22f9b0SDave Rigby * Sets the backfill buffer size (max limit) to a particular value 125ef22f9b0SDave Rigby */ 1263718696eSDave Rigby void setBackfillBufferSize(size_t newSize); 127ef22f9b0SDave Rigby 1283718696eSDave Rigby bool getBackfillBufferFullStatus(); 12914fcb066SJim Walker 130b0449c4fSPaolo Cocchi /* 131b0449c4fSPaolo Cocchi * @return A reference to BackfillManager::scanBuffer 132b0449c4fSPaolo Cocchi */ 1333718696eSDave Rigby BackfillScanBuffer& public_getBackfillScanBuffer(); 134b0449c4fSPaolo Cocchi 1353718696eSDave Rigby void bytesForceRead(size_t bytes); 13606433454SJames Harrison getBFM()13706433454SJames Harrison BackfillManager& getBFM() { 138fa8692d7SDave Rigby return *(backfillMgr.load()); 13906433454SJames Harrison } 14079cf5688SJim Walker getBFMPtr()141715d4c32SBen Huddleston BackfillManager* getBFMPtr() { 142715d4c32SBen Huddleston return backfillMgr.load().get(); 143715d4c32SBen Huddleston } 144715d4c32SBen Huddleston getBytesOutstanding()14579cf5688SJim Walker size_t getBytesOutstanding() const { 14679cf5688SJim Walker return log.getBytesOutstanding(); 14779cf5688SJim Walker } 14849d69f53SJim Walker ackBytesOutstanding(size_t bytes)1492e084d16SPaolo Cocchi void ackBytesOutstanding(size_t bytes) { 1502e084d16SPaolo Cocchi return log.acknowledge(bytes); 1512e084d16SPaolo Cocchi } 1522e084d16SPaolo Cocchi getReadyQueue()15323091b85SBen Huddleston VBReadyQueue& getReadyQueue() { 15466985a89SChristopher Farman return ready; 15566985a89SChristopher Farman } 15666985a89SChristopher Farman 157787236b3SJim Walker /** 158787236b3SJim Walker * Place a mock active stream into the producer 159787236b3SJim Walker */ 16038ba8360SJim Walker std::shared_ptr<MockActiveStream> mockActiveStreamRequest( 16138ba8360SJim Walker uint32_t flags, 162787236b3SJim Walker uint32_t opaque, 163787236b3SJim Walker VBucket& vb, 164787236b3SJim Walker uint64_t start_seqno, 165787236b3SJim Walker uint64_t end_seqno, 166787236b3SJim Walker uint64_t vbucket_uuid, 167787236b3SJim Walker uint64_t snap_start_seqno, 168d4c51818SJames Harrison uint64_t snap_end_seqno, 169d4c51818SJames Harrison IncludeValue includeValue = IncludeValue::Yes, 170*3268b913SPaolo Cocchi IncludeXattrs includeXattrs = IncludeXattrs::Yes, 171*3268b913SPaolo Cocchi IncludeDeletedUserXattrs includeDeleteUserXattrs = 172*3268b913SPaolo Cocchi IncludeDeletedUserXattrs::No); 173db693082SJim Walker 174db693082SJim Walker /** 175db693082SJim Walker * Step the producer and expect the opcode to be returned 176db693082SJim Walker */ 17751f63219SJim Walker ENGINE_ERROR_CODE stepAndExpect(MockDcpMessageProducers* producers, 178db693082SJim Walker cb::mcbp::ClientOpcode expectedOpcode); 1794dabb0baSJim Walker 1802b76e028SDave Rigby /** 1812b76e028SDave Rigby * Call step(), but wrap the producers with a DcpMsgProducersBorderGuard ( 1822b76e028SDave Rigby * in the same way it is called from EvPEngine::step(). 1832b76e028SDave Rigby * 1842b76e028SDave Rigby * Useful when operating directly on a (Mock)DcpProducer object without 1852b76e028SDave Rigby * ep_engine, but need to ensure currentEngine switching is still correct. 1862b76e028SDave Rigby */ 1872b76e028SDave Rigby ENGINE_ERROR_CODE stepWithBorderGuard(dcp_message_producers& producers); 1882b76e028SDave Rigby enableMultipleStreamRequests()1894dabb0baSJim Walker void enableMultipleStreamRequests() { 1904dabb0baSJim Walker multipleStreamRequests = MultipleStreamRequests::Yes; 1914dabb0baSJim Walker } 192787071edSJim Walker enableStreamEndOnClientStreamClose()193787071edSJim Walker void enableStreamEndOnClientStreamClose() { 194787071edSJim Walker sendStreamEndOnClientStreamClose = true; 195787071edSJim Walker } 196dee7c17eSJames Harrison scheduleBackfillManager(VBucket & vb,std::shared_ptr<ActiveStream> s,uint64_t start,uint64_t end)197dee7c17eSJames Harrison bool scheduleBackfillManager(VBucket& vb, 198dee7c17eSJames Harrison std::shared_ptr<ActiveStream> s, 199dee7c17eSJames Harrison uint64_t start, 200dee7c17eSJames Harrison uint64_t end) override { 201dee7c17eSJames Harrison beforeScheduleBackfillCB(end); 202dee7c17eSJames Harrison return DcpProducer::scheduleBackfillManager( 203dee7c17eSJames Harrison vb, std::move(s), start, end); 204dee7c17eSJames Harrison } 205dee7c17eSJames Harrison setBeforeScheduleBackfillCB(std::function<void (uint64_t)> backfillCB)206dee7c17eSJames Harrison void setBeforeScheduleBackfillCB(std::function<void(uint64_t)> backfillCB) { 207dee7c17eSJames Harrison beforeScheduleBackfillCB = backfillCB; 208dee7c17eSJames Harrison } 209dee7c17eSJames Harrison 210dee7c17eSJames Harrison std::function<void(uint64_t)> beforeScheduleBackfillCB = [](uint64_t) {}; 211255c7552SBen Huddleston setCloseAllStreamsHook(std::function<void ()> hook)212255c7552SBen Huddleston void setCloseAllStreamsHook(std::function<void()> hook) { 213255c7552SBen Huddleston closeAllStreamsHook = hook; 214255c7552SBen Huddleston } 215e6a018e4SBen Huddleston setCloseAllStreamsPostLockHook(std::function<void ()> hook)216715d4c32SBen Huddleston void setCloseAllStreamsPostLockHook(std::function<void()> hook) { 217715d4c32SBen Huddleston closeAllStreamsPostLockHook = hook; 218715d4c32SBen Huddleston } 219715d4c32SBen Huddleston setCloseAllStreamsPreLockHook(std::function<void ()> hook)220715d4c32SBen Huddleston void setCloseAllStreamsPreLockHook(std::function<void()> hook) { 221715d4c32SBen Huddleston closeAllStreamsPreLockHook = hook; 222715d4c32SBen Huddleston } 223715d4c32SBen Huddleston setSeqnoAckHook(std::function<void ()> hook)224e6a018e4SBen Huddleston void setSeqnoAckHook(std::function<void()> hook) { 225e6a018e4SBen Huddleston seqnoAckHook = hook; 226e6a018e4SBen Huddleston } 227c3663847SPaolo Cocchi public_getIncludeValue()228*3268b913SPaolo Cocchi IncludeValue public_getIncludeValue() const { 229*3268b913SPaolo Cocchi return includeValue; 230*3268b913SPaolo Cocchi } 231*3268b913SPaolo Cocchi public_getIncludeXattrs()232*3268b913SPaolo Cocchi IncludeXattrs public_getIncludeXattrs() const { 233*3268b913SPaolo Cocchi return includeXattrs; 234*3268b913SPaolo Cocchi } 235*3268b913SPaolo Cocchi public_getIncludeDeletedUserXattrs()236c3663847SPaolo Cocchi IncludeDeletedUserXattrs public_getIncludeDeletedUserXattrs() const { 237c3663847SPaolo Cocchi return includeDeletedUserXattrs; 238c3663847SPaolo Cocchi } 239ef22f9b0SDave Rigby }; 240