1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2018 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "mock_dcp_producer.h"
19
20 #include "dcp/active_stream_checkpoint_processor_task.h"
21 #include "dcp/msg_producers_border_guard.h"
22 #include "dcp/response.h"
23 #include "mock_dcp.h"
24 #include "mock_dcp_backfill_mgr.h"
25 #include "mock_stream.h"
26 #include "vbucket.h"
27
28 #include <folly/portability/GTest.h>
29
30 extern cb::mcbp::ClientOpcode last_op;
31
MockDcpProducer(EventuallyPersistentEngine& theEngine, const void* cookie, const std::string& name, uint32_t flags, bool startTask)32 MockDcpProducer::MockDcpProducer(EventuallyPersistentEngine& theEngine,
33 const void* cookie,
34 const std::string& name,
35 uint32_t flags,
36 bool startTask)
37 : DcpProducer(theEngine, cookie, name, flags, startTask) {
38 backfillMgr = std::make_shared<MockDcpBackfillManager>(engine_);
39 }
40
mockActiveStreamRequest( uint32_t flags, uint32_t opaque, VBucket& vb, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, IncludeValue includeValue, IncludeXattrs includeXattrs, IncludeDeletedUserXattrs includeDeletedUserXattrs)41 std::shared_ptr<MockActiveStream> MockDcpProducer::mockActiveStreamRequest(
42 uint32_t flags,
43 uint32_t opaque,
44 VBucket& vb,
45 uint64_t start_seqno,
46 uint64_t end_seqno,
47 uint64_t vbucket_uuid,
48 uint64_t snap_start_seqno,
49 uint64_t snap_end_seqno,
50 IncludeValue includeValue,
51 IncludeXattrs includeXattrs,
52 IncludeDeletedUserXattrs includeDeletedUserXattrs) {
53 auto stream = std::make_shared<MockActiveStream>(
54 static_cast<EventuallyPersistentEngine*>(&engine_),
55 std::static_pointer_cast<MockDcpProducer>(shared_from_this()),
56 flags,
57 opaque,
58 vb,
59 start_seqno,
60 end_seqno,
61 vbucket_uuid,
62 snap_start_seqno,
63 snap_end_seqno,
64 includeValue,
65 includeXattrs,
66 includeDeletedUserXattrs);
67 stream->setActive();
68
69 auto baseStream = dynamic_pointer_cast<Stream>(stream);
70 updateStreamsMap(vb.getId(), stream->getStreamId(), baseStream);
71
72 auto found = streams.find(vb.getId().get());
73 if (found == streams.end()) {
74 throw std::logic_error(
75 "MockDcpProducer::mockActiveStreamRequest "
76 "failed to insert requested stream");
77 }
78 notifyStreamReady(vb.getId());
79 return stream;
80 }
81
stepAndExpect( MockDcpMessageProducers* producers, cb::mcbp::ClientOpcode expectedOpcode)82 ENGINE_ERROR_CODE MockDcpProducer::stepAndExpect(
83 MockDcpMessageProducers* producers,
84 cb::mcbp::ClientOpcode expectedOpcode) {
85 auto rv = step(producers);
86 EXPECT_EQ(expectedOpcode, producers->last_op);
87 return rv;
88 }
89
stepWithBorderGuard( dcp_message_producers& producers)90 ENGINE_ERROR_CODE MockDcpProducer::stepWithBorderGuard(
91 dcp_message_producers& producers) {
92 DcpMsgProducersBorderGuard guardedProducers(producers);
93 return step(&guardedProducers);
94 }
95
findStream(Vbid vbid)96 std::shared_ptr<Stream> MockDcpProducer::findStream(Vbid vbid) {
97 auto rv = streams.find(vbid.get());
98 if (rv != streams.end()) {
99 auto handle = rv->second->rlock();
100 // An empty StreamContainer for this vbid is allowed
101 if (handle.size() == 0) {
102 return nullptr;
103 }
104
105 if (handle.size() != 1) {
106 throw std::logic_error(
107 "MockDcpProducer::findStream against producer with many "
108 "streams size:" +
109 std::to_string(handle.size()));
110 }
111 return handle.get();
112 }
113 return nullptr;
114 }
115
116 ActiveStreamCheckpointProcessorTask*
getCheckpointSnapshotTask() const117 MockDcpProducer::getCheckpointSnapshotTask() const {
118 LockHolder guard(checkpointCreator->mutex);
119 return static_cast<ActiveStreamCheckpointProcessorTask*>(
120 checkpointCreator->task.get());
121 }
122
findStream( Vbid vbid, cb::mcbp::DcpStreamId sid)123 std::pair<std::shared_ptr<Stream>, bool> MockDcpProducer::findStream(
124 Vbid vbid, cb::mcbp::DcpStreamId sid) {
125 auto rv = streams.find(vbid.get());
126 if (rv != streams.end()) {
127 auto handle = rv->second->rlock();
128 // Try and locate a matching stream
129 for (; !handle.end(); handle.next()) {
130 if (handle.get()->compareStreamId(sid)) {
131 return {handle.get(), true};
132 }
133 }
134 return {nullptr, handle.size() > 0};
135 }
136 return {nullptr, false};
137 }
138
setBackfillBufferSize(size_t newSize)139 void MockDcpProducer::setBackfillBufferSize(size_t newSize) {
140 return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
141 ->setBackfillBufferSize(newSize);
142 }
143
getBackfillBufferFullStatus()144 bool MockDcpProducer::getBackfillBufferFullStatus() {
145 return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
146 ->getBackfillBufferFullStatus();
147 }
148
public_getBackfillScanBuffer()149 BackfillScanBuffer& MockDcpProducer::public_getBackfillScanBuffer() {
150 return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
151 ->public_getBackfillScanBuffer();
152 }
153
bytesForceRead(size_t bytes)154 void MockDcpProducer::bytesForceRead(size_t bytes) {
155 backfillMgr->bytesForceRead(bytes);
156 }
157