1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2012 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 #include "ep_test_apis.h"
20 
21 #include <memcached/util.h>
22 #include <platform/platform.h>
23 #include <stdlib.h>
24 #include <string.h>
25 
26 #include <algorithm>
27 #include <iostream>
28 #include <sstream>
29 
30 #include "mock/mock_dcp.h"
31 
32 #define check(expr, msg) \
33     static_cast<void>((expr) ? 0 : abort_msg(#expr, msg, __LINE__))
34 
35 extern "C" bool abort_msg(const char *expr, const char *msg, int line);
36 
37 std::map<std::string, std::string> vals;
38 bool dump_stats = false;
39 protocol_binary_response_status last_status =
40     static_cast<protocol_binary_response_status>(0);
41 uint32_t last_bodylen = 0;
42 char *last_key = NULL;
43 char *last_body = NULL;
44 bool last_deleted_flag = false;
45 uint64_t last_cas = 0;
46 uint8_t last_datatype = 0x00;
47 ItemMetaData last_meta;
48 
49 extern "C" bool add_response_get_meta(const void *key, uint16_t keylen,
50                                       const void *ext, uint8_t extlen,
51                                       const void *body, uint32_t bodylen,
52                                       uint8_t datatype, uint16_t status,
53                                       uint64_t cas, const void *cookie);
54 void encodeExt(char *buffer, uint32_t val);
55 void encodeWithMetaExt(char *buffer, ItemMetaData *meta);
56 
decayingSleep(useconds_t *sleepTime)57 void decayingSleep(useconds_t *sleepTime) {
58     static const useconds_t maxSleepTime = 500000;
59     usleep(*sleepTime);
60     *sleepTime = std::min(*sleepTime << 1, maxSleepTime);
61 }
62 
vb_map_response(const void *cookie, const void *map, size_t mapsize)63 ENGINE_ERROR_CODE vb_map_response(const void *cookie,
64                                   const void *map,
65                                   size_t mapsize) {
66     (void)cookie;
67     last_bodylen = mapsize;
68     if (last_body) {
69         free(last_body);
70         last_body = NULL;
71     }
72     if (mapsize > 0) {
73         last_body = static_cast<char*>(malloc(mapsize));
74         cb_assert(last_body);
75         memcpy(last_body, map, mapsize);
76     }
77     return ENGINE_SUCCESS;
78 }
79 
add_response(const void *key, uint16_t keylen, const void *ext, uint8_t extlen, const void *body, uint32_t bodylen, uint8_t datatype, uint16_t status, uint64_t cas, const void *cookie)80 bool add_response(const void *key, uint16_t keylen, const void *ext,
81                   uint8_t extlen, const void *body, uint32_t bodylen,
82                   uint8_t datatype, uint16_t status, uint64_t cas,
83                   const void *cookie) {
84     (void)ext;
85     (void)extlen;
86     (void)cookie;
87     last_bodylen = bodylen;
88     last_status = static_cast<protocol_binary_response_status>(status);
89     if (last_body) {
90         free(last_body);
91         last_body = NULL;
92     }
93     if (bodylen > 0) {
94         last_body = static_cast<char*>(malloc(bodylen + 1));
95         cb_assert(last_body);
96         memcpy(last_body, body, bodylen);
97         last_body[bodylen] = '\0';
98     }
99     if (last_key) {
100         free(last_key);
101         last_key = NULL;
102     }
103     if (keylen > 0) {
104         last_key = static_cast<char*>(malloc(keylen + 1));
105         cb_assert(last_key);
106         memcpy(last_key, key, keylen);
107         last_key[keylen] = '\0';
108     }
109     last_cas = cas;
110     last_datatype = datatype;
111     return true;
112 }
113 
add_response_get_meta(const void *key, uint16_t keylen, const void *ext, uint8_t extlen, const void *body, uint32_t bodylen, uint8_t datatype, uint16_t status, uint64_t cas, const void *cookie)114 bool add_response_get_meta(const void *key, uint16_t keylen, const void *ext,
115                            uint8_t extlen, const void *body, uint32_t bodylen,
116                            uint8_t datatype, uint16_t status, uint64_t cas,
117                            const void *cookie) {
118     (void)cookie;
119     const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
120     if (ext && extlen > 0) {
121         uint32_t flags;
122         memcpy(&flags, ext_bytes, 4);
123         last_deleted_flag = ntohl(flags) & GET_META_ITEM_DELETED_FLAG;
124         memcpy(&last_meta.flags, ext_bytes + 4, 4);
125         memcpy(&last_meta.exptime, ext_bytes + 8, 4);
126         last_meta.exptime = ntohl(last_meta.exptime);
127         memcpy(&last_meta.revSeqno, ext_bytes + 12, 8);
128         last_meta.revSeqno = ntohll(last_meta.revSeqno);
129         last_meta.cas = cas;
130     }
131     return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
132                         status, cas, cookie);
133 }
134 
add_response_ret_meta(const void *key, uint16_t keylen, const void *ext, uint8_t extlen, const void *body, uint32_t bodylen, uint8_t datatype, uint16_t status, uint64_t cas, const void *cookie)135 bool add_response_ret_meta(const void *key, uint16_t keylen, const void *ext,
136                            uint8_t extlen, const void *body, uint32_t bodylen,
137                            uint8_t datatype, uint16_t status, uint64_t cas,
138                            const void *cookie) {
139     (void)cookie;
140     const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
141     if (ext && extlen == 16) {
142         memcpy(&last_meta.flags, ext_bytes, 4);
143         memcpy(&last_meta.exptime, ext_bytes + 4, 4);
144         last_meta.exptime = ntohl(last_meta.exptime);
145         memcpy(&last_meta.revSeqno, ext_bytes + 8, 8);
146         last_meta.revSeqno = ntohll(last_meta.revSeqno);
147         last_meta.cas = cas;
148     }
149     return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
150                         status, cas, cookie);
151 }
152 
add_stats(const char *key, const uint16_t klen, const char *val, const uint32_t vlen, const void *cookie)153 void add_stats(const char *key, const uint16_t klen, const char *val,
154                const uint32_t vlen, const void *cookie) {
155     (void)cookie;
156     std::string k(key, klen);
157     std::string v(val, vlen);
158 
159     if (dump_stats) {
160         std::cout << "stat[" << k << "] = " << v << std::endl;
161     }
162 
163     vals[k] = v;
164 }
165 
encodeExt(char *buffer, uint32_t val)166 void encodeExt(char *buffer, uint32_t val) {
167     val = htonl(val);
168     memcpy(buffer, (char*)&val, sizeof(val));
169 }
170 
encodeWithMetaExt(char *buffer, ItemMetaData *meta)171 void encodeWithMetaExt(char *buffer, ItemMetaData *meta) {
172     uint32_t flags = meta->flags;
173     uint32_t exp = htonl(meta->exptime);
174     uint64_t seqno = htonll(meta->revSeqno);
175     uint64_t cas = htonll(meta->cas);
176 
177     memcpy(buffer, (char*)&flags, sizeof(flags));
178     memcpy(buffer + 4, (char*)&exp, sizeof(exp));
179     memcpy(buffer + 8, (char*)&seqno, sizeof(seqno));
180     memcpy(buffer + 16, (char*)&cas, sizeof(cas));
181 }
182 
createPacket(uint8_t opcode, uint16_t vbid, uint64_t cas, const char *ext, uint8_t extlen, const char *key, uint32_t keylen, const char *val, uint32_t vallen, uint8_t datatype)183 protocol_binary_request_header* createPacket(uint8_t opcode,
184                                              uint16_t vbid,
185                                              uint64_t cas,
186                                              const char *ext,
187                                              uint8_t extlen,
188                                              const char *key,
189                                              uint32_t keylen,
190                                              const char *val,
191                                              uint32_t vallen,
192                                              uint8_t datatype) {
193     char *pkt_raw;
194     uint32_t headerlen = sizeof(protocol_binary_request_header);
195     pkt_raw = static_cast<char*>(calloc(1, headerlen + extlen + keylen + vallen));
196     cb_assert(pkt_raw);
197     protocol_binary_request_header *req =
198         (protocol_binary_request_header*)pkt_raw;
199     req->request.opcode = opcode;
200     req->request.keylen = htons(keylen);
201     req->request.extlen = extlen;
202     req->request.vbucket = htons(vbid);
203     req->request.bodylen = htonl(keylen + vallen + extlen);
204     req->request.cas = htonll(cas);
205     req->request.datatype = datatype;
206 
207     if (extlen > 0) {
208         memcpy(pkt_raw + headerlen, ext, extlen);
209     }
210 
211     if (keylen > 0) {
212         memcpy(pkt_raw + headerlen + extlen, key, keylen);
213     }
214 
215     if (vallen > 0) {
216         memcpy(pkt_raw + headerlen + extlen + keylen, val, vallen);
217     }
218 
219     return req;
220 }
221 
add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, const size_t keylen, const char *val, const size_t vallen, const uint32_t vb, ItemMetaData *itemMeta, bool skipConflictResolution)222 void add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
223                    const size_t keylen, const char *val, const size_t vallen,
224                    const uint32_t vb, ItemMetaData *itemMeta,
225                    bool skipConflictResolution) {
226     int blen = skipConflictResolution ? 28 : 24;
227     char *ext = new char[blen];
228     encodeWithMetaExt(ext, itemMeta);
229 
230     if (skipConflictResolution) {
231         uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
232         flag = htonl(flag);
233         memcpy(ext + 24, (char*)&flag, sizeof(flag));
234     }
235 
236     protocol_binary_request_header *pkt;
237     pkt = createPacket(PROTOCOL_BINARY_CMD_ADD_WITH_META, vb, 0, ext, blen, key, keylen,
238                        val, vallen);
239     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
240           "Expected to be able to store with meta");
241     delete[] ext;
242 }
243 
changeVBFilter(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, std::string name, std::map<uint16_t, uint64_t> &filtermap)244 void changeVBFilter(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, std::string name,
245                     std::map<uint16_t, uint64_t> &filtermap) {
246     std::stringstream value;
247     uint16_t vbs = htons(filtermap.size());
248     std::map<uint16_t, uint64_t>::iterator it;
249 
250     value.write((char*) &vbs, sizeof(uint16_t));
251     for (it = filtermap.begin(); it != filtermap.end(); ++it) {
252         uint16_t vb = htons(it->first);
253         uint64_t chkid = htonll(it->second);
254         value.write((char*) &vb, sizeof(uint16_t));
255         value.write((char*) &chkid, sizeof(uint64_t));
256     }
257 
258     protocol_binary_request_header *request;
259     request = createPacket(PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER, 0, 0, NULL, 0, name.c_str(),
260                        name.length(), value.str().data(), value.str().length());
261     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
262           "Failed to change the TAP VB filter.");
263     free(request);
264 }
265 
createCheckpoint(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)266 void createCheckpoint(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
267     protocol_binary_request_header *request = createPacket(PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT);
268     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
269           "Failed to create a new checkpoint.");
270     free(request);
271 }
272 
del(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, uint64_t cas, uint16_t vbucket, const void* cookie)273 ENGINE_ERROR_CODE del(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
274                       uint64_t cas, uint16_t vbucket, const void* cookie) {
275     return h1->remove(h, cookie, key, strlen(key), &cas, vbucket);
276 }
277 
del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, const size_t keylen, const uint32_t vb, ItemMetaData *itemMeta, uint64_t cas_for_delete, bool skipConflictResolution)278 void del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
279                    const size_t keylen, const uint32_t vb,
280                    ItemMetaData *itemMeta, uint64_t cas_for_delete,
281                    bool skipConflictResolution) {
282     int blen = skipConflictResolution ? 28 : 24;
283     char *ext = new char[blen];
284     encodeWithMetaExt(ext, itemMeta);
285 
286     if (skipConflictResolution) {
287         uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
288         flag = htonl(flag);
289         memcpy(ext + 24, (char*)&flag, sizeof(flag));
290     }
291     protocol_binary_request_header *pkt;
292     pkt = createPacket(PROTOCOL_BINARY_CMD_DEL_WITH_META, vb, cas_for_delete, ext, blen, key,
293                        keylen);
294     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
295           "Expected to be able to delete with meta");
296     delete[] ext;
297 }
298 
evict_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, uint16_t vbucketId, const char *msg, bool expectError)299 void evict_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
300                uint16_t vbucketId, const char *msg, bool expectError) {
301     int nonResidentItems = get_int_stat(h, h1, "ep_num_non_resident");
302     int numEjectedItems = get_int_stat(h, h1, "ep_num_value_ejects");
303     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_EVICT_KEY, 0, 0,
304                                                        NULL, 0, key, strlen(key));
305     pkt->request.vbucket = htons(vbucketId);
306 
307     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
308           "Failed to evict key.");
309 
310     if (expectError) {
311         check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
312               "Expected exists when evicting key.");
313     } else {
314         if (strcmp(last_body, "Already ejected.") != 0) {
315             nonResidentItems++;
316             numEjectedItems++;
317         }
318         check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
319               "Expected success evicting key.");
320     }
321 
322     check(get_int_stat(h, h1, "ep_num_non_resident") == nonResidentItems,
323           "Incorrect number of non-resident items");
324     check(get_int_stat(h, h1, "ep_num_value_ejects") == numEjectedItems,
325           "Incorrect number of ejected items");
326 
327     if (msg != NULL && strcmp(last_body, msg) != 0) {
328         fprintf(stderr, "Expected evict to return ``%s'', but it returned ``%s''\n",
329                 msg, last_body);
330         abort();
331     }
332 }
333 
estimateVBucketMove(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vbid, const char* tap_name)334 size_t estimateVBucketMove(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
335                          uint16_t vbid, const char* tap_name) {
336     std::stringstream ss;
337     ss << "tap-vbtakeover " << vbid;
338     if (tap_name) {
339       ss << " " << tap_name;
340     }
341     return get_int_stat(h, h1, "estimate", ss.str().c_str());
342 }
343 
checkpointPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint64_t checkpoint_id, uint16_t vb)344 ENGINE_ERROR_CODE checkpointPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
345                                         uint64_t checkpoint_id, uint16_t vb) {
346     checkpoint_id = htonll(checkpoint_id);
347     protocol_binary_request_header *request;
348     request = createPacket(PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE, vb, 0, NULL, 0, NULL, 0,
349                            (const char *)&checkpoint_id, sizeof(uint64_t));
350     ENGINE_ERROR_CODE rv = h1->unknown_command(h, NULL, request, add_response);
351     free(request);
352     return rv;
353 }
354 
seqnoPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vbucket, uint64_t seqno)355 ENGINE_ERROR_CODE seqnoPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
356                                    uint16_t vbucket, uint64_t seqno) {
357     seqno = htonll(seqno);
358     char buffer[8];
359     memcpy(buffer, &seqno, sizeof(uint64_t));
360     protocol_binary_request_header* request =
361         createPacket(PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE, vbucket, 0, buffer, 8);
362 
363     ENGINE_ERROR_CODE rv = h1->unknown_command(h, NULL, request, add_response);
364     free(request);
365     return rv;
366 }
367 
gat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key, uint16_t vb, uint32_t exp, bool quiet)368 void gat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
369          uint16_t vb, uint32_t exp, bool quiet) {
370     char ext[4];
371     uint8_t opcode = quiet ? PROTOCOL_BINARY_CMD_GATQ : PROTOCOL_BINARY_CMD_GAT;
372     uint32_t keylen = key ? strlen(key) : 0;
373     protocol_binary_request_header *request;
374     encodeExt(ext, exp);
375     request = createPacket(opcode, vb, 0, ext, 4, key, keylen);
376 
377     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
378           "Failed to call gat");
379     free(request);
380 }
381 
get_item_info(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, item_info *info, const char* key, uint16_t vb)382 bool get_item_info(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, item_info *info,
383                    const char* key, uint16_t vb) {
384     item *i = NULL;
385     if (h1->get(h, NULL, &i, key, strlen(key), vb) != ENGINE_SUCCESS) {
386         return false;
387     }
388     info->nvalue = 1;
389     if (!h1->get_item_info(h, NULL, i, info)) {
390         h1->release(h, NULL, i);
391         fprintf(stderr, "get_item_info failed\n");
392         return false;
393     }
394 
395     h1->release(h, NULL, i);
396     return true;
397 }
398 
get_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, item *i, std::string &key)399 bool get_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, item *i,
400              std::string &key) {
401 
402     item_info info;
403     info.nvalue = 1;
404     if (!h1->get_item_info(h, NULL, i, &info)) {
405         fprintf(stderr, "get_item_info failed\n");
406         return false;
407     }
408 
409     key.assign((const char*)info.key, info.nkey);
410     return true;
411 }
412 
getl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key, uint16_t vb, uint32_t lock_timeout)413 void getl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
414           uint16_t vb, uint32_t lock_timeout) {
415     char ext[4];
416     uint32_t keylen = key ? strlen(key) : 0;
417     protocol_binary_request_header *request;
418     encodeExt(ext, lock_timeout);
419     request = createPacket(PROTOCOL_BINARY_CMD_GET_LOCKED, vb, 0, ext, 4, key, keylen);
420 
421     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
422           "Failed to call getl");
423     free(request);
424 }
425 
get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key)426 bool get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key) {
427     protocol_binary_request_header *req = createPacket(PROTOCOL_BINARY_CMD_GET_META, 0, 0, NULL,
428                                                        0, key, strlen(key));
429 
430     ENGINE_ERROR_CODE ret = h1->unknown_command(h, NULL, req,
431                                                 add_response_get_meta);
432     check(ret == ENGINE_SUCCESS, "Expected get_meta call to be successful");
433     if (last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
434         return true;
435     }
436     return false;
437 }
438 
observe(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, std::map<std::string, uint16_t> obskeys)439 void observe(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
440              std::map<std::string, uint16_t> obskeys) {
441     std::stringstream value;
442     std::map<std::string, uint16_t>::iterator it;
443     for (it = obskeys.begin(); it != obskeys.end(); ++it) {
444         uint16_t vb = htons(it->second);
445         uint16_t keylen = htons(it->first.length());
446         value.write((char*) &vb, sizeof(uint16_t));
447         value.write((char*) &keylen, sizeof(uint16_t));
448         value.write(it->first.c_str(), it->first.length());
449     }
450 
451     protocol_binary_request_header *request;
452     request = createPacket(PROTOCOL_BINARY_CMD_OBSERVE, 0, 0, NULL, 0, NULL, 0,
453                            value.str().data(), value.str().length());
454     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
455           "Observe call failed");
456     free(request);
457 }
458 
get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key, uint16_t vbid)459 void get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
460                  uint16_t vbid) {
461     protocol_binary_request_header *pkt;
462     pkt = createPacket(PROTOCOL_BINARY_CMD_GET_REPLICA, vbid, 0, NULL, 0, key, strlen(key));
463     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
464                               "Get Replica Failed");
465     free(pkt);
466 }
467 
prepare_get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, vbucket_state_t state, bool makeinvalidkey)468 protocol_binary_request_header* prepare_get_replica(ENGINE_HANDLE *h,
469                                                     ENGINE_HANDLE_V1 *h1,
470                                                     vbucket_state_t state,
471                                                     bool makeinvalidkey) {
472     uint16_t id = 0;
473     const char *key = "k0";
474     protocol_binary_request_header *pkt;
475     pkt = createPacket(PROTOCOL_BINARY_CMD_GET_REPLICA, id, 0, NULL, 0, key, strlen(key));
476 
477     if (!makeinvalidkey) {
478         item *i = NULL;
479         check(store(h, h1, NULL, OPERATION_SET, key, "replicadata", &i, 0, id)
480               == ENGINE_SUCCESS, "Get Replica Failed");
481         h1->release(h, NULL, i);
482 
483         check(set_vbucket_state(h, h1, id, state),
484               "Failed to set vbucket active state, Get Replica Failed");
485     }
486 
487     return pkt;
488 }
489 
set_param(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, protocol_binary_engine_param_t paramtype, const char *param, const char *val)490 bool set_param(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, protocol_binary_engine_param_t paramtype,
491                const char *param, const char *val) {
492     char ext[4];
493     protocol_binary_request_header *pkt;
494     encodeExt(ext, static_cast<uint32_t>(paramtype));
495     pkt = createPacket(PROTOCOL_BINARY_CMD_SET_PARAM, 0, 0, ext, sizeof(protocol_binary_engine_param_t), param,
496                        strlen(param), val, strlen(val));
497 
498     if (h1->unknown_command(h, NULL, pkt, add_response) != ENGINE_SUCCESS) {
499         return false;
500     }
501 
502     return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
503 }
504 
set_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb, vbucket_state_t state)505 bool set_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
506                        uint16_t vb, vbucket_state_t state) {
507 
508     char ext[4];
509     protocol_binary_request_header *pkt;
510     encodeExt(ext, static_cast<uint32_t>(state));
511     pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, vb, 0, ext, 4);
512 
513     if (h1->unknown_command(h, NULL, pkt, add_response) != ENGINE_SUCCESS) {
514         return false;
515     }
516 
517     free(pkt);
518     return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
519 }
520 
set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, const size_t keylen, const char *val, const size_t vallen, const uint32_t vb, ItemMetaData *itemMeta, uint64_t cas_for_set, bool skipConflictResolution, uint8_t datatype)521 void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
522                    const size_t keylen, const char *val, const size_t vallen,
523                    const uint32_t vb, ItemMetaData *itemMeta,
524                    uint64_t cas_for_set, bool skipConflictResolution,
525                    uint8_t datatype) {
526     int blen = skipConflictResolution ? 28 : 24;
527     char *ext = new char[blen];
528     encodeWithMetaExt(ext, itemMeta);
529 
530     if (skipConflictResolution) {
531         uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
532         flag = htonl(flag);
533         memcpy(ext + 24, (char*)&flag, sizeof(flag));
534     }
535 
536     protocol_binary_request_header *pkt;
537     pkt = createPacket(PROTOCOL_BINARY_CMD_SET_WITH_META, vb, cas_for_set, ext, blen, key, keylen,
538                        val, vallen, datatype);
539     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
540           "Expected to be able to store with meta");
541     delete[] ext;
542 }
543 
return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, const size_t keylen, const char *val, const size_t vallen, const uint32_t vb, const uint64_t cas, const uint32_t flags, const uint32_t exp, const uint32_t type, uint8_t datatype)544 void return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
545                  const size_t keylen, const char *val, const size_t vallen,
546                  const uint32_t vb, const uint64_t cas, const uint32_t flags,
547                  const uint32_t exp, const uint32_t type, uint8_t datatype) {
548     char ext[12];
549     encodeExt(ext, type);
550     encodeExt(ext + 4, flags);
551     encodeExt(ext + 8, exp);
552     protocol_binary_request_header *pkt;
553     pkt = createPacket(PROTOCOL_BINARY_CMD_RETURN_META, vb, cas, ext, 12, key, keylen, val,
554                        vallen, datatype);
555     check(h1->unknown_command(h, NULL, pkt, add_response_ret_meta)
556               == ENGINE_SUCCESS, "Expected to be able to store ret meta");
557     free(pkt);
558 }
559 
set_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, const size_t keylen, const char *val, const size_t vallen, const uint32_t vb, const uint64_t cas, const uint32_t flags, const uint32_t exp, uint8_t datatype)560 void set_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
561                   const size_t keylen, const char *val, const size_t vallen,
562                   const uint32_t vb, const uint64_t cas, const uint32_t flags,
563                   const uint32_t exp, uint8_t datatype) {
564     return_meta(h, h1, key, keylen, val, vallen, vb, cas, flags, exp,
565                 SET_RET_META, datatype);
566 }
567 
add_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, const size_t keylen, const char *val, const size_t vallen, const uint32_t vb, const uint64_t cas, const uint32_t flags, const uint32_t exp, uint8_t datatype)568 void add_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
569                   const size_t keylen, const char *val, const size_t vallen,
570                   const uint32_t vb, const uint64_t cas, const uint32_t flags,
571                   const uint32_t exp, uint8_t datatype) {
572     return_meta(h, h1, key, keylen, val, vallen, vb, cas, flags, exp,
573                 ADD_RET_META, datatype);
574 }
575 
del_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, const size_t keylen, const uint32_t vb, const uint64_t cas)576 void del_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
577                   const size_t keylen, const uint32_t vb, const uint64_t cas) {
578     return_meta(h, h1, key, keylen, NULL, 0, vb, cas, 0, 0,
579                 DEL_RET_META);
580 }
581 
disable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)582 void disable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
583     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC);
584     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
585           "Failed to send data traffic command to the server");
586     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
587           "Failed to disable data traffic");
588     free(pkt);
589 }
590 
enable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)591 void enable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
592     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC);
593     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
594           "Failed to send data traffic command to the server");
595     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
596           "Failed to enable data traffic");
597     free(pkt);
598 }
599 
start_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)600 void start_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
601     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_START_PERSISTENCE);
602     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
603           "Failed to stop persistence.");
604     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
605           "Error starting persistence.");
606     free(pkt);
607 }
608 
stop_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)609 void stop_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
610     useconds_t sleepTime = 128;
611     while (true) {
612         if (get_str_stat(h, h1, "ep_flusher_state", 0) == "running") {
613             break;
614         }
615         decayingSleep(&sleepTime);
616     }
617 
618     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_STOP_PERSISTENCE);
619     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
620           "Failed to stop persistence.");
621     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
622           "Error stopping persistence.");
623     free(pkt);
624 }
625 
store(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void *cookie, ENGINE_STORE_OPERATION op, const char *key, const char *value, item **outitem, uint64_t casIn, uint16_t vb, uint32_t exp, uint8_t datatype)626 ENGINE_ERROR_CODE store(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
627                         const void *cookie, ENGINE_STORE_OPERATION op,
628                         const char *key, const char *value, item **outitem,
629                         uint64_t casIn, uint16_t vb, uint32_t exp,
630                         uint8_t datatype) {
631     return storeCasVb11(h, h1, cookie, op, key, value, strlen(value),
632                         9258, outitem, casIn, vb, exp, datatype);
633 }
634 
storeCasVb11(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void *cookie, ENGINE_STORE_OPERATION op, const char *key, const char *value, size_t vlen, uint32_t flags, item **outitem, uint64_t casIn, uint16_t vb, uint32_t exp, uint8_t datatype)635 ENGINE_ERROR_CODE storeCasVb11(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
636                                const void *cookie, ENGINE_STORE_OPERATION op,
637                                const char *key, const char *value, size_t vlen,
638                                uint32_t flags, item **outitem, uint64_t casIn,
639                                uint16_t vb, uint32_t exp, uint8_t datatype) {
640     item *it = NULL;
641     uint64_t cas = 0;
642 
643     ENGINE_ERROR_CODE rv = h1->allocate(h, cookie, &it,
644                                         key, strlen(key),
645                                         vlen, flags, exp,
646                                         datatype);
647     check(rv == ENGINE_SUCCESS, "Allocation failed.");
648 
649     item_info info;
650     info.nvalue = 1;
651     if (!h1->get_item_info(h, cookie, it, &info)) {
652         abort();
653     }
654 
655     cb_assert(info.value[0].iov_len == vlen);
656     memcpy(info.value[0].iov_base, value, vlen);
657     h1->item_set_cas(h, cookie, it, casIn);
658 
659     rv = h1->store(h, cookie, it, &cas, op, vb);
660 
661     if (outitem) {
662         *outitem = it;
663     } else {
664         h1->release(h, NULL, it);
665     }
666 
667     return rv;
668 }
669 
touch(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key, uint16_t vb, uint32_t exp)670 void touch(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
671            uint16_t vb, uint32_t exp) {
672     char ext[4];
673     uint32_t keylen = key ? strlen(key) : 0;
674     protocol_binary_request_header *request;
675     encodeExt(ext, exp);
676     request = createPacket(PROTOCOL_BINARY_CMD_TOUCH, vb, 0, ext, 4, key, keylen);
677 
678     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
679           "Failed to call touch");
680     free(request);
681 }
682 
unl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key, uint16_t vb, uint64_t cas)683 void unl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
684          uint16_t vb, uint64_t cas) {
685     uint32_t keylen = key ? strlen(key) : 0;
686     protocol_binary_request_header *request;
687     request = createPacket(PROTOCOL_BINARY_CMD_UNLOCK_KEY, vb, cas, NULL, 0, key, keylen);
688 
689     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
690           "Failed to call unl");
691     free(request);
692 }
693 
compact_db(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const uint16_t vbucket_id, const uint64_t purge_before_ts, const uint64_t purge_before_seq, const uint8_t drop_deletes)694 void compact_db(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
695                      const uint16_t vbucket_id,
696                      const uint64_t purge_before_ts,
697                      const uint64_t purge_before_seq,
698                      const uint8_t  drop_deletes) {
699     protocol_binary_request_compact_db req;
700     memset(&req, 0, sizeof(req));
701     req.message.body.purge_before_ts  = htonll(purge_before_ts);
702     req.message.body.purge_before_seq = htonll(purge_before_seq);
703     req.message.body.drop_deletes     = drop_deletes;
704 
705     const char *args = (const char *)&(req.message.body);
706     uint32_t argslen = 24;
707 
708     protocol_binary_request_header *pkt =
709         createPacket(PROTOCOL_BINARY_CMD_COMPACT_DB, vbucket_id, 0, args, argslen,  NULL, 0,
710                      NULL, 0);
711     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
712           "Failed to request compact vbucket");
713 }
714 
vbucketDelete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb, const char* args)715 void vbucketDelete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb,
716                    const char* args) {
717     uint32_t argslen = args ? strlen(args) : 0;
718     protocol_binary_request_header *pkt =
719         createPacket(PROTOCOL_BINARY_CMD_DEL_VBUCKET, vb, 0, NULL, 0, NULL, 0,
720                      args, argslen);
721     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
722           "Failed to request delete bucket");
723 }
724 
verify_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key, uint16_t vbucket)725 ENGINE_ERROR_CODE verify_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
726                              const char* key, uint16_t vbucket) {
727     item *i = NULL;
728     ENGINE_ERROR_CODE rv = h1->get(h, NULL, &i, key, strlen(key), vbucket);
729     if (rv == ENGINE_SUCCESS) {
730         h1->release(h, NULL, i);
731     }
732     return rv;
733 }
734 
verify_vbucket_missing(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb)735 bool verify_vbucket_missing(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
736                             uint16_t vb) {
737     char vbid[8];
738     snprintf(vbid, sizeof(vbid), "vb_%d", vb);
739     std::string vbstr(vbid);
740 
741     // Try up to three times to verify the bucket is missing.  Bucket
742     // state changes are async.
743     vals.clear();
744     check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
745           "Failed to get stats.");
746 
747     if (vals.find(vbstr) == vals.end()) {
748         return true;
749     }
750 
751     std::cerr << "Expected bucket missing, got " << vals[vbstr] << std::endl;
752 
753     return false;
754 }
755 
verify_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb, vbucket_state_t expected, bool mute)756 bool verify_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb,
757                           vbucket_state_t expected, bool mute) {
758     protocol_binary_request_header *pkt;
759     pkt = createPacket(PROTOCOL_BINARY_CMD_GET_VBUCKET, vb, 0);
760 
761     ENGINE_ERROR_CODE errcode = h1->unknown_command(h, NULL, pkt, add_response);
762     if (errcode != ENGINE_SUCCESS) {
763         if (!mute) {
764             fprintf(stderr, "Error code when getting vbucket %d\n", errcode);
765         }
766         return false;
767     }
768 
769     if (last_status != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
770         if (!mute) {
771             fprintf(stderr, "Last protocol status was %d (%s)\n",
772                     last_status, last_body ? last_body : "unknown");
773         }
774         return false;
775     }
776 
777     vbucket_state_t state;
778     memcpy(&state, last_body, sizeof(state));
779     state = static_cast<vbucket_state_t>(ntohl(state));
780     return state == expected;
781 }
782 
sendDcpAck(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie, protocol_binary_command opcode, protocol_binary_response_status status, uint32_t opaque)783 void sendDcpAck(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
784                 const void* cookie, protocol_binary_command opcode,
785                 protocol_binary_response_status status, uint32_t opaque) {
786     protocol_binary_response_header pkt;
787     pkt.response.magic = PROTOCOL_BINARY_RES;
788     pkt.response.opcode = opcode;
789     pkt.response.status = htons(status);
790     pkt.response.opaque = opaque;
791 
792     check(h1->dcp.response_handler(h, cookie, &pkt) == ENGINE_SUCCESS,
793           "Expected success");
794 }
795 
get_int_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname, const char *statkey)796 int get_int_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
797                  const char *statkey) {
798     vals.clear();
799     check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
800                         add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
801     std::string s = vals[statname];
802     return atoi(s.c_str());
803 }
804 
get_ull_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname, const char *statkey)805 uint64_t get_ull_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
806                       const char *statkey) {
807     vals.clear();
808     check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
809                         add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
810     std::string s = vals[statname];
811     return strtoull(s.c_str(), NULL, 10);
812 }
813 
get_str_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname, const char *statkey)814 std::string get_str_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
815                          const char *statname, const char *statkey) {
816     vals.clear();
817     check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
818                         add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
819     std::string s = vals[statname];
820     return s;
821 }
822 
verify_curr_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int exp, const char *msg)823 void verify_curr_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int exp,
824                        const char *msg) {
825     int curr_items = get_int_stat(h, h1, "curr_items");
826     if (curr_items != exp) {
827         std::cerr << "Expected "<< exp << " curr_items after " << msg
828                   << ", got " << curr_items << std::endl;
829         abort();
830     }
831 }
832 
wait_for_stat_change(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *stat, int initial, const char *statkey)833 void wait_for_stat_change(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
834                           const char *stat, int initial,
835                           const char *statkey) {
836     useconds_t sleepTime = 128;
837     while (get_int_stat(h, h1, stat, statkey) == initial) {
838         decayingSleep(&sleepTime);
839     }
840 }
841 
wait_for_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *stat, int final, const char* stat_key)842 void wait_for_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
843                          const char *stat, int final, const char* stat_key) {
844     useconds_t sleepTime = 128;
845     while (get_int_stat(h, h1, stat, stat_key) != final) {
846         decayingSleep(&sleepTime);
847     }
848 }
849 
wait_for_str_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *stat, const char* final, const char* stat_key)850 void wait_for_str_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
851                              const char *stat, const char* final,
852                              const char* stat_key) {
853     useconds_t sleepTime = 128;
854     while (get_str_stat(h, h1, stat, stat_key).compare(final) != 0) {
855         decayingSleep(&sleepTime);
856     }
857 }
858 
wait_for_memory_usage_below(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int mem_threshold)859 void wait_for_memory_usage_below(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
860                                  int mem_threshold) {
861     useconds_t sleepTime = 128;
862     while (get_int_stat(h, h1, "mem_used") > mem_threshold) {
863         decayingSleep(&sleepTime);
864     }
865 }
866 
wait_for_warmup_complete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)867 bool wait_for_warmup_complete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
868     useconds_t sleepTime = 128;
869     while (h1->get_stats(h, NULL, "warmup", 6, add_stats) == ENGINE_SUCCESS) {
870         std::string s = vals["ep_warmup_thread"];
871         if (strcmp(s.c_str(), "complete") == 0) {
872             break;
873         }
874         decayingSleep(&sleepTime);
875         vals.clear();
876     }
877     return true;
878 }
879 
wait_for_flusher_to_settle(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)880 void wait_for_flusher_to_settle(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
881     useconds_t sleepTime = 128;
882     while (get_int_stat(h, h1, "ep_queue_size") > 0) {
883         decayingSleep(&sleepTime);
884     }
885     wait_for_stat_change(h, h1, "ep_commit_num", 0);
886 }
887 
wait_for_persisted_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key, const char *val, uint16_t vbucketId)888 void wait_for_persisted_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
889                               const char *key, const char *val,
890                               uint16_t vbucketId) {
891 
892     item *i = NULL;
893     int commitNum = get_int_stat(h, h1, "ep_commit_num");
894     check(ENGINE_SUCCESS == store(h, h1, NULL, OPERATION_SET, key, val, &i, 0,
895                                   vbucketId),
896             "Failed to store an item.");
897 
898     // Wait for persistence...
899     wait_for_flusher_to_settle(h, h1);
900     wait_for_stat_change(h, h1, "ep_commit_num", commitNum);
901     h1->release(h, NULL, i);
902 }
903 
dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie)904 void dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie) {
905     struct dcp_message_producers* producers = get_dcp_producers();
906     ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
907     check(err == ENGINE_SUCCESS || err == ENGINE_WANT_MORE,
908             "Expected success or engine_want_more");
909     if (err == ENGINE_SUCCESS) {
910         clear_dcp_data();
911     }
912     free(producers);
913 }
914