1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <relaxed_atomic.h>
19 #include <stdlib.h>
20 
21 #include "collections/collections_types.h"
22 #include "item.h"
23 #include "mock_dcp.h"
24 
25 #include <memcached/protocol_binary.h>
26 
27 static EngineIface* engine_handle = nullptr;
28 static EngineIface* engine_handle_v1 = nullptr;
29 
30 std::vector<std::pair<uint64_t, uint64_t> > dcp_failover_log;
31 
mock_dcp_add_failover_log(vbucket_failover_t* entry, size_t nentries, gsl::not_null<const void*>)32 ENGINE_ERROR_CODE mock_dcp_add_failover_log(vbucket_failover_t* entry,
33                                             size_t nentries,
34                                             gsl::not_null<const void*>) {
35     while (!dcp_failover_log.empty()) {
36         dcp_failover_log.clear();
37     }
38 
39     if(nentries > 0) {
40         for (size_t i = 0; i < nentries; i--) {
41             std::pair<uint64_t, uint64_t> curr;
42             curr.first = entry[i].uuid;
43             curr.second = entry[i].seqno;
44             dcp_failover_log.push_back(curr);
45         }
46     }
47    return ENGINE_SUCCESS;
48 }
49 
get_failover_log(uint32_t opaque, Vbid vbucket)50 ENGINE_ERROR_CODE MockDcpMessageProducers::get_failover_log(uint32_t opaque,
51                                                             Vbid vbucket) {
52     clear_dcp_data();
53     return ENGINE_ENOTSUP;
54 }
55 
stream_req( uint32_t opaque, Vbid vbucket, uint32_t flags, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, const std::string& request_value)56 ENGINE_ERROR_CODE MockDcpMessageProducers::stream_req(
57         uint32_t opaque,
58         Vbid vbucket,
59         uint32_t flags,
60         uint64_t start_seqno,
61         uint64_t end_seqno,
62         uint64_t vbucket_uuid,
63         uint64_t snap_start_seqno,
64         uint64_t snap_end_seqno,
65         const std::string& request_value) {
66     clear_dcp_data();
67     last_op = cb::mcbp::ClientOpcode::DcpStreamReq;
68     last_opaque = opaque;
69     last_vbucket = vbucket;
70     last_flags = flags;
71     last_start_seqno = start_seqno;
72     last_end_seqno = end_seqno;
73     last_vbucket_uuid = vbucket_uuid;
74     last_packet_size = 64;
75     last_snap_start_seqno = snap_start_seqno;
76     last_snap_end_seqno = snap_end_seqno;
77     last_collection_filter = request_value;
78     return ENGINE_SUCCESS;
79 }
80 
add_stream_rsp( uint32_t opaque, uint32_t stream_opaque, cb::mcbp::Status status)81 ENGINE_ERROR_CODE MockDcpMessageProducers::add_stream_rsp(
82         uint32_t opaque, uint32_t stream_opaque, cb::mcbp::Status status) {
83     clear_dcp_data();
84     last_op = cb::mcbp::ClientOpcode::DcpAddStream;
85     last_opaque = opaque;
86     last_stream_opaque = stream_opaque;
87     last_status = status;
88     last_packet_size = 28;
89     return ENGINE_SUCCESS;
90 }
91 
marker_rsp(uint32_t opaque, cb::mcbp::Status status)92 ENGINE_ERROR_CODE MockDcpMessageProducers::marker_rsp(uint32_t opaque,
93                                                       cb::mcbp::Status status) {
94     clear_dcp_data();
95     last_op = cb::mcbp::ClientOpcode::DcpSnapshotMarker;
96     last_opaque = opaque;
97     last_status = status;
98     last_packet_size = 24;
99     return ENGINE_SUCCESS;
100 }
101 
set_vbucket_state_rsp( uint32_t opaque, cb::mcbp::Status status)102 ENGINE_ERROR_CODE MockDcpMessageProducers::set_vbucket_state_rsp(
103         uint32_t opaque, cb::mcbp::Status status) {
104     clear_dcp_data();
105     last_op = cb::mcbp::ClientOpcode::DcpSetVbucketState;
106     last_opaque = opaque;
107     last_status = status;
108     last_packet_size = 24;
109     return ENGINE_SUCCESS;
110 }
111 
stream_end( uint32_t opaque, Vbid vbucket, uint32_t flags, cb::mcbp::DcpStreamId sid)112 ENGINE_ERROR_CODE MockDcpMessageProducers::stream_end(
113         uint32_t opaque,
114         Vbid vbucket,
115         uint32_t flags,
116         cb::mcbp::DcpStreamId sid) {
117     clear_dcp_data();
118     last_op = cb::mcbp::ClientOpcode::DcpStreamEnd;
119     last_opaque = opaque;
120     last_vbucket = vbucket;
121     last_flags = flags;
122     last_packet_size = 28;
123     last_stream_id = sid;
124 
125     return ENGINE_SUCCESS;
126 }
127 
marker( uint32_t opaque, Vbid vbucket, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint32_t flags, boost::optional<uint64_t> highCompletedSeqno, boost::optional<uint64_t> maxVisibleSeqno, cb::mcbp::DcpStreamId sid)128 ENGINE_ERROR_CODE MockDcpMessageProducers::marker(
129         uint32_t opaque,
130         Vbid vbucket,
131         uint64_t snap_start_seqno,
132         uint64_t snap_end_seqno,
133         uint32_t flags,
134         boost::optional<uint64_t> highCompletedSeqno,
135         boost::optional<uint64_t> maxVisibleSeqno,
136         cb::mcbp::DcpStreamId sid) {
137     clear_dcp_data();
138     last_op = cb::mcbp::ClientOpcode::DcpSnapshotMarker;
139     last_opaque = opaque;
140     last_vbucket = vbucket;
141     last_packet_size = 44;
142     last_snap_start_seqno = snap_start_seqno;
143     last_snap_end_seqno = snap_end_seqno;
144     last_flags = flags;
145     last_stream_id = sid;
146     if (highCompletedSeqno) {
147         last_high_completed_seqno = *highCompletedSeqno;
148     }
149     return ENGINE_SUCCESS;
150 }
151 
mutation(uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, uint8_t nru, cb::mcbp::DcpStreamId sid)152 ENGINE_ERROR_CODE MockDcpMessageProducers::mutation(uint32_t opaque,
153                                                     cb::unique_item_ptr itm,
154                                                     Vbid vbucket,
155                                                     uint64_t by_seqno,
156                                                     uint64_t rev_seqno,
157                                                     uint32_t lock_time,
158                                                     uint8_t nru,
159                                                     cb::mcbp::DcpStreamId sid) {
160     auto result = handleMutationOrPrepare(cb::mcbp::ClientOpcode::DcpMutation,
161                                           opaque,
162                                           std::move(itm),
163                                           vbucket,
164                                           by_seqno,
165                                           rev_seqno,
166                                           lock_time,
167                                           {},
168                                           nru);
169     last_stream_id = sid;
170     return result;
171 }
172 
handleMutationOrPrepare( cb::mcbp::ClientOpcode opcode, uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, cb::const_char_buffer meta, uint8_t nru)173 ENGINE_ERROR_CODE MockDcpMessageProducers::handleMutationOrPrepare(
174         cb::mcbp::ClientOpcode opcode,
175         uint32_t opaque,
176         cb::unique_item_ptr itm,
177         Vbid vbucket,
178         uint64_t by_seqno,
179         uint64_t rev_seqno,
180         uint32_t lock_time,
181         cb::const_char_buffer meta,
182         uint8_t nru) {
183     clear_dcp_data();
184     Item* item = reinterpret_cast<Item*>(itm.get());
185     last_op = opcode;
186     last_opaque = opaque;
187     last_key.assign(item->getKey().c_str());
188     last_vbucket = vbucket;
189     last_byseqno = by_seqno;
190     last_revseqno = rev_seqno;
191     last_locktime = lock_time;
192     last_meta = to_string(meta);
193     last_value.assign(item->getData(), item->getNBytes());
194     last_nru = nru;
195 
196     // @todo: MB-24391: We are querying the header length with collections
197     // off, which if we extended our testapp tests to do collections may not be
198     // correct. For now collections testing is done via GTEST tests and isn't
199     // reliant on last_packet_size so this doesn't cause any problems.
200     last_packet_size = sizeof(cb::mcbp::Request) +
201                        sizeof(cb::mcbp::request::DcpMutationPayload);
202     last_packet_size = last_packet_size + last_key.length() +
203                        item->getNBytes() + meta.size();
204 
205     last_datatype = item->getDataType();
206     last_collection_id = item->getKey().getCollectionID();
207 
208     return mutationStatus;
209 }
210 
deletionInner( uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t deleteTime, uint32_t extlen, DeleteSource deleteSource, cb::mcbp::DcpStreamId sid)211 ENGINE_ERROR_CODE MockDcpMessageProducers::deletionInner(
212         uint32_t opaque,
213         cb::unique_item_ptr itm,
214         Vbid vbucket,
215         uint64_t by_seqno,
216         uint64_t rev_seqno,
217         uint32_t deleteTime,
218         uint32_t extlen,
219         DeleteSource deleteSource,
220         cb::mcbp::DcpStreamId sid) {
221     clear_dcp_data();
222     auto* item = reinterpret_cast<Item*>(itm.get());
223     if (deleteSource == DeleteSource::TTL) {
224         last_op = cb::mcbp::ClientOpcode::DcpExpiration;
225     } else {
226         last_op = cb::mcbp::ClientOpcode::DcpDeletion;
227     }
228     last_opaque = opaque;
229     last_key.assign(item->getKey().c_str());
230     last_cas = item->getCas();
231     last_vbucket = vbucket;
232     last_byseqno = by_seqno;
233     last_revseqno = rev_seqno;
234 
235     // @todo: MB-24391 as above.
236     last_packet_size = sizeof(protocol_binary_request_header) +
237                        last_key.length() + item->getNBytes();
238     last_packet_size += extlen;
239 
240     last_value.assign(static_cast<const char*>(item->getData()),
241                       item->getNBytes());
242     last_delete_time = deleteTime;
243     last_collection_id = item->getKey().getCollectionID();
244 
245     last_stream_id = sid;
246 
247     return ENGINE_SUCCESS;
248 }
249 
deletion(uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, cb::mcbp::DcpStreamId sid)250 ENGINE_ERROR_CODE MockDcpMessageProducers::deletion(uint32_t opaque,
251                                                     cb::unique_item_ptr itm,
252                                                     Vbid vbucket,
253                                                     uint64_t by_seqno,
254                                                     uint64_t rev_seqno,
255                                                     cb::mcbp::DcpStreamId sid) {
256     return deletionInner(opaque,
257                          std::move(itm),
258                          vbucket,
259                          by_seqno,
260                          rev_seqno,
261                          0,
262                          sizeof(cb::mcbp::request::DcpDeletionV1Payload),
263                          DeleteSource::Explicit,
264                          sid);
265 }
266 
deletion_v2( uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t deleteTime, cb::mcbp::DcpStreamId sid)267 ENGINE_ERROR_CODE MockDcpMessageProducers::deletion_v2(
268         uint32_t opaque,
269         cb::unique_item_ptr itm,
270         Vbid vbucket,
271         uint64_t by_seqno,
272         uint64_t rev_seqno,
273         uint32_t deleteTime,
274         cb::mcbp::DcpStreamId sid) {
275     return deletionInner(opaque,
276                          std::move(itm),
277                          vbucket,
278                          by_seqno,
279                          rev_seqno,
280                          deleteTime,
281                          sizeof(cb::mcbp::request::DcpDeletionV2Payload),
282                          DeleteSource::Explicit,
283                          sid);
284 }
285 
expiration( uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t deleteTime, cb::mcbp::DcpStreamId sid)286 ENGINE_ERROR_CODE MockDcpMessageProducers::expiration(
287         uint32_t opaque,
288         cb::unique_item_ptr itm,
289         Vbid vbucket,
290         uint64_t by_seqno,
291         uint64_t rev_seqno,
292         uint32_t deleteTime,
293         cb::mcbp::DcpStreamId sid) {
294     return deletionInner(opaque,
295                          std::move(itm),
296                          vbucket,
297                          by_seqno,
298                          rev_seqno,
299                          deleteTime,
300                          sizeof(cb::mcbp::request::DcpExpirationPayload),
301                          DeleteSource::TTL,
302                          sid);
303 }
304 
set_vbucket_state( uint32_t opaque, Vbid vbucket, vbucket_state_t state)305 ENGINE_ERROR_CODE MockDcpMessageProducers::set_vbucket_state(
306         uint32_t opaque, Vbid vbucket, vbucket_state_t state) {
307     clear_dcp_data();
308     last_op = cb::mcbp::ClientOpcode::DcpSetVbucketState;
309     last_opaque = opaque;
310     last_vbucket = vbucket;
311     last_vbucket_state = state;
312     last_packet_size = 25;
313     return ENGINE_SUCCESS;
314 }
315 
noop(uint32_t opaque)316 ENGINE_ERROR_CODE MockDcpMessageProducers::noop(uint32_t opaque) {
317     clear_dcp_data();
318     last_op = cb::mcbp::ClientOpcode::DcpNoop;
319     last_opaque = opaque;
320     return ENGINE_SUCCESS;
321 }
322 
buffer_acknowledgement( uint32_t opaque, Vbid vbucket, uint32_t buffer_bytes)323 ENGINE_ERROR_CODE MockDcpMessageProducers::buffer_acknowledgement(
324         uint32_t opaque, Vbid vbucket, uint32_t buffer_bytes) {
325     clear_dcp_data();
326     last_op = cb::mcbp::ClientOpcode::DcpBufferAcknowledgement;
327     last_opaque = opaque;
328     last_vbucket = vbucket;
329     return ENGINE_SUCCESS;
330 }
331 
control( uint32_t opaque, cb::const_char_buffer key, cb::const_char_buffer value)332 ENGINE_ERROR_CODE MockDcpMessageProducers::control(
333         uint32_t opaque,
334         cb::const_char_buffer key,
335         cb::const_char_buffer value) {
336     clear_dcp_data();
337     last_op = cb::mcbp::ClientOpcode::DcpControl;
338     last_opaque = opaque;
339     last_key.assign(key.data(), key.size());
340     last_value.assign(value.data(), value.size());
341     return ENGINE_SUCCESS;
342 }
343 
system_event( uint32_t opaque, Vbid vbucket, mcbp::systemevent::id event, uint64_t bySeqno, mcbp::systemevent::version version, cb::const_byte_buffer key, cb::const_byte_buffer eventData, cb::mcbp::DcpStreamId sid)344 ENGINE_ERROR_CODE MockDcpMessageProducers::system_event(
345         uint32_t opaque,
346         Vbid vbucket,
347         mcbp::systemevent::id event,
348         uint64_t bySeqno,
349         mcbp::systemevent::version version,
350         cb::const_byte_buffer key,
351         cb::const_byte_buffer eventData,
352         cb::mcbp::DcpStreamId sid) {
353     clear_dcp_data();
354     last_op = cb::mcbp::ClientOpcode::DcpSystemEvent;
355     last_system_event = event;
356     last_system_event_data.insert(
357             last_system_event_data.begin(), eventData.begin(), eventData.end());
358     last_system_event_version = version;
359 
360     if (event == mcbp::systemevent::id::CreateCollection) {
361         last_collection_id =
362                 reinterpret_cast<const Collections::CreateEventDcpData*>(
363                         eventData.data())
364                         ->cid.to_host();
365         last_scope_id =
366                 reinterpret_cast<const Collections::CreateEventDcpData*>(
367                         eventData.data())
368                         ->sid.to_host();
369 
370         last_key.assign(reinterpret_cast<const char*>(key.data()), key.size());
371     } else if (event == mcbp::systemevent::id::DeleteCollection) {
372         last_collection_id =
373                 reinterpret_cast<const Collections::DropEventDcpData*>(
374                         eventData.data())
375                         ->cid.to_host();
376     }
377 
378     last_stream_id = sid;
379     return ENGINE_SUCCESS;
380 }
381 
prepare(uint32_t opaque, cb::unique_item_ptr itm, Vbid vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, uint8_t nru, DocumentState document_state, cb::durability::Level)382 ENGINE_ERROR_CODE MockDcpMessageProducers::prepare(uint32_t opaque,
383                                                    cb::unique_item_ptr itm,
384                                                    Vbid vbucket,
385                                                    uint64_t by_seqno,
386                                                    uint64_t rev_seqno,
387                                                    uint32_t lock_time,
388                                                    uint8_t nru,
389                                                    DocumentState document_state,
390                                                    cb::durability::Level) {
391     return handleMutationOrPrepare(cb::mcbp::ClientOpcode::DcpPrepare,
392                                    opaque,
393                                    std::move(itm),
394                                    vbucket,
395                                    by_seqno,
396                                    rev_seqno,
397                                    lock_time,
398                                    {},
399                                    nru);
400 }
401 
seqno_acknowledged( uint32_t opaque, Vbid vbucket, uint64_t prepared_seqno)402 ENGINE_ERROR_CODE MockDcpMessageProducers::seqno_acknowledged(
403         uint32_t opaque, Vbid vbucket, uint64_t prepared_seqno) {
404     last_op = cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged;
405     last_opaque = opaque;
406     last_vbucket = vbucket;
407     last_prepared_seqno = prepared_seqno;
408     return ENGINE_SUCCESS;
409 }
410 
commit(uint32_t opaque, Vbid vbucket, const DocKey& key, uint64_t prepare_seqno, uint64_t commit_seqno)411 ENGINE_ERROR_CODE MockDcpMessageProducers::commit(uint32_t opaque,
412                                                   Vbid vbucket,
413                                                   const DocKey& key,
414                                                   uint64_t prepare_seqno,
415                                                   uint64_t commit_seqno) {
416     last_op = cb::mcbp::ClientOpcode::DcpCommit;
417     last_opaque = opaque;
418     last_vbucket = vbucket;
419     last_prepared_seqno = prepare_seqno;
420     last_commit_seqno = commit_seqno;
421     return ENGINE_SUCCESS;
422 }
423 
abort(uint32_t opaque, Vbid vbucket, const DocKey& key, uint64_t prepared_seqno, uint64_t abort_seqno)424 ENGINE_ERROR_CODE MockDcpMessageProducers::abort(uint32_t opaque,
425                                                  Vbid vbucket,
426                                                  const DocKey& key,
427                                                  uint64_t prepared_seqno,
428                                                  uint64_t abort_seqno) {
429     last_op = cb::mcbp::ClientOpcode::DcpAbort;
430     last_opaque = opaque;
431     last_vbucket = vbucket;
432     last_prepared_seqno = prepared_seqno;
433     last_abort_seqno = abort_seqno;
434     return ENGINE_SUCCESS;
435 }
436 
get_error_map(uint32_t opaque, uint16_t version)437 ENGINE_ERROR_CODE MockDcpMessageProducers::get_error_map(uint32_t opaque,
438                                                          uint16_t version) {
439     clear_dcp_data();
440     last_op = cb::mcbp::ClientOpcode::GetErrorMap;
441     return ENGINE_SUCCESS;
442 }
443 
MockDcpMessageProducers(EngineIface* engine)444 MockDcpMessageProducers::MockDcpMessageProducers(EngineIface* engine) {
445     engine_handle = engine;
446     engine_handle_v1 = engine;
447 }
448 
setMutationStatus(ENGINE_ERROR_CODE code)449 void MockDcpMessageProducers::setMutationStatus(ENGINE_ERROR_CODE code) {
450     mutationStatus = code;
451 }
452 
clear_dcp_data()453 void MockDcpMessageProducers::clear_dcp_data() {
454     last_op = cb::mcbp::ClientOpcode::Invalid;
455     last_status = cb::mcbp::Status::Success;
456     last_nru = 0;
457     last_vbucket = Vbid(0);
458     last_opaque = 0;
459     last_flags = 0;
460     last_stream_opaque = 0;
461     last_locktime = 0;
462     last_cas = 0;
463     last_start_seqno = 0;
464     last_end_seqno = 0;
465     last_vbucket_uuid = 0;
466     last_snap_start_seqno = 0;
467     last_snap_end_seqno = 0;
468     last_byseqno = 0;
469     last_meta.clear();
470     last_value.clear();
471     last_key.clear();
472     last_vbucket_state = (vbucket_state_t)0;
473     last_delete_time = 0;
474     last_collection_id = 0;
475     last_system_event_data.clear();
476     last_system_event_version = mcbp::systemevent::version::version0;
477     last_collection_manifest_uid = 0;
478     last_stream_id = cb::mcbp::DcpStreamId{};
479     last_prepared_seqno = 0;
480     last_high_completed_seqno = 0;
481     last_commit_seqno = 0;
482     last_abort_seqno = 0;
483 }
484