1 /* -*- MODE: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2016 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 /*
19 * Testsuite for 'dcp' functionality in ep-engine.
20 */
21
22 // We need to include the tracer _before_ the header which defines
23 // the check macros to avoid conflicts with folly's headers
24 #include <memcached/tracer.h>
25
26 #include "ep_test_apis.h"
27 #include "ep_testsuite_common.h"
28 #include "mock/mock_dcp.h"
29 #include "programs/engine_testapp/mock_server.h"
30
31 #include <platform/cb_malloc.h>
32 #include <platform/cbassert.h>
33 #include <platform/compress.h>
34 #include <platform/platform_thread.h>
35 #include <condition_variable>
36 #include <thread>
37
38 using namespace std::string_literals;
39
40 // Helper functions ///////////////////////////////////////////////////////////
41
42 /**
43 * Converts the given engine to a DcpIface*. If engine doesn't implement
44 * DcpIface then throws.
45 * @returns non-null ptr to DcpIface.
46 */
requireDcpIface(EngineIface* engine)47 static gsl::not_null<DcpIface*> requireDcpIface(EngineIface* engine) {
48 return dynamic_cast<DcpIface*>(engine);
49 }
50
dcp_step(EngineIface* h, const void* cookie, MockDcpMessageProducers& producers)51 static void dcp_step(EngineIface* h,
52 const void* cookie,
53 MockDcpMessageProducers& producers) {
54 auto dcp = requireDcpIface(h);
55 ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
56 check(err == ENGINE_SUCCESS || err == ENGINE_EWOULDBLOCK,
57 "Expected success or engine_ewouldblock");
58 if (err == ENGINE_EWOULDBLOCK) {
59 producers.clear_dcp_data();
60 }
61 }
62
dcpHandleResponse(EngineIface* h, const void* cookie, protocol_binary_response_header* response, MockDcpMessageProducers& producers)63 static void dcpHandleResponse(EngineIface* h,
64 const void* cookie,
65 protocol_binary_response_header* response,
66 MockDcpMessageProducers& producers) {
67 auto dcp = requireDcpIface(h);
68 auto erroCode = dcp->response_handler(cookie, response);
69 check(erroCode == ENGINE_SUCCESS || erroCode == ENGINE_EWOULDBLOCK,
70 "Expected 'success' or 'engine_ewouldblock'");
71 if (erroCode == ENGINE_EWOULDBLOCK) {
72 producers.clear_dcp_data();
73 }
74 }
75
76 static bool wait_started(false);
77
78 struct SeqnoRange {
79 uint64_t start;
80 uint64_t end;
81 };
82
83 /**
84 * DeletionOpcode is used to determine whether or not to perform the deletion
85 * path, or the expiration path.
86 */
87 enum class DeletionOpcode : bool {
88 Deletion,
89 Expiration,
90 };
91
92 class DcpStreamCtx {
93 /**
94 * This class represents all attributes required for
95 * a stream. Objects of this class type are to be fed
96 * to TestDcpConsumer.
97 */
98 public:
DcpStreamCtx()99 DcpStreamCtx()
100 : vbucket(0),
101 flags(0),
102 vb_uuid(0),
103 exp_mutations(0),
104 exp_deletions(0),
105 exp_expirations(0),
106 exp_markers(0),
107 extra_takeover_ops(0),
108 exp_disk_snapshot(false),
109 exp_conflict_res(0),
110 skip_estimate_check(false),
111 live_frontend_client(false),
112 skip_verification(false),
113 exp_err(ENGINE_SUCCESS),
114 exp_rollback(0),
115 expected_values(0),
116 opaque(0) {
117 seqno = {0, static_cast<uint64_t>(~0)};
118 snapshot = {0, static_cast<uint64_t>(~0)};
119 }
120
121 /* Vbucket Id */
122 Vbid vbucket;
123 /* Stream flags */
124 uint32_t flags;
125 /* Vbucket UUID */
126 uint64_t vb_uuid;
127 /* Sequence number range */
128 SeqnoRange seqno;
129 /* Snapshot range */
130 SeqnoRange snapshot;
131 /* Number of mutations expected (for verification) */
132 size_t exp_mutations;
133 /* Number of deletions expected (for verification) */
134 size_t exp_deletions;
135 /* Number of expiries expected (for verification) */
136 size_t exp_expirations;
137 /* Number of snapshot markers expected (for verification) */
138 size_t exp_markers;
139 /* Extra front end mutations as part of takeover */
140 size_t extra_takeover_ops;
141 /* Flag - expect disk snapshot or not */
142 bool exp_disk_snapshot;
143 /* Expected conflict resolution flag */
144 uint8_t exp_conflict_res;
145 /* Skip estimate check during takeover */
146 bool skip_estimate_check;
147 /*
148 live_frontend_client to be set to true when streaming is done in parallel
149 with a client issuing writes to the vbucket. In this scenario, predicting
150 the number of snapshot markers received is difficult.
151 */
152 bool live_frontend_client;
153 /*
154 skip_verification to be set to true if verification of mutation count,
155 deletion count, marker count etc. is to be skipped at the end of
156 streaming.
157 */
158 bool skip_verification;
159 /* Expected error code on stream creation. We need this because rollback is
160 a valid operation and returns ENGINE_ROLLBACK (not ENGINE_SUCCESS) */
161 ENGINE_ERROR_CODE exp_err;
162 /* Expected rollback seqno */
163 uint64_t exp_rollback;
164 /* Expected number of values (from mutations or deleted_values) */
165 size_t expected_values;
166 /* stream opaque */
167 uint32_t opaque;
168 };
169
170 class TestDcpConsumer {
171 /**
172 * This class represents a DcpConsumer which is responsible
173 * for spawning a DcpProducer at the server and receiving
174 * messages from it.
175 */
176 public:
TestDcpConsumer(const std::string& _name, const void* _cookie, EngineIface* h)177 TestDcpConsumer(const std::string& _name,
178 const void* _cookie,
179 EngineIface* h)
180 : producers(h),
181 name(_name),
182 cookie(_cookie),
183 opaque(0),
184 total_bytes(0),
185 simulate_cursor_dropping(false),
186 flow_control_buf_size(1024),
187 disable_ack(false),
188 h(h),
189 dcp(requireDcpIface(h)),
190 nruCounter(2) {
191 }
192
getTotalBytes()193 uint64_t getTotalBytes() {
194 return total_bytes;
195 }
196
simulateCursorDropping()197 void simulateCursorDropping() {
198 simulate_cursor_dropping = true;
199 }
200
setFlowControlBufSize(uint64_t to)201 void setFlowControlBufSize(uint64_t to) {
202 flow_control_buf_size = to;
203 }
204
disableAcking()205 void disableAcking() {
206 disable_ack = true;
207 }
208
addStreamCtx(DcpStreamCtx &ctx)209 void addStreamCtx(DcpStreamCtx &ctx) {
210 stream_ctxs.push_back(ctx);
211 }
212
213 void run(bool openConn = true);
214
215 // Stop the thread if it is running. This is safe to be called from
216 // a different thread to the thread calling run().
217 void stop();
218
219 /**
220 * This method just opens a DCP connection. Note it does not open a stream
221 * and does not call the dcp step function to get all the items from the
222 * producer.
223 * @param flags Flags to pass to DCP_OPEN.
224 */
225 void openConnection(
226 uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer);
227
228 /* This method opens a stream on an existing DCP connection.
229 This does not call the dcp step function to get all the items from the
230 producer */
231 ENGINE_ERROR_CODE openStreams();
232
233 /* if clear is true, it will also clear the stream vector */
234 ENGINE_ERROR_CODE closeStreams(bool fClear = false);
235
236 ENGINE_ERROR_CODE sendControlMessage(const std::string& name,
237 const std::string& value);
238
getNruCounters() const239 const std::vector<int>& getNruCounters() const {
240 return nruCounter;
241 }
242
243 MockDcpMessageProducers producers;
244
245 private:
246 /* Vbucket-level stream stats used in test */
247 struct VBStats {
VBStatsTestDcpConsumer::VBStats248 VBStats()
249 : num_mutations(0),
250 num_deletions(0),
251 num_expirations(0),
252 num_snapshot_markers(0),
253 num_set_vbucket_pending(0),
254 num_set_vbucket_active(0),
255 pending_marker_ack(false),
256 marker_end(0),
257 last_by_seqno(0),
258 extra_takeover_ops(0),
259 exp_disk_snapshot(false),
260 exp_conflict_res(0),
261 num_values(0) {
262 }
263
264 size_t num_mutations;
265 size_t num_deletions;
266 size_t num_expirations;
267 size_t num_snapshot_markers;
268 size_t num_set_vbucket_pending;
269 size_t num_set_vbucket_active;
270 bool pending_marker_ack;
271 uint64_t marker_end;
272 uint64_t last_by_seqno;
273 size_t extra_takeover_ops;
274 bool exp_disk_snapshot;
275 uint8_t exp_conflict_res;
276 size_t num_values;
277 };
278
279 /* Connection name */
280 const std::string name;
281 /* Connection cookie */
282 const void *cookie;
283 /* Vector containing information of streams */
284 std::vector<DcpStreamCtx> stream_ctxs;
285 /* Opaque value in the connection */
286 uint32_t opaque;
287 /* Total bytes received */
288 uint64_t total_bytes;
289 /* Flag to simulate cursor dropping */
290 bool simulate_cursor_dropping;
291 /* Flow control buffer size */
292 uint64_t flow_control_buf_size;
293 /* Flag to disable acking */
294 bool disable_ack;
295 /* map of vbstats */
296 std::map<Vbid, VBStats> vb_stats;
297 EngineIface* h;
298 gsl::not_null<DcpIface*> dcp;
299 std::vector<int> nruCounter;
300
301 // Flag used by run() to check if it should continue to execute.
302 std::atomic<bool> done{false};
303
304 /**
305 * Helper function to perform the very similar resolution of a deletion
306 * and an expiry, triggered inside the run() case switch where one of these
307 * operations is returned as the last_op.
308 * @param stats The vbstats that will be updated by this function.
309 * @param bytes_read The current no of bytes read which will be updated by
310 * this function.
311 * @param all_bytes The total no of bytes read which will be updated by
312 * this function.
313 * @param vbid The vBucket ID.
314 * @param delOrExpire Determines whether to take the deletion case or the
315 * expiration case.
316 */
317 void deleteOrExpireCase(TestDcpConsumer::VBStats& stats,
318 uint32_t& bytes_read,
319 uint64_t& all_bytes,
320 Vbid vbid,
321 DeletionOpcode delOrExpire);
322 };
323
sendControlMessage( const std::string& name, const std::string& value)324 ENGINE_ERROR_CODE TestDcpConsumer::sendControlMessage(
325 const std::string& name, const std::string& value) {
326 auto dcp = requireDcpIface(h);
327 return dcp->control(cookie, ++opaque, name, value);
328 }
329
deleteOrExpireCase(TestDcpConsumer::VBStats& stats, uint32_t& bytes_read, uint64_t& all_bytes, Vbid vbid, DeletionOpcode delOrExpire)330 void TestDcpConsumer::deleteOrExpireCase(TestDcpConsumer::VBStats& stats,
331 uint32_t& bytes_read,
332 uint64_t& all_bytes,
333 Vbid vbid,
334 DeletionOpcode delOrExpire) {
335 cb_assert(vbid != static_cast<Vbid>(-1));
336 checklt(stats.last_by_seqno,
337 producers.last_byseqno.load(),
338 "Expected bigger seqno");
339 stats.last_by_seqno = producers.last_byseqno;
340 if (delOrExpire == DeletionOpcode::Deletion) {
341 stats.num_deletions++;
342 } else {
343 stats.num_expirations++;
344 }
345 bytes_read += producers.last_packet_size;
346 all_bytes += producers.last_packet_size;
347 if (stats.pending_marker_ack &&
348 producers.last_byseqno == stats.marker_end) {
349 sendDcpAck(h,
350 cookie,
351 cb::mcbp::ClientOpcode::DcpSnapshotMarker,
352 cb::mcbp::Status::Success,
353 producers.last_opaque);
354 }
355
356 if (!producers.last_value.empty()) {
357 stats.num_values++;
358 }
359 return;
360 }
361
run(bool openConn)362 void TestDcpConsumer::run(bool openConn) {
363 checkle(size_t{1}, stream_ctxs.size(), "No dcp_stream arguments provided!");
364
365 /* Open the connection with the DCP producer */
366 if (openConn) {
367 openConnection();
368 }
369
370 /* Open streams in the above open connection */
371 openStreams();
372
373 bool exp_all_items_streamed = true;
374 size_t num_stream_ends_received = 0;
375 uint32_t bytes_read = 0;
376 uint64_t all_bytes = 0;
377 uint64_t total_acked_bytes = 0;
378 uint64_t ack_limit = flow_control_buf_size / 2;
379
380 bool delay_buffer_acking = false;
381 if (simulate_cursor_dropping) {
382 /**
383 * Simulates cursor dropping by slowing down the initial buffer
384 * acknowledgement from the consmer.
385 *
386 * Note that the cursor may not be dropped if the memory usage
387 * is not over the cursor_dropping_upper_threshold or if the
388 * checkpoint_remover sleep time is high.
389 */
390 delay_buffer_acking = true;
391 }
392
393 do {
394 if (!disable_ack && (bytes_read > ack_limit)) {
395 if (delay_buffer_acking) {
396 std::this_thread::sleep_for(std::chrono::seconds(2));
397 delay_buffer_acking = false;
398 }
399 dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
400 total_acked_bytes += bytes_read;
401 bytes_read = 0;
402 }
403 ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
404 if (err == ENGINE_DISCONNECT) {
405 done = true;
406 } else {
407 const Vbid vbid = producers.last_vbucket;
408 auto &stats = vb_stats[vbid];
409 switch (producers.last_op) {
410 case cb::mcbp::ClientOpcode::DcpMutation:
411 cb_assert(vbid != static_cast<Vbid>(-1));
412 checklt(stats.last_by_seqno,
413 producers.last_byseqno.load(),
414 "Expected bigger seqno");
415 stats.last_by_seqno = producers.last_byseqno;
416 stats.num_mutations++;
417 bytes_read += producers.last_packet_size;
418 all_bytes += producers.last_packet_size;
419 if (stats.pending_marker_ack &&
420 producers.last_byseqno == stats.marker_end) {
421 sendDcpAck(h,
422 cookie,
423 cb::mcbp::ClientOpcode::DcpSnapshotMarker,
424 cb::mcbp::Status::Success,
425 producers.last_opaque);
426 }
427
428 if (producers.last_nru > 0) {
429 nruCounter[1]++;
430 } else {
431 nruCounter[0]++;
432 }
433 if (!producers.last_value.empty()) {
434 stats.num_values++;
435 }
436
437 break;
438 case cb::mcbp::ClientOpcode::DcpDeletion:
439 deleteOrExpireCase(stats,
440 bytes_read,
441 all_bytes,
442 vbid,
443 DeletionOpcode::Deletion);
444 break;
445 case cb::mcbp::ClientOpcode::DcpExpiration:
446 deleteOrExpireCase(stats,
447 bytes_read,
448 all_bytes,
449 vbid,
450 DeletionOpcode::Expiration);
451 break;
452 case cb::mcbp::ClientOpcode::DcpStreamEnd:
453 cb_assert(vbid != static_cast<Vbid>(-1));
454 if (++num_stream_ends_received == stream_ctxs.size()) {
455 done = true;
456 }
457 bytes_read += producers.last_packet_size;
458 all_bytes += producers.last_packet_size;
459 break;
460 case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
461 cb_assert(vbid != static_cast<Vbid>(-1));
462 if (stats.exp_disk_snapshot &&
463 stats.num_snapshot_markers == 0) {
464 checkeq(uint32_t{1},
465 producers.last_flags,
466 "Expected disk snapshot");
467 }
468
469 if (producers.last_flags & 8) {
470 stats.pending_marker_ack = true;
471 stats.marker_end = producers.last_snap_end_seqno;
472 }
473
474 stats.num_snapshot_markers++;
475 bytes_read += producers.last_packet_size;
476 all_bytes += producers.last_packet_size;
477 break;
478 case cb::mcbp::ClientOpcode::DcpSetVbucketState:
479 cb_assert(vbid != static_cast<Vbid>(-1));
480 if (producers.last_vbucket_state == vbucket_state_pending) {
481 stats.num_set_vbucket_pending++;
482 for (size_t j = 0; j < stats.extra_takeover_ops; ++j) {
483 std::string key("key" + std::to_string(j));
484 checkeq(ENGINE_SUCCESS,
485 store(h,
486 NULL,
487 OPERATION_SET,
488 key.c_str(),
489 "data",
490 nullptr,
491 0,
492 vbid),
493 "Failed to store a value");
494 }
495 } else if (producers.last_vbucket_state ==
496 vbucket_state_active) {
497 stats.num_set_vbucket_active++;
498 }
499 bytes_read += producers.last_packet_size;
500 all_bytes += producers.last_packet_size;
501 sendDcpAck(h,
502 cookie,
503 cb::mcbp::ClientOpcode::DcpSetVbucketState,
504 cb::mcbp::Status::Success,
505 producers.last_opaque);
506 break;
507 case cb::mcbp::ClientOpcode::Invalid:
508 if (disable_ack && flow_control_buf_size &&
509 (bytes_read >= flow_control_buf_size)) {
510 /* If there is no acking and if flow control is enabled
511 we are done because producer should not send us any
512 more items. We need this to test that producer stops
513 sending items correctly when there are no acks while
514 flow control is enabled */
515 done = true;
516 exp_all_items_streamed = false;
517 } else {
518 /* No messages were ready on the last step call, so we
519 * wait till the conn is notified of new item.
520 * Note that we check for 0 because we clear the
521 * producers.last_op value below.
522 */
523 testHarness->lock_cookie(cookie);
524 /* waitfor_cookie() waits on a condition variable. But
525 the api expects the cookie to be locked before
526 calling it */
527 testHarness->waitfor_cookie(cookie);
528 testHarness->unlock_cookie(cookie);
529 }
530 break;
531 default:
532 // Aborting ...
533 std::stringstream ss;
534 ss << "Unknown DCP operation: " << to_string(producers.last_op);
535 check(false, ss.str().c_str());
536 }
537 producers.last_op = cb::mcbp::ClientOpcode::Invalid;
538 producers.last_nru = 0;
539 producers.last_vbucket = Vbid(-1);
540 }
541 } while (!done);
542
543 total_bytes += all_bytes;
544
545 for (const auto& ctx : stream_ctxs) {
546 if (!ctx.skip_verification) {
547 auto &stats = vb_stats[ctx.vbucket];
548 if (simulate_cursor_dropping) {
549 if (stats.num_snapshot_markers == 0) {
550 cb_assert(stats.num_mutations == 0 &&
551 stats.num_deletions == 0);
552 } else {
553 checkge(ctx.exp_mutations, stats.num_mutations,
554 "Invalid number of mutations");
555 checkge(ctx.exp_deletions, stats.num_deletions,
556 "Invalid number of deletes");
557 checkge(ctx.exp_expirations,
558 stats.num_expirations,
559 "Invalid number of expirations");
560 }
561 } else {
562 // Account for cursors that may have been dropped because
563 // of high memory usage
564 if (get_int_stat(h, "ep_cursors_dropped") > 0) {
565 // Hard to predict exact number of markers to be received
566 // if in case of a live parallel front end load
567 if (!ctx.live_frontend_client) {
568 checkle(stats.num_snapshot_markers, ctx.exp_markers,
569 "Invalid number of markers");
570 }
571 checkle(stats.num_mutations, ctx.exp_mutations,
572 "Invalid number of mutations");
573 checkle(stats.num_deletions, ctx.exp_deletions,
574 "Invalid number of deletions");
575 checkle(stats.num_expirations,
576 ctx.exp_expirations,
577 "Invalid number of expirations");
578 } else {
579 checkeq(ctx.exp_mutations, stats.num_mutations,
580 "Invalid number of mutations");
581 checkeq(ctx.exp_deletions, stats.num_deletions,
582 "Invalid number of deletes");
583 checkeq(ctx.exp_expirations,
584 stats.num_expirations,
585 "Invalid number of expirations");
586 if (ctx.live_frontend_client) {
587 // Hard to predict exact number of markers to be received
588 // if in case of a live parallel front end load
589 if (ctx.exp_mutations > 0 || ctx.exp_deletions > 0 ||
590 ctx.exp_expirations > 0) {
591 checkle(size_t{1},
592 stats.num_snapshot_markers,
593 "Snapshot marker count can't be zero");
594 }
595 } else {
596 checkeq(ctx.exp_markers, stats.num_snapshot_markers,
597 "Unexpected number of snapshot markers");
598 }
599 }
600 }
601
602 if (ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) {
603 checkeq(size_t{1},
604 stats.num_set_vbucket_pending,
605 "Didn't receive pending set state");
606 checkeq(size_t{1},
607 stats.num_set_vbucket_active,
608 "Didn't receive active set state");
609 }
610
611 /* Check if the readyQ size goes to zero after all items are streamed */
612 if (exp_all_items_streamed) {
613 std::stringstream stats_ready_queue_memory;
614 stats_ready_queue_memory << "eq_dcpq:" << name.c_str()
615 << ":stream_" << ctx.vbucket.get()
616 << "_ready_queue_memory";
617 checkeq(uint64_t{0},
618 get_ull_stat(h,
619 stats_ready_queue_memory.str().c_str(),
620 "dcp"),
621 "readyQ size did not go to zero");
622
623 std::string stats_backfill_buffer_items(
624 "eq_dcpq:" + name + ":stream_" +
625 std::to_string(ctx.vbucket.get()) +
626 "_backfill_buffer_items");
627 checkeq(uint64_t{0},
628 get_ull_stat(
629 h, stats_backfill_buffer_items.c_str(), "dcp"),
630 "backfill buffer items did not go to zero");
631 }
632 if (ctx.expected_values) {
633 checkeq(ctx.expected_values,
634 stats.num_values,
635 "Expected values didn't match");
636 }
637 }
638 }
639
640 /* Check if the producer has updated flow control stat correctly */
641 if (flow_control_buf_size) {
642 char stats_buffer[50] = {0};
643 snprintf(stats_buffer, sizeof(stats_buffer), "eq_dcpq:%s:unacked_bytes",
644 name.c_str());
645 checkeq((all_bytes - total_acked_bytes),
646 get_ull_stat(h, stats_buffer, "dcp"),
647 "Buffer Size did not get set correctly");
648 }
649 }
650
stop()651 void TestDcpConsumer::stop() {
652 this->done = true;
653 }
654
openConnection(uint32_t flags)655 void TestDcpConsumer::openConnection(uint32_t flags) {
656 /* Reset any stale dcp data */
657 producers.clear_dcp_data();
658
659 opaque = 1;
660
661 /* Set up Producer at server */
662 checkeq(ENGINE_SUCCESS,
663 dcp->open(cookie,
664 ++opaque,
665 0,
666 flags,
667 name,
668 R"({"consumer_name":"replica1"})"),
669 "Failed dcp producer open connection.");
670
671 /* Set flow control buffer size */
672 std::string flow_control_buf_sz(std::to_string(flow_control_buf_size));
673 checkeq(ENGINE_SUCCESS,
674 dcp->control(cookie,
675 ++opaque,
676 "connection_buffer_size",
677 flow_control_buf_sz),
678 "Failed to establish connection buffer");
679 char stats_buffer[50] = {0};
680 if (flow_control_buf_size) {
681 snprintf(stats_buffer, sizeof(stats_buffer),
682 "eq_dcpq:%s:max_buffer_bytes", name.c_str());
683 checkeq(static_cast<int>(flow_control_buf_size),
684 get_int_stat(h, stats_buffer, "dcp"),
685 "TestDcpConsumer::openConnection() : "
686 "Buffer Size did not get set correctly");
687 } else {
688 snprintf(stats_buffer, sizeof(stats_buffer),
689 "eq_dcpq:%s:flow_control", name.c_str());
690 std::string status = get_str_stat(h, stats_buffer, "dcp");
691 checkeq(status, std::string("disabled"), "Flow control enabled!");
692 }
693
694 checkeq(ENGINE_SUCCESS,
695 dcp->control(cookie, ++opaque, "enable_ext_metadata", "true"),
696 "Failed to enable xdcr extras");
697 }
698
openStreams()699 ENGINE_ERROR_CODE TestDcpConsumer::openStreams() {
700 for (auto& ctx : stream_ctxs) {
701 /* Different opaque for every stream created */
702 ++opaque;
703
704 /* Initiate stream request */
705 uint64_t rollback = 0;
706 ENGINE_ERROR_CODE rv = dcp->stream_req(cookie,
707 ctx.flags,
708 opaque,
709 ctx.vbucket,
710 ctx.seqno.start,
711 ctx.seqno.end,
712 ctx.vb_uuid,
713 ctx.snapshot.start,
714 ctx.snapshot.end,
715 &rollback,
716 mock_dcp_add_failover_log,
717 {});
718
719 checkeq(ctx.exp_err, rv, "Failed to initiate stream request");
720
721 if (rv == ENGINE_NOT_MY_VBUCKET || rv == ENGINE_ENOTSUP) {
722 return rv;
723 }
724
725 if (rv == ENGINE_ROLLBACK || rv == ENGINE_KEY_ENOENT) {
726 checkeq(ctx.exp_rollback, rollback,
727 "Rollback didn't match expected value");
728 return rv;
729 }
730
731 if (ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) {
732 ctx.seqno.end = std::numeric_limits<uint64_t>::max();
733 } else if (ctx.flags & DCP_ADD_STREAM_FLAG_LATEST ||
734 ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
735 std::string high_seqno("vb_" + std::to_string(ctx.vbucket.get()) +
736 ":high_seqno");
737 ctx.seqno.end =
738 get_ull_stat(h, high_seqno.c_str(), "vbucket-seqno");
739 }
740
741 std::stringstream stats_flags;
742 stats_flags << "eq_dcpq:" << name.c_str() << ":stream_"
743 << ctx.vbucket.get() << "_flags";
744 checkeq(ctx.flags,
745 (uint32_t)get_int_stat(h, stats_flags.str().c_str(), "dcp"),
746 "Flags didn't match");
747
748 std::stringstream stats_opaque;
749 stats_opaque << "eq_dcpq:" << name.c_str() << ":stream_"
750 << ctx.vbucket.get() << "_opaque";
751 checkeq(opaque,
752 (uint32_t)get_int_stat(h, stats_opaque.str().c_str(), "dcp"),
753 "Opaque didn't match");
754 ctx.opaque = opaque;
755
756 std::stringstream stats_start_seqno;
757 stats_start_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
758 << ctx.vbucket.get() << "_start_seqno";
759 checkeq(ctx.seqno.start,
760 (uint64_t)get_ull_stat(
761 h, stats_start_seqno.str().c_str(), "dcp"),
762 "Start Seqno Didn't match");
763
764 std::stringstream stats_end_seqno;
765 stats_end_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
766 << ctx.vbucket.get() << "_end_seqno";
767 checkeq(ctx.seqno.end,
768 (uint64_t)get_ull_stat(h, stats_end_seqno.str().c_str(), "dcp"),
769 "End Seqno didn't match");
770
771 std::stringstream stats_vb_uuid;
772 stats_vb_uuid << "eq_dcpq:" << name.c_str() << ":stream_"
773 << ctx.vbucket.get() << "_vb_uuid";
774 checkeq(ctx.vb_uuid,
775 (uint64_t)get_ull_stat(h, stats_vb_uuid.str().c_str(), "dcp"),
776 "VBucket UUID didn't match");
777
778 std::stringstream stats_snap_seqno;
779 stats_snap_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
780 << ctx.vbucket.get() << "_snap_start_seqno";
781 checkeq(ctx.snapshot.start,
782 (uint64_t)get_ull_stat(
783 h, stats_snap_seqno.str().c_str(), "dcp"),
784 "snap start seqno didn't match");
785
786 if ((ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) &&
787 !ctx.skip_estimate_check) {
788 std::string high_seqno_str(
789 "vb_" + std::to_string(ctx.vbucket.get()) + ":high_seqno");
790 uint64_t vb_high_seqno =
791 get_ull_stat(h, high_seqno_str.c_str(), "vbucket-seqno");
792 uint64_t est = vb_high_seqno - ctx.seqno.start;
793 std::stringstream stats_takeover;
794 stats_takeover << "dcp-vbtakeover " << ctx.vbucket.get() << " "
795 << name.c_str();
796 wait_for_stat_to_be_lte(
797 h, "estimate", est, stats_takeover.str().c_str());
798 }
799
800 if (ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
801 /* Wait for backfill to start */
802 std::string stats_backfill_read_bytes("eq_dcpq:" + name +
803 ":backfill_buffer_bytes_read");
804 wait_for_stat_to_be_gte(
805 h, stats_backfill_read_bytes.c_str(), 0, "dcp");
806 /* Verify that we have no dcp cursors in the checkpoint. (There will
807 just be one persistence cursor) */
808 std::string stats_num_conn_cursors(
809 "vb_" + std::to_string(ctx.vbucket.get()) +
810 ":num_conn_cursors");
811 /* In case of persistent buckets there will be 1 persistent cursor,
812 in case of ephemeral buckets there will be no cursor */
813 checkge(1,
814 get_int_stat(h, stats_num_conn_cursors.c_str(),
815 "checkpoint"),
816 "DCP cursors not expected to be registered");
817 }
818
819 // Init stats used in test
820 VBStats stats;
821 stats.extra_takeover_ops = ctx.extra_takeover_ops;
822 stats.exp_disk_snapshot = ctx.exp_disk_snapshot;
823 stats.exp_conflict_res = ctx.exp_conflict_res;
824
825 vb_stats[ctx.vbucket] = stats;
826 }
827 return ENGINE_SUCCESS;
828 }
829
closeStreams(bool fClear)830 ENGINE_ERROR_CODE TestDcpConsumer::closeStreams(bool fClear) {
831 ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
832 for (auto& ctx : stream_ctxs) {
833 if (ctx.opaque > 0) {
834 err = dcp->close_stream(cookie, ctx.opaque, Vbid(0), {});
835 if (ENGINE_SUCCESS != err) {
836 break;
837 }
838 }
839 }
840
841 if (fClear) {
842 stream_ctxs.clear();
843 }
844 return err;
845 }
846
notifier_request(EngineIface* h, const void* cookie, uint32_t opaque, Vbid vbucket, uint64_t start, bool shouldSucceed)847 static void notifier_request(EngineIface* h,
848 const void* cookie,
849 uint32_t opaque,
850 Vbid vbucket,
851 uint64_t start,
852 bool shouldSucceed) {
853 uint32_t flags = 0;
854 uint64_t rollback = 0;
855 uint64_t vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
856 uint64_t snap_start_seqno = get_ull_stat(h, "vb_0:0:seq", "failovers");
857 uint64_t snap_end_seqno = snap_start_seqno;
858 auto dcp = requireDcpIface(h);
859
860 ENGINE_ERROR_CODE err = dcp->stream_req(cookie,
861 flags,
862 opaque,
863 vbucket,
864 start,
865 0,
866 vb_uuid,
867 snap_start_seqno,
868 snap_end_seqno,
869 &rollback,
870 mock_dcp_add_failover_log,
871 {});
872 checkeq(ENGINE_SUCCESS, err, "Failed to initiate stream request");
873
874 std::string type = get_str_stat(h, "eq_dcpq:unittest:type", "dcp");
875 checkeq("notifier"s, type, "Consumer not found");
876
877 checkeq(flags,
878 static_cast<uint32_t>(
879 get_int_stat(h, "eq_dcpq:unittest:stream_0_flags", "dcp")),
880 "Flags didn't match");
881 checkeq(opaque,
882 static_cast<uint32_t>(
883 get_int_stat(h, "eq_dcpq:unittest:stream_0_opaque", "dcp")),
884 "Opaque didn't match");
885 checkeq(start,
886 get_ull_stat(h, "eq_dcpq:unittest:stream_0_start_seqno", "dcp"),
887 "Start Seqno Didn't match");
888 checkeq(uint64_t{0},
889 get_ull_stat(h, "eq_dcpq:unittest:stream_0_end_seqno", "dcp"),
890 "End Seqno didn't match");
891 checkeq(vb_uuid,
892 get_ull_stat(h, "eq_dcpq:unittest:stream_0_vb_uuid", "dcp"),
893 "VBucket UUID didn't match");
894 checkeq(snap_start_seqno,
895 get_ull_stat(h,"eq_dcpq:unittest:stream_0_snap_start_seqno", "dcp"),
896 "snap start seqno didn't match");
897 }
898
dcp_stream_to_replica(EngineIface* h, const void* cookie, uint32_t opaque, Vbid vbucket, uint32_t flags, uint64_t start, uint64_t end, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint8_t cas = 0x1, uint8_t datatype = 1, uint32_t exprtime = 0, uint32_t lockTime = 0, uint64_t revSeqno = 0)899 static void dcp_stream_to_replica(EngineIface* h,
900 const void* cookie,
901 uint32_t opaque,
902 Vbid vbucket,
903 uint32_t flags,
904 uint64_t start,
905 uint64_t end,
906 uint64_t snap_start_seqno,
907 uint64_t snap_end_seqno,
908 uint8_t cas = 0x1,
909 uint8_t datatype = 1,
910 uint32_t exprtime = 0,
911 uint32_t lockTime = 0,
912 uint64_t revSeqno = 0) {
913 /* Send snapshot marker */
914 auto dcp = requireDcpIface(h);
915 checkeq(ENGINE_SUCCESS,
916 dcp->snapshot_marker(cookie,
917 opaque,
918 vbucket,
919 snap_start_seqno,
920 snap_end_seqno,
921 flags,
922 0 /*HCS*/,
923 {} /*maxVisibleSeqno*/),
924 "Failed to send marker!");
925 const std::string data("data");
926 /* Send DCP mutations */
927 for (uint64_t i = start; i <= end; i++) {
928 const std::string key{"key" + std::to_string(i)};
929 const DocKey docKey{key, DocKeyEncodesCollectionId::No};
930 const cb::const_byte_buffer value{(uint8_t*)data.data(), data.size()};
931 checkeq(ENGINE_SUCCESS,
932 dcp->mutation(cookie,
933 opaque,
934 docKey,
935 value,
936 0, // priv bytes
937 PROTOCOL_BINARY_RAW_BYTES,
938 cas,
939 vbucket,
940 flags,
941 i, // by seqno
942 revSeqno,
943 exprtime,
944 lockTime,
945 {},
946 INITIAL_NRU_VALUE),
947 "Failed dcp mutate.");
948 }
949 }
950
951 /* This is a helper function to read items from an existing DCP Producer. It
952 reads items from start to end on the connection. (Note: this can work
953 correctly only in case there is one vbucket)
954 Currently this supports only streaming mutations, but can be extend to stream
955 deletion etc */
dcp_stream_from_producer_conn(EngineIface* h, const void* cookie, uint32_t opaque, uint64_t start, uint64_t end, uint64_t expSnapStart, MockDcpMessageProducers& producers)956 static void dcp_stream_from_producer_conn(EngineIface* h,
957 const void* cookie,
958 uint32_t opaque,
959 uint64_t start,
960 uint64_t end,
961 uint64_t expSnapStart,
962 MockDcpMessageProducers& producers) {
963 bool done = false;
964 size_t bytes_read = 0;
965 bool pending_marker_ack = false;
966 uint64_t marker_end = 0;
967 uint64_t num_mutations = 0;
968 uint64_t last_snap_start_seqno = 0;
969 auto dcp = requireDcpIface(h);
970
971 do {
972 if (bytes_read > 512) {
973 checkeq(ENGINE_SUCCESS,
974 dcp->buffer_acknowledgement(
975 cookie, ++opaque, Vbid(0), bytes_read),
976 "Failed to get dcp buffer ack");
977 bytes_read = 0;
978 }
979 ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
980 if (err == ENGINE_DISCONNECT) {
981 done = true;
982 } else {
983 switch (producers.last_op) {
984 case cb::mcbp::ClientOpcode::DcpMutation:
985 bytes_read += producers.last_packet_size;
986 if (pending_marker_ack &&
987 producers.last_byseqno == marker_end) {
988 sendDcpAck(h,
989 cookie,
990 cb::mcbp::ClientOpcode::DcpSnapshotMarker,
991 cb::mcbp::Status::Success,
992 producers.last_opaque);
993 }
994 num_mutations++;
995 break;
996 case cb::mcbp::ClientOpcode::DcpStreamEnd:
997 done = true;
998 bytes_read += producers.last_packet_size;
999 break;
1000 case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
1001 if (producers.last_flags & 8) {
1002 pending_marker_ack = true;
1003 marker_end = producers.last_snap_end_seqno;
1004 }
1005 bytes_read += producers.last_packet_size;
1006 last_snap_start_seqno = producers.last_snap_start_seqno;
1007 break;
1008 case cb::mcbp::ClientOpcode::Invalid:
1009 break;
1010 default:
1011 // Aborting ...
1012 std::string err_string(
1013 "Unexpected DCP operation: " +
1014 to_string(producers.last_op) + " last_byseqno: " +
1015 std::to_string(producers.last_byseqno.load()) +
1016 " last_key: " + producers.last_key + " last_value: " +
1017 producers.last_value + " last_flags: " +
1018 std::to_string(producers.last_flags));
1019 check(false, err_string.c_str());
1020 }
1021 if (producers.last_byseqno >= end) {
1022 done = true;
1023 }
1024 producers.last_op = cb::mcbp::ClientOpcode::Invalid;
1025 }
1026 } while (!done);
1027
1028 /* Do buffer ack of the outstanding bytes */
1029 dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
1030 checkeq((end - start + 1), num_mutations, "Invalid number of mutations");
1031 if (expSnapStart) {
1032 checkge(last_snap_start_seqno,
1033 expSnapStart,
1034 "Incorrect snap start seqno");
1035 }
1036 }
1037
dcp_stream_expiries_to_replica(EngineIface* h, const void* cookie, uint32_t opaque, Vbid vbucket, uint32_t flags, uint64_t start, uint64_t end, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint32_t delTime, uint64_t revSeqno = 0, uint8_t cas = 0x1)1038 static void dcp_stream_expiries_to_replica(EngineIface* h,
1039 const void* cookie,
1040 uint32_t opaque,
1041 Vbid vbucket,
1042 uint32_t flags,
1043 uint64_t start,
1044 uint64_t end,
1045 uint64_t snap_start_seqno,
1046 uint64_t snap_end_seqno,
1047 uint32_t delTime,
1048 uint64_t revSeqno = 0,
1049 uint8_t cas = 0x1) {
1050 auto dcp = requireDcpIface(h);
1051 checkeq(ENGINE_SUCCESS,
1052 dcp->snapshot_marker(cookie,
1053 opaque,
1054 vbucket,
1055 snap_start_seqno,
1056 snap_end_seqno,
1057 flags,
1058 0 /*HCS*/,
1059 {} /*maxVisibleSeqno*/),
1060 "Failed to send marker!");
1061 const std::string data("data");
1062 /* Stream Expiries */
1063 for (uint64_t i = start; i <= end; i++) {
1064 const std::string key{"key" + std::to_string(i)};
1065 const DocKey docKey{key, DocKeyEncodesCollectionId::No};
1066 checkeq(ENGINE_SUCCESS,
1067 dcp->expiration(cookie,
1068 opaque,
1069 docKey,
1070 {},
1071 0, // priv bytes
1072 PROTOCOL_BINARY_RAW_BYTES,
1073 cas,
1074 vbucket,
1075 i,
1076 revSeqno,
1077 delTime),
1078 "Failed dcp expiry");
1079 }
1080 }
1081
1082 struct mb16357_ctx {
mb16357_ctxmb16357_ctx1083 mb16357_ctx(EngineIface* _h, int _items)
1084 : h(_h), dcp(requireDcpIface(h)), items(_items) {
1085 }
1086
1087 EngineIface* h;
1088 gsl::not_null<DcpIface*> dcp;
1089 int items;
1090 std::mutex mutex;
1091 std::condition_variable cv;
1092 bool compactor_waiting{false};
1093 bool compaction_start{false};
1094 };
1095
1096 struct writer_thread_ctx {
1097 EngineIface* h;
1098 int items;
1099 Vbid vbid;
1100 };
1101
1102 struct continuous_dcp_ctx {
1103 EngineIface* h;
1104 const void *cookie;
1105 Vbid vbid;
1106 const std::string &name;
1107 uint64_t start_seqno;
1108 std::unique_ptr<TestDcpConsumer> dcpConsumer;
1109 };
1110
1111 //Forward declaration required for dcp_thread_func
1112 static uint32_t add_stream_for_consumer(EngineIface* h,
1113 const void* cookie,
1114 uint32_t opaque,
1115 Vbid vbucket,
1116 uint32_t flags,
1117 cb::mcbp::Status response,
1118 uint64_t exp_snap_start = 0,
1119 uint64_t exp_snap_end = 0);
1120
1121 extern "C" {
dcp_thread_func(void *args)1122 static void dcp_thread_func(void *args) {
1123 struct mb16357_ctx *ctx = static_cast<mb16357_ctx *>(args);
1124
1125 const void* cookie = testHarness->create_cookie();
1126 uint32_t opaque = 0xFFFF0000;
1127 uint32_t flags = 0;
1128 std::string name = "unittest";
1129
1130 // Wait for compaction thread to to ready (and waiting on cv) - as
1131 // we don't want the nofify_one() to be lost.
1132 for (;;) {
1133 std::lock_guard<std::mutex> lh(ctx->mutex);
1134 if (ctx->compactor_waiting) {
1135 break;
1136 }
1137 };
1138 // Now compactor is waiting to run (and we are just about to start DCP
1139 // stream, activate compaction.
1140 {
1141 std::lock_guard<std::mutex> lh(ctx->mutex);
1142 ctx->compaction_start = true;
1143 }
1144 ctx->cv.notify_one();
1145
1146 // Switch to replica
1147 check(set_vbucket_state(ctx->h, Vbid(0), vbucket_state_replica),
1148 "Failed to set vbucket state.");
1149
1150 // Open consumer connection
1151 checkeq(ctx->dcp->open(cookie,
1152 opaque,
1153 0,
1154 flags,
1155 name,
1156 R"({"consumer_name":"replica1"})"),
1157 ENGINE_SUCCESS,
1158 "Failed dcp Consumer open connection.");
1159
1160 add_stream_for_consumer(ctx->h,
1161 cookie,
1162 opaque++,
1163 Vbid(0),
1164 0,
1165 cb::mcbp::Status::Success);
1166
1167 uint32_t stream_opaque =
1168 get_int_stat(ctx->h, "eq_dcpq:unittest:stream_0_opaque", "dcp");
1169
1170 for (int i = 1; i <= ctx->items; i++) {
1171 std::stringstream ss;
1172 ss << "kamakeey-" << i;
1173
1174 // send mutations in single mutation snapshots to race more with compaction
1175 ctx->dcp->snapshot_marker(cookie,
1176 stream_opaque,
1177 Vbid(0),
1178 ctx->items,
1179 ctx->items + i,
1180 2,
1181 0 /*HCS*/,
1182 {} /*maxVisibleSeqno*/);
1183
1184 const std::string key = ss.str();
1185 const DocKey docKey{key, DocKeyEncodesCollectionId::No};
1186 ctx->dcp->mutation(cookie,
1187 stream_opaque,
1188 docKey,
1189 {(const uint8_t*)"value", 5},
1190 0, // priv bytes
1191 PROTOCOL_BINARY_RAW_BYTES,
1192 i * 3, // cas
1193 Vbid(0),
1194 0, // flags
1195 i + ctx->items, // by_seqno
1196 i + ctx->items, // rev_seqno
1197 0, // exptime
1198 0, // locktime
1199 {}, // meta
1200 INITIAL_NRU_VALUE);
1201 }
1202
1203 testHarness->destroy_cookie(cookie);
1204 }
1205
compact_thread_func(void *args)1206 static void compact_thread_func(void *args) {
1207 struct mb16357_ctx *ctx = static_cast<mb16357_ctx *>(args);
1208 std::unique_lock<std::mutex> lk(ctx->mutex);
1209 ctx->compactor_waiting = true;
1210 ctx->cv.wait(lk, [ctx]{return ctx->compaction_start;});
1211 compact_db(ctx->h, Vbid(0), Vbid(0), 99, ctx->items, 1);
1212 }
1213
writer_thread(void *args)1214 static void writer_thread(void *args) {
1215 struct writer_thread_ctx *wtc = static_cast<writer_thread_ctx *>(args);
1216
1217 for (int i = 0; i < wtc->items; ++i) {
1218 std::string key("key_" + std::to_string(i));
1219 checkeq(ENGINE_SUCCESS,
1220 store(wtc->h,
1221 nullptr,
1222 OPERATION_SET,
1223 key.c_str(),
1224 "somevalue",
1225 nullptr,
1226 0,
1227 wtc->vbid),
1228 "Failed to store value");
1229 }
1230 }
1231
continuous_dcp_thread(void *args)1232 static void continuous_dcp_thread(void *args) {
1233 struct continuous_dcp_ctx *cdc = static_cast<continuous_dcp_ctx *>(args);
1234
1235 DcpStreamCtx ctx;
1236 ctx.vbucket = cdc->vbid;
1237 std::string vbuuid_entry("vb_" + std::to_string(cdc->vbid.get()) +
1238 ":0:id");
1239 ctx.vb_uuid = get_ull_stat(cdc->h, vbuuid_entry.c_str(), "failovers");
1240 ctx.seqno = {cdc->start_seqno, std::numeric_limits<uint64_t>::max()};
1241 ctx.snapshot = {cdc->start_seqno, cdc->start_seqno};
1242 ctx.skip_verification = true;
1243
1244 cdc->dcpConsumer->addStreamCtx(ctx);
1245 cdc->dcpConsumer->run();
1246 }
1247 }
1248
1249 /* DCP step thread that keeps running till it reads upto 'exp_mutations'.
1250 Note: the exp_mutations is cumulative across all streams in the DCP
1251 connection */
dcp_waiting_step(EngineIface* h, const void* cookie, uint32_t opaque, uint64_t exp_mutations, MockDcpMessageProducers& producers)1252 static void dcp_waiting_step(EngineIface* h,
1253 const void* cookie,
1254 uint32_t opaque,
1255 uint64_t exp_mutations,
1256 MockDcpMessageProducers& producers) {
1257 bool done = false;
1258 size_t bytes_read = 0;
1259 bool pending_marker_ack = false;
1260 uint64_t marker_end = 0;
1261 uint64_t num_mutations = 0;
1262
1263 auto dcp = requireDcpIface(h);
1264
1265 do {
1266 if (bytes_read > 512) {
1267 checkeq(ENGINE_SUCCESS,
1268 dcp->buffer_acknowledgement(
1269 cookie, ++opaque, Vbid(0), bytes_read),
1270 "Failed to get dcp buffer ack");
1271 bytes_read = 0;
1272 }
1273 ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
1274 if (err == ENGINE_DISCONNECT) {
1275 done = true;
1276 } else {
1277 switch (producers.last_op) {
1278 case cb::mcbp::ClientOpcode::DcpMutation:
1279 bytes_read += producers.last_packet_size;
1280 if (pending_marker_ack &&
1281 producers.last_byseqno == marker_end) {
1282 sendDcpAck(h,
1283 cookie,
1284 cb::mcbp::ClientOpcode::DcpSnapshotMarker,
1285 cb::mcbp::Status::Success,
1286 producers.last_opaque);
1287 }
1288 ++num_mutations;
1289 break;
1290 case cb::mcbp::ClientOpcode::DcpStreamEnd:
1291 done = true;
1292 bytes_read += producers.last_packet_size;
1293 break;
1294 case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
1295 if (producers.last_flags & 8) {
1296 pending_marker_ack = true;
1297 marker_end = producers.last_snap_end_seqno;
1298 }
1299 bytes_read += producers.last_packet_size;
1300 break;
1301 case cb::mcbp::ClientOpcode::Invalid:
1302 /* No messages were ready on the last step call, so we
1303 * wait till the conn is notified of new item.
1304 * Note that we check for 0 because we clear the
1305 * producers.last_op value below.
1306 */
1307 testHarness->lock_cookie(cookie);
1308 /* waitfor_cookie() waits on a condition variable. But
1309 the api expects the cookie to be locked before
1310 calling it */
1311 wait_started = true;
1312 testHarness->waitfor_cookie(cookie);
1313 testHarness->unlock_cookie(cookie);
1314 break;
1315 default:
1316 // Aborting ...
1317 std::string err_string("Unexpected DCP operation: " +
1318 to_string(producers.last_op));
1319 check(false, err_string.c_str());
1320 }
1321 if (num_mutations >= exp_mutations) {
1322 done = true;
1323 }
1324 producers.last_op = cb::mcbp::ClientOpcode::Invalid;
1325 }
1326 } while (!done);
1327
1328 /* Do buffer ack of the outstanding bytes */
1329 dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
1330 }
1331
1332 // Testcases //////////////////////////////////////////////////////////////////
1333
test_dcp_vbtakeover_no_stream(EngineIface* h)1334 static enum test_result test_dcp_vbtakeover_no_stream(EngineIface* h) {
1335 write_items(h, 10);
1336 if (isPersistentBucket(h) && is_full_eviction(h)) {
1337 // MB-21646: FE mode - curr_items (which is part of "estimate") is
1338 // updated as part of flush, and thus if the writes are flushed in
1339 // blocks < 10 we may see an estimate < 10
1340 wait_for_flusher_to_settle(h);
1341 }
1342
1343 const auto est = get_int_stat(h, "estimate", "dcp-vbtakeover 0");
1344 checkeq(10, est, "Invalid estimate for non-existent stream");
1345 checkeq(ENGINE_NOT_MY_VBUCKET,
1346 get_stats(h, "dcp-vbtakeover 1"_ccb, {}, add_stats),
1347 "Expected not my vbucket");
1348
1349 return SUCCESS;
1350 }
1351
1352 /*
1353 * The following test is similar to the test_dcp_consumer_open test and
1354 * test_dcp_producer_open test, in that it opens a connections, then
1355 * immediately closes it.
1356 * It then moves time forward and repeats the creation of a connection and
1357 * checks that the new connection was created after the previous connection.
1358 */
1359
test_dcp_notifier_open(EngineIface* h)1360 static enum test_result test_dcp_notifier_open(EngineIface* h) {
1361 const auto* cookie1 = testHarness->create_cookie();
1362 const std::string name("unittest");
1363 const uint32_t seqno = 0;
1364 uint32_t opaque = 0;
1365 auto dcp = requireDcpIface(h);
1366
1367 checkeq(ENGINE_SUCCESS,
1368 dcp->open(cookie1,
1369 opaque,
1370 seqno,
1371 cb::mcbp::request::DcpOpenPayload::Notifier,
1372 name,
1373 R"({"consumer_name":"replica1"})"),
1374 "Failed dcp consumer open connection.");
1375
1376 const std::string stat_type("eq_dcpq:" + name + ":type");
1377 auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1378 const std::string stat_created("eq_dcpq:" + name + ":created");
1379 const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1380 checkeq(0, type.compare("notifier"), "Notifier not found");
1381 testHarness->destroy_cookie(cookie1);
1382
1383 testHarness->time_travel(600);
1384
1385 const auto* cookie2 = testHarness->create_cookie();
1386 checkeq(ENGINE_SUCCESS,
1387 dcp->open(cookie2,
1388 opaque,
1389 seqno,
1390 cb::mcbp::request::DcpOpenPayload::Notifier,
1391 name,
1392 R"({"consumer_name":"replica1"})"),
1393 "Failed dcp consumer open connection.");
1394
1395 type = get_str_stat(h, stat_type.c_str(), "dcp");
1396 checkeq(0, type.compare("notifier"), "Notifier not found");
1397 checkle((created + 600), get_int_stat(h, stat_created.c_str(), "dcp"),
1398 "New dcp stream is not newer");
1399 testHarness->destroy_cookie(cookie2);
1400
1401 return SUCCESS;
1402 }
1403
test_dcp_notifier(EngineIface* h)1404 static enum test_result test_dcp_notifier(EngineIface* h) {
1405 write_items(h, 10);
1406 const auto* cookie = testHarness->create_cookie();
1407 const std::string name("unittest");
1408 const uint32_t seqno = 0;
1409 const Vbid vbucket = Vbid(0);
1410 uint32_t opaque = 0;
1411 uint64_t start = 0;
1412 auto dcp = requireDcpIface(h);
1413 MockDcpMessageProducers producers(h);
1414
1415 checkeq(ENGINE_SUCCESS,
1416 dcp->open(cookie,
1417 opaque,
1418 seqno,
1419 cb::mcbp::request::DcpOpenPayload::Notifier,
1420 name,
1421 R"({"consumer_name":"replica1"})"),
1422 "Failed dcp notifier open connection.");
1423 // Get notification for an old item
1424 notifier_request(h, cookie, ++opaque, vbucket, start, true);
1425 dcp_step(h, cookie, producers);
1426 checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1427 producers.last_op,
1428 "Expected stream end");
1429 // Get notification when we're slightly behind
1430 start += 9;
1431 notifier_request(h, cookie, ++opaque, vbucket, start, true);
1432 dcp_step(h, cookie, producers);
1433 checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1434 producers.last_op,
1435 "Expected stream end");
1436 // Wait for notification of a future item
1437 start += 11;
1438 notifier_request(h, cookie, ++opaque, vbucket, start, true);
1439 dcp_step(h, cookie, producers);
1440 for (auto j = 0; j < 5; ++j) {
1441 const auto key = "key" + std::to_string(j);
1442 checkeq(ENGINE_SUCCESS,
1443 store(h,
1444 nullptr,
1445 OPERATION_SET,
1446 key.c_str(),
1447 "data",
1448 nullptr),
1449 "Failed to store a value");
1450 }
1451 // Shouldn't get a stream end yet
1452 dcp_step(h, cookie, producers);
1453 checkne(cb::mcbp::ClientOpcode::DcpStreamEnd,
1454 producers.last_op,
1455 "Wasn't expecting a stream end");
1456 for (auto j = 0; j < 6; ++j) {
1457 const auto key = "key" + std::to_string(j);
1458 checkeq(ENGINE_SUCCESS,
1459 store(h,
1460 nullptr,
1461 OPERATION_SET,
1462 key.c_str(),
1463 "data",
1464 nullptr),
1465 "Failed to store a value");
1466 }
1467 // Should get a stream end
1468 dcp_step(h, cookie, producers);
1469 checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1470 producers.last_op,
1471 "Expected stream end");
1472 testHarness->destroy_cookie(cookie);
1473
1474 return SUCCESS;
1475 }
1476
1477 /*
1478 * The following test is similar to the previous test
1479 * (test_dcp_notifier) , whereby a notifier connection is opened and
1480 * notifier_requests are made checking for the occurance of stream
1481 * end commands.
1482 *
1483 * In the following test we make a notifier_request equal to
1484 * the number of operations performed (in this case one). The test is
1485 * to ensure that a stream end is not received. A second operation is
1486 * then performed and this time we check for the occurance of a
1487 * stream end command.
1488 */
1489
test_dcp_notifier_equal_to_number_of_items( EngineIface* h)1490 static enum test_result test_dcp_notifier_equal_to_number_of_items(
1491 EngineIface* h) {
1492 const std::string key("key0");
1493 checkeq(ENGINE_SUCCESS,
1494 store(h, nullptr, OPERATION_SET, key.c_str(), "data", nullptr),
1495 "Failed to store a value");
1496
1497 const auto* cookie = testHarness->create_cookie();
1498 const std::string name("unittest");
1499 const uint32_t seqno = 0;
1500 const Vbid vbucket = Vbid(0);
1501 const uint64_t start = 1;
1502 uint32_t opaque = 0;
1503 auto dcp = requireDcpIface(h);
1504 MockDcpMessageProducers producers(h);
1505
1506 checkeq(ENGINE_SUCCESS,
1507 dcp->open(cookie,
1508 opaque,
1509 seqno,
1510 cb::mcbp::request::DcpOpenPayload::Notifier,
1511 name,
1512 R"({"consumer_name":"replica1"})"),
1513 "Failed dcp notifier open connection.");
1514 // Should not get a stream end
1515 notifier_request(h, cookie, ++opaque, vbucket, start, true);
1516 dcp_step(h, cookie, producers);
1517 checkne(cb::mcbp::ClientOpcode::DcpStreamEnd,
1518 producers.last_op,
1519 "Wasn't expecting a stream end");
1520 checkeq(ENGINE_SUCCESS,
1521 store(h, nullptr, OPERATION_SET, "key0", "data"),
1522 "Failed to store a value");
1523 dcp_step(h, cookie, producers);
1524 checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1525 producers.last_op,
1526 "Expected stream end");
1527 testHarness->destroy_cookie(cookie);
1528
1529 return SUCCESS;
1530 }
1531
test_dcp_consumer_open(EngineIface* h)1532 static enum test_result test_dcp_consumer_open(EngineIface* h) {
1533 const auto* cookie1 = testHarness->create_cookie();
1534 const std::string name("unittest");
1535 const uint32_t opaque = 0;
1536 const uint32_t seqno = 0;
1537 const uint32_t flags = 0;
1538 auto dcp = requireDcpIface(h);
1539
1540 checkeq(ENGINE_SUCCESS,
1541 dcp->open(cookie1,
1542 opaque,
1543 seqno,
1544 flags,
1545 name,
1546 R"({"consumer_name":"replica1"})"),
1547 "Failed dcp consumer open connection.");
1548
1549 const auto stat_type("eq_dcpq:" + name + ":type");
1550 auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1551 const auto stat_created("eq_dcpq:" + name + ":created");
1552 const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1553 checkeq(0, type.compare("consumer"), "Consumer not found");
1554 testHarness->destroy_cookie(cookie1);
1555
1556 testHarness->time_travel(600);
1557
1558 const auto* cookie2 = testHarness->create_cookie();
1559 checkeq(ENGINE_SUCCESS,
1560 dcp->open(cookie2,
1561 opaque,
1562 seqno,
1563 flags,
1564 name,
1565 R"({"consumer_name":"replica1"})"),
1566 "Failed dcp consumer open connection.");
1567
1568 type = get_str_stat(h, stat_type.c_str(), "dcp");
1569 checkeq(0, type.compare("consumer"), "Consumer not found");
1570 checklt(created, get_int_stat(h, stat_created.c_str(), "dcp"),
1571 "New dcp stream is not newer");
1572 testHarness->destroy_cookie(cookie2);
1573
1574 return SUCCESS;
1575 }
1576
test_dcp_consumer_flow_control_none(EngineIface* h)1577 static enum test_result test_dcp_consumer_flow_control_none(EngineIface* h) {
1578 const auto* cookie1 = testHarness->create_cookie();
1579 const std::string name("unittest");
1580 const uint32_t opaque = 0;
1581 const uint32_t seqno = 0;
1582 const uint32_t flags = 0;
1583 auto dcp = requireDcpIface(h);
1584
1585 checkeq(ENGINE_SUCCESS,
1586 dcp->open(cookie1,
1587 opaque,
1588 seqno,
1589 flags,
1590 name,
1591 R"({"consumer_name":"replica1"})"),
1592 "Failed dcp consumer open connection.");
1593
1594 const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1595 checkeq(0,
1596 get_int_stat(h, stat_name.c_str(), "dcp"),
1597 "Flow Control Buffer Size not zero");
1598 testHarness->destroy_cookie(cookie1);
1599
1600 return SUCCESS;
1601 }
1602
test_dcp_consumer_flow_control_static(EngineIface* h)1603 static enum test_result test_dcp_consumer_flow_control_static(EngineIface* h) {
1604 const auto* cookie1 = testHarness->create_cookie();
1605 const std::string name("unittest");
1606 const uint32_t opaque = 0;
1607 const uint32_t seqno = 0;
1608 const uint32_t flags = 0;
1609 const auto flow_ctl_buf_def_size = 10485760;
1610 auto dcp = requireDcpIface(h);
1611
1612 checkeq(ENGINE_SUCCESS,
1613 dcp->open(cookie1,
1614 opaque,
1615 seqno,
1616 flags,
1617 name,
1618 R"({"consumer_name":"replica1"})"),
1619 "Failed dcp consumer open connection.");
1620
1621 const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1622 checkeq(flow_ctl_buf_def_size,
1623 get_int_stat(h, stat_name.c_str(), "dcp"),
1624 "Flow Control Buffer Size not equal to default value");
1625 testHarness->destroy_cookie(cookie1);
1626
1627 return SUCCESS;
1628 }
1629
test_dcp_consumer_flow_control_dynamic(EngineIface* h)1630 static enum test_result test_dcp_consumer_flow_control_dynamic(EngineIface* h) {
1631 const auto* cookie1 = testHarness->create_cookie();
1632 const std::string name("unittest");
1633 const uint32_t opaque = 0;
1634 const uint32_t seqno = 0;
1635 const uint32_t flags = 0;
1636 /* Check the min limit */
1637 set_param(h,
1638 cb::mcbp::request::SetParamPayload::Type::Flush,
1639 "max_size",
1640 "500000000");
1641 checkeq(500000000, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1642
1643 auto dcp = requireDcpIface(h);
1644 checkeq(ENGINE_SUCCESS,
1645 dcp->open(cookie1,
1646 opaque,
1647 seqno,
1648 flags,
1649 name,
1650 R"({"consumer_name":"replica1"})"),
1651 "Failed dcp consumer open connection.");
1652
1653 const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1654 checkeq(10485760,
1655 get_int_stat(h, stat_name.c_str(), "dcp"),
1656 "Flow Control Buffer Size not equal to min");
1657 testHarness->destroy_cookie(cookie1);
1658
1659 /* Check the size as percentage of the bucket memory */
1660 const auto* cookie2 = testHarness->create_cookie();
1661 set_param(h,
1662 cb::mcbp::request::SetParamPayload::Type::Flush,
1663 "max_size",
1664 "2000000000");
1665 checkeq(2000000000, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1666
1667 checkeq(ENGINE_SUCCESS,
1668 dcp->open(cookie2,
1669 opaque,
1670 seqno,
1671 flags,
1672 name,
1673 R"({"consumer_name":"replica1"})"),
1674 "Failed dcp consumer open connection.");
1675
1676 checkeq(20000000,
1677 get_int_stat(h, stat_name.c_str(), "dcp"),
1678 "Flow Control Buffer Size not equal to 1% of mem size");
1679 testHarness->destroy_cookie(cookie2);
1680
1681 /* Check the case when mem used by flow control bufs hit the threshold */
1682 /* Create around 10 more connections to use more than 10% of the total
1683 memory */
1684 for (auto count = 0; count < 10; count++) {
1685 const auto* cookie = testHarness->create_cookie();
1686 checkeq(ENGINE_SUCCESS,
1687 dcp->open(cookie,
1688 opaque,
1689 seqno,
1690 flags,
1691 name,
1692 R"({"consumer_name":"replica1"})"),
1693 "Failed dcp consumer open connection.");
1694 testHarness->destroy_cookie(cookie);
1695 }
1696 /* By now mem used by flow control bufs would have crossed the threshold */
1697 const auto* cookie3 = testHarness->create_cookie();
1698 checkeq(ENGINE_SUCCESS,
1699 dcp->open(cookie3,
1700 opaque,
1701 seqno,
1702 flags,
1703 name,
1704 R"({"consumer_name":"replica1"})"),
1705 "Failed dcp consumer open connection.");
1706
1707 checkeq(10485760,
1708 get_int_stat(h, stat_name.c_str(), "dcp"),
1709 "Flow Control Buffer Size not equal to min after threshold is hit");
1710 testHarness->destroy_cookie(cookie3);
1711
1712 /* Check the max limit */
1713 const auto* cookie4 = testHarness->create_cookie();
1714 set_param(h,
1715 cb::mcbp::request::SetParamPayload::Type::Flush,
1716 "max_size",
1717 "7000000000");
1718 checkeq(static_cast<uint64_t>(7000000000),
1719 get_ull_stat(h, "ep_max_size"),
1720 "Incorrect new size.");
1721
1722 checkeq(ENGINE_SUCCESS,
1723 dcp->open(cookie4,
1724 opaque,
1725 seqno,
1726 flags,
1727 name,
1728 R"({"consumer_name":"replica1"})"),
1729 "Failed dcp consumer open connection.");
1730
1731 checkeq(52428800,
1732 get_int_stat(h, stat_name.c_str(), "dcp"),
1733 "Flow Control Buffer Size beyond max");
1734 testHarness->destroy_cookie(cookie4);
1735
1736 return SUCCESS;
1737 }
1738
test_dcp_consumer_flow_control_aggressive( EngineIface* h)1739 static enum test_result test_dcp_consumer_flow_control_aggressive(
1740 EngineIface* h) {
1741 const auto max_conns = 6;
1742 const void *cookie[max_conns];
1743 const auto flow_ctl_buf_max = 52428800;
1744 const auto flow_ctl_buf_min = 10485760;
1745 const auto ep_max_size = 1200000000;
1746 const auto bucketMemQuotaFraction = 0.05;
1747 set_param(h,
1748 cb::mcbp::request::SetParamPayload::Type::Flush,
1749 "max_size",
1750 std::to_string(ep_max_size).c_str());
1751 checkeq(ep_max_size, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1752
1753 std::vector<Vbid> vbuckets = {
1754 {Vbid{1}, Vbid{2}, Vbid{3}, Vbid{4}, Vbid{5}, Vbid{6}}};
1755 checkeq(std::size_t(max_conns),
1756 vbuckets.size(),
1757 "I need one vbucket per cookie");
1758 for (const auto& vb : vbuckets) {
1759 check(set_vbucket_state(h, vb, vbucket_state_replica),
1760 "Failed to set VBucket state.");
1761 }
1762
1763 /* Create first connection */
1764 const std::string name("unittest_");
1765 const auto name1(name + "0");
1766 const uint32_t opaque = 0;
1767 const uint32_t seqno = 0;
1768 const uint32_t flags = 0;
1769 cookie[0] = testHarness->create_cookie();
1770 auto dcp = requireDcpIface(h);
1771
1772 checkeq(ENGINE_SUCCESS,
1773 dcp->open(cookie[0],
1774 opaque,
1775 seqno,
1776 flags,
1777 name1,
1778 R"({"consumer_name":"replica1"})"),
1779 "Failed dcp consumer open connection.");
1780
1781 checkeq(ENGINE_SUCCESS,
1782 dcp->add_stream(cookie[0], 0, vbuckets[0], 0),
1783 "Failed to set up stream");
1784
1785 /* Check the max limit */
1786 auto stat_name = "eq_dcpq:" + name1 + ":max_buffer_bytes";
1787 checkeq(flow_ctl_buf_max,
1788 get_int_stat(h, stat_name.c_str(), "dcp"),
1789 "Flow Control Buffer Size not equal to max");
1790
1791 /* Create at least 4 more connections */
1792 for (auto count = 1; count < max_conns - 1; count++) {
1793 cookie[count] = testHarness->create_cookie();
1794 const auto name2(name + std::to_string(count));
1795 checkeq(ENGINE_SUCCESS,
1796 dcp->open(cookie[count],
1797 opaque,
1798 seqno,
1799 flags,
1800 name2,
1801 R"({"consumer_name":"replica1"})"),
1802 "Failed dcp consumer open connection.");
1803
1804 checkeq(ENGINE_SUCCESS,
1805 dcp->add_stream(cookie[count], 0, vbuckets[count], 0),
1806 "Failed to set up stream");
1807
1808 for (auto i = 0; i <= count; i++) {
1809 /* Check if the buffer size of all connections has changed */
1810 const auto stat_name("eq_dcpq:" + name + std::to_string(i) +
1811 ":max_buffer_bytes");
1812 checkeq((int)((ep_max_size * bucketMemQuotaFraction) / (count + 1)),
1813 get_int_stat(h, stat_name.c_str(), "dcp"),
1814 "Flow Control Buffer Size not correct");
1815 }
1816 }
1817
1818 /* Opening another connection should set the buffer size to min value */
1819 cookie[max_conns - 1] = testHarness->create_cookie();
1820 const auto name3(name + std::to_string(max_conns - 1));
1821 const auto stat_name2("eq_dcpq:" + name3 + ":max_buffer_bytes");
1822 checkeq(ENGINE_SUCCESS,
1823 dcp->open(cookie[max_conns - 1],
1824 opaque,
1825 seqno,
1826 flags,
1827 name3,
1828 R"({"consumer_name":"replica1"})"),
1829 "Failed dcp consumer open connection.");
1830
1831 checkeq(ENGINE_SUCCESS,
1832 dcp->add_stream(
1833 cookie[max_conns - 1], 0, vbuckets[max_conns - 1], 0),
1834 "Failed to set up stream");
1835
1836 checkeq(flow_ctl_buf_min,
1837 get_int_stat(h, stat_name2.c_str(), "dcp"),
1838 "Flow Control Buffer Size not equal to min");
1839
1840 /* Disconnect connections and see if flow control
1841 * buffer size of existing conns increase
1842 */
1843 for (auto count = 0; count < max_conns / 2; count++) {
1844 testHarness->destroy_cookie(cookie[count]);
1845 }
1846 /* Wait for disconnected connections to be deleted */
1847 wait_for_stat_to_be(h, "ep_dcp_dead_conn_count", 0, "dcp");
1848
1849 /* Check if the buffer size of all connections has increased */
1850 const auto exp_buf_size = (int)((ep_max_size * bucketMemQuotaFraction) /
1851 (max_conns - (max_conns / 2)));
1852
1853 /* Also check if we get control message indicating the flow control buffer
1854 size change from the consumer connections */
1855 MockDcpMessageProducers producers(h);
1856
1857 for (auto i = max_conns / 2; i < max_conns; i++) {
1858 /* Check if the buffer size of all connections has changed */
1859 const auto name4(name + std::to_string(i));
1860 const auto stat_name3("eq_dcpq:" + name4 + ":max_buffer_bytes");
1861 checkeq(exp_buf_size,
1862 get_int_stat(h, stat_name3.c_str(), "dcp"),
1863 "Flow Control Buffer Size not correct");
1864 checkeq(ENGINE_SUCCESS,
1865 dcp->step(cookie[i], &producers),
1866 "Pending flow control buffer change not processed");
1867 checkeq(cb::mcbp::ClientOpcode::DcpControl,
1868 producers.last_op,
1869 "Flow ctl buf size change control message not received");
1870 checkeq(0,
1871 producers.last_key.compare("connection_buffer_size"),
1872 "Flow ctl buf size change control message key error");
1873 checkeq(exp_buf_size,
1874 atoi(producers.last_value.c_str()),
1875 "Flow ctl buf size in control message not correct");
1876 }
1877 /* Disconnect remaining connections */
1878 for (auto count = max_conns / 2; count < max_conns; count++) {
1879 testHarness->destroy_cookie(cookie[count]);
1880 }
1881
1882 return SUCCESS;
1883 }
1884
test_dcp_producer_open(EngineIface* h)1885 static enum test_result test_dcp_producer_open(EngineIface* h) {
1886 const auto* cookie1 = testHarness->create_cookie();
1887 const std::string name("unittest");
1888 const uint32_t opaque = 0;
1889 const uint32_t seqno = 0;
1890 auto dcp = requireDcpIface(h);
1891
1892 checkeq(ENGINE_SUCCESS,
1893 dcp->open(cookie1,
1894 opaque,
1895 seqno,
1896 cb::mcbp::request::DcpOpenPayload::Producer,
1897 name,
1898 R"({"consumer_name":"replica1"})"),
1899 "Failed dcp producer open connection.");
1900 const auto stat_type("eq_dcpq:" + name + ":type");
1901 auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1902 const auto stat_created("eq_dcpq:" + name + ":created");
1903 const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1904 checkeq(0, type.compare("producer"), "Producer not found");
1905 testHarness->destroy_cookie(cookie1);
1906
1907 testHarness->time_travel(600);
1908
1909 const auto* cookie2 = testHarness->create_cookie();
1910 checkeq(ENGINE_SUCCESS,
1911 dcp->open(cookie2,
1912 opaque,
1913 seqno,
1914 cb::mcbp::request::DcpOpenPayload::Producer,
1915 name,
1916 R"({"consumer_name":"replica1"})"),
1917 "Failed dcp producer open connection.");
1918 type = get_str_stat(h, stat_type.c_str(), "dcp");
1919 checkeq(0, type.compare("producer"), "Producer not found");
1920 checklt(created, get_int_stat(h, stat_created.c_str(), "dcp"),
1921 "New dcp stream is not newer");
1922 testHarness->destroy_cookie(cookie2);
1923
1924 return SUCCESS;
1925 }
1926
test_dcp_producer_open_same_cookie(EngineIface* h)1927 static enum test_result test_dcp_producer_open_same_cookie(EngineIface* h) {
1928 const auto* cookie = testHarness->create_cookie();
1929 const std::string name("unittest");
1930 uint32_t opaque = 0;
1931 const uint32_t seqno = 0;
1932 auto dcp = requireDcpIface(h);
1933
1934 checkeq(ENGINE_SUCCESS,
1935 dcp->open(cookie,
1936 opaque,
1937 seqno,
1938 cb::mcbp::request::DcpOpenPayload::Producer,
1939 name,
1940 R"({"consumer_name":"replica1"})"),
1941 "Failed dcp producer open connection.");
1942
1943 const auto stat_type("eq_dcpq:" + name + ":type");
1944 auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1945 checkeq(0, type.compare("producer"), "Producer not found");
1946 /*
1947 * Number of references is 2 (as opposed to 1) because a
1948 * mock_connstuct is initialised to having 1 reference
1949 * to represent a client being connected to it.
1950 */
1951 checkeq(2,
1952 testHarness->get_number_of_mock_cookie_references(cookie),
1953 "Number of cookie references is not two");
1954 /*
1955 * engine_data needs to be reset so that it passes the check that
1956 * a connection does not already exist on the same socket.
1957 */
1958 testHarness->store_engine_specific(cookie, nullptr);
1959
1960 checkeq(ENGINE_DISCONNECT,
1961 dcp->open(cookie,
1962 opaque++,
1963 seqno,
1964 cb::mcbp::request::DcpOpenPayload::Producer,
1965 name,
1966 R"({"consumer_name":"replica1"})"),
1967 "Failed to return ENGINE_DISCONNECT");
1968
1969 checkeq(2,
1970 testHarness->get_number_of_mock_cookie_references(cookie),
1971 "Number of cookie references is not two");
1972
1973 testHarness->destroy_cookie(cookie);
1974
1975 checkeq(1,
1976 testHarness->get_number_of_mock_cookie_references(cookie),
1977 "Number of cookie references is not one");
1978
1979 return SUCCESS;
1980 }
1981
test_dcp_noop(EngineIface* h)1982 static enum test_result test_dcp_noop(EngineIface* h) {
1983 const auto* cookie = testHarness->create_cookie();
1984 const std::string name("unittest");
1985 const uint32_t seqno = 0;
1986 uint32_t opaque = 0;
1987 auto dcp = requireDcpIface(h);
1988 MockDcpMessageProducers producers(h);
1989
1990 checkeq(ENGINE_SUCCESS,
1991 dcp->open(cookie,
1992 opaque,
1993 seqno,
1994 cb::mcbp::request::DcpOpenPayload::Producer,
1995 name,
1996 R"({"consumer_name":"replica1"})"),
1997 "Failed dcp producer open connection.");
1998 const std::string param1_name("connection_buffer_size");
1999 const std::string param1_value("1024");
2000 checkeq(ENGINE_SUCCESS,
2001 dcp->control(cookie, ++opaque, param1_name, param1_value),
2002 "Failed to establish connection buffer");
2003 const std::string param2_name("enable_noop");
2004 const std::string param2_value("true");
2005 checkeq(ENGINE_SUCCESS,
2006 dcp->control(cookie, ++opaque, param2_name, param2_value),
2007 "Failed to enable no-ops");
2008
2009 testHarness->time_travel(201);
2010
2011 auto done = false;
2012 while (!done) {
2013 if (dcp->step(cookie, &producers) == ENGINE_DISCONNECT) {
2014 done = true;
2015 } else if (producers.last_op == cb::mcbp::ClientOpcode::DcpNoop) {
2016 done = true;
2017 // Producer opaques are hard coded to start from 10M
2018 checkeq(10000001,
2019 (int)producers.last_opaque,
2020 "last_opaque != 10,000,001");
2021 const auto stat_name("eq_dcpq:" + name + ":noop_wait");
2022 checkeq(1,
2023 get_int_stat(h, stat_name.c_str(), "dcp"),
2024 "Didn't send noop");
2025 sendDcpAck(h,
2026 cookie,
2027 cb::mcbp::ClientOpcode::DcpNoop,
2028 cb::mcbp::Status::Success,
2029 producers.last_opaque);
2030 checkeq(0,
2031 get_int_stat(h, stat_name.c_str(), "dcp"),
2032 "Didn't ack noop");
2033 } else if (producers.last_op != cb::mcbp::ClientOpcode::Invalid) {
2034 abort();
2035 }
2036 producers.last_op = cb::mcbp::ClientOpcode::Invalid;
2037 }
2038 testHarness->destroy_cookie(cookie);
2039
2040 return SUCCESS;
2041 }
2042
test_dcp_noop_fail(EngineIface* h)2043 static enum test_result test_dcp_noop_fail(EngineIface* h) {
2044 const auto* cookie = testHarness->create_cookie();
2045 const std::string name("unittest");
2046 const uint32_t seqno = 0;
2047 uint32_t opaque = 0;
2048 auto dcp = requireDcpIface(h);
2049
2050 checkeq(ENGINE_SUCCESS,
2051 dcp->open(cookie,
2052 opaque,
2053 seqno,
2054 cb::mcbp::request::DcpOpenPayload::Producer,
2055 name,
2056 R"({"consumer_name":"replica1"})"),
2057 "Failed dcp producer open connection.");
2058 const std::string param1_name("connection_buffer_size");
2059 const std::string param1_value("1024");
2060 checkeq(ENGINE_SUCCESS,
2061 dcp->control(cookie, ++opaque, param1_name, param1_value),
2062 "Failed to establish connection buffer");
2063 const std::string param2_name("enable_noop");
2064 const std::string param2_value("true");
2065 checkeq(ENGINE_SUCCESS,
2066 dcp->control(cookie, ++opaque, param2_name, param2_value),
2067 "Failed to enable no-ops");
2068
2069 testHarness->time_travel(201);
2070
2071 MockDcpMessageProducers producers(h);
2072 while (dcp->step(cookie, &producers) != ENGINE_DISCONNECT) {
2073 if (producers.last_op == cb::mcbp::ClientOpcode::DcpNoop) {
2074 // Producer opaques are hard coded to start from 10M
2075 checkeq(10000001,
2076 (int)producers.last_opaque,
2077 "last_opaque != 10,000,001");
2078 const auto stat_name("eq_dcpq:" + name + ":noop_wait");
2079 checkeq(1,
2080 get_int_stat(h, stat_name.c_str(), "dcp"),
2081 "Didn't send noop");
2082 testHarness->time_travel(201);
2083 } else if (producers.last_op != cb::mcbp::ClientOpcode::Invalid) {
2084 abort();
2085 }
2086 }
2087 testHarness->destroy_cookie(cookie);
2088
2089 return SUCCESS;
2090 }
2091
test_dcp_consumer_noop(EngineIface* h)2092 static enum test_result test_dcp_consumer_noop(EngineIface* h) {
2093 check(set_vbucket_state(h, Vbid(0), vbucket_state_replica),
2094 "Failed to set vbucket state.");
2095 const auto* cookie = testHarness->create_cookie();
2096 const std::string name("unittest");
2097 const uint32_t seqno = 0;
2098 const uint32_t flags = 0;
2099 const Vbid vbucket = Vbid(0);
2100 uint32_t opaque = 0;
2101 auto dcp = requireDcpIface(h);
2102
2103 // Open consumer connection
2104 checkeq(ENGINE_SUCCESS,
2105 dcp->open(cookie,
2106 opaque,
2107 seqno,
2108 flags,
2109 name,
2110 R"({"consumer_name":"replica1"})"),
2111 "Failed dcp Consumer open connection.");
2112 add_stream_for_consumer(
2113 h, cookie, opaque, vbucket, flags, cb::mcbp::Status::Success);
2114 testHarness->time_travel(201);
2115 // No-op not recieved for 201 seconds. Should be ok.
2116 MockDcpMessageProducers producers(h);
2117 checkeq(ENGINE_EWOULDBLOCK,
2118 dcp->step(cookie, &producers),
2119 "Expected engine would block");
2120
2121 testHarness->time_travel(200);
2122
2123 // Message not recieved for over 400 seconds. Should disconnect.
2124 checkeq(ENGINE_DISCONNECT,
2125 dcp->step(cookie, &producers),
2126 "Expected engine disconnect");
2127 testHarness->destroy_cookie(cookie);
2128
2129 return SUCCESS;
2130 }
2131
2132 /**
2133 * Creates a DCP producer stream with the specified values for noop_manditory,
2134 * noop_enabled, and XATTRs enabled, and then attempts to open a stream,
2135 * checking for the expectedResult code.
2136 */
test_dcp_noop_mandatory_combo(EngineIface* h, bool noopManditory, bool enableNoop, bool enableXAttrs, ENGINE_ERROR_CODE expectedResult)2137 static void test_dcp_noop_mandatory_combo(EngineIface* h,
2138 bool noopManditory,
2139 bool enableNoop,
2140 bool enableXAttrs,
2141 ENGINE_ERROR_CODE expectedResult) {
2142 const void* cookie = testHarness->create_cookie();
2143
2144 // Configure manditory noop as requested.
2145 set_param(h,
2146 cb::mcbp::request::SetParamPayload::Type::Flush,
2147 "dcp_noop_mandatory_for_v5_features",
2148 noopManditory ? "true" : "false");
2149 checkeq(noopManditory,
2150 get_bool_stat(h, "ep_dcp_noop_mandatory_for_v5_features"),
2151 "Incorrect value for dcp_noop_mandatory_for_v5_features");
2152
2153 // Create DCP consumer with requested flags.
2154 TestDcpConsumer tdc("dcp_noop_manditory_test", cookie, h);
2155 uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2156 if (enableXAttrs) {
2157 flags |= cb::mcbp::request::DcpOpenPayload::IncludeXattrs;
2158 }
2159 tdc.openConnection(flags);
2160
2161 // Setup noop on the DCP connection.
2162 checkeq(ENGINE_SUCCESS,
2163 tdc.sendControlMessage("enable_noop",
2164 enableNoop ? "true" : "false"),
2165 "Failed to configure noop");
2166
2167 // Finally, attempt to create the stream and verify we get the expeced
2168 // response.
2169 DcpStreamCtx ctx;
2170 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2171 ctx.seqno = {0, static_cast<uint64_t>(-1)};
2172 ctx.exp_err = expectedResult;
2173 tdc.addStreamCtx(ctx);
2174
2175 tdc.openStreams();
2176
2177 testHarness->destroy_cookie(cookie);
2178 }
2179
test_dcp_noop_mandatory(EngineIface* h)2180 static enum test_result test_dcp_noop_mandatory(EngineIface* h) {
2181 // Test all combinations of {manditoryNoop, enable_noop, includeXAttr}
2182 test_dcp_noop_mandatory_combo(h, false, false, false, ENGINE_SUCCESS);
2183 test_dcp_noop_mandatory_combo(h, false, false, true, ENGINE_SUCCESS);
2184 test_dcp_noop_mandatory_combo(h, false, true, false, ENGINE_SUCCESS);
2185 test_dcp_noop_mandatory_combo(h, false, true, true, ENGINE_SUCCESS);
2186 test_dcp_noop_mandatory_combo(h, true, false, false, ENGINE_SUCCESS);
2187 test_dcp_noop_mandatory_combo(h, true, false, true, ENGINE_ENOTSUP);
2188 test_dcp_noop_mandatory_combo(h, true, true, false, ENGINE_SUCCESS);
2189 test_dcp_noop_mandatory_combo(h, true, true, true, ENGINE_SUCCESS);
2190
2191 return SUCCESS;
2192 }
2193
test_dcp_producer_stream_req_open(EngineIface* h)2194 static enum test_result test_dcp_producer_stream_req_open(EngineIface* h) {
2195 const void* cookie = testHarness->create_cookie();
2196 const int num_items = 3;
2197
2198 DcpStreamCtx ctx;
2199 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2200 ctx.seqno = {0, static_cast<uint64_t>(-1)};
2201
2202 std::string name("unittest");
2203 TestDcpConsumer tdc(name.c_str(), cookie, h);
2204 tdc.addStreamCtx(ctx);
2205
2206 tdc.openConnection();
2207
2208 /* Create a separate thread that does tries to get any DCP items */
2209 std::thread dcp_step_thread(
2210 dcp_waiting_step, h, cookie, 0, num_items, std::ref(tdc.producers));
2211
2212 /* We need to wait till the 'dcp_waiting_step' thread begins its wait */
2213 while (1) {
2214 /* Busy wait is ok here. To do a non busy wait we must use
2215 another condition variable which is an overkill here */
2216 testHarness->lock_cookie(cookie);
2217 if (wait_started) {
2218 testHarness->unlock_cookie(cookie);
2219 break;
2220 }
2221 testHarness->unlock_cookie(cookie);
2222 }
2223
2224 /* Now create a stream */
2225 tdc.openStreams();
2226
2227 /* Write items */
2228 write_items(h, num_items, 0);
2229 wait_for_flusher_to_settle(h);
2230 verify_curr_items(h, num_items, "Wrong amount of items");
2231
2232 /* If the notification (to 'dcp_waiting_step' thread upon writing an item)
2233 mechanism is efficient, we must see the 'dcp_waiting_step' finish before
2234 test time out */
2235 dcp_step_thread.join();
2236
2237 testHarness->destroy_cookie(cookie);
2238
2239 return SUCCESS;
2240 }
2241
test_dcp_producer_stream_req_partial(EngineIface* h)2242 static enum test_result test_dcp_producer_stream_req_partial(EngineIface* h) {
2243 // Should start at checkpoint_id 2
2244 const auto initial_ckpt_id =
2245 get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint");
2246 checkeq(2, initial_ckpt_id, "Expected to start at checkpoint ID 2");
2247
2248 // Create two 'full' checkpoints by storing exactly 2 x 'chk_max_items'
2249 // into the VBucket.
2250 const auto max_ckpt_items = get_int_stat(h, "ep_chk_max_items");
2251
2252 write_items(h, max_ckpt_items);
2253 wait_for_flusher_to_settle(h);
2254 wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", max_ckpt_items);
2255 checkeq(initial_ckpt_id + 1,
2256 get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint"),
2257 "Expected #checkpoints to increase by 1 after storing items");
2258
2259 write_items(h, max_ckpt_items, max_ckpt_items);
2260 wait_for_flusher_to_settle(h);
2261 wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", max_ckpt_items * 2);
2262 checkeq(initial_ckpt_id + 2,
2263 get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint"),
2264 "Expected #checkpoints to increase by 2 after storing 2x "
2265 "max_ckpt_items");
2266
2267 // Stop persistece (so the persistence cursor cannot advance into the
2268 // deletions below, and hence de-dupe them with respect to the
2269 // additions we just did).
2270 stop_persistence(h);
2271
2272 // Now delete half of the keys. Given that we have reached the
2273 // maximum checkpoint size above, all the deletes should be in a
2274 // subsequent checkpoint.
2275 for (int j = 0; j < max_ckpt_items; ++j) {
2276 std::stringstream ss;
2277 ss << "key" << j;
2278 checkeq(ENGINE_SUCCESS,
2279 del(h, ss.str().c_str(), 0, Vbid(0)),
2280 "Expected delete to succeed");
2281 }
2282
2283 const void* cookie = testHarness->create_cookie();
2284
2285 // Verify that we recieve full checkpoints when we only ask for
2286 // sequence numbers which lie within partial checkpoints. We
2287 // should have the following Checkpoints in existence:
2288 //
2289 // { 1,100} - MUTATE(key0..key99), from disk.
2290 // {101,200} - MUTATE(key100.key199), from disk.
2291 // {201,300} - DELETE(key0..key99), in memory (as persistence has been stopped).
2292 //
2293 // We request a start and end which lie in the middle of checkpoints -
2294 // start at 105 and end at 209. We should recieve to the end of
2295 // complete checkpoints, i.e. from 105 all the way to 300.
2296 DcpStreamCtx ctx;
2297 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2298 ctx.seqno = {105, 209};
2299 ctx.snapshot = {105, 105};
2300 ctx.exp_mutations = 95; // 105 to 200
2301 ctx.exp_deletions = 100; // 201 to 300
2302
2303 if (isPersistentBucket(h)) {
2304 ctx.exp_markers = 2;
2305 } else {
2306 // the ephemeral stream request won't be broken into two snapshots of
2307 // backfill from disk vs the checkpoint in memory
2308 ctx.exp_markers = 1;
2309 }
2310
2311 TestDcpConsumer tdc("unittest", cookie, h);
2312 tdc.addStreamCtx(ctx);
2313 tdc.run();
2314
2315 testHarness->destroy_cookie(cookie);
2316
2317 return SUCCESS;
2318 }
2319
test_dcp_producer_stream_req_full_merged_snapshots( EngineIface* h)2320 static enum test_result test_dcp_producer_stream_req_full_merged_snapshots(
2321 EngineIface* h) {
2322 const int num_items = 300, batch_items = 100;
2323 for (int start_seqno = 0; start_seqno < num_items;
2324 start_seqno += batch_items) {
2325 wait_for_flusher_to_settle(h);
2326 write_items(h, batch_items, start_seqno);
2327 }
2328
2329 wait_for_flusher_to_settle(h);
2330 verify_curr_items(h, num_items, "Wrong amount of items");
2331 wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2332
2333 const void* cookie = testHarness->create_cookie();
2334
2335 DcpStreamCtx ctx;
2336 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2337 ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2338 ctx.exp_mutations = num_items;
2339 /* Disk backfill sends all items in disk as one snapshot */
2340 ctx.exp_markers = 1;
2341
2342 TestDcpConsumer tdc("unittest", cookie, h);
2343 tdc.addStreamCtx(ctx);
2344 tdc.run();
2345
2346 testHarness->destroy_cookie(cookie);
2347
2348 return SUCCESS;
2349 }
2350
test_dcp_producer_stream_req_full(EngineIface* h)2351 static enum test_result test_dcp_producer_stream_req_full(EngineIface* h) {
2352 const int num_items = 300, batch_items = 100;
2353 for (int start_seqno = 0; start_seqno < num_items;
2354 start_seqno += batch_items) {
2355 wait_for_flusher_to_settle(h);
2356 write_items(h, batch_items, start_seqno);
2357 }
2358
2359 wait_for_flusher_to_settle(h);
2360 verify_curr_items(h, num_items, "Wrong amount of items");
2361 wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2362
2363 checkne(num_items - get_stat<uint64_t>(h, "ep_items_rm_from_checkpoints"),
2364 uint64_t{0},
2365 "Require a non-zero number of items to still be present in "
2366 "CheckpointManager to be able to get 2x snapshot markers "
2367 "(1x disk, 1x memory)");
2368
2369 const void* cookie = testHarness->create_cookie();
2370
2371 DcpStreamCtx ctx;
2372 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2373 ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2374 ctx.exp_mutations = num_items;
2375 /* Memory backfill sends items from checkpoint snapshots as much as possible
2376 Relies on backfill only when checkpoint snapshot is cleaned up */
2377 ctx.exp_markers = 2;
2378
2379 TestDcpConsumer tdc("unittest", cookie, h);
2380 tdc.addStreamCtx(ctx);
2381 tdc.run();
2382
2383 testHarness->destroy_cookie(cookie);
2384
2385 return SUCCESS;
2386 }
2387
2388 /*
2389 * Test that deleted items (with values) backfill correctly
2390 */
test_dcp_producer_deleted_item_backfill( EngineIface* h)2391 static enum test_result test_dcp_producer_deleted_item_backfill(
2392 EngineIface* h) {
2393 const int deletions = 10;
2394 write_items(h,
2395 deletions,
2396 0,
2397 "del",
2398 "value",
2399 0 /*exp*/,
2400 Vbid(0),
2401 DocumentState::Deleted);
2402 wait_for_flusher_to_settle(h);
2403
2404 const void* cookie = testHarness->create_cookie();
2405
2406 DcpStreamCtx ctx;
2407 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2408 ctx.seqno = {0, deletions};
2409 ctx.exp_deletions = deletions;
2410 ctx.expected_values = deletions;
2411 ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2412 ctx.exp_markers = 1;
2413
2414 TestDcpConsumer tdc("unittest", cookie, h);
2415 tdc.addStreamCtx(ctx);
2416 tdc.run();
2417
2418 testHarness->destroy_cookie(cookie);
2419
2420 return SUCCESS;
2421 }
2422
2423 // Function to parameterize whether expiries should be outputted or not.
testDcpProducerExpiredItemBackfill( EngineIface* h, EnableExpiryOutput enableExpiryOutput)2424 static test_result testDcpProducerExpiredItemBackfill(
2425 EngineIface* h, EnableExpiryOutput enableExpiryOutput) {
2426 const int expiries = 5;
2427 const int start_seqno = 0;
2428 write_items(h,
2429 expiries,
2430 start_seqno,
2431 "exp",
2432 "value",
2433 5 /*exp*/,
2434 Vbid(0),
2435 DocumentState::Alive);
2436
2437 wait_for_flusher_to_settle(h);
2438 verify_curr_items(h, expiries, "Wrong number of items");
2439
2440 testHarness->time_travel(256);
2441 const void* cookie1 = testHarness->create_cookie();
2442 for (int i = 0; i < expiries; ++i) {
2443 std::string key("exp" + std::to_string(i + start_seqno));
2444 cb::EngineErrorItemPair ret =
2445 get(h, cookie1, key.c_str(), Vbid(0), DocStateFilter::Alive);
2446 checkeq(cb::engine_errc::no_such_key,
2447 ret.first,
2448 "Expected get to return 'no_such_key'");
2449 }
2450 testHarness->destroy_cookie(cookie1);
2451
2452 wait_for_flusher_to_settle(h);
2453 checkeq(get_stat<int>(h, "vb_active_expired"),
2454 expiries,
2455 "Expected vb_active_expired to contain correct number of expiries");
2456
2457 DcpStreamCtx ctx;
2458 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2459 ctx.seqno = {0, expiries * 2}; // doubled as each will cause a mutation,
2460 // as well as an expiry
2461
2462 if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2463 ctx.exp_expirations = expiries;
2464 } else {
2465 ctx.exp_deletions = expiries;
2466 }
2467
2468 ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2469 ctx.exp_markers = 1;
2470
2471 const void* cookie = testHarness->create_cookie();
2472 TestDcpConsumer tdc("unittest", cookie, h);
2473 uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2474
2475 if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2476 // dcp expiry requires the connection to opt in to delete times
2477 flags |= cb::mcbp::request::DcpOpenPayload::IncludeDeleteTimes;
2478 }
2479 tdc.openConnection(flags);
2480
2481 if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2482 checkeq(ENGINE_SUCCESS,
2483 tdc.sendControlMessage("enable_expiry_opcode", "true"),
2484 "Failed to enable_expiry_opcode");
2485 }
2486
2487 tdc.addStreamCtx(ctx);
2488
2489 tdc.run(false);
2490
2491 testHarness->destroy_cookie(cookie);
2492
2493 return SUCCESS;
2494 }
2495
test_dcp_producer_stream_req_backfill(EngineIface* h)2496 static enum test_result test_dcp_producer_stream_req_backfill(EngineIface* h) {
2497 const int num_items = 400, batch_items = 200;
2498 for (int start_seqno = 0; start_seqno < num_items;
2499 start_seqno += batch_items) {
2500 if (200 == start_seqno) {
2501 wait_for_flusher_to_settle(h);
2502 wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", 200);
2503 stop_persistence(h);
2504 }
2505 write_items(h, batch_items, start_seqno);
2506 }
2507
2508 wait_for_stat_to_be_gte(h, "vb_0:num_checkpoints", 2, "checkpoint");
2509
2510 const void* cookie = testHarness->create_cookie();
2511
2512 DcpStreamCtx ctx;
2513 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2514 ctx.seqno = {0, 200};
2515 ctx.exp_mutations = 200;
2516 ctx.exp_markers = 1;
2517
2518 TestDcpConsumer tdc("unittest", cookie, h);
2519 tdc.addStreamCtx(ctx);
2520 tdc.run();
2521
2522 testHarness->destroy_cookie(cookie);
2523
2524 return SUCCESS;
2525 }
2526
2527 /*
2528 * Test that expired items (with values) backfill correctly with Expiry Output
2529 * disabled
2530 */
test_dcp_producer_expired_item_backfill_delete( EngineIface* h)2531 static enum test_result test_dcp_producer_expired_item_backfill_delete(
2532 EngineIface* h) {
2533 return testDcpProducerExpiredItemBackfill(h, EnableExpiryOutput::No);
2534 }
2535
2536 /*
2537 * Test that expired items (with values) backfill correctly with Expiry Output
2538 * enabled
2539 */
test_dcp_producer_expired_item_backfill_expire( EngineIface* h)2540 static enum test_result test_dcp_producer_expired_item_backfill_expire(
2541 EngineIface* h) {
2542 return testDcpProducerExpiredItemBackfill(h, EnableExpiryOutput::Yes);
2543 }
2544
test_dcp_producer_stream_req_diskonly(EngineIface* h)2545 static enum test_result test_dcp_producer_stream_req_diskonly(EngineIface* h) {
2546 const int num_items = 300, batch_items = 100;
2547 for (int start_seqno = 0; start_seqno < num_items;
2548 start_seqno += batch_items) {
2549 wait_for_flusher_to_settle(h);
2550 write_items(h, batch_items, start_seqno);
2551 }
2552
2553 wait_for_flusher_to_settle(h);
2554 verify_curr_items(h, num_items, "Wrong amount of items");
2555 wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2556
2557 const void* cookie = testHarness->create_cookie();
2558
2559 DcpStreamCtx ctx;
2560 ctx.flags = DCP_ADD_STREAM_FLAG_DISKONLY;
2561 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2562 ctx.seqno = {0, static_cast<uint64_t>(-1)};
2563 ctx.exp_mutations = 300;
2564 ctx.exp_markers = 1;
2565
2566 TestDcpConsumer tdc("unittest", cookie, h);
2567 tdc.addStreamCtx(ctx);
2568 tdc.run();
2569
2570 testHarness->destroy_cookie(cookie);
2571
2572 return SUCCESS;
2573 }
2574
test_dcp_producer_disk_backfill_limits(EngineIface* h)2575 static enum test_result test_dcp_producer_disk_backfill_limits(EngineIface* h) {
2576 const int num_items = 3;
2577 write_items(h, num_items);
2578
2579 wait_for_flusher_to_settle(h);
2580 verify_curr_items(h, num_items, "Wrong amount of items");
2581 wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2582
2583 const void* cookie = testHarness->create_cookie();
2584
2585 DcpStreamCtx ctx;
2586 ctx.flags = DCP_ADD_STREAM_FLAG_DISKONLY;
2587 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2588 ctx.seqno = {0, static_cast<uint64_t>(-1)};
2589 ctx.exp_mutations = 3;
2590 ctx.exp_markers = 1;
2591
2592 TestDcpConsumer tdc("unittest", cookie, h);
2593 tdc.addStreamCtx(ctx);
2594 tdc.run();
2595
2596 uint64_t exp_backfill_task_runs;
2597 if (isPersistentBucket(h)) {
2598 /* Backfill task runs are expected as below:
2599 once for backfill_state_init + once for backfill_state_completing +
2600 once post all backfills are run finished. Here since we have
2601 dcp_scan_byte_limit = 100, we expect the backfill task to run
2602 additional 'num_items' during backfill_state_scanning state. */
2603 exp_backfill_task_runs = 3 + num_items;
2604 } else {
2605 /* Backfill task runs are expected as below:
2606 once for backfill_state_init + once for backfill_state_completing.
2607 Here since we have dcp_scan_byte_limit = 100, we expect the backfill
2608 task to run additional 'num_items' during BackfillState::scanning
2609 state. */
2610 exp_backfill_task_runs = 2 + num_items;
2611 }
2612 checkeq(exp_backfill_task_runs,
2613 get_histo_stat(h,
2614 "BackfillManagerTask[auxIO]",
2615 "runtimes",
2616 Histo_stat_info::TOTAL_COUNT),
2617 "backfill_tasks did not run expected number of times");
2618
2619 testHarness->destroy_cookie(cookie);
2620
2621 return SUCCESS;
2622 }
2623
test_dcp_producer_disk_backfill_buffer_limits( EngineIface* h)2624 static enum test_result test_dcp_producer_disk_backfill_buffer_limits(
2625 EngineIface* h) {
2626 const int num_items = 3;
2627 write_items(h, num_items);
2628
2629 wait_for_flusher_to_settle(h);
2630 verify_curr_items(h, num_items, "Wrong amount of items");
2631
2632 /* Wait for the checkpoint to be removed so that upon DCP connection
2633 backfill is scheduled */
2634 wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", num_items);
2635
2636 const void* cookie = testHarness->create_cookie();
2637
2638 DcpStreamCtx ctx;
2639 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2640 ctx.seqno = {0, num_items};
2641 ctx.exp_mutations = 3;
2642 ctx.exp_markers = 1;
2643
2644 TestDcpConsumer tdc("unittest", cookie, h);
2645 tdc.addStreamCtx(ctx);
2646 tdc.run();
2647
2648 testHarness->destroy_cookie(cookie);
2649
2650 return SUCCESS;
2651 }
2652
test_dcp_producer_stream_req_mem(EngineIface* h)2653 static enum test_result test_dcp_producer_stream_req_mem(EngineIface* h) {
2654 const int num_items = 300, batch_items = 100;
2655 for (int start_seqno = 0; start_seqno < num_items;
2656 start_seqno += batch_items) {
2657 wait_for_flusher_to_settle(h);
2658 write_items(h, batch_items, start_seqno);
2659 }
2660
2661 wait_for_flusher_to_settle(h);
2662 verify_curr_items(h, num_items, "Wrong amount of items");
2663
2664 const void* cookie = testHarness->create_cookie();
2665
2666 DcpStreamCtx ctx;
2667 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2668 ctx.seqno = {200, 300};
2669 ctx.snapshot = {200, 200};
2670 ctx.exp_mutations = 100;
2671 ctx.exp_markers = 1;
2672
2673 TestDcpConsumer tdc("unittest", cookie, h);
2674 tdc.addStreamCtx(ctx);
2675 tdc.run();
2676
2677 testHarness->destroy_cookie(cookie);
2678
2679 return SUCCESS;
2680 }
2681
2682 /**
2683 * Test that a DCP stream request in DGM scenarios correctly receives items
2684 * from both memory and disk.
2685 */
test_dcp_producer_stream_req_dgm(EngineIface* h)2686 static enum test_result test_dcp_producer_stream_req_dgm(EngineIface* h) {
2687 // Test only works for the now removed 2-bit LRU eviction algorithm as it
2688 // relies on looking at the LRU state.
2689 // @todo Investigate converting the test to work with the new hifi_mfu
2690 // eviction algorithm.
2691 return SUCCESS;
2692
2693 const void* cookie = testHarness->create_cookie();
2694
2695 int i = 0; // Item count
2696 while (true) {
2697 // Gathering stats on every store is expensive, just check every 100 iterations
2698 if ((i % 100) == 0) {
2699 if (get_int_stat(h, "vb_active_perc_mem_resident") < 50) {
2700 break;
2701 }
2702 }
2703
2704 std::stringstream ss;
2705 ss << "key" << i;
2706 ENGINE_ERROR_CODE ret =
2707 store(h, cookie, OPERATION_SET, ss.str().c_str(), "somevalue");
2708 if (ret == ENGINE_SUCCESS) {
2709 i++;
2710 }
2711 }
2712
2713 // Sanity check - ensure we have enough vBucket quota (max_size)
2714 // such that we have 1000 items - enough to give us 0.1%
2715 // granuarity in any residency calculations. */
2716 checkge(i, 1000,
2717 "Does not have expected min items; Check max_size setting");
2718
2719 wait_for_flusher_to_settle(h);
2720
2721 verify_curr_items(h, i, "Wrong number of items");
2722 double num_non_resident = get_int_stat(h, "vb_active_num_non_resident");
2723 checkge(num_non_resident,
2724 i * 0.5,
2725 "Expected at least 50% of items to be non-resident");
2726
2727 // Reduce max_size from 6291456 to 6000000
2728 set_param(h,
2729 cb::mcbp::request::SetParamPayload::Type::Flush,
2730 "max_size",
2731 "6000000");
2732 checkgt(50,
2733 get_int_stat(h, "vb_active_perc_mem_resident"),
2734 "Too high percentage of memory resident");
2735
2736 DcpStreamCtx ctx;
2737 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2738 ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2739 ctx.exp_mutations = i;
2740 ctx.exp_markers = 1;
2741
2742 TestDcpConsumer tdc("unittest", cookie, h);
2743 tdc.addStreamCtx(ctx);
2744 tdc.run();
2745
2746 testHarness->destroy_cookie(cookie);
2747
2748 return SUCCESS;
2749 }
2750
2751 /**
2752 * Test that eviction hotness data is passed in DCP stream.
2753 */
test_dcp_producer_stream_req_coldness(EngineIface* h)2754 static enum test_result test_dcp_producer_stream_req_coldness(EngineIface* h) {
2755 const void* cookie = testHarness->create_cookie();
2756
2757 for (int ii = 0; ii < 10; ii++) {
2758 std::stringstream ss;
2759 ss << "key" << ii;
2760 store(h, cookie, OPERATION_SET, ss.str().c_str(), "somevalue");
2761 }
2762
2763 wait_for_flusher_to_settle(h);
2764 verify_curr_items(h, 10, "Wrong number of items");
2765
2766 for (int ii = 0; ii < 5; ii++) {
2767 std::stringstream ss;
2768 ss << "key" << ii;
2769 evict_key(h, ss.str().c_str(), Vbid(0), "Ejected.");
2770 }
2771 wait_for_flusher_to_settle(h);
2772 wait_for_stat_to_be(h, "ep_num_value_ejects", 5);
2773
2774 TestDcpConsumer tdc("unittest", cookie, h);
2775 uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2776
2777 tdc.openConnection(flags);
2778
2779 checkeq(ENGINE_SUCCESS,
2780 tdc.sendControlMessage("supports_hifi_MFU", "true"),
2781 "Failed to configure MFU");
2782
2783 DcpStreamCtx ctx;
2784 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2785 ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2786 ctx.exp_mutations = 10;
2787 ctx.exp_markers = 1;
2788
2789 // Only stream from disk to ensure that we only ever get a single snapshot.
2790 // If we got unlucky we could see 2 snapshots due to creation of a second
2791 // checkpoint if we were streaming from the checkpoint manager.
2792 ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2793
2794 tdc.addStreamCtx(ctx);
2795 tdc.run(false);
2796
2797 checkeq(tdc.getNruCounters()[1],
2798 5, "unexpected number of hot items");
2799 checkeq(tdc.getNruCounters()[0],
2800 5, "unexpected number of cold items");
2801 testHarness->destroy_cookie(cookie);
2802
2803 return SUCCESS;
2804 }
2805
2806 /**
2807 * Test that eviction hotness data is picked up by the DCP consumer
2808 */
test_dcp_consumer_hotness_data(EngineIface* h)2809 static enum test_result test_dcp_consumer_hotness_data(EngineIface* h) {
2810 const void* cookie = testHarness->create_cookie();
2811 uint32_t opaque = 0xFFFF0000;
2812 uint32_t flags = 0;
2813 Vbid vbid = Vbid(0);
2814 const char* name = "unittest";
2815
2816 // Set vbucket 0 to a replica so we can consume a mutation over DCP
2817 check(set_vbucket_state(h, vbid, vbucket_state_replica),
2818 "Failed to set vbucket state.");
2819
2820 // Open consumer connection
2821 auto dcp = requireDcpIface(h);
2822 checkeq(ENGINE_SUCCESS,
2823 dcp->open(cookie,
2824 opaque,
2825 0,
2826 flags,
2827 name,
2828 R"({"consumer_name":"replica1"})"),
2829 "Failed dcp Consumer open connection.");
2830
2831 add_stream_for_consumer(h,
2832 cookie,
2833 opaque++,
2834 vbid,
2835 DCP_ADD_STREAM_FLAG_TAKEOVER,
2836 cb::mcbp::Status::Success);
2837
2838 uint32_t stream_opaque =
2839 get_int_stat(h, "eq_dcpq:unittest:stream_0_opaque", "dcp");
2840
2841 // Snapshot marker indicating a mutation will follow
2842 checkeq(ENGINE_SUCCESS,
2843 dcp->snapshot_marker(cookie,
2844 stream_opaque,
2845 vbid,
2846 0,
2847 1,
2848 0,
2849 {} /*HCS*/,
2850 {} /*maxVisibleSeqno*/),
2851 "Failed to send marker!");
2852
2853 const DocKey docKey("key", DocKeyEncodesCollectionId::No);
2854 checkeq(ENGINE_SUCCESS,
2855 dcp->mutation(cookie,
2856 stream_opaque,
2857 docKey,
2858 {(const uint8_t*)"value", 5},
2859 0, // privileged bytes
2860 PROTOCOL_BINARY_RAW_BYTES,
2861 0, // cas
2862 vbid,
2863 0, // flags
2864 1, // by_seqno
2865 0, // rev_seqno
2866 0, // expiration
2867 0, // lock_time
2868 {}, // meta
2869 128 // frequency value
2870 ),
2871 "Failed to send dcp mutation");
2872
2873 // Set vbucket 0 to active so we can perform a get
2874 check(set_vbucket_state(h, vbid, vbucket_state_active),
2875 "Failed to set vbucket state.");
2876
2877 // Perform a get to retrieve the frequency counter value
2878 auto rv = get(h, cookie, "key", vbid, DocStateFilter::AliveOrDeleted);
2879 checkeq(cb::engine_errc::success, rv.first, "Failed to fetch");
2880 const Item* it = reinterpret_cast<const Item*>(rv.second.get());
2881
2882 // Confirm that the item that was consumed over DCP has picked up
2883 // the correct hotness data value.
2884 // Performing the get may increase the hotness value by 1 and therefore
2885 // it is valid for the value to be 128 or 129.
2886 checkle(128,
2887 int(it->getFreqCounterValue()),
2888 "Failed to set the hotness data to the correct value");
2889 checkge(129,
2890 int(it->getFreqCounterValue()),
2891 "Failed to set the hotness data to the correct value");
2892
2893 testHarness->destroy_cookie(cookie);
2894
2895 return SUCCESS;
2896 }
2897
test_dcp_producer_stream_latest(EngineIface* h)2898 static enum test_result test_dcp_producer_stream_latest(EngineIface* h) {
2899 const int num_items = 300, batch_items = 100;
2900 for (int start_seqno = 0; start_seqno < num_items;
2901 start_seqno += batch_items) {
2902 wait_for_flusher_to_settle(h);
2903 write_items(h, batch_items, start_seqno);
2904 }
2905
2906 wait_for_flusher_to_settle(h);
2907 verify_curr_items(h, num_items, "Wrong amount of items");
2908
2909 const void* cookie = testHarness->create_cookie();
2910
2911 DcpStreamCtx ctx;
2912 ctx.flags = DCP_ADD_STREAM_FLAG_LATEST;
2913 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2914 ctx.seqno = {200, 205};
2915 ctx.snapshot = {200, 200};
2916 ctx.exp_mutations = 100;
2917 ctx.exp_markers = 1;
2918
2919 TestDcpConsumer tdc("unittest", cookie, h);
2920 tdc.addStreamCtx(ctx);
2921 tdc.run();
2922
2923 testHarness->destroy_cookie(cookie);
2924
2925 return SUCCESS;
2926 }
2927
test_dcp_producer_keep_stream_open(EngineIface* h)2928 static enum test_result test_dcp_producer_keep_stream_open(EngineIface* h) {
2929 const std::string conn_name("unittest");
2930 const int num_items = 2, vb = 0;
2931
2932 write_items(h, num_items);
2933
2934 wait_for_flusher_to_settle(h);
2935 verify_curr_items(h, num_items, "Wrong amount of items");
2936
2937 const void* cookie = testHarness->create_cookie();
2938
2939 /* We want to stream items till end and keep the stream open. Then we want
2940 to verify the stream is still open */
2941 struct continuous_dcp_ctx cdc = {
2942 h,
2943 cookie,
2944 Vbid(0),
2945 conn_name,
2946 0,
2947 std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
2948 cb_thread_t dcp_thread;
2949 cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
2950 == 0);
2951
2952 /* Wait for producer to be created */
2953 wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
2954
2955 /* Wait for an active stream to be created */
2956 const std::string stat_stream_count("eq_dcpq:" + conn_name +
2957 ":num_streams");
2958 wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
2959
2960 /* Wait for the dcp test client to receive upto highest seqno we have */
2961 cb::RelaxedAtomic<uint64_t> exp_items(num_items);
2962 wait_for_val_to_be("last_sent_seqno",
2963 cdc.dcpConsumer->producers.last_byseqno,
2964 exp_items);
2965
2966 /* Check if the stream is still open after sending out latest items */
2967 std::string stat_stream_state("eq_dcpq:" + conn_name + ":stream_" +
2968 std::to_string(vb) + "_state");
2969 std::string state = get_str_stat(h, stat_stream_state.c_str(), "dcp");
2970 checkeq(state.compare("in-memory"), 0, "Stream is not open");
2971
2972 /* Before closing the connection stop the thread that continuously polls
2973 for dcp data */
2974 cdc.dcpConsumer->stop();
2975 testHarness->notify_io_complete(cookie, ENGINE_SUCCESS);
2976 cb_assert(cb_join_thread(dcp_thread) == 0);
2977 testHarness->destroy_cookie(cookie);
2978
2979 return SUCCESS;
2980 }
2981
test_dcp_producer_keep_stream_open_replica( EngineIface* h)2982 static enum test_result test_dcp_producer_keep_stream_open_replica(
2983 EngineIface* h) {
2984 /* This test case validates if a replica vbucket correctly sends items
2985 and snapshot end seqno when a stream requests for items till end of time
2986 (end_seqno in req is 2^64 - 1).
2987 The test has 2 parts.
2988 (i) Set up replica vbucket such that it has items to be streamed from
2989 backfill and memory.
2990 (ii) Open a stream (in a DCP conn) and see if all the items are received
2991 correctly */
2992
2993 /* Part (i): Set up replica vbucket such that it has items to be streamed
2994 from backfill and memory. */
2995 check(set_vbucket_state(h, Vbid(0), vbucket_state_replica),
2996 "Failed to set vbucket state.");
2997
2998 const void* cookie = testHarness->create_cookie();
2999 uint32_t opaque = 0xFFFF0000;
3000 uint32_t seqno = 0;
3001 uint32_t flags = 0;
3002 const int num_items = 10;
3003 const std::string conn_name("unittest");
3004 int vb = 0;
3005 auto dcp = requireDcpIface(h);
3006
3007 /* Open an DCP consumer connection */
3008 checkeq(ENGINE_SUCCESS,
3009 dcp->open(cookie,
3010 opaque,
3011 seqno,
3012 flags,
3013 conn_name,
3014 R"({"consumer_name":"replica1"})"),
3015 "Failed dcp producer open connection.");
3016
3017 std::string type = get_str_stat(h, "eq_dcpq:unittest:type", "dcp");
3018 checkeq(0, type.compare("consumer"), "Consumer not found");
3019
3020 opaque = add_stream_for_consumer(
3021 h, cookie, opaque, Vbid(0), 0, cb::mcbp::Status::Success);
3022
3023 /* Send DCP mutations with in memory flag (0x01) */
3024 dcp_stream_to_replica(
3025 h, cookie, opaque, Vbid(0), 0x01, 1, num_items, 0, num_items);
3026
3027 /* Send 10 more DCP mutations with checkpoint creation flag (0x04) */
3028 uint64_t start = num_items;
3029 dcp_stream_to_replica(h,
3030 cookie,
3031 opaque,
3032 Vbid(0),
3033 0x04,
3034 start + 1,
3035 start + 10,
3036 start,
3037 start + 10);
3038
3039 wait_for_flusher_to_settle(h);
3040 stop_persistence(h);
3041 checkeq(2 * num_items,
3042 get_int_stat(h, "vb_replica_curr_items"),
3043 "wrong number of items in replica vbucket");
3044
3045 /* Add 10 more items to the replica node on a new checkpoint.
3046 Send with flag (0x04) indicating checkpoint creation */
3047 start = 2 * num_items;
3048 dcp_stream_to_replica(h,
3049 cookie,
3050 opaque,
3051 Vbid(0),
3052 0x04,
3053 start + 1,
3054 start + 10,
3055 start,
3056 start + 10);
3057
3058 /* Expecting for Disk backfill + in memory snapshot merge.
3059 Wait for a checkpoint to be removed */
3060 wait_for_stat_to_be_lte(h, "vb_0:num_checkpoints", 2, "checkpoint");
3061
3062 /* Part (ii): Open a stream (in a DCP conn) and see if all the items are
3063 received correctly */
3064 /* We want to stream items till end and keep the stream open. Then we want
3065 to verify the stream is still open */
3066 const void* cookie1 = testHarness->create_cookie();
3067 const std::string conn_name1("unittest1");
3068 struct continuous_dcp_ctx cdc = {
3069 h,
3070 cookie1,
3071 Vbid(0),
3072 conn_name1,
3073 0,
3074 std::make_unique<TestDcpConsumer>(conn_name1, cookie1, h)};
3075 cb_thread_t dcp_thread;
3076 cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
3077 == 0);
3078
3079 /* Wait for producer to be created */
3080 wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
3081
3082 /* Wait for an active stream to be created */
3083 const std::string stat_stream_count("eq_dcpq:" + conn_name1 +
3084 ":num_streams");
3085 wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
3086
3087 /* Wait for the dcp test client to receive upto highest seqno we have */
3088 cb::RelaxedAtomic<uint64_t> exp_items(3 * num_items);
3089 wait_for_val_to_be("last_sent_seqno",
3090 cdc.dcpConsumer->producers.last_byseqno,
3091 exp_items);
3092
3093 /* Check if correct snap end seqno is sent */
3094 std::string stat_stream_last_sent_snap_end_seqno("eq_dcpq:" + conn_name1 +
3095 ":stream_" +
3096 std::to_string(vb) +
3097 "_last_sent_snap_end_seqno");
3098 wait_for_stat_to_be(h,
3099 stat_stream_last_sent_snap_end_seqno.c_str(),
3100 3 * num_items,
3101 "dcp");
3102
3103 /* Check if the stream is still open after sending out latest items */
3104 std::string stat_stream_state("eq_dcpq:" + conn_name1 + ":stream_" +
3105 std::to_string(vb) + "_state");
3106 std::string state = get_str_stat(h, stat_stream_state.c_str(), "dcp");
3107 checkeq(state.compare("in-memory"), 0, "Stream is not open");
3108
3109 /* Before closing the connection stop the thread that continuously polls
3110 for dcp data */
3111 cdc.dcpConsumer->stop();
3112 testHarness->notify_io_complete(cookie1, ENGINE_SUCCESS);
3113 cb_assert(cb_join_thread(dcp_thread) == 0);
3114
3115 testHarness->destroy_cookie(cookie);
3116 testHarness->destroy_cookie(cookie1);
3117
3118 return SUCCESS;
3119 }
3120
test_dcp_producer_stream_cursor_movement( EngineIface* h)3121 static enum test_result test_dcp_producer_stream_cursor_movement(
3122 EngineIface* h) {
3123 const std::string conn_name("unittest");
3124 const int num_items = 30;
3125 uint64_t curr_chkpt_id = 0;
3126 for (int j = 0; j < num_items; ++j) {
3127 if (j % 10 == 0) {
3128 wait_for_flusher_to_settle(h);
3129 }
3130 if (j == (num_items - 1)) {
3131 /* Since checkpoint max items is set to 10 and we are going to
3132 write 30th item, a new checkpoint could be added after
3133 writing and persisting the 30th item. I mean, if the checkpoint
3134 id is got outside the while loop, there could be an error due to
3135 race (flusher and checkpoint remover could run before getting
3136 this stat) */
3137 curr_chkpt_id =
3138 get_ull_stat(h, "vb_0:open_checkpoint_id", "checkpoint");
3139 }
3140 std::string key("key" + std::to_string(j));
3141 checkeq(ENGINE_SUCCESS,
3142 store(h, NULL, OPERATION_SET, key.c_str(), "data"),
3143 "Failed to store a value");
3144 }
3145
3146 wait_for_flusher_to_settle(h);
3147 verify_curr_items(h, num_items, "Wrong amount of items");
3148
3149 const void* cookie = testHarness->create_cookie();
3150
3151 /* We want to stream items till end and keep the stream open. We want to
3152 verify if the DCP cursor has moved to new open checkpoint */
3153 struct continuous_dcp_ctx cdc = {
3154 h,
3155 cookie,
3156 Vbid(0),
3157 conn_name,
3158 20,
3159 std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
3160 cb_thread_t dcp_thread;
3161 cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
3162 == 0);
3163
3164 /* Wait for producer to be created */
3165 wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
3166
3167 /* Wait for an active stream to be created */
3168 const std::string stat_stream_count("eq_dcpq:" + conn_name +
3169 ":num_streams");
3170 wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
3171
3172 /* Wait for the dcp test client to receive upto highest seqno we have */
3173 cb::RelaxedAtomic<uint64_t> exp_items(num_items);
3174 wait_for_val_to_be("last_sent_seqno",
3175 cdc.dcpConsumer->producers.last_byseqno,
3176 exp_items);
3177
3178 /* Wait for new open (empty) checkpoint to be added */
3179 wait_for_stat_to_be(
3180 h, "vb_0:open_checkpoint_id", curr_chkpt_id + 1, "checkpoint");
3181
3182 /* We want to make sure that no cursors are lingering on any of the previous
3183 checkpoints. For that we wait for checkpoint remover to remove all but
3184 the latest open checkpoint cursor */
3185 wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
3186
3187 /* Before closing the connection stop the thread that continuously polls
3188 for dcp data */
3189 cdc.dcpConsumer->stop();
3190 testHarness->notify_io_complete(cookie, ENGINE_SUCCESS);
3191 cb_assert(cb_join_thread(dcp_thread) == 0);
3192 testHarness->destroy_cookie(cookie);
3193
3194 return SUCCESS;
3195 }
3196
test_dcp_producer_stream_req_nmvb(EngineIface* h)3197 static test_result test_dcp_producer_stream_req_nmvb(EngineIface* h) {
3198 const void* cookie1 = testHarness->create_cookie();
3199 uint32_t opaque = 0;
3200 uint32_t seqno = 0;
3201 uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
3202 const char *name = "unittest";
3203 auto dcp = requireDcpIface(h);
3204
3205 checkeq(ENGINE_SUCCESS,
3206 dcp->open(cookie1,
3207 opaque,
3208 seqno,
3209 flags,
3210 name,
3211 R"({"consumer_name":"replica1"})"),
3212 "Failed dcp producer open connection.");
3213
3214 Vbid req_vbucket = Vbid(1);
3215 uint64_t rollback = 0;
3216
3217 checkeq(ENGINE_NOT_MY_VBUCKET,
3218 dcp->stream_req(cookie1,
3219 0,
3220 0,
3221 req_vbucket,
3222 0,
3223 0,
3224 0,
3225 0,
3226 0,
3227 &rollback,
3228 mock_dcp_add_failover_log,
3229 {}),
3230 "Expected not my vbucket");
3231 testHarness->destroy_cookie(cookie1);
3232
3233 return SUCCESS;
3234 }
3235
test_dcp_agg_stats(EngineIface* h)3236 static test_result test_dcp_agg_stats(EngineIface* h) {
3237 const int num_items = 300, batch_items = 100;
3238 for (int start_seqno = 0; start_seqno < num_items;
3239 start_seqno += batch_items) {
3240 wait_for_flusher_to_settle(h);
3241 write_items(h, batch_items, start_seqno);
3242 }
3243
3244 wait_for_flusher_to_settle(h);
3245 verify_curr_items(h, num_items, "Wrong amount of items");
3246
3247 const void *cookie[5];
3248
3249 uint64_t total_bytes = 0;
3250 for (int j = 0; j < 5; ++j) {
3251 std::string name("unittest_" + std::to_string(j));
3252 cookie[j] = testHarness->create_cookie();
3253
3254 DcpStreamCtx ctx;
3255 ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
3256 ctx.seqno = {200, 300};
3257 ctx.snapshot = {200, 200};
3258 ctx.exp_mutations = 100;
3259 ctx.exp_markers = 1;
3260
3261 TestDcpConsumer tdc(name, cookie[j], h);
3262