1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <stdlib.h>
21 
22 #include "item.h"
23 #include "mock_dcp.h"
24 
25 uint8_t dcp_last_op;
26 uint8_t dcp_last_status;
27 uint8_t dcp_last_nru;
28 uint16_t dcp_last_vbucket;
29 uint32_t dcp_last_opaque;
30 uint32_t dcp_last_flags;
31 uint32_t dcp_last_stream_opaque;
32 uint32_t dcp_last_locktime;
33 uint32_t dcp_last_packet_size;
34 uint64_t dcp_last_cas;
35 uint64_t dcp_last_start_seqno;
36 uint64_t dcp_last_end_seqno;
37 uint64_t dcp_last_vbucket_uuid;
38 uint64_t dcp_last_snap_start_seqno;
39 uint64_t dcp_last_snap_end_seqno;
40 uint64_t dcp_last_byseqno;
41 uint64_t dcp_last_revseqno;
42 const void *dcp_last_meta;
43 uint16_t dcp_last_nmeta;
44 std::string dcp_last_key;
45 vbucket_state_t dcp_last_vbucket_state;
46 
47 extern "C" {
48 
49 std::vector<std::pair<uint64_t, uint64_t> > dcp_failover_log;
50 
mock_dcp_add_failover_log(vbucket_failover_t* entry, size_t nentries, const void *cookie)51 ENGINE_ERROR_CODE mock_dcp_add_failover_log(vbucket_failover_t* entry,
52                                             size_t nentries,
53                                             const void *cookie) {
54     (void) cookie;
55 
56     while (!dcp_failover_log.empty()) {
57         dcp_failover_log.clear();
58     }
59 
60     if(nentries > 0) {
61         for (size_t i = 0; i < nentries; i--) {
62             std::pair<uint64_t, uint64_t> curr;
63             curr.first = entry[i].uuid;
64             curr.second = entry[i].seqno;
65             dcp_failover_log.push_back(curr);
66         }
67     }
68    return ENGINE_SUCCESS;
69 }
70 
mock_get_failover_log(const void *cookie, uint32_t opaque, uint16_t vbucket)71 static ENGINE_ERROR_CODE mock_get_failover_log(const void *cookie,
72                                                uint32_t opaque,
73                                                uint16_t vbucket) {
74     (void) cookie;
75     (void) opaque;
76     (void) vbucket;
77     clear_dcp_data();
78     return ENGINE_ENOTSUP;
79 }
80 
mock_stream_req(const void *cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)81 static ENGINE_ERROR_CODE mock_stream_req(const void *cookie,
82                                          uint32_t opaque,
83                                          uint16_t vbucket,
84                                          uint32_t flags,
85                                          uint64_t start_seqno,
86                                          uint64_t end_seqno,
87                                          uint64_t vbucket_uuid,
88                                          uint64_t snap_start_seqno,
89                                          uint64_t snap_end_seqno) {
90     (void) cookie;
91     clear_dcp_data();
92     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_STREAM_REQ;
93     dcp_last_opaque = opaque;
94     dcp_last_vbucket = vbucket;
95     dcp_last_flags = flags;
96     dcp_last_start_seqno = start_seqno;
97     dcp_last_end_seqno = end_seqno;
98     dcp_last_vbucket_uuid = vbucket_uuid;
99     dcp_last_packet_size = 64;
100     dcp_last_snap_start_seqno = snap_start_seqno;
101     dcp_last_snap_end_seqno = snap_end_seqno;
102     return ENGINE_SUCCESS;
103 }
104 
mock_add_stream_rsp(const void *cookie, uint32_t opaque, uint32_t stream_opaque, uint8_t status)105 static ENGINE_ERROR_CODE mock_add_stream_rsp(const void *cookie,
106                                              uint32_t opaque,
107                                              uint32_t stream_opaque,
108                                              uint8_t status) {
109     (void) cookie;
110     clear_dcp_data();
111     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_ADD_STREAM;
112     dcp_last_opaque = opaque;
113     dcp_last_stream_opaque = stream_opaque;
114     dcp_last_status = status;
115     dcp_last_packet_size = 28;
116     return ENGINE_SUCCESS;
117 }
118 
mock_snapshot_marker_resp(const void *cookie, uint32_t opaque, uint8_t status)119 static ENGINE_ERROR_CODE mock_snapshot_marker_resp(const void *cookie,
120                                                    uint32_t opaque,
121                                                    uint8_t status) {
122     (void) cookie;
123     clear_dcp_data();
124     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
125     dcp_last_opaque = opaque;
126     dcp_last_status = status;
127     dcp_last_packet_size = 24;
128     return ENGINE_SUCCESS;
129 }
130 
mock_stream_end(const void *cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags)131 static ENGINE_ERROR_CODE mock_stream_end(const void *cookie,
132                                          uint32_t opaque,
133                                          uint16_t vbucket,
134                                          uint32_t flags) {
135     (void) cookie;
136     clear_dcp_data();
137     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_STREAM_END;
138     dcp_last_opaque = opaque;
139     dcp_last_vbucket = vbucket;
140     dcp_last_flags = flags;
141     dcp_last_packet_size = 28;
142     return ENGINE_SUCCESS;
143 }
144 
mock_marker(const void *cookie, uint32_t opaque, uint16_t vbucket, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint32_t flags)145 static ENGINE_ERROR_CODE mock_marker(const void *cookie,
146                                      uint32_t opaque,
147                                      uint16_t vbucket,
148                                      uint64_t snap_start_seqno,
149                                      uint64_t snap_end_seqno,
150                                      uint32_t flags) {
151     (void) cookie;
152     clear_dcp_data();
153     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
154     dcp_last_opaque = opaque;
155     dcp_last_vbucket = vbucket;
156     dcp_last_packet_size = 44;
157     dcp_last_snap_start_seqno = snap_start_seqno;
158     dcp_last_snap_end_seqno = snap_end_seqno;
159     dcp_last_flags = flags;
160     return ENGINE_SUCCESS;
161 }
162 
mock_mutation(const void* cookie, uint32_t opaque, item *itm, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, const void *meta, uint16_t nmeta, uint8_t nru)163 static ENGINE_ERROR_CODE mock_mutation(const void* cookie,
164                                        uint32_t opaque,
165                                        item *itm,
166                                        uint16_t vbucket,
167                                        uint64_t by_seqno,
168                                        uint64_t rev_seqno,
169                                        uint32_t lock_time,
170                                        const void *meta,
171                                        uint16_t nmeta,
172                                        uint8_t nru) {
173     (void) cookie;
174     clear_dcp_data();
175     Item* item = reinterpret_cast<Item*>(itm);
176     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_MUTATION;
177     dcp_last_opaque = opaque;
178     dcp_last_key.assign(item->getKey().c_str());
179     dcp_last_vbucket = vbucket;
180     dcp_last_byseqno = by_seqno;
181     dcp_last_revseqno = rev_seqno;
182     dcp_last_locktime = lock_time;
183     dcp_last_meta = meta;
184     dcp_last_nmeta = nmeta;
185     dcp_last_nru = nru;
186     dcp_last_packet_size = 55 + dcp_last_key.length() + item->getValMemSize();
187     return ENGINE_SUCCESS;
188 }
189 
mock_deletion(const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, const void *meta, uint16_t nmeta)190 static ENGINE_ERROR_CODE mock_deletion(const void* cookie,
191                                        uint32_t opaque,
192                                        const void *key,
193                                        uint16_t nkey,
194                                        uint64_t cas,
195                                        uint16_t vbucket,
196                                        uint64_t by_seqno,
197                                        uint64_t rev_seqno,
198                                        const void *meta,
199                                        uint16_t nmeta) {
200     (void) cookie;
201     clear_dcp_data();
202     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_DELETION;
203     dcp_last_opaque = opaque;
204     dcp_last_key.assign(static_cast<const char*>(key), nkey);
205     dcp_last_cas = cas;
206     dcp_last_vbucket = vbucket;
207     dcp_last_byseqno = by_seqno;
208     dcp_last_revseqno = rev_seqno;
209     dcp_last_meta = meta;
210     dcp_last_nmeta = nmeta;
211     dcp_last_packet_size = 42 + nkey;
212     return ENGINE_SUCCESS;
213 }
214 
mock_expiration(const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, const void *meta, uint16_t nmeta)215 static ENGINE_ERROR_CODE mock_expiration(const void* cookie,
216                                          uint32_t opaque,
217                                          const void *key,
218                                          uint16_t nkey,
219                                          uint64_t cas,
220                                          uint16_t vbucket,
221                                          uint64_t by_seqno,
222                                          uint64_t rev_seqno,
223                                          const void *meta,
224                                          uint16_t nmeta) {
225     (void) cookie;
226     (void) opaque;
227     (void) key;
228     (void) nkey;
229     (void) cas;
230     (void) vbucket;
231     (void) by_seqno;
232     (void) rev_seqno;
233     (void)meta;
234     (void)nmeta;
235     clear_dcp_data();
236     return ENGINE_ENOTSUP;
237 }
238 
mock_flush(const void* cookie, uint32_t opaque, uint16_t vbucket)239 static ENGINE_ERROR_CODE mock_flush(const void* cookie,
240                                     uint32_t opaque,
241                                     uint16_t vbucket) {
242     (void) cookie;
243     (void) opaque;
244     (void) vbucket;
245     clear_dcp_data();
246     return ENGINE_ENOTSUP;
247 }
248 
mock_set_vbucket_state(const void* cookie, uint32_t opaque, uint16_t vbucket, vbucket_state_t state)249 static ENGINE_ERROR_CODE mock_set_vbucket_state(const void* cookie,
250                                                 uint32_t opaque,
251                                                 uint16_t vbucket,
252                                                 vbucket_state_t state) {
253     (void) cookie;
254     clear_dcp_data();
255     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE;
256     dcp_last_opaque = opaque;
257     dcp_last_vbucket = vbucket;
258     dcp_last_vbucket_state = state;
259     dcp_last_packet_size = 25;
260     return ENGINE_SUCCESS;
261 }
262 
mock_noop(const void* cookie, uint32_t opaque)263 static ENGINE_ERROR_CODE mock_noop(const void* cookie,
264                                    uint32_t opaque) {
265     (void) cookie;
266     clear_dcp_data();
267     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_NOOP;
268     dcp_last_opaque = opaque;
269     return ENGINE_SUCCESS;
270 }
271 
mock_buffer_acknowledgement(const void* cookie, uint32_t opaque, uint16_t vbucket, uint32_t buffer_bytes)272 static ENGINE_ERROR_CODE mock_buffer_acknowledgement(const void* cookie,
273                                                      uint32_t opaque,
274                                                      uint16_t vbucket,
275                                                      uint32_t buffer_bytes) {
276     (void) cookie;
277     clear_dcp_data();
278     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_BUFFER_ACKNOWLEDGEMENT;
279     dcp_last_opaque = opaque;
280     dcp_last_vbucket = vbucket;
281     return ENGINE_SUCCESS;
282 }
283 
mock_control(const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, const void *value, uint32_t nvalue)284 static ENGINE_ERROR_CODE mock_control(const void* cookie,
285                                            uint32_t opaque,
286                                            const void *key,
287                                            uint16_t nkey,
288                                            const void *value,
289                                            uint32_t nvalue) {
290     (void) cookie;
291     clear_dcp_data();
292     dcp_last_op = PROTOCOL_BINARY_CMD_DCP_CONTROL;
293     dcp_last_opaque = opaque;
294     dcp_last_key.assign(static_cast<const char*>(key), nkey);
295     return ENGINE_SUCCESS;
296 }
297 
298 }
299 
clear_dcp_data()300 void clear_dcp_data() {
301     dcp_last_op = 0;
302     dcp_last_status = 0;
303     dcp_last_nru = 0;
304     dcp_last_vbucket = 0;
305     dcp_last_opaque = 0;
306     dcp_last_flags = 0;
307     dcp_last_stream_opaque = 0;
308     dcp_last_locktime = 0;
309     dcp_last_cas = 0;
310     dcp_last_start_seqno = 0;
311     dcp_last_end_seqno = 0;
312     dcp_last_vbucket_uuid = 0;
313     dcp_last_snap_start_seqno = 0;
314     dcp_last_snap_end_seqno = 0;
315     dcp_last_meta = NULL;
316     dcp_last_nmeta = 0;
317     dcp_last_key.clear();
318     dcp_last_vbucket_state = (vbucket_state_t)0;
319 }
320 
get_dcp_producers()321 struct dcp_message_producers* get_dcp_producers() {
322     dcp_message_producers* producers =
323         (dcp_message_producers*)malloc(sizeof(dcp_message_producers));
324 
325     producers->get_failover_log = mock_get_failover_log;
326     producers->stream_req = mock_stream_req;
327     producers->add_stream_rsp = mock_add_stream_rsp;
328     producers->marker_rsp = mock_snapshot_marker_resp;
329     producers->stream_end = mock_stream_end;
330     producers->marker = mock_marker;
331     producers->mutation = mock_mutation;
332     producers->deletion = mock_deletion;
333     producers->expiration = mock_expiration;
334     producers->flush = mock_flush;
335     producers->set_vbucket_state = mock_set_vbucket_state;
336     producers->noop = mock_noop;
337     producers->buffer_acknowledgement = mock_buffer_acknowledgement;
338     producers->control = mock_control;
339 
340     return producers;
341 }
342