1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2018 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "mock_dcp_producer.h"
19 
20 #include "dcp/active_stream_checkpoint_processor_task.h"
21 #include "dcp/msg_producers_border_guard.h"
22 #include "dcp/response.h"
23 #include "mock_dcp.h"
24 #include "mock_dcp_backfill_mgr.h"
25 #include "mock_stream.h"
26 #include "vbucket.h"
27 
28 #include <folly/portability/GTest.h>
29 
30 extern cb::mcbp::ClientOpcode last_op;
31 
MockDcpProducer(EventuallyPersistentEngine& theEngine, const void* cookie, const std::string& name, uint32_t flags, bool startTask)32 MockDcpProducer::MockDcpProducer(EventuallyPersistentEngine& theEngine,
33                                  const void* cookie,
34                                  const std::string& name,
35                                  uint32_t flags,
36                                  bool startTask)
37     : DcpProducer(theEngine, cookie, name, flags, startTask) {
38     backfillMgr = std::make_shared<MockDcpBackfillManager>(engine_);
39 }
40 
mockActiveStreamRequest( uint32_t flags, uint32_t opaque, VBucket& vb, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, IncludeValue includeValue, IncludeXattrs includeXattrs, IncludeDeletedUserXattrs includeDeletedUserXattrs)41 std::shared_ptr<MockActiveStream> MockDcpProducer::mockActiveStreamRequest(
42         uint32_t flags,
43         uint32_t opaque,
44         VBucket& vb,
45         uint64_t start_seqno,
46         uint64_t end_seqno,
47         uint64_t vbucket_uuid,
48         uint64_t snap_start_seqno,
49         uint64_t snap_end_seqno,
50         IncludeValue includeValue,
51         IncludeXattrs includeXattrs,
52         IncludeDeletedUserXattrs includeDeletedUserXattrs) {
53     auto stream = std::make_shared<MockActiveStream>(
54             static_cast<EventuallyPersistentEngine*>(&engine_),
55             std::static_pointer_cast<MockDcpProducer>(shared_from_this()),
56             flags,
57             opaque,
58             vb,
59             start_seqno,
60             end_seqno,
61             vbucket_uuid,
62             snap_start_seqno,
63             snap_end_seqno,
64             includeValue,
65             includeXattrs,
66             includeDeletedUserXattrs);
67     stream->setActive();
68 
69     auto baseStream = dynamic_pointer_cast<Stream>(stream);
70     updateStreamsMap(vb.getId(), stream->getStreamId(), baseStream);
71 
72     auto found = streams.find(vb.getId().get());
73     if (found == streams.end()) {
74         throw std::logic_error(
75                 "MockDcpProducer::mockActiveStreamRequest "
76                 "failed to insert requested stream");
77     }
78     notifyStreamReady(vb.getId());
79     return stream;
80 }
81 
stepAndExpect( MockDcpMessageProducers* producers, cb::mcbp::ClientOpcode expectedOpcode)82 ENGINE_ERROR_CODE MockDcpProducer::stepAndExpect(
83         MockDcpMessageProducers* producers,
84         cb::mcbp::ClientOpcode expectedOpcode) {
85     auto rv = step(producers);
86     EXPECT_EQ(expectedOpcode, producers->last_op);
87     return rv;
88 }
89 
stepWithBorderGuard( dcp_message_producers& producers)90 ENGINE_ERROR_CODE MockDcpProducer::stepWithBorderGuard(
91         dcp_message_producers& producers) {
92     DcpMsgProducersBorderGuard guardedProducers(producers);
93     return step(&guardedProducers);
94 }
95 
findStream(Vbid vbid)96 std::shared_ptr<Stream> MockDcpProducer::findStream(Vbid vbid) {
97     auto rv = streams.find(vbid.get());
98     if (rv != streams.end()) {
99         auto handle = rv->second->rlock();
100         // An empty StreamContainer for this vbid is allowed
101         if (handle.size() == 0) {
102             return nullptr;
103         }
104 
105         if (handle.size() != 1) {
106             throw std::logic_error(
107                     "MockDcpProducer::findStream against producer with many "
108                     "streams size:" +
109                     std::to_string(handle.size()));
110         }
111         return handle.get();
112     }
113     return nullptr;
114 }
115 
116 ActiveStreamCheckpointProcessorTask*
getCheckpointSnapshotTask() const117 MockDcpProducer::getCheckpointSnapshotTask() const {
118     LockHolder guard(checkpointCreator->mutex);
119     return static_cast<ActiveStreamCheckpointProcessorTask*>(
120             checkpointCreator->task.get());
121 }
122 
findStream( Vbid vbid, cb::mcbp::DcpStreamId sid)123 std::pair<std::shared_ptr<Stream>, bool> MockDcpProducer::findStream(
124         Vbid vbid, cb::mcbp::DcpStreamId sid) {
125     auto rv = streams.find(vbid.get());
126     if (rv != streams.end()) {
127         auto handle = rv->second->rlock();
128         // Try and locate a matching stream
129         for (; !handle.end(); handle.next()) {
130             if (handle.get()->compareStreamId(sid)) {
131                 return {handle.get(), true};
132             }
133         }
134         return {nullptr, handle.size() > 0};
135     }
136     return {nullptr, false};
137 }
138 
setBackfillBufferSize(size_t newSize)139 void MockDcpProducer::setBackfillBufferSize(size_t newSize) {
140     return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
141             ->setBackfillBufferSize(newSize);
142 }
143 
getBackfillBufferFullStatus()144 bool MockDcpProducer::getBackfillBufferFullStatus() {
145     return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
146             ->getBackfillBufferFullStatus();
147 }
148 
public_getBackfillScanBuffer()149 BackfillScanBuffer& MockDcpProducer::public_getBackfillScanBuffer() {
150     return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
151             ->public_getBackfillScanBuffer();
152 }
153 
bytesForceRead(size_t bytes)154 void MockDcpProducer::bytesForceRead(size_t bytes) {
155     backfillMgr->bytesForceRead(bytes);
156 }
157