1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <relaxed_atomic.h>
19#include <stdlib.h>
20
21#include "collections/collections_types.h"
22#include "item.h"
23#include "mock_dcp.h"
24
25#include <memcached/protocol_binary.h>
26
27static EngineIface* engine_handle = nullptr;
28static EngineIface* engine_handle_v1 = nullptr;
29
30std::vector<std::pair<uint64_t, uint64_t> > dcp_failover_log;
31
32ENGINE_ERROR_CODE mock_dcp_add_failover_log(vbucket_failover_t* entry,
33                                            size_t nentries,
34                                            gsl::not_null<const void*>) {
35    while (!dcp_failover_log.empty()) {
36        dcp_failover_log.clear();
37    }
38
39    if(nentries > 0) {
40        for (size_t i = 0; i < nentries; i--) {
41            std::pair<uint64_t, uint64_t> curr;
42            curr.first = entry[i].uuid;
43            curr.second = entry[i].seqno;
44            dcp_failover_log.push_back(curr);
45        }
46    }
47   return ENGINE_SUCCESS;
48}
49
50ENGINE_ERROR_CODE MockDcpMessageProducers::get_failover_log(uint32_t opaque,
51                                                            Vbid vbucket) {
52    clear_dcp_data();
53    return ENGINE_ENOTSUP;
54}
55
56ENGINE_ERROR_CODE MockDcpMessageProducers::stream_req(
57        uint32_t opaque,
58        Vbid vbucket,
59        uint32_t flags,
60        uint64_t start_seqno,
61        uint64_t end_seqno,
62        uint64_t vbucket_uuid,
63        uint64_t snap_start_seqno,
64        uint64_t snap_end_seqno,
65        const std::string& request_value) {
66    clear_dcp_data();
67    last_op = cb::mcbp::ClientOpcode::DcpStreamReq;
68    last_opaque = opaque;
69    last_vbucket = vbucket;
70    last_flags = flags;
71    last_start_seqno = start_seqno;
72    last_end_seqno = end_seqno;
73    last_vbucket_uuid = vbucket_uuid;
74    last_packet_size = 64;
75    last_snap_start_seqno = snap_start_seqno;
76    last_snap_end_seqno = snap_end_seqno;
77    last_collection_filter = request_value;
78    return ENGINE_SUCCESS;
79}
80
81ENGINE_ERROR_CODE MockDcpMessageProducers::add_stream_rsp(
82        uint32_t opaque, uint32_t stream_opaque, cb::mcbp::Status status) {
83    clear_dcp_data();
84    last_op = cb::mcbp::ClientOpcode::DcpAddStream;
85    last_opaque = opaque;
86    last_stream_opaque = stream_opaque;
87    last_status = status;
88    last_packet_size = 28;
89    return ENGINE_SUCCESS;
90}
91
92ENGINE_ERROR_CODE MockDcpMessageProducers::marker_rsp(uint32_t opaque,
93                                                      cb::mcbp::Status status) {
94    clear_dcp_data();
95    last_op = cb::mcbp::ClientOpcode::DcpSnapshotMarker;
96    last_opaque = opaque;
97    last_status = status;
98    last_packet_size = 24;
99    return ENGINE_SUCCESS;
100}
101
102ENGINE_ERROR_CODE MockDcpMessageProducers::set_vbucket_state_rsp(
103        uint32_t opaque, cb::mcbp::Status status) {
104    clear_dcp_data();
105    last_op = cb::mcbp::ClientOpcode::DcpSetVbucketState;
106    last_opaque = opaque;
107    last_status = status;
108    last_packet_size = 24;
109    return ENGINE_SUCCESS;
110}
111
112ENGINE_ERROR_CODE MockDcpMessageProducers::stream_end(
113        uint32_t opaque,
114        Vbid vbucket,
115        uint32_t flags,
116        cb::mcbp::DcpStreamId sid) {
117    clear_dcp_data();
118    last_op = cb::mcbp::ClientOpcode::DcpStreamEnd;
119    last_opaque = opaque;
120    last_vbucket = vbucket;
121    last_flags = flags;
122    last_packet_size = 28;
123    last_stream_id = sid;
124
125    return ENGINE_SUCCESS;
126}
127
128ENGINE_ERROR_CODE MockDcpMessageProducers::marker(
129        uint32_t opaque,
130        Vbid vbucket,
131        uint64_t snap_start_seqno,
132        uint64_t snap_end_seqno,
133        uint32_t flags,
134        boost::optional<uint64_t> highCompletedSeqno,
135        boost::optional<uint64_t> maxVisibleSeqno,
136        cb::mcbp::DcpStreamId sid) {
137    clear_dcp_data();
138    last_op = cb::mcbp::ClientOpcode::DcpSnapshotMarker;
139    last_opaque = opaque;
140    last_vbucket = vbucket;
141    last_packet_size = 44;
142    last_snap_start_seqno = snap_start_seqno;
143    last_snap_end_seqno = snap_end_seqno;
144    last_flags = flags;
145    last_stream_id = sid;
146    if (highCompletedSeqno) {
147        last_high_completed_seqno = *highCompletedSeqno;
148    }
149    return ENGINE_SUCCESS;
150}
151
152ENGINE_ERROR_CODE MockDcpMessageProducers::mutation(uint32_t opaque,
153                                                    cb::unique_item_ptr itm,
154                                                    Vbid vbucket,
155                                                    uint64_t by_seqno,
156                                                    uint64_t rev_seqno,
157                                                    uint32_t lock_time,
158                                                    uint8_t nru,
159                                                    cb::mcbp::DcpStreamId sid) {
160    auto result = handleMutationOrPrepare(cb::mcbp::ClientOpcode::DcpMutation,
161                                          opaque,
162                                          std::move(itm),
163                                          vbucket,
164                                          by_seqno,
165                                          rev_seqno,
166                                          lock_time,
167                                          {},
168                                          nru);
169    last_stream_id = sid;
170    return result;
171}
172
173ENGINE_ERROR_CODE MockDcpMessageProducers::handleMutationOrPrepare(
174        cb::mcbp::ClientOpcode opcode,
175        uint32_t opaque,
176        cb::unique_item_ptr itm,
177        Vbid vbucket,
178        uint64_t by_seqno,
179        uint64_t rev_seqno,
180        uint32_t lock_time,
181        cb::const_char_buffer meta,
182        uint8_t nru) {
183    clear_dcp_data();
184    Item* item = reinterpret_cast<Item*>(itm.get());
185    last_op = opcode;
186    last_opaque = opaque;
187    last_key.assign(item->getKey().c_str());
188    last_vbucket = vbucket;
189    last_byseqno = by_seqno;
190    last_revseqno = rev_seqno;
191    last_locktime = lock_time;
192    last_meta = to_string(meta);
193    last_value.assign(item->getData(), item->getNBytes());
194    last_nru = nru;
195
196    // @todo: MB-24391: We are querying the header length with collections
197    // off, which if we extended our testapp tests to do collections may not be
198    // correct. For now collections testing is done via GTEST tests and isn't
199    // reliant on last_packet_size so this doesn't cause any problems.
200    last_packet_size = sizeof(cb::mcbp::Request) +
201                       sizeof(cb::mcbp::request::DcpMutationPayload);
202    last_packet_size = last_packet_size + last_key.length() +
203                       item->getNBytes() + meta.size();
204
205    last_datatype = item->getDataType();
206    last_collection_id = item->getKey().getCollectionID();
207
208    return mutationStatus;
209}
210
211ENGINE_ERROR_CODE MockDcpMessageProducers::deletionInner(
212        uint32_t opaque,
213        cb::unique_item_ptr itm,
214        Vbid vbucket,
215        uint64_t by_seqno,
216        uint64_t rev_seqno,
217        uint32_t deleteTime,
218        uint32_t extlen,
219        DeleteSource deleteSource,
220        cb::mcbp::DcpStreamId sid) {
221    clear_dcp_data();
222    auto* item = reinterpret_cast<Item*>(itm.get());
223    if (deleteSource == DeleteSource::TTL) {
224        last_op = cb::mcbp::ClientOpcode::DcpExpiration;
225    } else {
226        last_op = cb::mcbp::ClientOpcode::DcpDeletion;
227    }
228    last_opaque = opaque;
229    last_key.assign(item->getKey().c_str());
230    last_cas = item->getCas();
231    last_vbucket = vbucket;
232    last_byseqno = by_seqno;
233    last_revseqno = rev_seqno;
234
235    // @todo: MB-24391 as above.
236    last_packet_size = sizeof(protocol_binary_request_header) +
237                       last_key.length() + item->getNBytes();
238    last_packet_size += extlen;
239
240    last_value.assign(static_cast<const char*>(item->getData()),
241                      item->getNBytes());
242    last_delete_time = deleteTime;
243    last_collection_id = item->getKey().getCollectionID();
244
245    last_stream_id = sid;
246
247    return ENGINE_SUCCESS;
248}
249
250ENGINE_ERROR_CODE MockDcpMessageProducers::deletion(uint32_t opaque,
251                                                    cb::unique_item_ptr itm,
252                                                    Vbid vbucket,
253                                                    uint64_t by_seqno,
254                                                    uint64_t rev_seqno,
255                                                    cb::mcbp::DcpStreamId sid) {
256    return deletionInner(opaque,
257                         std::move(itm),
258                         vbucket,
259                         by_seqno,
260                         rev_seqno,
261                         0,
262                         sizeof(cb::mcbp::request::DcpDeletionV1Payload),
263                         DeleteSource::Explicit,
264                         sid);
265}
266
267ENGINE_ERROR_CODE MockDcpMessageProducers::deletion_v2(
268        uint32_t opaque,
269        cb::unique_item_ptr itm,
270        Vbid vbucket,
271        uint64_t by_seqno,
272        uint64_t rev_seqno,
273        uint32_t deleteTime,
274        cb::mcbp::DcpStreamId sid) {
275    return deletionInner(opaque,
276                         std::move(itm),
277                         vbucket,
278                         by_seqno,
279                         rev_seqno,
280                         deleteTime,
281                         sizeof(cb::mcbp::request::DcpDeletionV2Payload),
282                         DeleteSource::Explicit,
283                         sid);
284}
285
286ENGINE_ERROR_CODE MockDcpMessageProducers::expiration(
287        uint32_t opaque,
288        cb::unique_item_ptr itm,
289        Vbid vbucket,
290        uint64_t by_seqno,
291        uint64_t rev_seqno,
292        uint32_t deleteTime,
293        cb::mcbp::DcpStreamId sid) {
294    return deletionInner(opaque,
295                         std::move(itm),
296                         vbucket,
297                         by_seqno,
298                         rev_seqno,
299                         deleteTime,
300                         sizeof(cb::mcbp::request::DcpExpirationPayload),
301                         DeleteSource::TTL,
302                         sid);
303}
304
305ENGINE_ERROR_CODE MockDcpMessageProducers::set_vbucket_state(
306        uint32_t opaque, Vbid vbucket, vbucket_state_t state) {
307    clear_dcp_data();
308    last_op = cb::mcbp::ClientOpcode::DcpSetVbucketState;
309    last_opaque = opaque;
310    last_vbucket = vbucket;
311    last_vbucket_state = state;
312    last_packet_size = 25;
313    return ENGINE_SUCCESS;
314}
315
316ENGINE_ERROR_CODE MockDcpMessageProducers::noop(uint32_t opaque) {
317    clear_dcp_data();
318    last_op = cb::mcbp::ClientOpcode::DcpNoop;
319    last_opaque = opaque;
320    return ENGINE_SUCCESS;
321}
322
323ENGINE_ERROR_CODE MockDcpMessageProducers::buffer_acknowledgement(
324        uint32_t opaque, Vbid vbucket, uint32_t buffer_bytes) {
325    clear_dcp_data();
326    last_op = cb::mcbp::ClientOpcode::DcpBufferAcknowledgement;
327    last_opaque = opaque;
328    last_vbucket = vbucket;
329    return ENGINE_SUCCESS;
330}
331
332ENGINE_ERROR_CODE MockDcpMessageProducers::control(
333        uint32_t opaque,
334        cb::const_char_buffer key,
335        cb::const_char_buffer value) {
336    clear_dcp_data();
337    last_op = cb::mcbp::ClientOpcode::DcpControl;
338    last_opaque = opaque;
339    last_key.assign(key.data(), key.size());
340    last_value.assign(value.data(), value.size());
341    return ENGINE_SUCCESS;
342}
343
344ENGINE_ERROR_CODE MockDcpMessageProducers::system_event(
345        uint32_t opaque,
346        Vbid vbucket,
347        mcbp::systemevent::id event,
348        uint64_t bySeqno,
349        mcbp::systemevent::version version,
350        cb::const_byte_buffer key,
351        cb::const_byte_buffer eventData,
352        cb::mcbp::DcpStreamId sid) {
353    clear_dcp_data();
354    last_op = cb::mcbp::ClientOpcode::DcpSystemEvent;
355    last_system_event = event;
356    last_system_event_data.insert(
357            last_system_event_data.begin(), eventData.begin(), eventData.end());
358    last_system_event_version = version;
359
360    if (event == mcbp::systemevent::id::CreateCollection) {
361        last_collection_id =
362                reinterpret_cast<const Collections::CreateEventDcpData*>(
363                        eventData.data())
364                        ->cid.to_host();
365        last_scope_id =
366                reinterpret_cast<const Collections::CreateEventDcpData*>(
367                        eventData.data())
368                        ->sid.to_host();
369
370        last_key.assign(reinterpret_cast<const char*>(key.data()), key.size());
371    } else if (event == mcbp::systemevent::id::DeleteCollection) {
372        last_collection_id =
373                reinterpret_cast<const Collections::DropEventDcpData*>(
374                        eventData.data())
375                        ->cid.to_host();
376    }
377
378    last_stream_id = sid;
379    return ENGINE_SUCCESS;
380}
381
382ENGINE_ERROR_CODE MockDcpMessageProducers::prepare(uint32_t opaque,
383                                                   cb::unique_item_ptr itm,
384                                                   Vbid vbucket,
385                                                   uint64_t by_seqno,
386                                                   uint64_t rev_seqno,
387                                                   uint32_t lock_time,
388                                                   uint8_t nru,
389                                                   DocumentState document_state,
390                                                   cb::durability::Level) {
391    return handleMutationOrPrepare(cb::mcbp::ClientOpcode::DcpPrepare,
392                                   opaque,
393                                   std::move(itm),
394                                   vbucket,
395                                   by_seqno,
396                                   rev_seqno,
397                                   lock_time,
398                                   {},
399                                   nru);
400}
401
402ENGINE_ERROR_CODE MockDcpMessageProducers::seqno_acknowledged(
403        uint32_t opaque, Vbid vbucket, uint64_t prepared_seqno) {
404    last_op = cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged;
405    last_opaque = opaque;
406    last_vbucket = vbucket;
407    last_prepared_seqno = prepared_seqno;
408    return ENGINE_SUCCESS;
409}
410
411ENGINE_ERROR_CODE MockDcpMessageProducers::commit(uint32_t opaque,
412                                                  Vbid vbucket,
413                                                  const DocKey& key,
414                                                  uint64_t prepare_seqno,
415                                                  uint64_t commit_seqno) {
416    last_op = cb::mcbp::ClientOpcode::DcpCommit;
417    last_opaque = opaque;
418    last_vbucket = vbucket;
419    last_prepared_seqno = prepare_seqno;
420    last_commit_seqno = commit_seqno;
421    return ENGINE_SUCCESS;
422}
423
424ENGINE_ERROR_CODE MockDcpMessageProducers::abort(uint32_t opaque,
425                                                 Vbid vbucket,
426                                                 const DocKey& key,
427                                                 uint64_t prepared_seqno,
428                                                 uint64_t abort_seqno) {
429    last_op = cb::mcbp::ClientOpcode::DcpAbort;
430    last_opaque = opaque;
431    last_vbucket = vbucket;
432    last_prepared_seqno = prepared_seqno;
433    last_abort_seqno = abort_seqno;
434    return ENGINE_SUCCESS;
435}
436
437ENGINE_ERROR_CODE MockDcpMessageProducers::get_error_map(uint32_t opaque,
438                                                         uint16_t version) {
439    clear_dcp_data();
440    last_op = cb::mcbp::ClientOpcode::GetErrorMap;
441    return ENGINE_SUCCESS;
442}
443
444MockDcpMessageProducers::MockDcpMessageProducers(EngineIface* engine) {
445    engine_handle = engine;
446    engine_handle_v1 = engine;
447}
448
449void MockDcpMessageProducers::setMutationStatus(ENGINE_ERROR_CODE code) {
450    mutationStatus = code;
451}
452
453void MockDcpMessageProducers::clear_dcp_data() {
454    last_op = cb::mcbp::ClientOpcode::Invalid;
455    last_status = cb::mcbp::Status::Success;
456    last_nru = 0;
457    last_vbucket = Vbid(0);
458    last_opaque = 0;
459    last_flags = 0;
460    last_stream_opaque = 0;
461    last_locktime = 0;
462    last_cas = 0;
463    last_start_seqno = 0;
464    last_end_seqno = 0;
465    last_vbucket_uuid = 0;
466    last_snap_start_seqno = 0;
467    last_snap_end_seqno = 0;
468    last_byseqno = 0;
469    last_meta.clear();
470    last_value.clear();
471    last_key.clear();
472    last_vbucket_state = (vbucket_state_t)0;
473    last_delete_time = 0;
474    last_collection_id = 0;
475    last_system_event_data.clear();
476    last_system_event_version = mcbp::systemevent::version::version0;
477    last_collection_manifest_uid = 0;
478    last_stream_id = cb::mcbp::DcpStreamId{};
479    last_prepared_seqno = 0;
480    last_high_completed_seqno = 0;
481    last_commit_seqno = 0;
482    last_abort_seqno = 0;
483}
484