1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2013 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include <stdlib.h>
21
22 #include "item.h"
23 #include "mock_dcp.h"
24
25 uint8_t dcp_last_op;
26 uint8_t dcp_last_status;
27 uint8_t dcp_last_nru;
28 uint16_t dcp_last_vbucket;
29 uint32_t dcp_last_opaque;
30 uint32_t dcp_last_flags;
31 uint32_t dcp_last_stream_opaque;
32 uint32_t dcp_last_locktime;
33 uint32_t dcp_last_packet_size;
34 uint64_t dcp_last_cas;
35 uint64_t dcp_last_start_seqno;
36 uint64_t dcp_last_end_seqno;
37 uint64_t dcp_last_vbucket_uuid;
38 uint64_t dcp_last_snap_start_seqno;
39 uint64_t dcp_last_snap_end_seqno;
40 uint64_t dcp_last_byseqno;
41 uint64_t dcp_last_revseqno;
42 const void *dcp_last_meta;
43 uint16_t dcp_last_nmeta;
44 std::string dcp_last_key;
45 vbucket_state_t dcp_last_vbucket_state;
46
47 extern "C" {
48
49 std::vector<std::pair<uint64_t, uint64_t> > dcp_failover_log;
50
mock_dcp_add_failover_log(vbucket_failover_t* entry, size_t nentries, const void *cookie)51 ENGINE_ERROR_CODE mock_dcp_add_failover_log(vbucket_failover_t* entry,
52 size_t nentries,
53 const void *cookie) {
54 (void) cookie;
55
56 while (!dcp_failover_log.empty()) {
57 dcp_failover_log.clear();
58 }
59
60 if(nentries > 0) {
61 for (size_t i = 0; i < nentries; i--) {
62 std::pair<uint64_t, uint64_t> curr;
63 curr.first = entry[i].uuid;
64 curr.second = entry[i].seqno;
65 dcp_failover_log.push_back(curr);
66 }
67 }
68 return ENGINE_SUCCESS;
69 }
70
mock_get_failover_log(const void *cookie, uint32_t opaque, uint16_t vbucket)71 static ENGINE_ERROR_CODE mock_get_failover_log(const void *cookie,
72 uint32_t opaque,
73 uint16_t vbucket) {
74 (void) cookie;
75 (void) opaque;
76 (void) vbucket;
77 clear_dcp_data();
78 return ENGINE_ENOTSUP;
79 }
80
mock_stream_req(const void *cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)81 static ENGINE_ERROR_CODE mock_stream_req(const void *cookie,
82 uint32_t opaque,
83 uint16_t vbucket,
84 uint32_t flags,
85 uint64_t start_seqno,
86 uint64_t end_seqno,
87 uint64_t vbucket_uuid,
88 uint64_t snap_start_seqno,
89 uint64_t snap_end_seqno) {
90 (void) cookie;
91 clear_dcp_data();
92 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_STREAM_REQ;
93 dcp_last_opaque = opaque;
94 dcp_last_vbucket = vbucket;
95 dcp_last_flags = flags;
96 dcp_last_start_seqno = start_seqno;
97 dcp_last_end_seqno = end_seqno;
98 dcp_last_vbucket_uuid = vbucket_uuid;
99 dcp_last_packet_size = 64;
100 dcp_last_snap_start_seqno = snap_start_seqno;
101 dcp_last_snap_end_seqno = snap_end_seqno;
102 return ENGINE_SUCCESS;
103 }
104
mock_add_stream_rsp(const void *cookie, uint32_t opaque, uint32_t stream_opaque, uint8_t status)105 static ENGINE_ERROR_CODE mock_add_stream_rsp(const void *cookie,
106 uint32_t opaque,
107 uint32_t stream_opaque,
108 uint8_t status) {
109 (void) cookie;
110 clear_dcp_data();
111 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_ADD_STREAM;
112 dcp_last_opaque = opaque;
113 dcp_last_stream_opaque = stream_opaque;
114 dcp_last_status = status;
115 dcp_last_packet_size = 28;
116 return ENGINE_SUCCESS;
117 }
118
mock_snapshot_marker_resp(const void *cookie, uint32_t opaque, uint8_t status)119 static ENGINE_ERROR_CODE mock_snapshot_marker_resp(const void *cookie,
120 uint32_t opaque,
121 uint8_t status) {
122 (void) cookie;
123 clear_dcp_data();
124 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
125 dcp_last_opaque = opaque;
126 dcp_last_status = status;
127 dcp_last_packet_size = 24;
128 return ENGINE_SUCCESS;
129 }
130
mock_stream_end(const void *cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags)131 static ENGINE_ERROR_CODE mock_stream_end(const void *cookie,
132 uint32_t opaque,
133 uint16_t vbucket,
134 uint32_t flags) {
135 (void) cookie;
136 clear_dcp_data();
137 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_STREAM_END;
138 dcp_last_opaque = opaque;
139 dcp_last_vbucket = vbucket;
140 dcp_last_flags = flags;
141 dcp_last_packet_size = 28;
142 return ENGINE_SUCCESS;
143 }
144
mock_marker(const void *cookie, uint32_t opaque, uint16_t vbucket, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint32_t flags)145 static ENGINE_ERROR_CODE mock_marker(const void *cookie,
146 uint32_t opaque,
147 uint16_t vbucket,
148 uint64_t snap_start_seqno,
149 uint64_t snap_end_seqno,
150 uint32_t flags) {
151 (void) cookie;
152 clear_dcp_data();
153 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
154 dcp_last_opaque = opaque;
155 dcp_last_vbucket = vbucket;
156 dcp_last_packet_size = 44;
157 dcp_last_snap_start_seqno = snap_start_seqno;
158 dcp_last_snap_end_seqno = snap_end_seqno;
159 dcp_last_flags = flags;
160 return ENGINE_SUCCESS;
161 }
162
mock_mutation(const void* cookie, uint32_t opaque, item *itm, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, const void *meta, uint16_t nmeta, uint8_t nru)163 static ENGINE_ERROR_CODE mock_mutation(const void* cookie,
164 uint32_t opaque,
165 item *itm,
166 uint16_t vbucket,
167 uint64_t by_seqno,
168 uint64_t rev_seqno,
169 uint32_t lock_time,
170 const void *meta,
171 uint16_t nmeta,
172 uint8_t nru) {
173 (void) cookie;
174 clear_dcp_data();
175 Item* item = reinterpret_cast<Item*>(itm);
176 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_MUTATION;
177 dcp_last_opaque = opaque;
178 dcp_last_key.assign(item->getKey().c_str());
179 dcp_last_vbucket = vbucket;
180 dcp_last_byseqno = by_seqno;
181 dcp_last_revseqno = rev_seqno;
182 dcp_last_locktime = lock_time;
183 dcp_last_meta = meta;
184 dcp_last_nmeta = nmeta;
185 dcp_last_nru = nru;
186 dcp_last_packet_size = 55 + dcp_last_key.length() + item->getValMemSize();
187 return ENGINE_SUCCESS;
188 }
189
mock_deletion(const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, const void *meta, uint16_t nmeta)190 static ENGINE_ERROR_CODE mock_deletion(const void* cookie,
191 uint32_t opaque,
192 const void *key,
193 uint16_t nkey,
194 uint64_t cas,
195 uint16_t vbucket,
196 uint64_t by_seqno,
197 uint64_t rev_seqno,
198 const void *meta,
199 uint16_t nmeta) {
200 (void) cookie;
201 clear_dcp_data();
202 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_DELETION;
203 dcp_last_opaque = opaque;
204 dcp_last_key.assign(static_cast<const char*>(key), nkey);
205 dcp_last_cas = cas;
206 dcp_last_vbucket = vbucket;
207 dcp_last_byseqno = by_seqno;
208 dcp_last_revseqno = rev_seqno;
209 dcp_last_meta = meta;
210 dcp_last_nmeta = nmeta;
211 dcp_last_packet_size = 42 + nkey;
212 return ENGINE_SUCCESS;
213 }
214
mock_expiration(const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, const void *meta, uint16_t nmeta)215 static ENGINE_ERROR_CODE mock_expiration(const void* cookie,
216 uint32_t opaque,
217 const void *key,
218 uint16_t nkey,
219 uint64_t cas,
220 uint16_t vbucket,
221 uint64_t by_seqno,
222 uint64_t rev_seqno,
223 const void *meta,
224 uint16_t nmeta) {
225 (void) cookie;
226 (void) opaque;
227 (void) key;
228 (void) nkey;
229 (void) cas;
230 (void) vbucket;
231 (void) by_seqno;
232 (void) rev_seqno;
233 (void)meta;
234 (void)nmeta;
235 clear_dcp_data();
236 return ENGINE_ENOTSUP;
237 }
238
mock_flush(const void* cookie, uint32_t opaque, uint16_t vbucket)239 static ENGINE_ERROR_CODE mock_flush(const void* cookie,
240 uint32_t opaque,
241 uint16_t vbucket) {
242 (void) cookie;
243 (void) opaque;
244 (void) vbucket;
245 clear_dcp_data();
246 return ENGINE_ENOTSUP;
247 }
248
mock_set_vbucket_state(const void* cookie, uint32_t opaque, uint16_t vbucket, vbucket_state_t state)249 static ENGINE_ERROR_CODE mock_set_vbucket_state(const void* cookie,
250 uint32_t opaque,
251 uint16_t vbucket,
252 vbucket_state_t state) {
253 (void) cookie;
254 clear_dcp_data();
255 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE;
256 dcp_last_opaque = opaque;
257 dcp_last_vbucket = vbucket;
258 dcp_last_vbucket_state = state;
259 dcp_last_packet_size = 25;
260 return ENGINE_SUCCESS;
261 }
262
mock_noop(const void* cookie, uint32_t opaque)263 static ENGINE_ERROR_CODE mock_noop(const void* cookie,
264 uint32_t opaque) {
265 (void) cookie;
266 clear_dcp_data();
267 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_NOOP;
268 dcp_last_opaque = opaque;
269 return ENGINE_SUCCESS;
270 }
271
mock_buffer_acknowledgement(const void* cookie, uint32_t opaque, uint16_t vbucket, uint32_t buffer_bytes)272 static ENGINE_ERROR_CODE mock_buffer_acknowledgement(const void* cookie,
273 uint32_t opaque,
274 uint16_t vbucket,
275 uint32_t buffer_bytes) {
276 (void) cookie;
277 clear_dcp_data();
278 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_BUFFER_ACKNOWLEDGEMENT;
279 dcp_last_opaque = opaque;
280 dcp_last_vbucket = vbucket;
281 return ENGINE_SUCCESS;
282 }
283
mock_control(const void* cookie, uint32_t opaque, const void *key, uint16_t nkey, const void *value, uint32_t nvalue)284 static ENGINE_ERROR_CODE mock_control(const void* cookie,
285 uint32_t opaque,
286 const void *key,
287 uint16_t nkey,
288 const void *value,
289 uint32_t nvalue) {
290 (void) cookie;
291 clear_dcp_data();
292 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_CONTROL;
293 dcp_last_opaque = opaque;
294 dcp_last_key.assign(static_cast<const char*>(key), nkey);
295 return ENGINE_SUCCESS;
296 }
297
298 }
299
clear_dcp_data()300 void clear_dcp_data() {
301 dcp_last_op = 0;
302 dcp_last_status = 0;
303 dcp_last_nru = 0;
304 dcp_last_vbucket = 0;
305 dcp_last_opaque = 0;
306 dcp_last_flags = 0;
307 dcp_last_stream_opaque = 0;
308 dcp_last_locktime = 0;
309 dcp_last_cas = 0;
310 dcp_last_start_seqno = 0;
311 dcp_last_end_seqno = 0;
312 dcp_last_vbucket_uuid = 0;
313 dcp_last_snap_start_seqno = 0;
314 dcp_last_snap_end_seqno = 0;
315 dcp_last_meta = NULL;
316 dcp_last_nmeta = 0;
317 dcp_last_key.clear();
318 dcp_last_vbucket_state = (vbucket_state_t)0;
319 }
320
get_dcp_producers()321 struct dcp_message_producers* get_dcp_producers() {
322 dcp_message_producers* producers =
323 (dcp_message_producers*)malloc(sizeof(dcp_message_producers));
324
325 producers->get_failover_log = mock_get_failover_log;
326 producers->stream_req = mock_stream_req;
327 producers->add_stream_rsp = mock_add_stream_rsp;
328 producers->marker_rsp = mock_snapshot_marker_resp;
329 producers->stream_end = mock_stream_end;
330 producers->marker = mock_marker;
331 producers->mutation = mock_mutation;
332 producers->deletion = mock_deletion;
333 producers->expiration = mock_expiration;
334 producers->flush = mock_flush;
335 producers->set_vbucket_state = mock_set_vbucket_state;
336 producers->noop = mock_noop;
337 producers->buffer_acknowledgement = mock_buffer_acknowledgement;
338 producers->control = mock_control;
339
340 return producers;
341 }
342