1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "mock_synchronous_ep_engine.h"
19 
20 #include "checkpoint_config.h"
21 #include "checkpoint_remover.h"
22 #include "dcp/dcpconnmap.h"
23 #include "dcp/flow-control-manager.h"
24 #include "item.h"
25 #include "mock_dcp_conn_map.h"
26 #include "mock_ep_bucket.h"
27 #include "mock_ephemeral_bucket.h"
28 #include "replicationthrottle.h"
29 #include <platform/cbassert.h>
30 #include <programs/engine_testapp/mock_server.h>
31 #include <string>
32 
SynchronousEPEngine(std::string extra_config)33 SynchronousEPEngine::SynchronousEPEngine(std::string extra_config)
34     : EventuallyPersistentEngine(get_mock_server_api) {
35     // Tests may need to create multiple failover table entries, so allow that
36     maxFailoverEntries = 5;
37 
38     // Default to a reduced number of vBuckets & shards to speed up test
39     // setup / teardown (fewer VBucket & other related objects).
40     // Tests which require additional vbuckets can specify that in
41     // extra_config string.
42     if (!configuration.parseConfiguration("max_vbuckets=4;max_num_shards=2",
43                                           serverApi)) {
44         throw std::invalid_argument(
45                 "SynchronousEPEngine: Unable to set reduced max_vbuckets & "
46                 "max_num_shards");
47     }
48 
49     // Merge any extra config into the main configuration.
50     if (extra_config.size() > 0) {
51         if (!configuration.parseConfiguration(extra_config.c_str(),
52                                               serverApi)) {
53             throw std::invalid_argument("Unable to parse config string: " +
54                                         extra_config);
55         }
56     }
57 
58     name = "SynchronousEPEngine";
59 
60     // workload is needed by EPStore's constructor (to construct the
61     // VBucketMap).
62     auto shards = configuration.getMaxNumShards();
63     workload = new WorkLoadPolicy(/*workers*/ 1, shards);
64 
65     // dcpConnMap_ is needed by EPStore's constructor.
66     dcpConnMap_ = std::make_unique<MockDcpConnMap>(*this);
67 
68     // checkpointConfig is needed by CheckpointManager (via EPStore).
69     checkpointConfig = new CheckpointConfig(*this);
70 
71     dcpFlowControlManager_ = std::make_unique<DcpFlowControlManager>(*this);
72 
73     enableTraffic(true);
74 
75     maxItemSize = configuration.getMaxItemSize();
76 
77     setCompressionMode(configuration.getCompressionMode());
78     allowDelWithMetaPruneUserData =
79             configuration.isAllowDelWithMetaPruneUserData();
80 }
81 
setKVBucket(std::unique_ptr<KVBucket> store)82 void SynchronousEPEngine::setKVBucket(std::unique_ptr<KVBucket> store) {
83     cb_assert(kvBucket == nullptr);
84     kvBucket = std::move(store);
85 }
86 
setDcpConnMap( std::unique_ptr<DcpConnMap> dcpConnMap)87 void SynchronousEPEngine::setDcpConnMap(
88         std::unique_ptr<DcpConnMap> dcpConnMap) {
89     dcpConnMap_ = std::move(dcpConnMap);
90 }
91 
build( const std::string& config)92 SynchronousEPEngineUniquePtr SynchronousEPEngine::build(
93         const std::string& config) {
94     SynchronousEPEngineUniquePtr engine(new SynchronousEPEngine(config));
95 
96     // switch current thread to this new engine, so all sub-created objects
97     // are accounted in it's mem_used.
98     ObjectRegistry::onSwitchThread(engine.get());
99 
100     engine->setKVBucket(
101             engine->public_makeMockBucket(engine->getConfiguration()));
102 
103     // Ensure that EPEngine is told about necessary server callbacks
104     // (client disconnect, bucket delete).
105     engine->public_initializeEngineCallbacks();
106 
107     return engine;
108 }
109 
operator ()(SynchronousEPEngine* engine)110 void SynchronousEPEngineDeleter::operator()(SynchronousEPEngine* engine) {
111     ObjectRegistry::onSwitchThread(engine);
112     delete engine;
113     ObjectRegistry::onSwitchThread(nullptr);
114 }
115 
initializeConnmap()116 void SynchronousEPEngine::initializeConnmap() {
117     dcpConnMap_->initialize();
118 }
119 
public_makeMockBucket( Configuration& config)120 std::unique_ptr<KVBucket> SynchronousEPEngine::public_makeMockBucket(
121         Configuration& config) {
122     const auto bucketType = config.getBucketType();
123     if (bucketType == "persistent") {
124         return std::make_unique<MockEPBucket>(*this);
125     } else if (bucketType == "ephemeral") {
126         EphemeralBucket::reconfigureForEphemeral(configuration);
127         return std::make_unique<MockEphemeralBucket>(*this);
128     }
129     throw std::invalid_argument(bucketType +
130                                 " is not a recognized bucket "
131                                 "type");
132 }
133 
public_makeBucket( Configuration& config)134 std::unique_ptr<KVBucket> SynchronousEPEngine::public_makeBucket(
135         Configuration& config) {
136     return makeBucket(config);
137 }
138 
public_setWithMeta( Vbid vbucket, DocKey key, cb::const_byte_buffer value, ItemMetaData itemMeta, bool isDeleted, protocol_binary_datatype_t datatype, uint64_t& cas, uint64_t* seqno, const void* cookie, PermittedVBStates permittedVBStates, CheckConflicts checkConflicts, bool allowExisting, GenerateBySeqno genBySeqno, GenerateCas genCas, cb::const_byte_buffer emd)139 ENGINE_ERROR_CODE SynchronousEPEngine::public_setWithMeta(
140         Vbid vbucket,
141         DocKey key,
142         cb::const_byte_buffer value,
143         ItemMetaData itemMeta,
144         bool isDeleted,
145         protocol_binary_datatype_t datatype,
146         uint64_t& cas,
147         uint64_t* seqno,
148         const void* cookie,
149         PermittedVBStates permittedVBStates,
150         CheckConflicts checkConflicts,
151         bool allowExisting,
152         GenerateBySeqno genBySeqno,
153         GenerateCas genCas,
154         cb::const_byte_buffer emd) {
155     return setWithMeta(vbucket,
156                        key,
157                        value,
158                        itemMeta,
159                        isDeleted,
160                        datatype,
161                        cas,
162                        seqno,
163                        cookie,
164                        permittedVBStates,
165                        checkConflicts,
166                        allowExisting,
167                        genBySeqno,
168                        genCas,
169                        emd);
170 }
171 
public_makeDocKey(const void* cookie, const std::string& key)172 DocKey SynchronousEPEngine::public_makeDocKey(const void* cookie,
173                                               const std::string& key) {
174     const auto buf = cb::const_byte_buffer{
175             reinterpret_cast<const uint8_t*>(key.data()), key.size()};
176     return makeDocKey(cookie, buf);
177 }
178