xref: /3.0.3-GA/ep-engine/tests/mock/mock_dcp.cc (revision 90defd6d)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <stdlib.h>
21
22#include "item.h"
23#include "mock_dcp.h"
24
25uint8_t dcp_last_op;
26uint8_t dcp_last_status;
27uint8_t dcp_last_nru;
28uint16_t dcp_last_vbucket;
29uint32_t dcp_last_opaque;
30uint32_t dcp_last_flags;
31uint32_t dcp_last_stream_opaque;
32uint32_t dcp_last_locktime;
33uint32_t dcp_last_packet_size;
34uint64_t dcp_last_cas;
35uint64_t dcp_last_start_seqno;
36uint64_t dcp_last_end_seqno;
37uint64_t dcp_last_vbucket_uuid;
38uint64_t dcp_last_snap_start_seqno;
39uint64_t dcp_last_snap_end_seqno;
40uint64_t dcp_last_byseqno;
41uint64_t dcp_last_revseqno;
42const void *dcp_last_meta;
43uint16_t dcp_last_nmeta;
44std::string dcp_last_key;
45vbucket_state_t dcp_last_vbucket_state;
46
47extern "C" {
48
49std::vector<std::pair<uint64_t, uint64_t> > dcp_failover_log;
50
51ENGINE_ERROR_CODE mock_dcp_add_failover_log(vbucket_failover_t* entry,
52                                            size_t nentries,
53                                            const void *cookie) {
54    (void) cookie;
55
56    while (!dcp_failover_log.empty()) {
57        dcp_failover_log.clear();
58    }
59
60    if(nentries > 0) {
61        for (size_t i = 0; i < nentries; i--) {
62            std::pair<uint64_t, uint64_t> curr;
63            curr.first = entry[i].uuid;
64            curr.second = entry[i].seqno;
65            dcp_failover_log.push_back(curr);
66        }
67    }
68   return ENGINE_SUCCESS;
69}
70
71static ENGINE_ERROR_CODE mock_get_failover_log(const void *cookie,
72                                               uint32_t opaque,
73                                               uint16_t vbucket) {
74    (void) cookie;
75    (void) opaque;
76    (void) vbucket;
77    clear_dcp_data();
78    return ENGINE_ENOTSUP;
79}
80
81static ENGINE_ERROR_CODE mock_stream_req(const void *cookie,
82                                         uint32_t opaque,
83                                         uint16_t vbucket,
84                                         uint32_t flags,
85                                         uint64_t start_seqno,
86                                         uint64_t end_seqno,
87                                         uint64_t vbucket_uuid,
88                                         uint64_t snap_start_seqno,
89                                         uint64_t snap_end_seqno) {
90    (void) cookie;
91    clear_dcp_data();
92    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_STREAM_REQ;
93    dcp_last_opaque = opaque;
94    dcp_last_vbucket = vbucket;
95    dcp_last_flags = flags;
96    dcp_last_start_seqno = start_seqno;
97    dcp_last_end_seqno = end_seqno;
98    dcp_last_vbucket_uuid = vbucket_uuid;
99    dcp_last_packet_size = 64;
100    dcp_last_snap_start_seqno = snap_start_seqno;
101    dcp_last_snap_end_seqno = snap_end_seqno;
102    return ENGINE_SUCCESS;
103}
104
105static ENGINE_ERROR_CODE mock_add_stream_rsp(const void *cookie,
106                                             uint32_t opaque,
107                                             uint32_t stream_opaque,
108                                             uint8_t status) {
109    (void) cookie;
110    clear_dcp_data();
111    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_ADD_STREAM;
112    dcp_last_opaque = opaque;
113    dcp_last_stream_opaque = stream_opaque;
114    dcp_last_status = status;
115    dcp_last_packet_size = 28;
116    return ENGINE_SUCCESS;
117}
118
119static ENGINE_ERROR_CODE mock_snapshot_marker_resp(const void *cookie,
120                                                   uint32_t opaque,
121                                                   uint8_t status) {
122    (void) cookie;
123    clear_dcp_data();
124    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
125    dcp_last_opaque = opaque;
126    dcp_last_status = status;
127    dcp_last_packet_size = 24;
128    return ENGINE_SUCCESS;
129}
130
131static ENGINE_ERROR_CODE mock_stream_end(const void *cookie,
132                                         uint32_t opaque,
133                                         uint16_t vbucket,
134                                         uint32_t flags) {
135    (void) cookie;
136    clear_dcp_data();
137    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_STREAM_END;
138    dcp_last_opaque = opaque;
139    dcp_last_vbucket = vbucket;
140    dcp_last_flags = flags;
141    dcp_last_packet_size = 28;
142    return ENGINE_SUCCESS;
143}
144
145static ENGINE_ERROR_CODE mock_marker(const void *cookie,
146                                     uint32_t opaque,
147                                     uint16_t vbucket,
148                                     uint64_t snap_start_seqno,
149                                     uint64_t snap_end_seqno,
150                                     uint32_t flags) {
151    (void) cookie;
152    clear_dcp_data();
153    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
154    dcp_last_opaque = opaque;
155    dcp_last_vbucket = vbucket;
156    dcp_last_packet_size = 44;
157    dcp_last_snap_start_seqno = snap_start_seqno;
158    dcp_last_snap_end_seqno = snap_end_seqno;
159    dcp_last_flags = flags;
160    return ENGINE_SUCCESS;
161}
162
163static ENGINE_ERROR_CODE mock_mutation(const void* cookie,
164                                       uint32_t opaque,
165                                       item *itm,
166                                       uint16_t vbucket,
167                                       uint64_t by_seqno,
168                                       uint64_t rev_seqno,
169                                       uint32_t lock_time,
170                                       const void *meta,
171                                       uint16_t nmeta,
172                                       uint8_t nru) {
173    (void) cookie;
174    clear_dcp_data();
175    Item* item = reinterpret_cast<Item*>(itm);
176    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_MUTATION;
177    dcp_last_opaque = opaque;
178    dcp_last_key.assign(item->getKey().c_str());
179    dcp_last_vbucket = vbucket;
180    dcp_last_byseqno = by_seqno;
181    dcp_last_revseqno = rev_seqno;
182    dcp_last_locktime = lock_time;
183    dcp_last_meta = meta;
184    dcp_last_nmeta = nmeta;
185    dcp_last_nru = nru;
186    dcp_last_packet_size = 55 + dcp_last_key.length() + item->getValMemSize();
187    return ENGINE_SUCCESS;
188}
189
190static ENGINE_ERROR_CODE mock_deletion(const void* cookie,
191                                       uint32_t opaque,
192                                       const void *key,
193                                       uint16_t nkey,
194                                       uint64_t cas,
195                                       uint16_t vbucket,
196                                       uint64_t by_seqno,
197                                       uint64_t rev_seqno,
198                                       const void *meta,
199                                       uint16_t nmeta) {
200    (void) cookie;
201    clear_dcp_data();
202    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_DELETION;
203    dcp_last_opaque = opaque;
204    dcp_last_key.assign(static_cast<const char*>(key), nkey);
205    dcp_last_cas = cas;
206    dcp_last_vbucket = vbucket;
207    dcp_last_byseqno = by_seqno;
208    dcp_last_revseqno = rev_seqno;
209    dcp_last_meta = meta;
210    dcp_last_nmeta = nmeta;
211    dcp_last_packet_size = 42 + nkey;
212    return ENGINE_SUCCESS;
213}
214
215static ENGINE_ERROR_CODE mock_expiration(const void* cookie,
216                                         uint32_t opaque,
217                                         const void *key,
218                                         uint16_t nkey,
219                                         uint64_t cas,
220                                         uint16_t vbucket,
221                                         uint64_t by_seqno,
222                                         uint64_t rev_seqno,
223                                         const void *meta,
224                                         uint16_t nmeta) {
225    (void) cookie;
226    (void) opaque;
227    (void) key;
228    (void) nkey;
229    (void) cas;
230    (void) vbucket;
231    (void) by_seqno;
232    (void) rev_seqno;
233    (void)meta;
234    (void)nmeta;
235    clear_dcp_data();
236    return ENGINE_ENOTSUP;
237}
238
239static ENGINE_ERROR_CODE mock_flush(const void* cookie,
240                                    uint32_t opaque,
241                                    uint16_t vbucket) {
242    (void) cookie;
243    (void) opaque;
244    (void) vbucket;
245    clear_dcp_data();
246    return ENGINE_ENOTSUP;
247}
248
249static ENGINE_ERROR_CODE mock_set_vbucket_state(const void* cookie,
250                                                uint32_t opaque,
251                                                uint16_t vbucket,
252                                                vbucket_state_t state) {
253    (void) cookie;
254    clear_dcp_data();
255    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE;
256    dcp_last_opaque = opaque;
257    dcp_last_vbucket = vbucket;
258    dcp_last_vbucket_state = state;
259    dcp_last_packet_size = 25;
260    return ENGINE_SUCCESS;
261}
262
263static ENGINE_ERROR_CODE mock_noop(const void* cookie,
264                                   uint32_t opaque) {
265    (void) cookie;
266    clear_dcp_data();
267    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_NOOP;
268    dcp_last_opaque = opaque;
269    return ENGINE_SUCCESS;
270}
271
272static ENGINE_ERROR_CODE mock_buffer_acknowledgement(const void* cookie,
273                                                     uint32_t opaque,
274                                                     uint16_t vbucket,
275                                                     uint32_t buffer_bytes) {
276    (void) cookie;
277    clear_dcp_data();
278    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_BUFFER_ACKNOWLEDGEMENT;
279    dcp_last_opaque = opaque;
280    dcp_last_vbucket = vbucket;
281    return ENGINE_SUCCESS;
282}
283
284static ENGINE_ERROR_CODE mock_control(const void* cookie,
285                                           uint32_t opaque,
286                                           const void *key,
287                                           uint16_t nkey,
288                                           const void *value,
289                                           uint32_t nvalue) {
290    (void) cookie;
291    clear_dcp_data();
292    dcp_last_op = PROTOCOL_BINARY_CMD_DCP_CONTROL;
293    dcp_last_opaque = opaque;
294    return ENGINE_SUCCESS;
295}
296
297}
298
299void clear_dcp_data() {
300    dcp_last_op = 0;
301    dcp_last_status = 0;
302    dcp_last_nru = 0;
303    dcp_last_vbucket = 0;
304    dcp_last_opaque = 0;
305    dcp_last_flags = 0;
306    dcp_last_stream_opaque = 0;
307    dcp_last_locktime = 0;
308    dcp_last_cas = 0;
309    dcp_last_start_seqno = 0;
310    dcp_last_end_seqno = 0;
311    dcp_last_vbucket_uuid = 0;
312    dcp_last_snap_start_seqno = 0;
313    dcp_last_snap_end_seqno = 0;
314    dcp_last_meta = NULL;
315    dcp_last_nmeta = 0;
316    dcp_last_key.clear();
317    dcp_last_vbucket_state = (vbucket_state_t)0;
318}
319
320struct dcp_message_producers* get_dcp_producers() {
321    dcp_message_producers* producers =
322        (dcp_message_producers*)malloc(sizeof(dcp_message_producers));
323
324    producers->get_failover_log = mock_get_failover_log;
325    producers->stream_req = mock_stream_req;
326    producers->add_stream_rsp = mock_add_stream_rsp;
327    producers->marker_rsp = mock_snapshot_marker_resp;
328    producers->stream_end = mock_stream_end;
329    producers->marker = mock_marker;
330    producers->mutation = mock_mutation;
331    producers->deletion = mock_deletion;
332    producers->expiration = mock_expiration;
333    producers->flush = mock_flush;
334    producers->set_vbucket_state = mock_set_vbucket_state;
335    producers->noop = mock_noop;
336    producers->buffer_acknowledgement = mock_buffer_acknowledgement;
337    producers->control = mock_control;
338
339    return producers;
340}
341