1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2018 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#include "mock_dcp_producer.h" 19 20#include "dcp/active_stream_checkpoint_processor_task.h" 21#include "dcp/msg_producers_border_guard.h" 22#include "dcp/response.h" 23#include "mock_dcp.h" 24#include "mock_dcp_backfill_mgr.h" 25#include "mock_stream.h" 26#include "vbucket.h" 27 28#include <folly/portability/GTest.h> 29 30extern cb::mcbp::ClientOpcode last_op; 31 32MockDcpProducer::MockDcpProducer(EventuallyPersistentEngine& theEngine, 33 const void* cookie, 34 const std::string& name, 35 uint32_t flags, 36 bool startTask) 37 : DcpProducer(theEngine, cookie, name, flags, startTask) { 38 backfillMgr = std::make_shared<MockDcpBackfillManager>(engine_); 39} 40 41std::shared_ptr<MockActiveStream> MockDcpProducer::mockActiveStreamRequest( 42 uint32_t flags, 43 uint32_t opaque, 44 VBucket& vb, 45 uint64_t start_seqno, 46 uint64_t end_seqno, 47 uint64_t vbucket_uuid, 48 uint64_t snap_start_seqno, 49 uint64_t snap_end_seqno, 50 IncludeValue includeValue, 51 IncludeXattrs includeXattrs, 52 IncludeDeletedUserXattrs includeDeletedUserXattrs) { 53 auto stream = std::make_shared<MockActiveStream>( 54 static_cast<EventuallyPersistentEngine*>(&engine_), 55 std::static_pointer_cast<MockDcpProducer>(shared_from_this()), 56 flags, 57 opaque, 58 vb, 59 start_seqno, 60 end_seqno, 61 vbucket_uuid, 62 snap_start_seqno, 63 snap_end_seqno, 64 includeValue, 65 includeXattrs, 66 includeDeletedUserXattrs); 67 stream->setActive(); 68 69 auto baseStream = dynamic_pointer_cast<Stream>(stream); 70 updateStreamsMap(vb.getId(), stream->getStreamId(), baseStream); 71 72 auto found = streams.find(vb.getId().get()); 73 if (found == streams.end()) { 74 throw std::logic_error( 75 "MockDcpProducer::mockActiveStreamRequest " 76 "failed to insert requested stream"); 77 } 78 notifyStreamReady(vb.getId()); 79 return stream; 80} 81 82ENGINE_ERROR_CODE MockDcpProducer::stepAndExpect( 83 MockDcpMessageProducers* producers, 84 cb::mcbp::ClientOpcode expectedOpcode) { 85 auto rv = step(producers); 86 EXPECT_EQ(expectedOpcode, producers->last_op); 87 return rv; 88} 89 90ENGINE_ERROR_CODE MockDcpProducer::stepWithBorderGuard( 91 dcp_message_producers& producers) { 92 DcpMsgProducersBorderGuard guardedProducers(producers); 93 return step(&guardedProducers); 94} 95 96std::shared_ptr<Stream> MockDcpProducer::findStream(Vbid vbid) { 97 auto rv = streams.find(vbid.get()); 98 if (rv != streams.end()) { 99 auto handle = rv->second->rlock(); 100 // An empty StreamContainer for this vbid is allowed 101 if (handle.size() == 0) { 102 return nullptr; 103 } 104 105 if (handle.size() != 1) { 106 throw std::logic_error( 107 "MockDcpProducer::findStream against producer with many " 108 "streams size:" + 109 std::to_string(handle.size())); 110 } 111 return handle.get(); 112 } 113 return nullptr; 114} 115 116ActiveStreamCheckpointProcessorTask* 117MockDcpProducer::getCheckpointSnapshotTask() const { 118 LockHolder guard(checkpointCreator->mutex); 119 return static_cast<ActiveStreamCheckpointProcessorTask*>( 120 checkpointCreator->task.get()); 121} 122 123std::pair<std::shared_ptr<Stream>, bool> MockDcpProducer::findStream( 124 Vbid vbid, cb::mcbp::DcpStreamId sid) { 125 auto rv = streams.find(vbid.get()); 126 if (rv != streams.end()) { 127 auto handle = rv->second->rlock(); 128 // Try and locate a matching stream 129 for (; !handle.end(); handle.next()) { 130 if (handle.get()->compareStreamId(sid)) { 131 return {handle.get(), true}; 132 } 133 } 134 return {nullptr, handle.size() > 0}; 135 } 136 return {nullptr, false}; 137} 138 139void MockDcpProducer::setBackfillBufferSize(size_t newSize) { 140 return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load()) 141 ->setBackfillBufferSize(newSize); 142} 143 144bool MockDcpProducer::getBackfillBufferFullStatus() { 145 return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load()) 146 ->getBackfillBufferFullStatus(); 147} 148 149BackfillScanBuffer& MockDcpProducer::public_getBackfillScanBuffer() { 150 return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load()) 151 ->public_getBackfillScanBuffer(); 152} 153 154void MockDcpProducer::bytesForceRead(size_t bytes) { 155 backfillMgr->bytesForceRead(bytes); 156} 157