1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2018 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "mock_dcp_producer.h"
19
20#include "dcp/active_stream_checkpoint_processor_task.h"
21#include "dcp/msg_producers_border_guard.h"
22#include "dcp/response.h"
23#include "mock_dcp.h"
24#include "mock_dcp_backfill_mgr.h"
25#include "mock_stream.h"
26#include "vbucket.h"
27
28#include <folly/portability/GTest.h>
29
30extern cb::mcbp::ClientOpcode last_op;
31
32MockDcpProducer::MockDcpProducer(EventuallyPersistentEngine& theEngine,
33                                 const void* cookie,
34                                 const std::string& name,
35                                 uint32_t flags,
36                                 bool startTask)
37    : DcpProducer(theEngine, cookie, name, flags, startTask) {
38    backfillMgr = std::make_shared<MockDcpBackfillManager>(engine_);
39}
40
41std::shared_ptr<MockActiveStream> MockDcpProducer::mockActiveStreamRequest(
42        uint32_t flags,
43        uint32_t opaque,
44        VBucket& vb,
45        uint64_t start_seqno,
46        uint64_t end_seqno,
47        uint64_t vbucket_uuid,
48        uint64_t snap_start_seqno,
49        uint64_t snap_end_seqno,
50        IncludeValue includeValue,
51        IncludeXattrs includeXattrs,
52        IncludeDeletedUserXattrs includeDeletedUserXattrs) {
53    auto stream = std::make_shared<MockActiveStream>(
54            static_cast<EventuallyPersistentEngine*>(&engine_),
55            std::static_pointer_cast<MockDcpProducer>(shared_from_this()),
56            flags,
57            opaque,
58            vb,
59            start_seqno,
60            end_seqno,
61            vbucket_uuid,
62            snap_start_seqno,
63            snap_end_seqno,
64            includeValue,
65            includeXattrs,
66            includeDeletedUserXattrs);
67    stream->setActive();
68
69    auto baseStream = dynamic_pointer_cast<Stream>(stream);
70    updateStreamsMap(vb.getId(), stream->getStreamId(), baseStream);
71
72    auto found = streams.find(vb.getId().get());
73    if (found == streams.end()) {
74        throw std::logic_error(
75                "MockDcpProducer::mockActiveStreamRequest "
76                "failed to insert requested stream");
77    }
78    notifyStreamReady(vb.getId());
79    return stream;
80}
81
82ENGINE_ERROR_CODE MockDcpProducer::stepAndExpect(
83        MockDcpMessageProducers* producers,
84        cb::mcbp::ClientOpcode expectedOpcode) {
85    auto rv = step(producers);
86    EXPECT_EQ(expectedOpcode, producers->last_op);
87    return rv;
88}
89
90ENGINE_ERROR_CODE MockDcpProducer::stepWithBorderGuard(
91        dcp_message_producers& producers) {
92    DcpMsgProducersBorderGuard guardedProducers(producers);
93    return step(&guardedProducers);
94}
95
96std::shared_ptr<Stream> MockDcpProducer::findStream(Vbid vbid) {
97    auto rv = streams.find(vbid.get());
98    if (rv != streams.end()) {
99        auto handle = rv->second->rlock();
100        // An empty StreamContainer for this vbid is allowed
101        if (handle.size() == 0) {
102            return nullptr;
103        }
104
105        if (handle.size() != 1) {
106            throw std::logic_error(
107                    "MockDcpProducer::findStream against producer with many "
108                    "streams size:" +
109                    std::to_string(handle.size()));
110        }
111        return handle.get();
112    }
113    return nullptr;
114}
115
116ActiveStreamCheckpointProcessorTask*
117MockDcpProducer::getCheckpointSnapshotTask() const {
118    LockHolder guard(checkpointCreator->mutex);
119    return static_cast<ActiveStreamCheckpointProcessorTask*>(
120            checkpointCreator->task.get());
121}
122
123std::pair<std::shared_ptr<Stream>, bool> MockDcpProducer::findStream(
124        Vbid vbid, cb::mcbp::DcpStreamId sid) {
125    auto rv = streams.find(vbid.get());
126    if (rv != streams.end()) {
127        auto handle = rv->second->rlock();
128        // Try and locate a matching stream
129        for (; !handle.end(); handle.next()) {
130            if (handle.get()->compareStreamId(sid)) {
131                return {handle.get(), true};
132            }
133        }
134        return {nullptr, handle.size() > 0};
135    }
136    return {nullptr, false};
137}
138
139void MockDcpProducer::setBackfillBufferSize(size_t newSize) {
140    return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
141            ->setBackfillBufferSize(newSize);
142}
143
144bool MockDcpProducer::getBackfillBufferFullStatus() {
145    return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
146            ->getBackfillBufferFullStatus();
147}
148
149BackfillScanBuffer& MockDcpProducer::public_getBackfillScanBuffer() {
150    return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
151            ->public_getBackfillScanBuffer();
152}
153
154void MockDcpProducer::bytesForceRead(size_t bytes) {
155    backfillMgr->bytesForceRead(bytes);
156}
157