1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2013 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <relaxed_atomic.h>
19 #include <stdlib.h>
20
21 #include "collections/collections_types.h"
22 #include "item.h"
23 #include "mock_dcp.h"
24
25 #include <memcached/protocol_binary.h>
26
27 static EngineIface* engine_handle = nullptr;
28 static EngineIface* engine_handle_v1 = nullptr;
29
30 std::vector<std::pair<uint64_t, uint64_t> > dcp_failover_log;
31
mock_dcp_add_failover_log(vbucket_failover_t* entry, size_t nentries, gsl::not_null<const void*>)32 ENGINE_ERROR_CODE mock_dcp_add_failover_log(vbucket_failover_t* entry,
33 size_t nentries,
34 gsl::not_null<const void*>) {
35 while (!dcp_failover_log.empty()) {
36 dcp_failover_log.clear();
37 }
38
39 if(nentries > 0) {
40 for (size_t i = 0; i < nentries; i--) {
41 std::pair<uint64_t, uint64_t> curr;
42 curr.first = entry[i].uuid;
43 curr.second = entry[i].seqno;
44 dcp_failover_log.push_back(curr);
45 }
46 }
47 return ENGINE_SUCCESS;
48 }
49
get_failover_log(uint32_t opaque, Vbid vbucket)50 ENGINE_ERROR_CODE MockDcpMessageProducers::get_failover_log(uint32_t opaque,
51 Vbid vbucket) {
52 clear_dcp_data();
53 return ENGINE_ENOTSUP;
54 }
55
stream_req( uint32_t opaque, Vbid vbucket, uint32_t flags, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, const std::string& request_value)56 ENGINE_ERROR_CODE MockDcpMessageProducers::stream_req(
57 uint32_t opaque,
58 Vbid vbucket,
59 uint32_t flags,
60 uint64_t start_seqno,
61 uint64_t end_seqno,
62 uint64_t vbucket_uuid,
63 uint64_t snap_start_seqno,
64 uint64_t snap_end_seqno,
65 const std::string& request_value) {
66 clear_dcp_data();
67 last_op = cb::mcbp::ClientOpcode::DcpStreamReq;
68 last_opaque = opaque;
69 last_vbucket = vbucket;
70 last_flags = flags;
71 last_start_seqno = start_seqno;
72 last_end_seqno = end_seqno;
73 last_vbucket_uuid = vbucket_uuid;
74 last_packet_size = 64;
75 last_snap_start_seqno = snap_start_seqno;
76 last_snap_end_seqno = snap_end_seqno;
77 last_collection_filter = request_value;
78 return ENGINE_SUCCESS;
79 }
80
add_stream_rsp( uint32_t opaque, uint32_t stream_opaque, cb::mcbp::Status status)81 ENGINE_ERROR_CODE MockDcpMessageProducers::add_stream_rsp(
82 uint32_t opaque, uint32_t stream_opaque, cb::mcbp::Status status) {
83 clear_dcp_data();
84 last_op = cb::mcbp::ClientOpcode::DcpAddStream;
85 last_opaque = opaque;
86 last_stream_opaque = stream_opaque;
87 last_status = status;
88 last_packet_size = 28;
89 return ENGINE_SUCCESS;
90 }
91
marker_rsp(uint32_t opaque, cb::mcbp::Status status)92 ENGINE_ERROR_CODE MockDcpMessageProducers::marker_rsp(uint32_t opaque,
93 cb::mcbp::Status status) {
94 clear_dcp_data();
95 last_op = cb::mcbp::ClientOpcode::DcpSnapshotMarker;
96 last_opaque = opaque;
97 last_status = status;
98 last_packet_size = 24;
99 return ENGINE_SUCCESS;
100 }
101
set_vbucket_state_rsp( uint32_t opaque, cb::mcbp::Status status)102 ENGINE_ERROR_CODE MockDcpMessageProducers::set_vbucket_state_rsp(
103 uint32_t opaque, cb::mcbp::Status status) {
104 clear_dcp_data();
105 last_op = cb::mcbp::ClientOpcode::DcpSetVbucketState;
106 last_opaque = opaque;
107 last_status = status;
108 last_packet_size = 24;
109 return ENGINE_SUCCESS;
110 }
111
stream_end( uint32_t opaque, Vbid vbucket, uint32_t flags, cb::mcbp::DcpStreamId sid)112 ENGINE_ERROR_CODE MockDcpMessageProducers::stream_end(
113 uint32_t opaque,
114 Vbid vbucket,
115 uint32_t flags,
116 cb::mcbp::DcpStreamId sid) {
117 clear_dcp_data();
118 last_op = cb::mcbp::ClientOpcode::DcpStreamEnd;
119 last_opaque = opaque;
120 last_vbucket = vbucket;
121 last_flags = flags;
122 last_packet_size = 28;
123 last_stream_id = sid;
124
125 return ENGINE_SUCCESS;
126 }
127
marker( uint32_t opaque, Vbid vbucket, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint32_t flags, boost::optional<uint64_t> highCompletedSeqno, boost::optional<uint64_t> maxVisibleSeqno, cb::mcbp::DcpStreamId sid)128 ENGINE_ERROR_CODE MockDcpMessageProducers::marker(
129 uint32_t opaque,
130 Vbid vbucket,
131 uint64_t snap_start_seqno,
132 uint64_t snap_end_seqno,
133 uint32_t flags,
134 boost::optional<uint64_t> highCompletedSeqno,
135 boost::optional<uint64_t> maxVisibleSeqno,
136 cb::mcbp::DcpStreamId sid) {
137 clear_dcp_data();
138 last_op = cb::mcbp::ClientOpcode::DcpSnapshotMarker;
139 last_opaque = opaque;
140 last_vbucket = vbucket;
141 last_packet_size = 44;
142 last_snap_start_seqno = snap_start_seqno;
143 last_snap_end_seqno = snap_end_seqno;
144 last_flags = flags;
145 last_stream_id = sid;
146 if (highCompletedSeqno) {
147 last_high_completed_seqno = *highCompletedSeqno;
148 }
149 return ENGINE_SUCCESS;
150 }
151
mutation(uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, uint8_t nru, cb::mcbp::DcpStreamId sid)152 ENGINE_ERROR_CODE MockDcpMessageProducers::mutation(uint32_t opaque,
153 cb::unique_item_ptr itm,
154 Vbid vbucket,
155 uint64_t by_seqno,
156 uint64_t rev_seqno,
157 uint32_t lock_time,
158 uint8_t nru,
159 cb::mcbp::DcpStreamId sid) {
160 auto result = handleMutationOrPrepare(cb::mcbp::ClientOpcode::DcpMutation,
161 opaque,
162 std::move(itm),
163 vbucket,
164 by_seqno,
165 rev_seqno,
166 lock_time,
167 {},
168 nru);
169 last_stream_id = sid;
170 return result;
171 }
172
handleMutationOrPrepare( cb::mcbp::ClientOpcode opcode, uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, cb::const_char_buffer meta, uint8_t nru)173 ENGINE_ERROR_CODE MockDcpMessageProducers::handleMutationOrPrepare(
174 cb::mcbp::ClientOpcode opcode,
175 uint32_t opaque,
176 cb::unique_item_ptr itm,
177 Vbid vbucket,
178 uint64_t by_seqno,
179 uint64_t rev_seqno,
180 uint32_t lock_time,
181 cb::const_char_buffer meta,
182 uint8_t nru) {
183 clear_dcp_data();
184 Item* item = reinterpret_cast<Item*>(itm.get());
185 last_op = opcode;
186 last_opaque = opaque;
187 last_key.assign(item->getKey().c_str());
188 last_vbucket = vbucket;
189 last_byseqno = by_seqno;
190 last_revseqno = rev_seqno;
191 last_locktime = lock_time;
192 last_meta = to_string(meta);
193 last_value.assign(item->getData(), item->getNBytes());
194 last_nru = nru;
195
196 // @todo: MB-24391: We are querying the header length with collections
197 // off, which if we extended our testapp tests to do collections may not be
198 // correct. For now collections testing is done via GTEST tests and isn't
199 // reliant on last_packet_size so this doesn't cause any problems.
200 last_packet_size = sizeof(cb::mcbp::Request) +
201 sizeof(cb::mcbp::request::DcpMutationPayload);
202 last_packet_size = last_packet_size + last_key.length() +
203 item->getNBytes() + meta.size();
204
205 last_datatype = item->getDataType();
206 last_collection_id = item->getKey().getCollectionID();
207
208 return mutationStatus;
209 }
210
deletionInner( uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t deleteTime, uint32_t extlen, DeleteSource deleteSource, cb::mcbp::DcpStreamId sid)211 ENGINE_ERROR_CODE MockDcpMessageProducers::deletionInner(
212 uint32_t opaque,
213 cb::unique_item_ptr itm,
214 Vbid vbucket,
215 uint64_t by_seqno,
216 uint64_t rev_seqno,
217 uint32_t deleteTime,
218 uint32_t extlen,
219 DeleteSource deleteSource,
220 cb::mcbp::DcpStreamId sid) {
221 clear_dcp_data();
222 auto* item = reinterpret_cast<Item*>(itm.get());
223 if (deleteSource == DeleteSource::TTL) {
224 last_op = cb::mcbp::ClientOpcode::DcpExpiration;
225 } else {
226 last_op = cb::mcbp::ClientOpcode::DcpDeletion;
227 }
228 last_opaque = opaque;
229 last_key.assign(item->getKey().c_str());
230 last_cas = item->getCas();
231 last_vbucket = vbucket;
232 last_byseqno = by_seqno;
233 last_revseqno = rev_seqno;
234
235 // @todo: MB-24391 as above.
236 last_packet_size = sizeof(protocol_binary_request_header) +
237 last_key.length() + item->getNBytes();
238 last_packet_size += extlen;
239
240 last_value.assign(static_cast<const char*>(item->getData()),
241 item->getNBytes());
242 last_delete_time = deleteTime;
243 last_collection_id = item->getKey().getCollectionID();
244
245 last_stream_id = sid;
246
247 return ENGINE_SUCCESS;
248 }
249
deletion(uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, cb::mcbp::DcpStreamId sid)250 ENGINE_ERROR_CODE MockDcpMessageProducers::deletion(uint32_t opaque,
251 cb::unique_item_ptr itm,
252 Vbid vbucket,
253 uint64_t by_seqno,
254 uint64_t rev_seqno,
255 cb::mcbp::DcpStreamId sid) {
256 return deletionInner(opaque,
257 std::move(itm),
258 vbucket,
259 by_seqno,
260 rev_seqno,
261 0,
262 sizeof(cb::mcbp::request::DcpDeletionV1Payload),
263 DeleteSource::Explicit,
264 sid);
265 }
266
deletion_v2( uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t deleteTime, cb::mcbp::DcpStreamId sid)267 ENGINE_ERROR_CODE MockDcpMessageProducers::deletion_v2(
268 uint32_t opaque,
269 cb::unique_item_ptr itm,
270 Vbid vbucket,
271 uint64_t by_seqno,
272 uint64_t rev_seqno,
273 uint32_t deleteTime,
274 cb::mcbp::DcpStreamId sid) {
275 return deletionInner(opaque,
276 std::move(itm),
277 vbucket,
278 by_seqno,
279 rev_seqno,
280 deleteTime,
281 sizeof(cb::mcbp::request::DcpDeletionV2Payload),
282 DeleteSource::Explicit,
283 sid);
284 }
285
expiration( uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t deleteTime, cb::mcbp::DcpStreamId sid)286 ENGINE_ERROR_CODE MockDcpMessageProducers::expiration(
287 uint32_t opaque,
288 cb::unique_item_ptr itm,
289 Vbid vbucket,
290 uint64_t by_seqno,
291 uint64_t rev_seqno,
292 uint32_t deleteTime,
293 cb::mcbp::DcpStreamId sid) {
294 return deletionInner(opaque,
295 std::move(itm),
296 vbucket,
297 by_seqno,
298 rev_seqno,
299 deleteTime,
300 sizeof(cb::mcbp::request::DcpExpirationPayload),
301 DeleteSource::TTL,
302 sid);
303 }
304
set_vbucket_state( uint32_t opaque, Vbid vbucket, vbucket_state_t state)305 ENGINE_ERROR_CODE MockDcpMessageProducers::set_vbucket_state(
306 uint32_t opaque, Vbid vbucket, vbucket_state_t state) {
307 clear_dcp_data();
308 last_op = cb::mcbp::ClientOpcode::DcpSetVbucketState;
309 last_opaque = opaque;
310 last_vbucket = vbucket;
311 last_vbucket_state = state;
312 last_packet_size = 25;
313 return ENGINE_SUCCESS;
314 }
315
noop(uint32_t opaque)316 ENGINE_ERROR_CODE MockDcpMessageProducers::noop(uint32_t opaque) {
317 clear_dcp_data();
318 last_op = cb::mcbp::ClientOpcode::DcpNoop;
319 last_opaque = opaque;
320 return ENGINE_SUCCESS;
321 }
322
buffer_acknowledgement( uint32_t opaque, Vbid vbucket, uint32_t buffer_bytes)323 ENGINE_ERROR_CODE MockDcpMessageProducers::buffer_acknowledgement(
324 uint32_t opaque, Vbid vbucket, uint32_t buffer_bytes) {
325 clear_dcp_data();
326 last_op = cb::mcbp::ClientOpcode::DcpBufferAcknowledgement;
327 last_opaque = opaque;
328 last_vbucket = vbucket;
329 return ENGINE_SUCCESS;
330 }
331
control( uint32_t opaque, cb::const_char_buffer key, cb::const_char_buffer value)332 ENGINE_ERROR_CODE MockDcpMessageProducers::control(
333 uint32_t opaque,
334 cb::const_char_buffer key,
335 cb::const_char_buffer value) {
336 clear_dcp_data();
337 last_op = cb::mcbp::ClientOpcode::DcpControl;
338 last_opaque = opaque;
339 last_key.assign(key.data(), key.size());
340 last_value.assign(value.data(), value.size());
341 return ENGINE_SUCCESS;
342 }
343
system_event( uint32_t opaque, Vbid vbucket, mcbp::systemevent::id event, uint64_t bySeqno, mcbp::systemevent::version version, cb::const_byte_buffer key, cb::const_byte_buffer eventData, cb::mcbp::DcpStreamId sid)344 ENGINE_ERROR_CODE MockDcpMessageProducers::system_event(
345 uint32_t opaque,
346 Vbid vbucket,
347 mcbp::systemevent::id event,
348 uint64_t bySeqno,
349 mcbp::systemevent::version version,
350 cb::const_byte_buffer key,
351 cb::const_byte_buffer eventData,
352 cb::mcbp::DcpStreamId sid) {
353 clear_dcp_data();
354 last_op = cb::mcbp::ClientOpcode::DcpSystemEvent;
355 last_system_event = event;
356 last_system_event_data.insert(
357 last_system_event_data.begin(), eventData.begin(), eventData.end());
358 last_system_event_version = version;
359
360 if (event == mcbp::systemevent::id::CreateCollection) {
361 last_collection_id =
362 reinterpret_cast<const Collections::CreateEventDcpData*>(
363 eventData.data())
364 ->cid.to_host();
365 last_scope_id =
366 reinterpret_cast<const Collections::CreateEventDcpData*>(
367 eventData.data())
368 ->sid.to_host();
369
370 last_key.assign(reinterpret_cast<const char*>(key.data()), key.size());
371 } else if (event == mcbp::systemevent::id::DeleteCollection) {
372 last_collection_id =
373 reinterpret_cast<const Collections::DropEventDcpData*>(
374 eventData.data())
375 ->cid.to_host();
376 }
377
378 last_stream_id = sid;
379 return ENGINE_SUCCESS;
380 }
381
prepare(uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, uint8_t nru, DocumentState document_state, cb::durability::Level)382 ENGINE_ERROR_CODE MockDcpMessageProducers::prepare(uint32_t opaque,
383 cb::unique_item_ptr itm,
384 Vbid vbucket,
385 uint64_t by_seqno,
386 uint64_t rev_seqno,
387 uint32_t lock_time,
388 uint8_t nru,
389 DocumentState document_state,
390 cb::durability::Level) {
391 return handleMutationOrPrepare(cb::mcbp::ClientOpcode::DcpPrepare,
392 opaque,
393 std::move(itm),
394 vbucket,
395 by_seqno,
396 rev_seqno,
397 lock_time,
398 {},
399 nru);
400 }
401
seqno_acknowledged( uint32_t opaque, Vbid vbucket, uint64_t prepared_seqno)402 ENGINE_ERROR_CODE MockDcpMessageProducers::seqno_acknowledged(
403 uint32_t opaque, Vbid vbucket, uint64_t prepared_seqno) {
404 last_op = cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged;
405 last_opaque = opaque;
406 last_vbucket = vbucket;
407 last_prepared_seqno = prepared_seqno;
408 return ENGINE_SUCCESS;
409 }
410
commit(uint32_t opaque, Vbid vbucket, const DocKey& key, uint64_t prepare_seqno, uint64_t commit_seqno)411 ENGINE_ERROR_CODE MockDcpMessageProducers::commit(uint32_t opaque,
412 Vbid vbucket,
413 const DocKey& key,
414 uint64_t prepare_seqno,
415 uint64_t commit_seqno) {
416 last_op = cb::mcbp::ClientOpcode::DcpCommit;
417 last_opaque = opaque;
418 last_vbucket = vbucket;
419 last_prepared_seqno = prepare_seqno;
420 last_commit_seqno = commit_seqno;
421 return ENGINE_SUCCESS;
422 }
423
abort(uint32_t opaque, Vbid vbucket, const DocKey& key, uint64_t prepared_seqno, uint64_t abort_seqno)424 ENGINE_ERROR_CODE MockDcpMessageProducers::abort(uint32_t opaque,
425 Vbid vbucket,
426 const DocKey& key,
427 uint64_t prepared_seqno,
428 uint64_t abort_seqno) {
429 last_op = cb::mcbp::ClientOpcode::DcpAbort;
430 last_opaque = opaque;
431 last_vbucket = vbucket;
432 last_prepared_seqno = prepared_seqno;
433 last_abort_seqno = abort_seqno;
434 return ENGINE_SUCCESS;
435 }
436
get_error_map(uint32_t opaque, uint16_t version)437 ENGINE_ERROR_CODE MockDcpMessageProducers::get_error_map(uint32_t opaque,
438 uint16_t version) {
439 clear_dcp_data();
440 last_op = cb::mcbp::ClientOpcode::GetErrorMap;
441 return ENGINE_SUCCESS;
442 }
443
MockDcpMessageProducers(EngineIface* engine)444 MockDcpMessageProducers::MockDcpMessageProducers(EngineIface* engine) {
445 engine_handle = engine;
446 engine_handle_v1 = engine;
447 }
448
setMutationStatus(ENGINE_ERROR_CODE code)449 void MockDcpMessageProducers::setMutationStatus(ENGINE_ERROR_CODE code) {
450 mutationStatus = code;
451 }
452
clear_dcp_data()453 void MockDcpMessageProducers::clear_dcp_data() {
454 last_op = cb::mcbp::ClientOpcode::Invalid;
455 last_status = cb::mcbp::Status::Success;
456 last_nru = 0;
457 last_vbucket = Vbid(0);
458 last_opaque = 0;
459 last_flags = 0;
460 last_stream_opaque = 0;
461 last_locktime = 0;
462 last_cas = 0;
463 last_start_seqno = 0;
464 last_end_seqno = 0;
465 last_vbucket_uuid = 0;
466 last_snap_start_seqno = 0;
467 last_snap_end_seqno = 0;
468 last_byseqno = 0;
469 last_meta.clear();
470 last_value.clear();
471 last_key.clear();
472 last_vbucket_state = (vbucket_state_t)0;
473 last_delete_time = 0;
474 last_collection_id = 0;
475 last_system_event_data.clear();
476 last_system_event_version = mcbp::systemevent::version::version0;
477 last_collection_manifest_uid = 0;
478 last_stream_id = cb::mcbp::DcpStreamId{};
479 last_prepared_seqno = 0;
480 last_high_completed_seqno = 0;
481 last_commit_seqno = 0;
482 last_abort_seqno = 0;
483 }
484