1787236b3SJim Walker /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2787236b3SJim Walker /*
3787236b3SJim Walker  *     Copyright 2018 Couchbase, Inc.
4787236b3SJim Walker  *
5787236b3SJim Walker  *   Licensed under the Apache License, Version 2.0 (the "License");
6787236b3SJim Walker  *   you may not use this file except in compliance with the License.
7787236b3SJim Walker  *   You may obtain a copy of the License at
8787236b3SJim Walker  *
9787236b3SJim Walker  *       http://www.apache.org/licenses/LICENSE-2.0
10787236b3SJim Walker  *
11787236b3SJim Walker  *   Unless required by applicable law or agreed to in writing, software
12787236b3SJim Walker  *   distributed under the License is distributed on an "AS IS" BASIS,
13787236b3SJim Walker  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14787236b3SJim Walker  *   See the License for the specific language governing permissions and
15787236b3SJim Walker  *   limitations under the License.
16787236b3SJim Walker  */
17787236b3SJim Walker 
18787236b3SJim Walker #include "mock_dcp_producer.h"
199880aab0SDave Rigby 
203718696eSDave Rigby #include "dcp/active_stream_checkpoint_processor_task.h"
212b76e028SDave Rigby #include "dcp/msg_producers_border_guard.h"
229880aab0SDave Rigby #include "dcp/response.h"
2351f63219SJim Walker #include "mock_dcp.h"
243718696eSDave Rigby #include "mock_dcp_backfill_mgr.h"
25787236b3SJim Walker #include "mock_stream.h"
263718696eSDave Rigby #include "vbucket.h"
27787236b3SJim Walker 
282abcba17Sben_huddleston #include <folly/portability/GTest.h>
29db693082SJim Walker 
3051f63219SJim Walker extern cb::mcbp::ClientOpcode last_op;
31db693082SJim Walker 
MockDcpProducer(EventuallyPersistentEngine & theEngine,const void * cookie,const std::string & name,uint32_t flags,bool startTask)323718696eSDave Rigby MockDcpProducer::MockDcpProducer(EventuallyPersistentEngine& theEngine,
333718696eSDave Rigby                                  const void* cookie,
343718696eSDave Rigby                                  const std::string& name,
353718696eSDave Rigby                                  uint32_t flags,
363718696eSDave Rigby                                  bool startTask)
373718696eSDave Rigby     : DcpProducer(theEngine, cookie, name, flags, startTask) {
383718696eSDave Rigby     backfillMgr = std::make_shared<MockDcpBackfillManager>(engine_);
393718696eSDave Rigby }
403718696eSDave Rigby 
mockActiveStreamRequest(uint32_t flags,uint32_t opaque,VBucket & vb,uint64_t start_seqno,uint64_t end_seqno,uint64_t vbucket_uuid,uint64_t snap_start_seqno,uint64_t snap_end_seqno,IncludeValue includeValue,IncludeXattrs includeXattrs,IncludeDeletedUserXattrs includeDeletedUserXattrs)4138ba8360SJim Walker std::shared_ptr<MockActiveStream> MockDcpProducer::mockActiveStreamRequest(
4238ba8360SJim Walker         uint32_t flags,
43787236b3SJim Walker         uint32_t opaque,
44787236b3SJim Walker         VBucket& vb,
45787236b3SJim Walker         uint64_t start_seqno,
46787236b3SJim Walker         uint64_t end_seqno,
47787236b3SJim Walker         uint64_t vbucket_uuid,
48787236b3SJim Walker         uint64_t snap_start_seqno,
49d4c51818SJames Harrison         uint64_t snap_end_seqno,
50d4c51818SJames Harrison         IncludeValue includeValue,
51*3268b913SPaolo Cocchi         IncludeXattrs includeXattrs,
52*3268b913SPaolo Cocchi         IncludeDeletedUserXattrs includeDeletedUserXattrs) {
5349d69f53SJim Walker     auto stream = std::make_shared<MockActiveStream>(
54787236b3SJim Walker             static_cast<EventuallyPersistentEngine*>(&engine_),
5549d69f53SJim Walker             std::static_pointer_cast<MockDcpProducer>(shared_from_this()),
56787236b3SJim Walker             flags,
57787236b3SJim Walker             opaque,
58787236b3SJim Walker             vb,
59787236b3SJim Walker             start_seqno,
60787236b3SJim Walker             end_seqno,
61787236b3SJim Walker             vbucket_uuid,
62787236b3SJim Walker             snap_start_seqno,
63d4c51818SJames Harrison             snap_end_seqno,
64d4c51818SJames Harrison             includeValue,
65*3268b913SPaolo Cocchi             includeXattrs,
66*3268b913SPaolo Cocchi             includeDeletedUserXattrs);
67787236b3SJim Walker     stream->setActive();
6856d72278SJim Walker 
693d03132aSBen Huddleston     auto baseStream = dynamic_pointer_cast<Stream>(stream);
703d03132aSBen Huddleston     updateStreamsMap(vb.getId(), stream->getStreamId(), baseStream);
713d03132aSBen Huddleston 
723d03132aSBen Huddleston     auto found = streams.find(vb.getId().get());
733d03132aSBen Huddleston     if (found == streams.end()) {
74787236b3SJim Walker         throw std::logic_error(
75787236b3SJim Walker                 "MockDcpProducer::mockActiveStreamRequest "
76787236b3SJim Walker                 "failed to insert requested stream");
77787236b3SJim Walker     }
78787236b3SJim Walker     notifyStreamReady(vb.getId());
7938ba8360SJim Walker     return stream;
80787236b3SJim Walker }
81db693082SJim Walker 
stepAndExpect(MockDcpMessageProducers * producers,cb::mcbp::ClientOpcode expectedOpcode)82db693082SJim Walker ENGINE_ERROR_CODE MockDcpProducer::stepAndExpect(
8351f63219SJim Walker         MockDcpMessageProducers* producers,
84db693082SJim Walker         cb::mcbp::ClientOpcode expectedOpcode) {
85db693082SJim Walker     auto rv = step(producers);
8651f63219SJim Walker     EXPECT_EQ(expectedOpcode, producers->last_op);
87db693082SJim Walker     return rv;
88db693082SJim Walker }
8956d72278SJim Walker 
stepWithBorderGuard(dcp_message_producers & producers)902b76e028SDave Rigby ENGINE_ERROR_CODE MockDcpProducer::stepWithBorderGuard(
912b76e028SDave Rigby         dcp_message_producers& producers) {
922b76e028SDave Rigby     DcpMsgProducersBorderGuard guardedProducers(producers);
932b76e028SDave Rigby     return step(&guardedProducers);
942b76e028SDave Rigby }
952b76e028SDave Rigby 
findStream(Vbid vbid)9656d72278SJim Walker std::shared_ptr<Stream> MockDcpProducer::findStream(Vbid vbid) {
973d03132aSBen Huddleston     auto rv = streams.find(vbid.get());
983d03132aSBen Huddleston     if (rv != streams.end()) {
993d03132aSBen Huddleston         auto handle = rv->second->rlock();
100a35b2f33SBen Huddleston         // An empty StreamContainer for this vbid is allowed
101a35b2f33SBen Huddleston         if (handle.size() == 0) {
102a35b2f33SBen Huddleston             return nullptr;
103a35b2f33SBen Huddleston         }
104a35b2f33SBen Huddleston 
10556d72278SJim Walker         if (handle.size() != 1) {
10656d72278SJim Walker             throw std::logic_error(
10756d72278SJim Walker                     "MockDcpProducer::findStream against producer with many "
10856d72278SJim Walker                     "streams size:" +
10956d72278SJim Walker                     std::to_string(handle.size()));
11056d72278SJim Walker         }
11156d72278SJim Walker         return handle.get();
11256d72278SJim Walker     }
11356d72278SJim Walker     return nullptr;
11456d72278SJim Walker }
115787071edSJim Walker 
11674881e67SBen Huddleston ActiveStreamCheckpointProcessorTask*
getCheckpointSnapshotTask() const1173718696eSDave Rigby MockDcpProducer::getCheckpointSnapshotTask() const {
1183718696eSDave Rigby     LockHolder guard(checkpointCreator->mutex);
11974881e67SBen Huddleston     return static_cast<ActiveStreamCheckpointProcessorTask*>(
1203718696eSDave Rigby             checkpointCreator->task.get());
1213718696eSDave Rigby }
1223718696eSDave Rigby 
findStream(Vbid vbid,cb::mcbp::DcpStreamId sid)123787071edSJim Walker std::pair<std::shared_ptr<Stream>, bool> MockDcpProducer::findStream(
124787071edSJim Walker         Vbid vbid, cb::mcbp::DcpStreamId sid) {
1253d03132aSBen Huddleston     auto rv = streams.find(vbid.get());
1263d03132aSBen Huddleston     if (rv != streams.end()) {
1273d03132aSBen Huddleston         auto handle = rv->second->rlock();
128787071edSJim Walker         // Try and locate a matching stream
129787071edSJim Walker         for (; !handle.end(); handle.next()) {
130787071edSJim Walker             if (handle.get()->compareStreamId(sid)) {
131787071edSJim Walker                 return {handle.get(), true};
132787071edSJim Walker             }
133787071edSJim Walker         }
1343d03132aSBen Huddleston         return {nullptr, handle.size() > 0};
135787071edSJim Walker     }
136787071edSJim Walker     return {nullptr, false};
137787071edSJim Walker }
1383718696eSDave Rigby 
setBackfillBufferSize(size_t newSize)1393718696eSDave Rigby void MockDcpProducer::setBackfillBufferSize(size_t newSize) {
1403718696eSDave Rigby     return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
1413718696eSDave Rigby             ->setBackfillBufferSize(newSize);
1423718696eSDave Rigby }
1433718696eSDave Rigby 
getBackfillBufferFullStatus()1443718696eSDave Rigby bool MockDcpProducer::getBackfillBufferFullStatus() {
1453718696eSDave Rigby     return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
1463718696eSDave Rigby             ->getBackfillBufferFullStatus();
1473718696eSDave Rigby }
1483718696eSDave Rigby 
public_getBackfillScanBuffer()1493718696eSDave Rigby BackfillScanBuffer& MockDcpProducer::public_getBackfillScanBuffer() {
1503718696eSDave Rigby     return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
1513718696eSDave Rigby             ->public_getBackfillScanBuffer();
1523718696eSDave Rigby }
1533718696eSDave Rigby 
bytesForceRead(size_t bytes)1543718696eSDave Rigby void MockDcpProducer::bytesForceRead(size_t bytes) {
1553718696eSDave Rigby     backfillMgr->bytesForceRead(bytes);
1563718696eSDave Rigby }
157