1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2016 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#pragma once 19 20#include "dcp/active_stream.h" 21#include "dcp/producer.h" 22#include "dcp/stream.h" 23 24class ActiveStreamCheckpointProcessorTask; 25struct BackfillScanBuffer; 26class MockActiveStream; 27class MockDcpMessageProducers; 28 29/* 30 * Mock of the DcpProducer class. Wraps the real DcpProducer, but exposes 31 * normally protected methods publically for test purposes. 32 */ 33class MockDcpProducer : public DcpProducer { 34public: 35 MockDcpProducer(EventuallyPersistentEngine& theEngine, 36 const void* cookie, 37 const std::string& name, 38 uint32_t flags, 39 bool startTask = true); 40 41 using DcpProducer::updateStreamsMap; 42 43 ENGINE_ERROR_CODE maybeDisconnect() { 44 return DcpProducer::maybeDisconnect(); 45 } 46 47 ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers) { 48 return DcpProducer::maybeSendNoop(producers); 49 } 50 51 void setLastReceiveTime(const rel_time_t timeValue) { 52 lastReceiveTime = timeValue; 53 } 54 55 void setNoopSendTime(const rel_time_t timeValue) { 56 noopCtx.sendTime = timeValue; 57 } 58 59 rel_time_t getNoopSendTime() { 60 return noopCtx.sendTime; 61 } 62 63 bool getNoopPendingRecv() { 64 return noopCtx.pendingRecv; 65 } 66 67 void setNoopEnabled(const bool booleanValue) { 68 noopCtx.enabled = booleanValue; 69 } 70 71 bool getNoopEnabled() { 72 return noopCtx.enabled; 73 } 74 75 void setDCPExpiry(bool value) { 76 enableExpiryOpcode = value; 77 if (enableExpiryOpcode) { 78 // Expiry opcode uses the same encoding as deleteV2 (includes 79 // delete time); therefore a client enabling expiry_opcode also 80 // implicitly enables includeDeletetime. 81 includeDeleteTime = IncludeDeleteTime::Yes; 82 } 83 } 84 85 bool getDCPExpiry() const { 86 return enableExpiryOpcode; 87 } 88 89 void setSyncReplication(SyncReplication value) { 90 supportsSyncReplication = value; 91 } 92 93 /** 94 * Create the ActiveStreamCheckpointProcessorTask and assign to 95 * checkpointCreator->task 96 */ 97 void createCheckpointProcessorTask() { 98 DcpProducer::createCheckpointProcessorTask(); 99 } 100 101 /** 102 * Schedule the checkpointCreator->task on the ExecutorPool 103 */ 104 void scheduleCheckpointProcessorTask() { 105 DcpProducer::scheduleCheckpointProcessorTask(); 106 } 107 108 ActiveStreamCheckpointProcessorTask* getCheckpointSnapshotTask() const; 109 110 /** 111 * Finds the stream for a given vbucket 112 */ 113 std::shared_ptr<Stream> findStream(Vbid vbid); 114 115 /** 116 * Finds the stream for a given vbucket/sid 117 * @returns a pair where second indicates if the VB has no entries at all 118 * first is the stream (or null if no stream) 119 */ 120 std::pair<std::shared_ptr<Stream>, bool> findStream( 121 Vbid vbid, cb::mcbp::DcpStreamId sid); 122 123 /** 124 * Sets the backfill buffer size (max limit) to a particular value 125 */ 126 void setBackfillBufferSize(size_t newSize); 127 128 bool getBackfillBufferFullStatus(); 129 130 /* 131 * @return A reference to BackfillManager::scanBuffer 132 */ 133 BackfillScanBuffer& public_getBackfillScanBuffer(); 134 135 void bytesForceRead(size_t bytes); 136 137 BackfillManager& getBFM() { 138 return *(backfillMgr.load()); 139 } 140 141 BackfillManager* getBFMPtr() { 142 return backfillMgr.load().get(); 143 } 144 145 size_t getBytesOutstanding() const { 146 return log.getBytesOutstanding(); 147 } 148 149 void ackBytesOutstanding(size_t bytes) { 150 return log.acknowledge(bytes); 151 } 152 153 VBReadyQueue& getReadyQueue() { 154 return ready; 155 } 156 157 /** 158 * Place a mock active stream into the producer 159 */ 160 std::shared_ptr<MockActiveStream> mockActiveStreamRequest( 161 uint32_t flags, 162 uint32_t opaque, 163 VBucket& vb, 164 uint64_t start_seqno, 165 uint64_t end_seqno, 166 uint64_t vbucket_uuid, 167 uint64_t snap_start_seqno, 168 uint64_t snap_end_seqno, 169 IncludeValue includeValue = IncludeValue::Yes, 170 IncludeXattrs includeXattrs = IncludeXattrs::Yes, 171 IncludeDeletedUserXattrs includeDeleteUserXattrs = 172 IncludeDeletedUserXattrs::No); 173 174 /** 175 * Step the producer and expect the opcode to be returned 176 */ 177 ENGINE_ERROR_CODE stepAndExpect(MockDcpMessageProducers* producers, 178 cb::mcbp::ClientOpcode expectedOpcode); 179 180 /** 181 * Call step(), but wrap the producers with a DcpMsgProducersBorderGuard ( 182 * in the same way it is called from EvPEngine::step(). 183 * 184 * Useful when operating directly on a (Mock)DcpProducer object without 185 * ep_engine, but need to ensure currentEngine switching is still correct. 186 */ 187 ENGINE_ERROR_CODE stepWithBorderGuard(dcp_message_producers& producers); 188 189 void enableMultipleStreamRequests() { 190 multipleStreamRequests = MultipleStreamRequests::Yes; 191 } 192 193 void enableStreamEndOnClientStreamClose() { 194 sendStreamEndOnClientStreamClose = true; 195 } 196 197 bool scheduleBackfillManager(VBucket& vb, 198 std::shared_ptr<ActiveStream> s, 199 uint64_t start, 200 uint64_t end) override { 201 beforeScheduleBackfillCB(end); 202 return DcpProducer::scheduleBackfillManager( 203 vb, std::move(s), start, end); 204 } 205 206 void setBeforeScheduleBackfillCB(std::function<void(uint64_t)> backfillCB) { 207 beforeScheduleBackfillCB = backfillCB; 208 } 209 210 std::function<void(uint64_t)> beforeScheduleBackfillCB = [](uint64_t) {}; 211 212 void setCloseAllStreamsHook(std::function<void()> hook) { 213 closeAllStreamsHook = hook; 214 } 215 216 void setCloseAllStreamsPostLockHook(std::function<void()> hook) { 217 closeAllStreamsPostLockHook = hook; 218 } 219 220 void setCloseAllStreamsPreLockHook(std::function<void()> hook) { 221 closeAllStreamsPreLockHook = hook; 222 } 223 224 void setSeqnoAckHook(std::function<void()> hook) { 225 seqnoAckHook = hook; 226 } 227 228 IncludeValue public_getIncludeValue() const { 229 return includeValue; 230 } 231 232 IncludeXattrs public_getIncludeXattrs() const { 233 return includeXattrs; 234 } 235 236 IncludeDeletedUserXattrs public_getIncludeDeletedUserXattrs() const { 237 return includeDeletedUserXattrs; 238 } 239}; 240