1787236b3SJim Walker /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2787236b3SJim Walker /*
3787236b3SJim Walker * Copyright 2018 Couchbase, Inc.
4787236b3SJim Walker *
5787236b3SJim Walker * Licensed under the Apache License, Version 2.0 (the "License");
6787236b3SJim Walker * you may not use this file except in compliance with the License.
7787236b3SJim Walker * You may obtain a copy of the License at
8787236b3SJim Walker *
9787236b3SJim Walker * http://www.apache.org/licenses/LICENSE-2.0
10787236b3SJim Walker *
11787236b3SJim Walker * Unless required by applicable law or agreed to in writing, software
12787236b3SJim Walker * distributed under the License is distributed on an "AS IS" BASIS,
13787236b3SJim Walker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14787236b3SJim Walker * See the License for the specific language governing permissions and
15787236b3SJim Walker * limitations under the License.
16787236b3SJim Walker */
17787236b3SJim Walker
18787236b3SJim Walker #include "mock_dcp_producer.h"
199880aab0SDave Rigby
203718696eSDave Rigby #include "dcp/active_stream_checkpoint_processor_task.h"
212b76e028SDave Rigby #include "dcp/msg_producers_border_guard.h"
229880aab0SDave Rigby #include "dcp/response.h"
2351f63219SJim Walker #include "mock_dcp.h"
243718696eSDave Rigby #include "mock_dcp_backfill_mgr.h"
25787236b3SJim Walker #include "mock_stream.h"
263718696eSDave Rigby #include "vbucket.h"
27787236b3SJim Walker
282abcba17Sben_huddleston #include <folly/portability/GTest.h>
29db693082SJim Walker
3051f63219SJim Walker extern cb::mcbp::ClientOpcode last_op;
31db693082SJim Walker
MockDcpProducer(EventuallyPersistentEngine & theEngine,const void * cookie,const std::string & name,uint32_t flags,bool startTask)323718696eSDave Rigby MockDcpProducer::MockDcpProducer(EventuallyPersistentEngine& theEngine,
333718696eSDave Rigby const void* cookie,
343718696eSDave Rigby const std::string& name,
353718696eSDave Rigby uint32_t flags,
363718696eSDave Rigby bool startTask)
373718696eSDave Rigby : DcpProducer(theEngine, cookie, name, flags, startTask) {
383718696eSDave Rigby backfillMgr = std::make_shared<MockDcpBackfillManager>(engine_);
393718696eSDave Rigby }
403718696eSDave Rigby
mockActiveStreamRequest(uint32_t flags,uint32_t opaque,VBucket & vb,uint64_t start_seqno,uint64_t end_seqno,uint64_t vbucket_uuid,uint64_t snap_start_seqno,uint64_t snap_end_seqno,IncludeValue includeValue,IncludeXattrs includeXattrs,IncludeDeletedUserXattrs includeDeletedUserXattrs)4138ba8360SJim Walker std::shared_ptr<MockActiveStream> MockDcpProducer::mockActiveStreamRequest(
4238ba8360SJim Walker uint32_t flags,
43787236b3SJim Walker uint32_t opaque,
44787236b3SJim Walker VBucket& vb,
45787236b3SJim Walker uint64_t start_seqno,
46787236b3SJim Walker uint64_t end_seqno,
47787236b3SJim Walker uint64_t vbucket_uuid,
48787236b3SJim Walker uint64_t snap_start_seqno,
49d4c51818SJames Harrison uint64_t snap_end_seqno,
50d4c51818SJames Harrison IncludeValue includeValue,
51*3268b913SPaolo Cocchi IncludeXattrs includeXattrs,
52*3268b913SPaolo Cocchi IncludeDeletedUserXattrs includeDeletedUserXattrs) {
5349d69f53SJim Walker auto stream = std::make_shared<MockActiveStream>(
54787236b3SJim Walker static_cast<EventuallyPersistentEngine*>(&engine_),
5549d69f53SJim Walker std::static_pointer_cast<MockDcpProducer>(shared_from_this()),
56787236b3SJim Walker flags,
57787236b3SJim Walker opaque,
58787236b3SJim Walker vb,
59787236b3SJim Walker start_seqno,
60787236b3SJim Walker end_seqno,
61787236b3SJim Walker vbucket_uuid,
62787236b3SJim Walker snap_start_seqno,
63d4c51818SJames Harrison snap_end_seqno,
64d4c51818SJames Harrison includeValue,
65*3268b913SPaolo Cocchi includeXattrs,
66*3268b913SPaolo Cocchi includeDeletedUserXattrs);
67787236b3SJim Walker stream->setActive();
6856d72278SJim Walker
693d03132aSBen Huddleston auto baseStream = dynamic_pointer_cast<Stream>(stream);
703d03132aSBen Huddleston updateStreamsMap(vb.getId(), stream->getStreamId(), baseStream);
713d03132aSBen Huddleston
723d03132aSBen Huddleston auto found = streams.find(vb.getId().get());
733d03132aSBen Huddleston if (found == streams.end()) {
74787236b3SJim Walker throw std::logic_error(
75787236b3SJim Walker "MockDcpProducer::mockActiveStreamRequest "
76787236b3SJim Walker "failed to insert requested stream");
77787236b3SJim Walker }
78787236b3SJim Walker notifyStreamReady(vb.getId());
7938ba8360SJim Walker return stream;
80787236b3SJim Walker }
81db693082SJim Walker
stepAndExpect(MockDcpMessageProducers * producers,cb::mcbp::ClientOpcode expectedOpcode)82db693082SJim Walker ENGINE_ERROR_CODE MockDcpProducer::stepAndExpect(
8351f63219SJim Walker MockDcpMessageProducers* producers,
84db693082SJim Walker cb::mcbp::ClientOpcode expectedOpcode) {
85db693082SJim Walker auto rv = step(producers);
8651f63219SJim Walker EXPECT_EQ(expectedOpcode, producers->last_op);
87db693082SJim Walker return rv;
88db693082SJim Walker }
8956d72278SJim Walker
stepWithBorderGuard(dcp_message_producers & producers)902b76e028SDave Rigby ENGINE_ERROR_CODE MockDcpProducer::stepWithBorderGuard(
912b76e028SDave Rigby dcp_message_producers& producers) {
922b76e028SDave Rigby DcpMsgProducersBorderGuard guardedProducers(producers);
932b76e028SDave Rigby return step(&guardedProducers);
942b76e028SDave Rigby }
952b76e028SDave Rigby
findStream(Vbid vbid)9656d72278SJim Walker std::shared_ptr<Stream> MockDcpProducer::findStream(Vbid vbid) {
973d03132aSBen Huddleston auto rv = streams.find(vbid.get());
983d03132aSBen Huddleston if (rv != streams.end()) {
993d03132aSBen Huddleston auto handle = rv->second->rlock();
100a35b2f33SBen Huddleston // An empty StreamContainer for this vbid is allowed
101a35b2f33SBen Huddleston if (handle.size() == 0) {
102a35b2f33SBen Huddleston return nullptr;
103a35b2f33SBen Huddleston }
104a35b2f33SBen Huddleston
10556d72278SJim Walker if (handle.size() != 1) {
10656d72278SJim Walker throw std::logic_error(
10756d72278SJim Walker "MockDcpProducer::findStream against producer with many "
10856d72278SJim Walker "streams size:" +
10956d72278SJim Walker std::to_string(handle.size()));
11056d72278SJim Walker }
11156d72278SJim Walker return handle.get();
11256d72278SJim Walker }
11356d72278SJim Walker return nullptr;
11456d72278SJim Walker }
115787071edSJim Walker
11674881e67SBen Huddleston ActiveStreamCheckpointProcessorTask*
getCheckpointSnapshotTask() const1173718696eSDave Rigby MockDcpProducer::getCheckpointSnapshotTask() const {
1183718696eSDave Rigby LockHolder guard(checkpointCreator->mutex);
11974881e67SBen Huddleston return static_cast<ActiveStreamCheckpointProcessorTask*>(
1203718696eSDave Rigby checkpointCreator->task.get());
1213718696eSDave Rigby }
1223718696eSDave Rigby
findStream(Vbid vbid,cb::mcbp::DcpStreamId sid)123787071edSJim Walker std::pair<std::shared_ptr<Stream>, bool> MockDcpProducer::findStream(
124787071edSJim Walker Vbid vbid, cb::mcbp::DcpStreamId sid) {
1253d03132aSBen Huddleston auto rv = streams.find(vbid.get());
1263d03132aSBen Huddleston if (rv != streams.end()) {
1273d03132aSBen Huddleston auto handle = rv->second->rlock();
128787071edSJim Walker // Try and locate a matching stream
129787071edSJim Walker for (; !handle.end(); handle.next()) {
130787071edSJim Walker if (handle.get()->compareStreamId(sid)) {
131787071edSJim Walker return {handle.get(), true};
132787071edSJim Walker }
133787071edSJim Walker }
1343d03132aSBen Huddleston return {nullptr, handle.size() > 0};
135787071edSJim Walker }
136787071edSJim Walker return {nullptr, false};
137787071edSJim Walker }
1383718696eSDave Rigby
setBackfillBufferSize(size_t newSize)1393718696eSDave Rigby void MockDcpProducer::setBackfillBufferSize(size_t newSize) {
1403718696eSDave Rigby return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
1413718696eSDave Rigby ->setBackfillBufferSize(newSize);
1423718696eSDave Rigby }
1433718696eSDave Rigby
getBackfillBufferFullStatus()1443718696eSDave Rigby bool MockDcpProducer::getBackfillBufferFullStatus() {
1453718696eSDave Rigby return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
1463718696eSDave Rigby ->getBackfillBufferFullStatus();
1473718696eSDave Rigby }
1483718696eSDave Rigby
public_getBackfillScanBuffer()1493718696eSDave Rigby BackfillScanBuffer& MockDcpProducer::public_getBackfillScanBuffer() {
1503718696eSDave Rigby return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
1513718696eSDave Rigby ->public_getBackfillScanBuffer();
1523718696eSDave Rigby }
1533718696eSDave Rigby
bytesForceRead(size_t bytes)1543718696eSDave Rigby void MockDcpProducer::bytesForceRead(size_t bytes) {
1553718696eSDave Rigby backfillMgr->bytesForceRead(bytes);
1563718696eSDave Rigby }
157