1 /* -*- MODE: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /*
19  * Testsuite for 'dcp' functionality in ep-engine.
20  */
21 
22 // We need to include the tracer _before_ the header which defines
23 // the check macros to avoid conflicts with folly's headers
24 #include <memcached/tracer.h>
25 
26 #include "ep_test_apis.h"
27 #include "ep_testsuite_common.h"
28 #include "mock/mock_dcp.h"
29 #include "programs/engine_testapp/mock_server.h"
30 
31 #include <platform/cb_malloc.h>
32 #include <platform/cbassert.h>
33 #include <platform/compress.h>
34 #include <platform/platform_thread.h>
35 #include <condition_variable>
36 #include <thread>
37 
38 using namespace std::string_literals;
39 
40 // Helper functions ///////////////////////////////////////////////////////////
41 
42 /**
43  * Converts the given engine to a DcpIface*. If engine doesn't implement
44  * DcpIface then throws.
45  * @returns non-null ptr to DcpIface.
46  */
requireDcpIface(EngineIface* engine)47 static gsl::not_null<DcpIface*> requireDcpIface(EngineIface* engine) {
48     return dynamic_cast<DcpIface*>(engine);
49 }
50 
dcp_step(EngineIface* h, const void* cookie, MockDcpMessageProducers& producers)51 static void dcp_step(EngineIface* h,
52                      const void* cookie,
53                      MockDcpMessageProducers& producers) {
54     auto dcp = requireDcpIface(h);
55     ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
56     check(err == ENGINE_SUCCESS || err == ENGINE_EWOULDBLOCK,
57           "Expected success or engine_ewouldblock");
58     if (err == ENGINE_EWOULDBLOCK) {
59         producers.clear_dcp_data();
60     }
61 }
62 
dcpHandleResponse(EngineIface* h, const void* cookie, protocol_binary_response_header* response, MockDcpMessageProducers& producers)63 static void dcpHandleResponse(EngineIface* h,
64                               const void* cookie,
65                               protocol_binary_response_header* response,
66                               MockDcpMessageProducers& producers) {
67     auto dcp = requireDcpIface(h);
68     auto erroCode = dcp->response_handler(cookie, response);
69     check(erroCode == ENGINE_SUCCESS || erroCode == ENGINE_EWOULDBLOCK,
70           "Expected 'success' or 'engine_ewouldblock'");
71     if (erroCode == ENGINE_EWOULDBLOCK) {
72         producers.clear_dcp_data();
73     }
74 }
75 
76 static bool wait_started(false);
77 
78 struct SeqnoRange {
79     uint64_t start;
80     uint64_t end;
81 };
82 
83 /**
84  * DeletionOpcode is used to determine whether or not to perform the deletion
85  * path, or the expiration path.
86  */
87 enum class DeletionOpcode : bool {
88     Deletion,
89     Expiration,
90 };
91 
92 class DcpStreamCtx {
93 /**
94  * This class represents all attributes required for
95  * a stream. Objects of this class type are to be fed
96  * to TestDcpConsumer.
97  */
98 public:
DcpStreamCtx()99     DcpStreamCtx()
100         : vbucket(0),
101           flags(0),
102           vb_uuid(0),
103           exp_mutations(0),
104           exp_deletions(0),
105           exp_expirations(0),
106           exp_markers(0),
107           extra_takeover_ops(0),
108           exp_disk_snapshot(false),
109           exp_conflict_res(0),
110           skip_estimate_check(false),
111           live_frontend_client(false),
112           skip_verification(false),
113           exp_err(ENGINE_SUCCESS),
114           exp_rollback(0),
115           expected_values(0),
116           opaque(0) {
117         seqno = {0, static_cast<uint64_t>(~0)};
118         snapshot = {0, static_cast<uint64_t>(~0)};
119     }
120 
121     /* Vbucket Id */
122     Vbid vbucket;
123     /* Stream flags */
124     uint32_t flags;
125     /* Vbucket UUID */
126     uint64_t vb_uuid;
127     /* Sequence number range */
128     SeqnoRange seqno;
129     /* Snapshot range */
130     SeqnoRange snapshot;
131     /* Number of mutations expected (for verification) */
132     size_t exp_mutations;
133     /* Number of deletions expected (for verification) */
134     size_t exp_deletions;
135     /* Number of expiries expected (for verification) */
136     size_t exp_expirations;
137     /* Number of snapshot markers expected (for verification) */
138     size_t exp_markers;
139     /* Extra front end mutations as part of takeover */
140     size_t extra_takeover_ops;
141     /* Flag - expect disk snapshot or not */
142     bool exp_disk_snapshot;
143     /* Expected conflict resolution flag */
144     uint8_t exp_conflict_res;
145     /* Skip estimate check during takeover */
146     bool skip_estimate_check;
147     /*
148        live_frontend_client to be set to true when streaming is done in parallel
149        with a client issuing writes to the vbucket. In this scenario, predicting
150        the number of snapshot markers received is difficult.
151     */
152     bool live_frontend_client;
153     /*
154        skip_verification to be set to true if verification of mutation count,
155        deletion count, marker count etc. is to be skipped at the end of
156        streaming.
157      */
158     bool skip_verification;
159     /* Expected error code on stream creation. We need this because rollback is
160        a valid operation and returns ENGINE_ROLLBACK (not ENGINE_SUCCESS) */
161     ENGINE_ERROR_CODE exp_err;
162     /* Expected rollback seqno */
163     uint64_t exp_rollback;
164     /* Expected number of values (from mutations or deleted_values) */
165     size_t expected_values;
166     /* stream opaque */
167     uint32_t opaque;
168 };
169 
170 class TestDcpConsumer {
171 /**
172  * This class represents a DcpConsumer which is responsible
173  * for spawning a DcpProducer at the server and receiving
174  * messages from it.
175  */
176 public:
TestDcpConsumer(const std::string& _name, const void* _cookie, EngineIface* h)177     TestDcpConsumer(const std::string& _name,
178                     const void* _cookie,
179                     EngineIface* h)
180         : producers(h),
181           name(_name),
182           cookie(_cookie),
183           opaque(0),
184           total_bytes(0),
185           simulate_cursor_dropping(false),
186           flow_control_buf_size(1024),
187           disable_ack(false),
188           h(h),
189           dcp(requireDcpIface(h)),
190           nruCounter(2) {
191     }
192 
getTotalBytes()193     uint64_t getTotalBytes() {
194         return total_bytes;
195     }
196 
simulateCursorDropping()197     void simulateCursorDropping() {
198         simulate_cursor_dropping = true;
199     }
200 
setFlowControlBufSize(uint64_t to)201     void setFlowControlBufSize(uint64_t to) {
202         flow_control_buf_size = to;
203     }
204 
disableAcking()205     void disableAcking() {
206         disable_ack = true;
207     }
208 
addStreamCtx(DcpStreamCtx &ctx)209     void addStreamCtx(DcpStreamCtx &ctx) {
210         stream_ctxs.push_back(ctx);
211     }
212 
213     void run(bool openConn = true);
214 
215     // Stop the thread if it is running. This is safe to be called from
216     // a different thread to the thread calling run().
217     void stop();
218 
219     /**
220      * This method just opens a DCP connection. Note it does not open a stream
221      * and does not call the dcp step function to get all the items from the
222      * producer.
223      * @param flags Flags to pass to DCP_OPEN.
224      */
225     void openConnection(
226             uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer);
227 
228     /* This method opens a stream on an existing DCP connection.
229        This does not call the dcp step function to get all the items from the
230        producer */
231     ENGINE_ERROR_CODE openStreams();
232 
233     /* if clear is true, it will also clear the stream vector */
234     ENGINE_ERROR_CODE closeStreams(bool fClear = false);
235 
236     ENGINE_ERROR_CODE sendControlMessage(const std::string& name,
237                                          const std::string& value);
238 
getNruCounters() const239     const std::vector<int>& getNruCounters() const {
240         return nruCounter;
241     }
242 
243     MockDcpMessageProducers producers;
244 
245 private:
246     /* Vbucket-level stream stats used in test */
247     struct VBStats {
VBStatsTestDcpConsumer::VBStats248         VBStats()
249             : num_mutations(0),
250               num_deletions(0),
251               num_expirations(0),
252               num_snapshot_markers(0),
253               num_set_vbucket_pending(0),
254               num_set_vbucket_active(0),
255               pending_marker_ack(false),
256               marker_end(0),
257               last_by_seqno(0),
258               extra_takeover_ops(0),
259               exp_disk_snapshot(false),
260               exp_conflict_res(0),
261               num_values(0) {
262         }
263 
264         size_t num_mutations;
265         size_t num_deletions;
266         size_t num_expirations;
267         size_t num_snapshot_markers;
268         size_t num_set_vbucket_pending;
269         size_t num_set_vbucket_active;
270         bool pending_marker_ack;
271         uint64_t marker_end;
272         uint64_t last_by_seqno;
273         size_t extra_takeover_ops;
274         bool exp_disk_snapshot;
275         uint8_t exp_conflict_res;
276         size_t num_values;
277     };
278 
279     /* Connection name */
280     const std::string name;
281     /* Connection cookie */
282     const void *cookie;
283     /* Vector containing information of streams */
284     std::vector<DcpStreamCtx> stream_ctxs;
285     /* Opaque value in the connection */
286     uint32_t opaque;
287     /* Total bytes received */
288     uint64_t total_bytes;
289     /* Flag to simulate cursor dropping */
290     bool simulate_cursor_dropping;
291     /* Flow control buffer size */
292     uint64_t flow_control_buf_size;
293     /* Flag to disable acking */
294     bool disable_ack;
295     /* map of vbstats */
296     std::map<Vbid, VBStats> vb_stats;
297     EngineIface* h;
298     gsl::not_null<DcpIface*> dcp;
299     std::vector<int> nruCounter;
300 
301     // Flag used by run() to check if it should continue to execute.
302     std::atomic<bool> done{false};
303 
304     /**
305      * Helper function to perform the very similar resolution of a deletion
306      * and an expiry, triggered inside the run() case switch where one of these
307      * operations is returned as the last_op.
308      * @param stats The vbstats that will be updated by this function.
309      * @param bytes_read The current no of bytes read which will be updated by
310      *                   this function.
311      * @param all_bytes The total no of bytes read which will be updated by
312      *                  this function.
313      * @param vbid The vBucket ID.
314      * @param delOrExpire Determines whether to take the deletion case or the
315      *                    expiration case.
316      */
317     void deleteOrExpireCase(TestDcpConsumer::VBStats& stats,
318                             uint32_t& bytes_read,
319                             uint64_t& all_bytes,
320                             Vbid vbid,
321                             DeletionOpcode delOrExpire);
322 };
323 
sendControlMessage( const std::string& name, const std::string& value)324 ENGINE_ERROR_CODE TestDcpConsumer::sendControlMessage(
325         const std::string& name, const std::string& value) {
326     auto dcp = requireDcpIface(h);
327     return dcp->control(cookie, ++opaque, name, value);
328 }
329 
deleteOrExpireCase(TestDcpConsumer::VBStats& stats, uint32_t& bytes_read, uint64_t& all_bytes, Vbid vbid, DeletionOpcode delOrExpire)330 void TestDcpConsumer::deleteOrExpireCase(TestDcpConsumer::VBStats& stats,
331                                          uint32_t& bytes_read,
332                                          uint64_t& all_bytes,
333                                          Vbid vbid,
334                                          DeletionOpcode delOrExpire) {
335     cb_assert(vbid != static_cast<Vbid>(-1));
336     checklt(stats.last_by_seqno,
337             producers.last_byseqno.load(),
338             "Expected bigger seqno");
339     stats.last_by_seqno = producers.last_byseqno;
340     if (delOrExpire == DeletionOpcode::Deletion) {
341         stats.num_deletions++;
342     } else {
343         stats.num_expirations++;
344     }
345     bytes_read += producers.last_packet_size;
346     all_bytes += producers.last_packet_size;
347     if (stats.pending_marker_ack &&
348         producers.last_byseqno == stats.marker_end) {
349         sendDcpAck(h,
350                    cookie,
351                    cb::mcbp::ClientOpcode::DcpSnapshotMarker,
352                    cb::mcbp::Status::Success,
353                    producers.last_opaque);
354     }
355 
356     if (!producers.last_value.empty()) {
357         stats.num_values++;
358     }
359     return;
360 }
361 
run(bool openConn)362 void TestDcpConsumer::run(bool openConn) {
363     checkle(size_t{1}, stream_ctxs.size(), "No dcp_stream arguments provided!");
364 
365     /* Open the connection with the DCP producer */
366     if (openConn) {
367         openConnection();
368     }
369 
370     /* Open streams in the above open connection */
371     openStreams();
372 
373     bool exp_all_items_streamed = true;
374     size_t num_stream_ends_received = 0;
375     uint32_t bytes_read = 0;
376     uint64_t all_bytes = 0;
377     uint64_t total_acked_bytes = 0;
378     uint64_t ack_limit = flow_control_buf_size / 2;
379 
380     bool delay_buffer_acking = false;
381     if (simulate_cursor_dropping) {
382         /**
383          * Simulates cursor dropping by slowing down the initial buffer
384          * acknowledgement from the consmer.
385          *
386          * Note that the cursor may not be dropped if the memory usage
387          * is not over the cursor_dropping_upper_threshold or if the
388          * checkpoint_remover sleep time is high.
389          */
390         delay_buffer_acking = true;
391     }
392 
393     do {
394         if (!disable_ack && (bytes_read > ack_limit)) {
395             if (delay_buffer_acking) {
396                 std::this_thread::sleep_for(std::chrono::seconds(2));
397                 delay_buffer_acking = false;
398             }
399             dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
400             total_acked_bytes += bytes_read;
401             bytes_read = 0;
402         }
403         ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
404         if (err == ENGINE_DISCONNECT) {
405             done = true;
406         } else {
407             const Vbid vbid = producers.last_vbucket;
408             auto &stats = vb_stats[vbid];
409             switch (producers.last_op) {
410             case cb::mcbp::ClientOpcode::DcpMutation:
411                 cb_assert(vbid != static_cast<Vbid>(-1));
412                 checklt(stats.last_by_seqno,
413                         producers.last_byseqno.load(),
414                         "Expected bigger seqno");
415                 stats.last_by_seqno = producers.last_byseqno;
416                 stats.num_mutations++;
417                 bytes_read += producers.last_packet_size;
418                 all_bytes += producers.last_packet_size;
419                 if (stats.pending_marker_ack &&
420                     producers.last_byseqno == stats.marker_end) {
421                     sendDcpAck(h,
422                                cookie,
423                                cb::mcbp::ClientOpcode::DcpSnapshotMarker,
424                                cb::mcbp::Status::Success,
425                                producers.last_opaque);
426                 }
427 
428                 if (producers.last_nru > 0) {
429                     nruCounter[1]++;
430                 } else {
431                     nruCounter[0]++;
432                 }
433                 if (!producers.last_value.empty()) {
434                     stats.num_values++;
435                 }
436 
437                 break;
438             case cb::mcbp::ClientOpcode::DcpDeletion:
439                 deleteOrExpireCase(stats,
440                                    bytes_read,
441                                    all_bytes,
442                                    vbid,
443                                    DeletionOpcode::Deletion);
444                 break;
445             case cb::mcbp::ClientOpcode::DcpExpiration:
446                 deleteOrExpireCase(stats,
447                                    bytes_read,
448                                    all_bytes,
449                                    vbid,
450                                    DeletionOpcode::Expiration);
451                 break;
452             case cb::mcbp::ClientOpcode::DcpStreamEnd:
453                 cb_assert(vbid != static_cast<Vbid>(-1));
454                 if (++num_stream_ends_received == stream_ctxs.size()) {
455                     done = true;
456                 }
457                 bytes_read += producers.last_packet_size;
458                 all_bytes += producers.last_packet_size;
459                 break;
460             case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
461                 cb_assert(vbid != static_cast<Vbid>(-1));
462                 if (stats.exp_disk_snapshot &&
463                     stats.num_snapshot_markers == 0) {
464                     checkeq(uint32_t{1},
465                             producers.last_flags,
466                             "Expected disk snapshot");
467                 }
468 
469                 if (producers.last_flags & 8) {
470                     stats.pending_marker_ack = true;
471                     stats.marker_end = producers.last_snap_end_seqno;
472                 }
473 
474                 stats.num_snapshot_markers++;
475                 bytes_read += producers.last_packet_size;
476                 all_bytes += producers.last_packet_size;
477                 break;
478             case cb::mcbp::ClientOpcode::DcpSetVbucketState:
479                 cb_assert(vbid != static_cast<Vbid>(-1));
480                 if (producers.last_vbucket_state == vbucket_state_pending) {
481                     stats.num_set_vbucket_pending++;
482                     for (size_t j = 0; j < stats.extra_takeover_ops; ++j) {
483                         std::string key("key" + std::to_string(j));
484                         checkeq(ENGINE_SUCCESS,
485                                 store(h,
486                                       NULL,
487                                       OPERATION_SET,
488                                       key.c_str(),
489                                       "data",
490                                       nullptr,
491                                       0,
492                                       vbid),
493                                 "Failed to store a value");
494                     }
495                 } else if (producers.last_vbucket_state ==
496                            vbucket_state_active) {
497                     stats.num_set_vbucket_active++;
498                 }
499                 bytes_read += producers.last_packet_size;
500                 all_bytes += producers.last_packet_size;
501                 sendDcpAck(h,
502                            cookie,
503                            cb::mcbp::ClientOpcode::DcpSetVbucketState,
504                            cb::mcbp::Status::Success,
505                            producers.last_opaque);
506                 break;
507             case cb::mcbp::ClientOpcode::Invalid:
508                 if (disable_ack && flow_control_buf_size &&
509                     (bytes_read >= flow_control_buf_size)) {
510                     /* If there is no acking and if flow control is enabled
511                        we are done because producer should not send us any
512                        more items. We need this to test that producer stops
513                        sending items correctly when there are no acks while
514                        flow control is enabled */
515                     done = true;
516                     exp_all_items_streamed = false;
517                 } else {
518                     /* No messages were ready on the last step call, so we
519                      * wait till the conn is notified of new item.
520                      * Note that we check for 0 because we clear the
521                      * producers.last_op value below.
522                      */
523                     testHarness->lock_cookie(cookie);
524                     /* waitfor_cookie() waits on a condition variable. But
525                        the api expects the cookie to be locked before
526                        calling it */
527                     testHarness->waitfor_cookie(cookie);
528                     testHarness->unlock_cookie(cookie);
529                 }
530                 break;
531             default:
532                 // Aborting ...
533                 std::stringstream ss;
534                 ss << "Unknown DCP operation: " << to_string(producers.last_op);
535                 check(false, ss.str().c_str());
536             }
537             producers.last_op = cb::mcbp::ClientOpcode::Invalid;
538             producers.last_nru = 0;
539             producers.last_vbucket = Vbid(-1);
540         }
541     } while (!done);
542 
543     total_bytes += all_bytes;
544 
545     for (const auto& ctx : stream_ctxs) {
546         if (!ctx.skip_verification) {
547             auto &stats = vb_stats[ctx.vbucket];
548             if (simulate_cursor_dropping) {
549                 if (stats.num_snapshot_markers == 0) {
550                     cb_assert(stats.num_mutations == 0 &&
551                               stats.num_deletions == 0);
552                 } else {
553                     checkge(ctx.exp_mutations, stats.num_mutations,
554                           "Invalid number of mutations");
555                     checkge(ctx.exp_deletions, stats.num_deletions,
556                           "Invalid number of deletes");
557                     checkge(ctx.exp_expirations,
558                             stats.num_expirations,
559                             "Invalid number of expirations");
560                 }
561             } else {
562                 // Account for cursors that may have been dropped because
563                 // of high memory usage
564                 if (get_int_stat(h, "ep_cursors_dropped") > 0) {
565                     // Hard to predict exact number of markers to be received
566                     // if in case of a live parallel front end load
567                     if (!ctx.live_frontend_client) {
568                         checkle(stats.num_snapshot_markers, ctx.exp_markers,
569                                 "Invalid number of markers");
570                     }
571                     checkle(stats.num_mutations, ctx.exp_mutations,
572                             "Invalid number of mutations");
573                     checkle(stats.num_deletions, ctx.exp_deletions,
574                             "Invalid number of deletions");
575                     checkle(stats.num_expirations,
576                             ctx.exp_expirations,
577                             "Invalid number of expirations");
578                 } else {
579                     checkeq(ctx.exp_mutations, stats.num_mutations,
580                             "Invalid number of mutations");
581                     checkeq(ctx.exp_deletions, stats.num_deletions,
582                             "Invalid number of deletes");
583                     checkeq(ctx.exp_expirations,
584                             stats.num_expirations,
585                             "Invalid number of expirations");
586                     if (ctx.live_frontend_client) {
587                         // Hard to predict exact number of markers to be received
588                         // if in case of a live parallel front end load
589                         if (ctx.exp_mutations > 0 || ctx.exp_deletions > 0 ||
590                             ctx.exp_expirations > 0) {
591                             checkle(size_t{1},
592                                     stats.num_snapshot_markers,
593                                     "Snapshot marker count can't be zero");
594                         }
595                     } else {
596                         checkeq(ctx.exp_markers, stats.num_snapshot_markers,
597                                 "Unexpected number of snapshot markers");
598                     }
599                 }
600             }
601 
602             if (ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) {
603                 checkeq(size_t{1},
604                         stats.num_set_vbucket_pending,
605                         "Didn't receive pending set state");
606                 checkeq(size_t{1},
607                         stats.num_set_vbucket_active,
608                         "Didn't receive active set state");
609             }
610 
611             /* Check if the readyQ size goes to zero after all items are streamed */
612             if (exp_all_items_streamed) {
613                 std::stringstream stats_ready_queue_memory;
614                 stats_ready_queue_memory << "eq_dcpq:" << name.c_str()
615                                          << ":stream_" << ctx.vbucket.get()
616                                          << "_ready_queue_memory";
617                 checkeq(uint64_t{0},
618                         get_ull_stat(h,
619                                      stats_ready_queue_memory.str().c_str(),
620                                      "dcp"),
621                         "readyQ size did not go to zero");
622 
623                 std::string stats_backfill_buffer_items(
624                         "eq_dcpq:" + name + ":stream_" +
625                         std::to_string(ctx.vbucket.get()) +
626                         "_backfill_buffer_items");
627                 checkeq(uint64_t{0},
628                         get_ull_stat(
629                                 h, stats_backfill_buffer_items.c_str(), "dcp"),
630                         "backfill buffer items did not go to zero");
631             }
632             if (ctx.expected_values) {
633                 checkeq(ctx.expected_values,
634                         stats.num_values,
635                         "Expected values didn't match");
636             }
637         }
638     }
639 
640     /* Check if the producer has updated flow control stat correctly */
641     if (flow_control_buf_size) {
642         char stats_buffer[50] = {0};
643         snprintf(stats_buffer, sizeof(stats_buffer), "eq_dcpq:%s:unacked_bytes",
644                  name.c_str());
645         checkeq((all_bytes - total_acked_bytes),
646                 get_ull_stat(h, stats_buffer, "dcp"),
647                 "Buffer Size did not get set correctly");
648     }
649 }
650 
stop()651 void TestDcpConsumer::stop() {
652     this->done = true;
653 }
654 
openConnection(uint32_t flags)655 void TestDcpConsumer::openConnection(uint32_t flags) {
656     /* Reset any stale dcp data */
657     producers.clear_dcp_data();
658 
659     opaque = 1;
660 
661     /* Set up Producer at server */
662     checkeq(ENGINE_SUCCESS,
663             dcp->open(cookie,
664                       ++opaque,
665                       0,
666                       flags,
667                       name,
668                       R"({"consumer_name":"replica1"})"),
669             "Failed dcp producer open connection.");
670 
671     /* Set flow control buffer size */
672     std::string flow_control_buf_sz(std::to_string(flow_control_buf_size));
673     checkeq(ENGINE_SUCCESS,
674             dcp->control(cookie,
675                          ++opaque,
676                          "connection_buffer_size",
677                          flow_control_buf_sz),
678             "Failed to establish connection buffer");
679     char stats_buffer[50] = {0};
680     if (flow_control_buf_size) {
681         snprintf(stats_buffer, sizeof(stats_buffer),
682                  "eq_dcpq:%s:max_buffer_bytes", name.c_str());
683         checkeq(static_cast<int>(flow_control_buf_size),
684                 get_int_stat(h, stats_buffer, "dcp"),
685                 "TestDcpConsumer::openConnection() : "
686                 "Buffer Size did not get set correctly");
687     } else {
688         snprintf(stats_buffer, sizeof(stats_buffer),
689                  "eq_dcpq:%s:flow_control", name.c_str());
690         std::string status = get_str_stat(h, stats_buffer, "dcp");
691         checkeq(status, std::string("disabled"), "Flow control enabled!");
692     }
693 
694     checkeq(ENGINE_SUCCESS,
695             dcp->control(cookie, ++opaque, "enable_ext_metadata", "true"),
696             "Failed to enable xdcr extras");
697 }
698 
openStreams()699 ENGINE_ERROR_CODE TestDcpConsumer::openStreams() {
700     for (auto& ctx : stream_ctxs) {
701         /* Different opaque for every stream created */
702         ++opaque;
703 
704         /* Initiate stream request */
705         uint64_t rollback = 0;
706         ENGINE_ERROR_CODE rv = dcp->stream_req(cookie,
707                                                ctx.flags,
708                                                opaque,
709                                                ctx.vbucket,
710                                                ctx.seqno.start,
711                                                ctx.seqno.end,
712                                                ctx.vb_uuid,
713                                                ctx.snapshot.start,
714                                                ctx.snapshot.end,
715                                                &rollback,
716                                                mock_dcp_add_failover_log,
717                                                {});
718 
719         checkeq(ctx.exp_err, rv, "Failed to initiate stream request");
720 
721         if (rv == ENGINE_NOT_MY_VBUCKET || rv == ENGINE_ENOTSUP) {
722             return rv;
723         }
724 
725         if (rv == ENGINE_ROLLBACK || rv == ENGINE_KEY_ENOENT) {
726             checkeq(ctx.exp_rollback, rollback,
727                     "Rollback didn't match expected value");
728             return rv;
729         }
730 
731         if (ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) {
732             ctx.seqno.end  = std::numeric_limits<uint64_t>::max();
733         } else if (ctx.flags & DCP_ADD_STREAM_FLAG_LATEST ||
734                    ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
735             std::string high_seqno("vb_" + std::to_string(ctx.vbucket.get()) +
736                                    ":high_seqno");
737             ctx.seqno.end =
738                     get_ull_stat(h, high_seqno.c_str(), "vbucket-seqno");
739         }
740 
741         std::stringstream stats_flags;
742         stats_flags << "eq_dcpq:" << name.c_str() << ":stream_"
743                     << ctx.vbucket.get() << "_flags";
744         checkeq(ctx.flags,
745                 (uint32_t)get_int_stat(h, stats_flags.str().c_str(), "dcp"),
746                 "Flags didn't match");
747 
748         std::stringstream stats_opaque;
749         stats_opaque << "eq_dcpq:" << name.c_str() << ":stream_"
750                      << ctx.vbucket.get() << "_opaque";
751         checkeq(opaque,
752                 (uint32_t)get_int_stat(h, stats_opaque.str().c_str(), "dcp"),
753                 "Opaque didn't match");
754         ctx.opaque = opaque;
755 
756         std::stringstream stats_start_seqno;
757         stats_start_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
758                           << ctx.vbucket.get() << "_start_seqno";
759         checkeq(ctx.seqno.start,
760                 (uint64_t)get_ull_stat(
761                         h, stats_start_seqno.str().c_str(), "dcp"),
762                 "Start Seqno Didn't match");
763 
764         std::stringstream stats_end_seqno;
765         stats_end_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
766                         << ctx.vbucket.get() << "_end_seqno";
767         checkeq(ctx.seqno.end,
768                 (uint64_t)get_ull_stat(h, stats_end_seqno.str().c_str(), "dcp"),
769                 "End Seqno didn't match");
770 
771         std::stringstream stats_vb_uuid;
772         stats_vb_uuid << "eq_dcpq:" << name.c_str() << ":stream_"
773                       << ctx.vbucket.get() << "_vb_uuid";
774         checkeq(ctx.vb_uuid,
775                 (uint64_t)get_ull_stat(h, stats_vb_uuid.str().c_str(), "dcp"),
776                 "VBucket UUID didn't match");
777 
778         std::stringstream stats_snap_seqno;
779         stats_snap_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
780                          << ctx.vbucket.get() << "_snap_start_seqno";
781         checkeq(ctx.snapshot.start,
782                 (uint64_t)get_ull_stat(
783                         h, stats_snap_seqno.str().c_str(), "dcp"),
784                 "snap start seqno didn't match");
785 
786         if ((ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) &&
787             !ctx.skip_estimate_check) {
788             std::string high_seqno_str(
789                     "vb_" + std::to_string(ctx.vbucket.get()) + ":high_seqno");
790             uint64_t vb_high_seqno =
791                     get_ull_stat(h, high_seqno_str.c_str(), "vbucket-seqno");
792             uint64_t est = vb_high_seqno - ctx.seqno.start;
793             std::stringstream stats_takeover;
794             stats_takeover << "dcp-vbtakeover " << ctx.vbucket.get() << " "
795                            << name.c_str();
796             wait_for_stat_to_be_lte(
797                     h, "estimate", est, stats_takeover.str().c_str());
798         }
799 
800         if (ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
801             /* Wait for backfill to start */
802             std::string stats_backfill_read_bytes("eq_dcpq:" + name +
803                                                   ":backfill_buffer_bytes_read");
804             wait_for_stat_to_be_gte(
805                     h, stats_backfill_read_bytes.c_str(), 0, "dcp");
806             /* Verify that we have no dcp cursors in the checkpoint. (There will
807              just be one persistence cursor) */
808             std::string stats_num_conn_cursors(
809                     "vb_" + std::to_string(ctx.vbucket.get()) +
810                     ":num_conn_cursors");
811             /* In case of persistent buckets there will be 1 persistent cursor,
812                in case of ephemeral buckets there will be no cursor */
813             checkge(1,
814                     get_int_stat(h, stats_num_conn_cursors.c_str(),
815                             "checkpoint"),
816                     "DCP cursors not expected to be registered");
817         }
818 
819         // Init stats used in test
820         VBStats stats;
821         stats.extra_takeover_ops = ctx.extra_takeover_ops;
822         stats.exp_disk_snapshot = ctx.exp_disk_snapshot;
823         stats.exp_conflict_res = ctx.exp_conflict_res;
824 
825         vb_stats[ctx.vbucket] = stats;
826     }
827     return ENGINE_SUCCESS;
828 }
829 
closeStreams(bool fClear)830 ENGINE_ERROR_CODE TestDcpConsumer::closeStreams(bool fClear) {
831     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
832     for (auto& ctx : stream_ctxs) {
833         if (ctx.opaque > 0) {
834             err = dcp->close_stream(cookie, ctx.opaque, Vbid(0), {});
835             if (ENGINE_SUCCESS != err) {
836                 break;
837             }
838         }
839     }
840 
841     if (fClear) {
842         stream_ctxs.clear();
843     }
844     return err;
845 }
846 
notifier_request(EngineIface* h, const void* cookie, uint32_t opaque, Vbid vbucket, uint64_t start, bool shouldSucceed)847 static void notifier_request(EngineIface* h,
848                              const void* cookie,
849                              uint32_t opaque,
850                              Vbid vbucket,
851                              uint64_t start,
852                              bool shouldSucceed) {
853     uint32_t flags = 0;
854     uint64_t rollback = 0;
855     uint64_t vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
856     uint64_t snap_start_seqno = get_ull_stat(h, "vb_0:0:seq", "failovers");
857     uint64_t snap_end_seqno = snap_start_seqno;
858     auto dcp = requireDcpIface(h);
859 
860     ENGINE_ERROR_CODE err = dcp->stream_req(cookie,
861                                             flags,
862                                             opaque,
863                                             vbucket,
864                                             start,
865                                             0,
866                                             vb_uuid,
867                                             snap_start_seqno,
868                                             snap_end_seqno,
869                                             &rollback,
870                                             mock_dcp_add_failover_log,
871                                             {});
872     checkeq(ENGINE_SUCCESS, err, "Failed to initiate stream request");
873 
874     std::string type = get_str_stat(h, "eq_dcpq:unittest:type", "dcp");
875     checkeq("notifier"s, type, "Consumer not found");
876 
877     checkeq(flags,
878             static_cast<uint32_t>(
879                     get_int_stat(h, "eq_dcpq:unittest:stream_0_flags", "dcp")),
880             "Flags didn't match");
881     checkeq(opaque,
882             static_cast<uint32_t>(
883                     get_int_stat(h, "eq_dcpq:unittest:stream_0_opaque", "dcp")),
884             "Opaque didn't match");
885     checkeq(start,
886             get_ull_stat(h, "eq_dcpq:unittest:stream_0_start_seqno", "dcp"),
887             "Start Seqno Didn't match");
888     checkeq(uint64_t{0},
889             get_ull_stat(h, "eq_dcpq:unittest:stream_0_end_seqno", "dcp"),
890             "End Seqno didn't match");
891     checkeq(vb_uuid,
892             get_ull_stat(h, "eq_dcpq:unittest:stream_0_vb_uuid", "dcp"),
893             "VBucket UUID didn't match");
894     checkeq(snap_start_seqno,
895             get_ull_stat(h,"eq_dcpq:unittest:stream_0_snap_start_seqno", "dcp"),
896             "snap start seqno didn't match");
897 }
898 
dcp_stream_to_replica(EngineIface* h, const void* cookie, uint32_t opaque, Vbid vbucket, uint32_t flags, uint64_t start, uint64_t end, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint8_t cas = 0x1, uint8_t datatype = 1, uint32_t exprtime = 0, uint32_t lockTime = 0, uint64_t revSeqno = 0)899 static void dcp_stream_to_replica(EngineIface* h,
900                                   const void* cookie,
901                                   uint32_t opaque,
902                                   Vbid vbucket,
903                                   uint32_t flags,
904                                   uint64_t start,
905                                   uint64_t end,
906                                   uint64_t snap_start_seqno,
907                                   uint64_t snap_end_seqno,
908                                   uint8_t cas = 0x1,
909                                   uint8_t datatype = 1,
910                                   uint32_t exprtime = 0,
911                                   uint32_t lockTime = 0,
912                                   uint64_t revSeqno = 0) {
913     /* Send snapshot marker */
914     auto dcp = requireDcpIface(h);
915     checkeq(ENGINE_SUCCESS,
916             dcp->snapshot_marker(cookie,
917                                  opaque,
918                                  vbucket,
919                                  snap_start_seqno,
920                                  snap_end_seqno,
921                                  flags,
922                                  0 /*HCS*/,
923                                  {} /*maxVisibleSeqno*/),
924             "Failed to send marker!");
925     const std::string data("data");
926     /* Send DCP mutations */
927     for (uint64_t i = start; i <= end; i++) {
928         const std::string key{"key" + std::to_string(i)};
929         const DocKey docKey{key, DocKeyEncodesCollectionId::No};
930         const cb::const_byte_buffer value{(uint8_t*)data.data(), data.size()};
931         checkeq(ENGINE_SUCCESS,
932                 dcp->mutation(cookie,
933                               opaque,
934                               docKey,
935                               value,
936                               0, // priv bytes
937                               PROTOCOL_BINARY_RAW_BYTES,
938                               cas,
939                               vbucket,
940                               flags,
941                               i, // by seqno
942                               revSeqno,
943                               exprtime,
944                               lockTime,
945                               {},
946                               INITIAL_NRU_VALUE),
947                 "Failed dcp mutate.");
948     }
949 }
950 
951 /* This is a helper function to read items from an existing DCP Producer. It
952    reads items from start to end on the connection. (Note: this can work
953    correctly only in case there is one vbucket)
954    Currently this supports only streaming mutations, but can be extend to stream
955    deletion etc */
dcp_stream_from_producer_conn(EngineIface* h, const void* cookie, uint32_t opaque, uint64_t start, uint64_t end, uint64_t expSnapStart, MockDcpMessageProducers& producers)956 static void dcp_stream_from_producer_conn(EngineIface* h,
957                                           const void* cookie,
958                                           uint32_t opaque,
959                                           uint64_t start,
960                                           uint64_t end,
961                                           uint64_t expSnapStart,
962                                           MockDcpMessageProducers& producers) {
963     bool done = false;
964     size_t bytes_read = 0;
965     bool pending_marker_ack = false;
966     uint64_t marker_end = 0;
967     uint64_t num_mutations = 0;
968     uint64_t last_snap_start_seqno = 0;
969     auto dcp = requireDcpIface(h);
970 
971     do {
972         if (bytes_read > 512) {
973             checkeq(ENGINE_SUCCESS,
974                     dcp->buffer_acknowledgement(
975                             cookie, ++opaque, Vbid(0), bytes_read),
976                     "Failed to get dcp buffer ack");
977             bytes_read = 0;
978         }
979         ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
980         if (err == ENGINE_DISCONNECT) {
981             done = true;
982         } else {
983             switch (producers.last_op) {
984             case cb::mcbp::ClientOpcode::DcpMutation:
985                 bytes_read += producers.last_packet_size;
986                 if (pending_marker_ack &&
987                     producers.last_byseqno == marker_end) {
988                     sendDcpAck(h,
989                                cookie,
990                                cb::mcbp::ClientOpcode::DcpSnapshotMarker,
991                                cb::mcbp::Status::Success,
992                                producers.last_opaque);
993                 }
994                 num_mutations++;
995                 break;
996             case cb::mcbp::ClientOpcode::DcpStreamEnd:
997                 done = true;
998                 bytes_read += producers.last_packet_size;
999                 break;
1000             case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
1001                 if (producers.last_flags & 8) {
1002                     pending_marker_ack = true;
1003                     marker_end = producers.last_snap_end_seqno;
1004                 }
1005                 bytes_read += producers.last_packet_size;
1006                 last_snap_start_seqno = producers.last_snap_start_seqno;
1007                 break;
1008             case cb::mcbp::ClientOpcode::Invalid:
1009                 break;
1010             default:
1011                 // Aborting ...
1012                 std::string err_string(
1013                         "Unexpected DCP operation: " +
1014                         to_string(producers.last_op) + " last_byseqno: " +
1015                         std::to_string(producers.last_byseqno.load()) +
1016                         " last_key: " + producers.last_key + " last_value: " +
1017                         producers.last_value + " last_flags: " +
1018                         std::to_string(producers.last_flags));
1019                 check(false, err_string.c_str());
1020             }
1021             if (producers.last_byseqno >= end) {
1022                 done = true;
1023             }
1024             producers.last_op = cb::mcbp::ClientOpcode::Invalid;
1025         }
1026     } while (!done);
1027 
1028     /* Do buffer ack of the outstanding bytes */
1029     dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
1030     checkeq((end - start + 1), num_mutations, "Invalid number of mutations");
1031     if (expSnapStart) {
1032         checkge(last_snap_start_seqno,
1033                 expSnapStart,
1034                 "Incorrect snap start seqno");
1035     }
1036 }
1037 
dcp_stream_expiries_to_replica(EngineIface* h, const void* cookie, uint32_t opaque, Vbid vbucket, uint32_t flags, uint64_t start, uint64_t end, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint32_t delTime, uint64_t revSeqno = 0, uint8_t cas = 0x1)1038 static void dcp_stream_expiries_to_replica(EngineIface* h,
1039                                            const void* cookie,
1040                                            uint32_t opaque,
1041                                            Vbid vbucket,
1042                                            uint32_t flags,
1043                                            uint64_t start,
1044                                            uint64_t end,
1045                                            uint64_t snap_start_seqno,
1046                                            uint64_t snap_end_seqno,
1047                                            uint32_t delTime,
1048                                            uint64_t revSeqno = 0,
1049                                            uint8_t cas = 0x1) {
1050     auto dcp = requireDcpIface(h);
1051     checkeq(ENGINE_SUCCESS,
1052             dcp->snapshot_marker(cookie,
1053                                  opaque,
1054                                  vbucket,
1055                                  snap_start_seqno,
1056                                  snap_end_seqno,
1057                                  flags,
1058                                  0 /*HCS*/,
1059                                  {} /*maxVisibleSeqno*/),
1060             "Failed to send marker!");
1061     const std::string data("data");
1062     /* Stream Expiries */
1063     for (uint64_t i = start; i <= end; i++) {
1064         const std::string key{"key" + std::to_string(i)};
1065         const DocKey docKey{key, DocKeyEncodesCollectionId::No};
1066         checkeq(ENGINE_SUCCESS,
1067                 dcp->expiration(cookie,
1068                                 opaque,
1069                                 docKey,
1070                                 {},
1071                                 0, // priv bytes
1072                                 PROTOCOL_BINARY_RAW_BYTES,
1073                                 cas,
1074                                 vbucket,
1075                                 i,
1076                                 revSeqno,
1077                                 delTime),
1078                 "Failed dcp expiry");
1079     }
1080 }
1081 
1082 struct mb16357_ctx {
mb16357_ctxmb16357_ctx1083     mb16357_ctx(EngineIface* _h, int _items)
1084         : h(_h), dcp(requireDcpIface(h)), items(_items) {
1085     }
1086 
1087     EngineIface* h;
1088     gsl::not_null<DcpIface*> dcp;
1089     int items;
1090     std::mutex mutex;
1091     std::condition_variable cv;
1092     bool compactor_waiting{false};
1093     bool compaction_start{false};
1094 };
1095 
1096 struct writer_thread_ctx {
1097     EngineIface* h;
1098     int items;
1099     Vbid vbid;
1100 };
1101 
1102 struct continuous_dcp_ctx {
1103     EngineIface* h;
1104     const void *cookie;
1105     Vbid vbid;
1106     const std::string &name;
1107     uint64_t start_seqno;
1108     std::unique_ptr<TestDcpConsumer> dcpConsumer;
1109 };
1110 
1111 //Forward declaration required for dcp_thread_func
1112 static uint32_t add_stream_for_consumer(EngineIface* h,
1113                                         const void* cookie,
1114                                         uint32_t opaque,
1115                                         Vbid vbucket,
1116                                         uint32_t flags,
1117                                         cb::mcbp::Status response,
1118                                         uint64_t exp_snap_start = 0,
1119                                         uint64_t exp_snap_end = 0);
1120 
1121 extern "C" {
dcp_thread_func(void *args)1122     static void dcp_thread_func(void *args) {
1123         struct mb16357_ctx *ctx = static_cast<mb16357_ctx *>(args);
1124 
1125         const void* cookie = testHarness->create_cookie();
1126         uint32_t opaque = 0xFFFF0000;
1127         uint32_t flags = 0;
1128         std::string name = "unittest";
1129 
1130         // Wait for compaction thread to to ready (and waiting on cv) - as
1131         // we don't want the nofify_one() to be lost.
1132         for (;;) {
1133             std::lock_guard<std::mutex> lh(ctx->mutex);
1134             if (ctx->compactor_waiting) {
1135                 break;
1136             }
1137         };
1138         // Now compactor is waiting to run (and we are just about to start DCP
1139         // stream, activate compaction.
1140         {
1141             std::lock_guard<std::mutex> lh(ctx->mutex);
1142             ctx->compaction_start = true;
1143         }
1144         ctx->cv.notify_one();
1145 
1146         // Switch to replica
1147         check(set_vbucket_state(ctx->h, Vbid(0), vbucket_state_replica),
1148               "Failed to set vbucket state.");
1149 
1150         // Open consumer connection
1151         checkeq(ctx->dcp->open(cookie,
1152                                opaque,
1153                                0,
1154                                flags,
1155                                name,
1156                                R"({"consumer_name":"replica1"})"),
1157                 ENGINE_SUCCESS,
1158                 "Failed dcp Consumer open connection.");
1159 
1160         add_stream_for_consumer(ctx->h,
1161                                 cookie,
1162                                 opaque++,
1163                                 Vbid(0),
1164                                 0,
1165                                 cb::mcbp::Status::Success);
1166 
1167         uint32_t stream_opaque =
1168                 get_int_stat(ctx->h, "eq_dcpq:unittest:stream_0_opaque", "dcp");
1169 
1170         for (int i = 1; i <= ctx->items; i++) {
1171             std::stringstream ss;
1172             ss << "kamakeey-" << i;
1173 
1174             // send mutations in single mutation snapshots to race more with compaction
1175             ctx->dcp->snapshot_marker(cookie,
1176                                       stream_opaque,
1177                                       Vbid(0),
1178                                       ctx->items,
1179                                       ctx->items + i,
1180                                       2,
1181                                       0 /*HCS*/,
1182                                       {} /*maxVisibleSeqno*/);
1183 
1184             const std::string key = ss.str();
1185             const DocKey docKey{key, DocKeyEncodesCollectionId::No};
1186             ctx->dcp->mutation(cookie,
1187                                stream_opaque,
1188                                docKey,
1189                                {(const uint8_t*)"value", 5},
1190                                0, // priv bytes
1191                                PROTOCOL_BINARY_RAW_BYTES,
1192                                i * 3, // cas
1193                                Vbid(0),
1194                                0, // flags
1195                                i + ctx->items, // by_seqno
1196                                i + ctx->items, // rev_seqno
1197                                0, // exptime
1198                                0, // locktime
1199                                {}, // meta
1200                                INITIAL_NRU_VALUE);
1201         }
1202 
1203         testHarness->destroy_cookie(cookie);
1204     }
1205 
compact_thread_func(void *args)1206     static void compact_thread_func(void *args) {
1207         struct mb16357_ctx *ctx = static_cast<mb16357_ctx *>(args);
1208         std::unique_lock<std::mutex> lk(ctx->mutex);
1209         ctx->compactor_waiting = true;
1210         ctx->cv.wait(lk, [ctx]{return ctx->compaction_start;});
1211         compact_db(ctx->h, Vbid(0), Vbid(0), 99, ctx->items, 1);
1212     }
1213 
writer_thread(void *args)1214     static void writer_thread(void *args) {
1215         struct writer_thread_ctx *wtc = static_cast<writer_thread_ctx *>(args);
1216 
1217         for (int i = 0; i < wtc->items; ++i) {
1218             std::string key("key_" + std::to_string(i));
1219             checkeq(ENGINE_SUCCESS,
1220                     store(wtc->h,
1221                           nullptr,
1222                           OPERATION_SET,
1223                           key.c_str(),
1224                           "somevalue",
1225                           nullptr,
1226                           0,
1227                           wtc->vbid),
1228                     "Failed to store value");
1229         }
1230     }
1231 
continuous_dcp_thread(void *args)1232     static void continuous_dcp_thread(void *args) {
1233         struct continuous_dcp_ctx *cdc = static_cast<continuous_dcp_ctx *>(args);
1234 
1235         DcpStreamCtx ctx;
1236         ctx.vbucket = cdc->vbid;
1237         std::string vbuuid_entry("vb_" + std::to_string(cdc->vbid.get()) +
1238                                  ":0:id");
1239         ctx.vb_uuid = get_ull_stat(cdc->h, vbuuid_entry.c_str(), "failovers");
1240         ctx.seqno = {cdc->start_seqno, std::numeric_limits<uint64_t>::max()};
1241         ctx.snapshot = {cdc->start_seqno, cdc->start_seqno};
1242         ctx.skip_verification = true;
1243 
1244         cdc->dcpConsumer->addStreamCtx(ctx);
1245         cdc->dcpConsumer->run();
1246     }
1247 }
1248 
1249 /* DCP step thread that keeps running till it reads upto 'exp_mutations'.
1250    Note: the exp_mutations is cumulative across all streams in the DCP
1251          connection */
dcp_waiting_step(EngineIface* h, const void* cookie, uint32_t opaque, uint64_t exp_mutations, MockDcpMessageProducers& producers)1252 static void dcp_waiting_step(EngineIface* h,
1253                              const void* cookie,
1254                              uint32_t opaque,
1255                              uint64_t exp_mutations,
1256                              MockDcpMessageProducers& producers) {
1257     bool done = false;
1258     size_t bytes_read = 0;
1259     bool pending_marker_ack = false;
1260     uint64_t marker_end = 0;
1261     uint64_t num_mutations = 0;
1262 
1263     auto dcp = requireDcpIface(h);
1264 
1265     do {
1266         if (bytes_read > 512) {
1267             checkeq(ENGINE_SUCCESS,
1268                     dcp->buffer_acknowledgement(
1269                             cookie, ++opaque, Vbid(0), bytes_read),
1270                     "Failed to get dcp buffer ack");
1271             bytes_read = 0;
1272         }
1273         ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
1274         if (err == ENGINE_DISCONNECT) {
1275             done = true;
1276         } else {
1277             switch (producers.last_op) {
1278             case cb::mcbp::ClientOpcode::DcpMutation:
1279                 bytes_read += producers.last_packet_size;
1280                 if (pending_marker_ack &&
1281                     producers.last_byseqno == marker_end) {
1282                     sendDcpAck(h,
1283                                cookie,
1284                                cb::mcbp::ClientOpcode::DcpSnapshotMarker,
1285                                cb::mcbp::Status::Success,
1286                                producers.last_opaque);
1287                 }
1288                 ++num_mutations;
1289                 break;
1290             case cb::mcbp::ClientOpcode::DcpStreamEnd:
1291                 done = true;
1292                 bytes_read += producers.last_packet_size;
1293                 break;
1294             case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
1295                 if (producers.last_flags & 8) {
1296                     pending_marker_ack = true;
1297                     marker_end = producers.last_snap_end_seqno;
1298                 }
1299                 bytes_read += producers.last_packet_size;
1300                 break;
1301             case cb::mcbp::ClientOpcode::Invalid:
1302                 /* No messages were ready on the last step call, so we
1303                  * wait till the conn is notified of new item.
1304                  * Note that we check for 0 because we clear the
1305                  * producers.last_op value below.
1306                  */
1307                 testHarness->lock_cookie(cookie);
1308                 /* waitfor_cookie() waits on a condition variable. But
1309                    the api expects the cookie to be locked before
1310                    calling it */
1311                 wait_started = true;
1312                 testHarness->waitfor_cookie(cookie);
1313                 testHarness->unlock_cookie(cookie);
1314                 break;
1315             default:
1316                 // Aborting ...
1317                 std::string err_string("Unexpected DCP operation: " +
1318                                        to_string(producers.last_op));
1319                 check(false, err_string.c_str());
1320             }
1321             if (num_mutations >= exp_mutations) {
1322                 done = true;
1323             }
1324             producers.last_op = cb::mcbp::ClientOpcode::Invalid;
1325         }
1326     } while (!done);
1327 
1328     /* Do buffer ack of the outstanding bytes */
1329     dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
1330 }
1331 
1332 // Testcases //////////////////////////////////////////////////////////////////
1333 
test_dcp_vbtakeover_no_stream(EngineIface* h)1334 static enum test_result test_dcp_vbtakeover_no_stream(EngineIface* h) {
1335     write_items(h, 10);
1336     if (isPersistentBucket(h) && is_full_eviction(h)) {
1337         // MB-21646: FE mode - curr_items (which is part of "estimate") is
1338         // updated as part of flush, and thus if the writes are flushed in
1339         // blocks < 10 we may see an estimate < 10
1340         wait_for_flusher_to_settle(h);
1341     }
1342 
1343     const auto est = get_int_stat(h, "estimate", "dcp-vbtakeover 0");
1344     checkeq(10, est, "Invalid estimate for non-existent stream");
1345     checkeq(ENGINE_NOT_MY_VBUCKET,
1346             get_stats(h, "dcp-vbtakeover 1"_ccb, {}, add_stats),
1347             "Expected not my vbucket");
1348 
1349     return SUCCESS;
1350 }
1351 
1352 /*
1353  * The following test is similar to the test_dcp_consumer_open test and
1354  * test_dcp_producer_open test, in that it opens a connections, then
1355  * immediately closes it.
1356  * It then moves time forward and repeats the creation of a connection and
1357  * checks that the new connection was created after the previous connection.
1358  */
1359 
test_dcp_notifier_open(EngineIface* h)1360 static enum test_result test_dcp_notifier_open(EngineIface* h) {
1361     const auto* cookie1 = testHarness->create_cookie();
1362     const std::string name("unittest");
1363     const uint32_t seqno = 0;
1364     uint32_t opaque = 0;
1365     auto dcp = requireDcpIface(h);
1366 
1367     checkeq(ENGINE_SUCCESS,
1368             dcp->open(cookie1,
1369                       opaque,
1370                       seqno,
1371                       cb::mcbp::request::DcpOpenPayload::Notifier,
1372                       name,
1373                       R"({"consumer_name":"replica1"})"),
1374             "Failed dcp consumer open connection.");
1375 
1376     const std::string stat_type("eq_dcpq:" + name + ":type");
1377     auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1378     const std::string stat_created("eq_dcpq:" + name + ":created");
1379     const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1380     checkeq(0, type.compare("notifier"), "Notifier not found");
1381     testHarness->destroy_cookie(cookie1);
1382 
1383     testHarness->time_travel(600);
1384 
1385     const auto* cookie2 = testHarness->create_cookie();
1386     checkeq(ENGINE_SUCCESS,
1387             dcp->open(cookie2,
1388                       opaque,
1389                       seqno,
1390                       cb::mcbp::request::DcpOpenPayload::Notifier,
1391                       name,
1392                       R"({"consumer_name":"replica1"})"),
1393             "Failed dcp consumer open connection.");
1394 
1395     type = get_str_stat(h, stat_type.c_str(), "dcp");
1396     checkeq(0, type.compare("notifier"), "Notifier not found");
1397     checkle((created + 600), get_int_stat(h, stat_created.c_str(), "dcp"),
1398             "New dcp stream is not newer");
1399     testHarness->destroy_cookie(cookie2);
1400 
1401     return SUCCESS;
1402 }
1403 
test_dcp_notifier(EngineIface* h)1404 static enum test_result test_dcp_notifier(EngineIface* h) {
1405     write_items(h, 10);
1406     const auto* cookie = testHarness->create_cookie();
1407     const std::string name("unittest");
1408     const uint32_t seqno = 0;
1409     const Vbid vbucket = Vbid(0);
1410     uint32_t opaque = 0;
1411     uint64_t start = 0;
1412     auto dcp = requireDcpIface(h);
1413     MockDcpMessageProducers producers(h);
1414 
1415     checkeq(ENGINE_SUCCESS,
1416             dcp->open(cookie,
1417                       opaque,
1418                       seqno,
1419                       cb::mcbp::request::DcpOpenPayload::Notifier,
1420                       name,
1421                       R"({"consumer_name":"replica1"})"),
1422             "Failed dcp notifier open connection.");
1423     // Get notification for an old item
1424     notifier_request(h, cookie, ++opaque, vbucket, start, true);
1425     dcp_step(h, cookie, producers);
1426     checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1427             producers.last_op,
1428             "Expected stream end");
1429     // Get notification when we're slightly behind
1430     start += 9;
1431     notifier_request(h, cookie, ++opaque, vbucket, start, true);
1432     dcp_step(h, cookie, producers);
1433     checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1434             producers.last_op,
1435             "Expected stream end");
1436     // Wait for notification of a future item
1437     start += 11;
1438     notifier_request(h, cookie, ++opaque, vbucket, start, true);
1439     dcp_step(h, cookie, producers);
1440     for (auto j = 0; j < 5; ++j) {
1441         const auto key = "key" + std::to_string(j);
1442         checkeq(ENGINE_SUCCESS,
1443                 store(h,
1444                       nullptr,
1445                       OPERATION_SET,
1446                       key.c_str(),
1447                       "data",
1448                       nullptr),
1449                 "Failed to store a value");
1450     }
1451     // Shouldn't get a stream end yet
1452     dcp_step(h, cookie, producers);
1453     checkne(cb::mcbp::ClientOpcode::DcpStreamEnd,
1454             producers.last_op,
1455             "Wasn't expecting a stream end");
1456     for (auto j = 0; j < 6; ++j) {
1457         const auto key = "key" + std::to_string(j);
1458         checkeq(ENGINE_SUCCESS,
1459                 store(h,
1460                       nullptr,
1461                       OPERATION_SET,
1462                       key.c_str(),
1463                       "data",
1464                       nullptr),
1465                 "Failed to store a value");
1466     }
1467     // Should get a stream end
1468     dcp_step(h, cookie, producers);
1469     checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1470             producers.last_op,
1471             "Expected stream end");
1472     testHarness->destroy_cookie(cookie);
1473 
1474     return SUCCESS;
1475 }
1476 
1477 /*
1478  * The following test is similar to the previous test
1479  * (test_dcp_notifier) , whereby a notifier connection is opened and
1480  * notifier_requests are made checking for the occurance of stream
1481  * end commands.
1482  *
1483  * In the following test we make a notifier_request equal to
1484  * the number of operations performed (in this case one).  The test is
1485  * to ensure that a stream end is not received.  A second operation is
1486  * then performed and this time we check for the occurance of a
1487  * stream end command.
1488  */
1489 
test_dcp_notifier_equal_to_number_of_items( EngineIface* h)1490 static enum test_result test_dcp_notifier_equal_to_number_of_items(
1491         EngineIface* h) {
1492     const std::string key("key0");
1493     checkeq(ENGINE_SUCCESS,
1494             store(h, nullptr, OPERATION_SET, key.c_str(), "data", nullptr),
1495             "Failed to store a value");
1496 
1497     const auto* cookie = testHarness->create_cookie();
1498     const std::string name("unittest");
1499     const uint32_t seqno = 0;
1500     const Vbid vbucket = Vbid(0);
1501     const uint64_t start = 1;
1502     uint32_t opaque = 0;
1503     auto dcp = requireDcpIface(h);
1504     MockDcpMessageProducers producers(h);
1505 
1506     checkeq(ENGINE_SUCCESS,
1507             dcp->open(cookie,
1508                       opaque,
1509                       seqno,
1510                       cb::mcbp::request::DcpOpenPayload::Notifier,
1511                       name,
1512                       R"({"consumer_name":"replica1"})"),
1513             "Failed dcp notifier open connection.");
1514     // Should not get a stream end
1515     notifier_request(h, cookie, ++opaque, vbucket, start, true);
1516     dcp_step(h, cookie, producers);
1517     checkne(cb::mcbp::ClientOpcode::DcpStreamEnd,
1518             producers.last_op,
1519             "Wasn't expecting a stream end");
1520     checkeq(ENGINE_SUCCESS,
1521             store(h, nullptr, OPERATION_SET, "key0", "data"),
1522             "Failed to store a value");
1523     dcp_step(h, cookie, producers);
1524     checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1525             producers.last_op,
1526             "Expected stream end");
1527     testHarness->destroy_cookie(cookie);
1528 
1529     return SUCCESS;
1530 }
1531 
test_dcp_consumer_open(EngineIface* h)1532 static enum test_result test_dcp_consumer_open(EngineIface* h) {
1533     const auto* cookie1 = testHarness->create_cookie();
1534     const std::string name("unittest");
1535     const uint32_t opaque = 0;
1536     const uint32_t seqno = 0;
1537     const uint32_t flags = 0;
1538     auto dcp = requireDcpIface(h);
1539 
1540     checkeq(ENGINE_SUCCESS,
1541             dcp->open(cookie1,
1542                       opaque,
1543                       seqno,
1544                       flags,
1545                       name,
1546                       R"({"consumer_name":"replica1"})"),
1547             "Failed dcp consumer open connection.");
1548 
1549     const auto stat_type("eq_dcpq:" + name + ":type");
1550     auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1551     const auto stat_created("eq_dcpq:" + name + ":created");
1552     const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1553     checkeq(0, type.compare("consumer"), "Consumer not found");
1554     testHarness->destroy_cookie(cookie1);
1555 
1556     testHarness->time_travel(600);
1557 
1558     const auto* cookie2 = testHarness->create_cookie();
1559     checkeq(ENGINE_SUCCESS,
1560             dcp->open(cookie2,
1561                       opaque,
1562                       seqno,
1563                       flags,
1564                       name,
1565                       R"({"consumer_name":"replica1"})"),
1566             "Failed dcp consumer open connection.");
1567 
1568     type = get_str_stat(h, stat_type.c_str(), "dcp");
1569     checkeq(0, type.compare("consumer"), "Consumer not found");
1570     checklt(created, get_int_stat(h, stat_created.c_str(), "dcp"),
1571             "New dcp stream is not newer");
1572     testHarness->destroy_cookie(cookie2);
1573 
1574     return SUCCESS;
1575 }
1576 
test_dcp_consumer_flow_control_none(EngineIface* h)1577 static enum test_result test_dcp_consumer_flow_control_none(EngineIface* h) {
1578     const auto* cookie1 = testHarness->create_cookie();
1579     const std::string name("unittest");
1580     const uint32_t opaque = 0;
1581     const uint32_t seqno = 0;
1582     const uint32_t flags = 0;
1583     auto dcp = requireDcpIface(h);
1584 
1585     checkeq(ENGINE_SUCCESS,
1586             dcp->open(cookie1,
1587                       opaque,
1588                       seqno,
1589                       flags,
1590                       name,
1591                       R"({"consumer_name":"replica1"})"),
1592             "Failed dcp consumer open connection.");
1593 
1594     const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1595     checkeq(0,
1596             get_int_stat(h, stat_name.c_str(), "dcp"),
1597             "Flow Control Buffer Size not zero");
1598     testHarness->destroy_cookie(cookie1);
1599 
1600     return SUCCESS;
1601 }
1602 
test_dcp_consumer_flow_control_static(EngineIface* h)1603 static enum test_result test_dcp_consumer_flow_control_static(EngineIface* h) {
1604     const auto* cookie1 = testHarness->create_cookie();
1605     const std::string name("unittest");
1606     const uint32_t opaque = 0;
1607     const uint32_t seqno = 0;
1608     const uint32_t flags = 0;
1609     const auto flow_ctl_buf_def_size = 10485760;
1610     auto dcp = requireDcpIface(h);
1611 
1612     checkeq(ENGINE_SUCCESS,
1613             dcp->open(cookie1,
1614                       opaque,
1615                       seqno,
1616                       flags,
1617                       name,
1618                       R"({"consumer_name":"replica1"})"),
1619             "Failed dcp consumer open connection.");
1620 
1621     const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1622     checkeq(flow_ctl_buf_def_size,
1623             get_int_stat(h, stat_name.c_str(), "dcp"),
1624             "Flow Control Buffer Size not equal to default value");
1625     testHarness->destroy_cookie(cookie1);
1626 
1627     return SUCCESS;
1628 }
1629 
test_dcp_consumer_flow_control_dynamic(EngineIface* h)1630 static enum test_result test_dcp_consumer_flow_control_dynamic(EngineIface* h) {
1631     const auto* cookie1 = testHarness->create_cookie();
1632     const std::string name("unittest");
1633     const uint32_t opaque = 0;
1634     const uint32_t seqno = 0;
1635     const uint32_t flags = 0;
1636     /* Check the min limit */
1637     set_param(h,
1638               cb::mcbp::request::SetParamPayload::Type::Flush,
1639               "max_size",
1640               "500000000");
1641     checkeq(500000000, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1642 
1643     auto dcp = requireDcpIface(h);
1644     checkeq(ENGINE_SUCCESS,
1645             dcp->open(cookie1,
1646                       opaque,
1647                       seqno,
1648                       flags,
1649                       name,
1650                       R"({"consumer_name":"replica1"})"),
1651             "Failed dcp consumer open connection.");
1652 
1653     const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1654     checkeq(10485760,
1655             get_int_stat(h, stat_name.c_str(), "dcp"),
1656             "Flow Control Buffer Size not equal to min");
1657     testHarness->destroy_cookie(cookie1);
1658 
1659     /* Check the size as percentage of the bucket memory */
1660     const auto* cookie2 = testHarness->create_cookie();
1661     set_param(h,
1662               cb::mcbp::request::SetParamPayload::Type::Flush,
1663               "max_size",
1664               "2000000000");
1665     checkeq(2000000000, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1666 
1667     checkeq(ENGINE_SUCCESS,
1668             dcp->open(cookie2,
1669                       opaque,
1670                       seqno,
1671                       flags,
1672                       name,
1673                       R"({"consumer_name":"replica1"})"),
1674             "Failed dcp consumer open connection.");
1675 
1676     checkeq(20000000,
1677             get_int_stat(h, stat_name.c_str(), "dcp"),
1678             "Flow Control Buffer Size not equal to 1% of mem size");
1679     testHarness->destroy_cookie(cookie2);
1680 
1681     /* Check the case when mem used by flow control bufs hit the threshold */
1682     /* Create around 10 more connections to use more than 10% of the total
1683        memory */
1684     for (auto count = 0; count < 10; count++) {
1685         const auto* cookie = testHarness->create_cookie();
1686         checkeq(ENGINE_SUCCESS,
1687                 dcp->open(cookie,
1688                           opaque,
1689                           seqno,
1690                           flags,
1691                           name,
1692                           R"({"consumer_name":"replica1"})"),
1693                 "Failed dcp consumer open connection.");
1694         testHarness->destroy_cookie(cookie);
1695     }
1696     /* By now mem used by flow control bufs would have crossed the threshold */
1697     const auto* cookie3 = testHarness->create_cookie();
1698     checkeq(ENGINE_SUCCESS,
1699             dcp->open(cookie3,
1700                       opaque,
1701                       seqno,
1702                       flags,
1703                       name,
1704                       R"({"consumer_name":"replica1"})"),
1705             "Failed dcp consumer open connection.");
1706 
1707     checkeq(10485760,
1708             get_int_stat(h, stat_name.c_str(), "dcp"),
1709             "Flow Control Buffer Size not equal to min after threshold is hit");
1710     testHarness->destroy_cookie(cookie3);
1711 
1712     /* Check the max limit */
1713     const auto* cookie4 = testHarness->create_cookie();
1714     set_param(h,
1715               cb::mcbp::request::SetParamPayload::Type::Flush,
1716               "max_size",
1717               "7000000000");
1718     checkeq(static_cast<uint64_t>(7000000000),
1719             get_ull_stat(h, "ep_max_size"),
1720             "Incorrect new size.");
1721 
1722     checkeq(ENGINE_SUCCESS,
1723             dcp->open(cookie4,
1724                       opaque,
1725                       seqno,
1726                       flags,
1727                       name,
1728                       R"({"consumer_name":"replica1"})"),
1729             "Failed dcp consumer open connection.");
1730 
1731     checkeq(52428800,
1732             get_int_stat(h, stat_name.c_str(), "dcp"),
1733             "Flow Control Buffer Size beyond max");
1734     testHarness->destroy_cookie(cookie4);
1735 
1736     return SUCCESS;
1737 }
1738 
test_dcp_consumer_flow_control_aggressive( EngineIface* h)1739 static enum test_result test_dcp_consumer_flow_control_aggressive(
1740         EngineIface* h) {
1741     const auto max_conns = 6;
1742     const void *cookie[max_conns];
1743     const auto flow_ctl_buf_max = 52428800;
1744     const auto flow_ctl_buf_min = 10485760;
1745     const auto ep_max_size = 1200000000;
1746     const auto bucketMemQuotaFraction = 0.05;
1747     set_param(h,
1748               cb::mcbp::request::SetParamPayload::Type::Flush,
1749               "max_size",
1750               std::to_string(ep_max_size).c_str());
1751     checkeq(ep_max_size, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1752 
1753     std::vector<Vbid> vbuckets = {
1754             {Vbid{1}, Vbid{2}, Vbid{3}, Vbid{4}, Vbid{5}, Vbid{6}}};
1755     checkeq(std::size_t(max_conns),
1756             vbuckets.size(),
1757             "I need one vbucket per cookie");
1758     for (const auto& vb : vbuckets) {
1759         check(set_vbucket_state(h, vb, vbucket_state_replica),
1760               "Failed to set VBucket state.");
1761     }
1762 
1763     /* Create first connection */
1764     const std::string name("unittest_");
1765     const auto name1(name + "0");
1766     const uint32_t opaque = 0;
1767     const uint32_t seqno = 0;
1768     const uint32_t flags = 0;
1769     cookie[0] = testHarness->create_cookie();
1770     auto dcp = requireDcpIface(h);
1771 
1772     checkeq(ENGINE_SUCCESS,
1773             dcp->open(cookie[0],
1774                       opaque,
1775                       seqno,
1776                       flags,
1777                       name1,
1778                       R"({"consumer_name":"replica1"})"),
1779             "Failed dcp consumer open connection.");
1780 
1781     checkeq(ENGINE_SUCCESS,
1782             dcp->add_stream(cookie[0], 0, vbuckets[0], 0),
1783             "Failed to set up stream");
1784 
1785     /* Check the max limit */
1786     auto stat_name = "eq_dcpq:" + name1 + ":max_buffer_bytes";
1787     checkeq(flow_ctl_buf_max,
1788             get_int_stat(h, stat_name.c_str(), "dcp"),
1789             "Flow Control Buffer Size not equal to max");
1790 
1791     /* Create at least 4 more connections */
1792     for (auto count = 1; count < max_conns - 1; count++) {
1793         cookie[count] = testHarness->create_cookie();
1794         const auto name2(name + std::to_string(count));
1795         checkeq(ENGINE_SUCCESS,
1796                 dcp->open(cookie[count],
1797                           opaque,
1798                           seqno,
1799                           flags,
1800                           name2,
1801                           R"({"consumer_name":"replica1"})"),
1802                 "Failed dcp consumer open connection.");
1803 
1804         checkeq(ENGINE_SUCCESS,
1805                 dcp->add_stream(cookie[count], 0, vbuckets[count], 0),
1806                 "Failed to set up stream");
1807 
1808         for (auto i = 0; i <= count; i++) {
1809             /* Check if the buffer size of all connections has changed */
1810             const auto stat_name("eq_dcpq:" + name + std::to_string(i) +
1811                                ":max_buffer_bytes");
1812             checkeq((int)((ep_max_size * bucketMemQuotaFraction) / (count + 1)),
1813                     get_int_stat(h, stat_name.c_str(), "dcp"),
1814                     "Flow Control Buffer Size not correct");
1815         }
1816     }
1817 
1818     /* Opening another connection should set the buffer size to min value */
1819     cookie[max_conns - 1] = testHarness->create_cookie();
1820     const auto name3(name + std::to_string(max_conns - 1));
1821     const auto stat_name2("eq_dcpq:" + name3 + ":max_buffer_bytes");
1822     checkeq(ENGINE_SUCCESS,
1823             dcp->open(cookie[max_conns - 1],
1824                       opaque,
1825                       seqno,
1826                       flags,
1827                       name3,
1828                       R"({"consumer_name":"replica1"})"),
1829             "Failed dcp consumer open connection.");
1830 
1831     checkeq(ENGINE_SUCCESS,
1832             dcp->add_stream(
1833                     cookie[max_conns - 1], 0, vbuckets[max_conns - 1], 0),
1834             "Failed to set up stream");
1835 
1836     checkeq(flow_ctl_buf_min,
1837             get_int_stat(h, stat_name2.c_str(), "dcp"),
1838             "Flow Control Buffer Size not equal to min");
1839 
1840     /* Disconnect connections and see if flow control
1841      * buffer size of existing conns increase
1842      */
1843     for (auto count = 0; count < max_conns / 2; count++) {
1844         testHarness->destroy_cookie(cookie[count]);
1845     }
1846     /* Wait for disconnected connections to be deleted */
1847     wait_for_stat_to_be(h, "ep_dcp_dead_conn_count", 0, "dcp");
1848 
1849     /* Check if the buffer size of all connections has increased */
1850     const auto exp_buf_size = (int)((ep_max_size * bucketMemQuotaFraction) /
1851                               (max_conns - (max_conns / 2)));
1852 
1853     /* Also check if we get control message indicating the flow control buffer
1854        size change from the consumer connections */
1855     MockDcpMessageProducers producers(h);
1856 
1857     for (auto i = max_conns / 2; i < max_conns; i++) {
1858         /* Check if the buffer size of all connections has changed */
1859         const auto name4(name + std::to_string(i));
1860         const auto stat_name3("eq_dcpq:" + name4 + ":max_buffer_bytes");
1861         checkeq(exp_buf_size,
1862                 get_int_stat(h, stat_name3.c_str(), "dcp"),
1863                 "Flow Control Buffer Size not correct");
1864         checkeq(ENGINE_SUCCESS,
1865                 dcp->step(cookie[i], &producers),
1866                 "Pending flow control buffer change not processed");
1867         checkeq(cb::mcbp::ClientOpcode::DcpControl,
1868                 producers.last_op,
1869                 "Flow ctl buf size change control message not received");
1870         checkeq(0,
1871                 producers.last_key.compare("connection_buffer_size"),
1872                 "Flow ctl buf size change control message key error");
1873         checkeq(exp_buf_size,
1874                 atoi(producers.last_value.c_str()),
1875                 "Flow ctl buf size in control message not correct");
1876     }
1877     /* Disconnect remaining connections */
1878     for (auto count = max_conns / 2; count < max_conns; count++) {
1879         testHarness->destroy_cookie(cookie[count]);
1880     }
1881 
1882     return SUCCESS;
1883 }
1884 
test_dcp_producer_open(EngineIface* h)1885 static enum test_result test_dcp_producer_open(EngineIface* h) {
1886     const auto* cookie1 = testHarness->create_cookie();
1887     const std::string name("unittest");
1888     const uint32_t opaque = 0;
1889     const uint32_t seqno = 0;
1890     auto dcp = requireDcpIface(h);
1891 
1892     checkeq(ENGINE_SUCCESS,
1893             dcp->open(cookie1,
1894                       opaque,
1895                       seqno,
1896                       cb::mcbp::request::DcpOpenPayload::Producer,
1897                       name,
1898                       R"({"consumer_name":"replica1"})"),
1899             "Failed dcp producer open connection.");
1900     const auto stat_type("eq_dcpq:" + name + ":type");
1901     auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1902     const auto stat_created("eq_dcpq:" + name + ":created");
1903     const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1904     checkeq(0, type.compare("producer"), "Producer not found");
1905     testHarness->destroy_cookie(cookie1);
1906 
1907     testHarness->time_travel(600);
1908 
1909     const auto* cookie2 = testHarness->create_cookie();
1910     checkeq(ENGINE_SUCCESS,
1911             dcp->open(cookie2,
1912                       opaque,
1913                       seqno,
1914                       cb::mcbp::request::DcpOpenPayload::Producer,
1915                       name,
1916                       R"({"consumer_name":"replica1"})"),
1917             "Failed dcp producer open connection.");
1918     type = get_str_stat(h, stat_type.c_str(), "dcp");
1919     checkeq(0, type.compare("producer"), "Producer not found");
1920     checklt(created, get_int_stat(h, stat_created.c_str(), "dcp"),
1921             "New dcp stream is not newer");
1922     testHarness->destroy_cookie(cookie2);
1923 
1924     return SUCCESS;
1925 }
1926 
test_dcp_producer_open_same_cookie(EngineIface* h)1927 static enum test_result test_dcp_producer_open_same_cookie(EngineIface* h) {
1928     const auto* cookie = testHarness->create_cookie();
1929     const std::string name("unittest");
1930     uint32_t opaque = 0;
1931     const uint32_t seqno = 0;
1932     auto dcp = requireDcpIface(h);
1933 
1934     checkeq(ENGINE_SUCCESS,
1935             dcp->open(cookie,
1936                       opaque,
1937                       seqno,
1938                       cb::mcbp::request::DcpOpenPayload::Producer,
1939                       name,
1940                       R"({"consumer_name":"replica1"})"),
1941             "Failed dcp producer open connection.");
1942 
1943     const auto stat_type("eq_dcpq:" + name + ":type");
1944     auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1945     checkeq(0, type.compare("producer"), "Producer not found");
1946     /*
1947      * Number of references is 2 (as opposed to 1) because a
1948      * mock_connstuct is initialised to having 1 reference
1949      * to represent a client being connected to it.
1950      */
1951     checkeq(2,
1952             testHarness->get_number_of_mock_cookie_references(cookie),
1953             "Number of cookie references is not two");
1954     /*
1955      * engine_data needs to be reset so that it passes the check that
1956      * a connection does not already exist on the same socket.
1957      */
1958     testHarness->store_engine_specific(cookie, nullptr);
1959 
1960     checkeq(ENGINE_DISCONNECT,
1961             dcp->open(cookie,
1962                       opaque++,
1963                       seqno,
1964                       cb::mcbp::request::DcpOpenPayload::Producer,
1965                       name,
1966                       R"({"consumer_name":"replica1"})"),
1967             "Failed to return ENGINE_DISCONNECT");
1968 
1969     checkeq(2,
1970             testHarness->get_number_of_mock_cookie_references(cookie),
1971             "Number of cookie references is not two");
1972 
1973     testHarness->destroy_cookie(cookie);
1974 
1975     checkeq(1,
1976             testHarness->get_number_of_mock_cookie_references(cookie),
1977             "Number of cookie references is not one");
1978 
1979     return SUCCESS;
1980 }
1981 
test_dcp_noop(EngineIface* h)1982 static enum test_result test_dcp_noop(EngineIface* h) {
1983     const auto* cookie = testHarness->create_cookie();
1984     const std::string name("unittest");
1985     const uint32_t seqno = 0;
1986     uint32_t opaque = 0;
1987     auto dcp = requireDcpIface(h);
1988     MockDcpMessageProducers producers(h);
1989 
1990     checkeq(ENGINE_SUCCESS,
1991             dcp->open(cookie,
1992                       opaque,
1993                       seqno,
1994                       cb::mcbp::request::DcpOpenPayload::Producer,
1995                       name,
1996                       R"({"consumer_name":"replica1"})"),
1997             "Failed dcp producer open connection.");
1998     const std::string param1_name("connection_buffer_size");
1999     const std::string param1_value("1024");
2000     checkeq(ENGINE_SUCCESS,
2001             dcp->control(cookie, ++opaque, param1_name, param1_value),
2002             "Failed to establish connection buffer");
2003     const std::string param2_name("enable_noop");
2004     const std::string param2_value("true");
2005     checkeq(ENGINE_SUCCESS,
2006             dcp->control(cookie, ++opaque, param2_name, param2_value),
2007             "Failed to enable no-ops");
2008 
2009     testHarness->time_travel(201);
2010 
2011     auto done = false;
2012     while (!done) {
2013         if (dcp->step(cookie, &producers) == ENGINE_DISCONNECT) {
2014             done = true;
2015         } else if (producers.last_op == cb::mcbp::ClientOpcode::DcpNoop) {
2016             done = true;
2017             // Producer opaques are hard coded to start from 10M
2018             checkeq(10000001,
2019                     (int)producers.last_opaque,
2020                     "last_opaque != 10,000,001");
2021             const auto stat_name("eq_dcpq:" + name + ":noop_wait");
2022             checkeq(1,
2023                     get_int_stat(h, stat_name.c_str(), "dcp"),
2024                     "Didn't send noop");
2025             sendDcpAck(h,
2026                        cookie,
2027                        cb::mcbp::ClientOpcode::DcpNoop,
2028                        cb::mcbp::Status::Success,
2029                        producers.last_opaque);
2030             checkeq(0,
2031                     get_int_stat(h, stat_name.c_str(), "dcp"),
2032                     "Didn't ack noop");
2033         } else if (producers.last_op != cb::mcbp::ClientOpcode::Invalid) {
2034             abort();
2035         }
2036         producers.last_op = cb::mcbp::ClientOpcode::Invalid;
2037     }
2038     testHarness->destroy_cookie(cookie);
2039 
2040     return SUCCESS;
2041 }
2042 
test_dcp_noop_fail(EngineIface* h)2043 static enum test_result test_dcp_noop_fail(EngineIface* h) {
2044     const auto* cookie = testHarness->create_cookie();
2045     const std::string name("unittest");
2046     const uint32_t seqno = 0;
2047     uint32_t opaque = 0;
2048     auto dcp = requireDcpIface(h);
2049 
2050     checkeq(ENGINE_SUCCESS,
2051             dcp->open(cookie,
2052                       opaque,
2053                       seqno,
2054                       cb::mcbp::request::DcpOpenPayload::Producer,
2055                       name,
2056                       R"({"consumer_name":"replica1"})"),
2057             "Failed dcp producer open connection.");
2058     const std::string param1_name("connection_buffer_size");
2059     const std::string param1_value("1024");
2060     checkeq(ENGINE_SUCCESS,
2061             dcp->control(cookie, ++opaque, param1_name, param1_value),
2062             "Failed to establish connection buffer");
2063     const std::string param2_name("enable_noop");
2064     const std::string param2_value("true");
2065     checkeq(ENGINE_SUCCESS,
2066             dcp->control(cookie, ++opaque, param2_name, param2_value),
2067             "Failed to enable no-ops");
2068 
2069     testHarness->time_travel(201);
2070 
2071     MockDcpMessageProducers producers(h);
2072     while (dcp->step(cookie, &producers) != ENGINE_DISCONNECT) {
2073         if (producers.last_op == cb::mcbp::ClientOpcode::DcpNoop) {
2074             // Producer opaques are hard coded to start from 10M
2075             checkeq(10000001,
2076                     (int)producers.last_opaque,
2077                     "last_opaque != 10,000,001");
2078             const auto stat_name("eq_dcpq:" + name + ":noop_wait");
2079             checkeq(1,
2080                     get_int_stat(h, stat_name.c_str(), "dcp"),
2081                     "Didn't send noop");
2082             testHarness->time_travel(201);
2083         } else if (producers.last_op != cb::mcbp::ClientOpcode::Invalid) {
2084             abort();
2085         }
2086     }
2087     testHarness->destroy_cookie(cookie);
2088 
2089     return SUCCESS;
2090 }
2091 
test_dcp_consumer_noop(EngineIface* h)2092 static enum test_result test_dcp_consumer_noop(EngineIface* h) {
2093     check(set_vbucket_state(h, Vbid(0), vbucket_state_replica),
2094           "Failed to set vbucket state.");
2095     const auto* cookie = testHarness->create_cookie();
2096     const std::string name("unittest");
2097     const uint32_t seqno = 0;
2098     const uint32_t flags = 0;
2099     const Vbid vbucket = Vbid(0);
2100     uint32_t opaque = 0;
2101     auto dcp = requireDcpIface(h);
2102 
2103     // Open consumer connection
2104     checkeq(ENGINE_SUCCESS,
2105             dcp->open(cookie,
2106                       opaque,
2107                       seqno,
2108                       flags,
2109                       name,
2110                       R"({"consumer_name":"replica1"})"),
2111             "Failed dcp Consumer open connection.");
2112     add_stream_for_consumer(
2113             h, cookie, opaque, vbucket, flags, cb::mcbp::Status::Success);
2114     testHarness->time_travel(201);
2115     // No-op not recieved for 201 seconds. Should be ok.
2116     MockDcpMessageProducers producers(h);
2117     checkeq(ENGINE_EWOULDBLOCK,
2118             dcp->step(cookie, &producers),
2119             "Expected engine would block");
2120 
2121     testHarness->time_travel(200);
2122 
2123     // Message not recieved for over 400 seconds. Should disconnect.
2124     checkeq(ENGINE_DISCONNECT,
2125             dcp->step(cookie, &producers),
2126             "Expected engine disconnect");
2127     testHarness->destroy_cookie(cookie);
2128 
2129     return SUCCESS;
2130 }
2131 
2132 /**
2133  * Creates a DCP producer stream with the specified values for noop_manditory,
2134  * noop_enabled, and XATTRs enabled, and then attempts to open a stream,
2135  * checking for the expectedResult code.
2136  */
test_dcp_noop_mandatory_combo(EngineIface* h, bool noopManditory, bool enableNoop, bool enableXAttrs, ENGINE_ERROR_CODE expectedResult)2137 static void test_dcp_noop_mandatory_combo(EngineIface* h,
2138                                           bool noopManditory,
2139                                           bool enableNoop,
2140                                           bool enableXAttrs,
2141                                           ENGINE_ERROR_CODE expectedResult) {
2142     const void* cookie = testHarness->create_cookie();
2143 
2144     // Configure manditory noop as requested.
2145     set_param(h,
2146               cb::mcbp::request::SetParamPayload::Type::Flush,
2147               "dcp_noop_mandatory_for_v5_features",
2148               noopManditory ? "true" : "false");
2149     checkeq(noopManditory,
2150             get_bool_stat(h, "ep_dcp_noop_mandatory_for_v5_features"),
2151             "Incorrect value for dcp_noop_mandatory_for_v5_features");
2152 
2153     // Create DCP consumer with requested flags.
2154     TestDcpConsumer tdc("dcp_noop_manditory_test", cookie, h);
2155     uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2156     if (enableXAttrs) {
2157         flags |= cb::mcbp::request::DcpOpenPayload::IncludeXattrs;
2158     }
2159     tdc.openConnection(flags);
2160 
2161     // Setup noop on the DCP connection.
2162     checkeq(ENGINE_SUCCESS,
2163             tdc.sendControlMessage("enable_noop",
2164                                    enableNoop ? "true" : "false"),
2165             "Failed to configure noop");
2166 
2167     // Finally, attempt to create the stream and verify we get the expeced
2168     // response.
2169     DcpStreamCtx ctx;
2170     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2171     ctx.seqno = {0, static_cast<uint64_t>(-1)};
2172     ctx.exp_err = expectedResult;
2173     tdc.addStreamCtx(ctx);
2174 
2175     tdc.openStreams();
2176 
2177     testHarness->destroy_cookie(cookie);
2178 }
2179 
test_dcp_noop_mandatory(EngineIface* h)2180 static enum test_result test_dcp_noop_mandatory(EngineIface* h) {
2181     // Test all combinations of {manditoryNoop, enable_noop, includeXAttr}
2182     test_dcp_noop_mandatory_combo(h, false, false, false, ENGINE_SUCCESS);
2183     test_dcp_noop_mandatory_combo(h, false, false, true, ENGINE_SUCCESS);
2184     test_dcp_noop_mandatory_combo(h, false, true, false, ENGINE_SUCCESS);
2185     test_dcp_noop_mandatory_combo(h, false, true, true, ENGINE_SUCCESS);
2186     test_dcp_noop_mandatory_combo(h, true, false, false, ENGINE_SUCCESS);
2187     test_dcp_noop_mandatory_combo(h, true, false, true, ENGINE_ENOTSUP);
2188     test_dcp_noop_mandatory_combo(h, true, true, false, ENGINE_SUCCESS);
2189     test_dcp_noop_mandatory_combo(h, true, true, true, ENGINE_SUCCESS);
2190 
2191     return SUCCESS;
2192 }
2193 
test_dcp_producer_stream_req_open(EngineIface* h)2194 static enum test_result test_dcp_producer_stream_req_open(EngineIface* h) {
2195     const void* cookie = testHarness->create_cookie();
2196     const int num_items = 3;
2197 
2198     DcpStreamCtx ctx;
2199     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2200     ctx.seqno = {0, static_cast<uint64_t>(-1)};
2201 
2202     std::string name("unittest");
2203     TestDcpConsumer tdc(name.c_str(), cookie, h);
2204     tdc.addStreamCtx(ctx);
2205 
2206     tdc.openConnection();
2207 
2208     /* Create a separate thread that does tries to get any DCP items */
2209     std::thread dcp_step_thread(
2210             dcp_waiting_step, h, cookie, 0, num_items, std::ref(tdc.producers));
2211 
2212     /* We need to wait till the 'dcp_waiting_step' thread begins its wait */
2213     while (1) {
2214         /* Busy wait is ok here. To do a non busy wait we must use
2215          another condition variable which is an overkill here */
2216         testHarness->lock_cookie(cookie);
2217         if (wait_started) {
2218             testHarness->unlock_cookie(cookie);
2219             break;
2220         }
2221         testHarness->unlock_cookie(cookie);
2222     }
2223 
2224     /* Now create a stream */
2225     tdc.openStreams();
2226 
2227     /* Write items */
2228     write_items(h, num_items, 0);
2229     wait_for_flusher_to_settle(h);
2230     verify_curr_items(h, num_items, "Wrong amount of items");
2231 
2232     /* If the notification (to 'dcp_waiting_step' thread upon writing an item)
2233      mechanism is efficient, we must see the 'dcp_waiting_step' finish before
2234      test time out */
2235     dcp_step_thread.join();
2236 
2237     testHarness->destroy_cookie(cookie);
2238 
2239     return SUCCESS;
2240 }
2241 
test_dcp_producer_stream_req_partial(EngineIface* h)2242 static enum test_result test_dcp_producer_stream_req_partial(EngineIface* h) {
2243     // Should start at checkpoint_id 2
2244     const auto initial_ckpt_id =
2245             get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint");
2246     checkeq(2, initial_ckpt_id, "Expected to start at checkpoint ID 2");
2247 
2248     // Create two 'full' checkpoints by storing exactly 2 x 'chk_max_items'
2249     // into the VBucket.
2250     const auto max_ckpt_items = get_int_stat(h, "ep_chk_max_items");
2251 
2252     write_items(h, max_ckpt_items);
2253     wait_for_flusher_to_settle(h);
2254     wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", max_ckpt_items);
2255     checkeq(initial_ckpt_id + 1,
2256             get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint"),
2257             "Expected #checkpoints to increase by 1 after storing items");
2258 
2259     write_items(h, max_ckpt_items, max_ckpt_items);
2260     wait_for_flusher_to_settle(h);
2261     wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", max_ckpt_items * 2);
2262     checkeq(initial_ckpt_id + 2,
2263             get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint"),
2264             "Expected #checkpoints to increase by 2 after storing 2x "
2265             "max_ckpt_items");
2266 
2267     // Stop persistece (so the persistence cursor cannot advance into the
2268     // deletions below, and hence de-dupe them with respect to the
2269     // additions we just did).
2270     stop_persistence(h);
2271 
2272     // Now delete half of the keys. Given that we have reached the
2273     // maximum checkpoint size above, all the deletes should be in a
2274     // subsequent checkpoint.
2275     for (int j = 0; j < max_ckpt_items; ++j) {
2276         std::stringstream ss;
2277         ss << "key" << j;
2278         checkeq(ENGINE_SUCCESS,
2279                 del(h, ss.str().c_str(), 0, Vbid(0)),
2280                 "Expected delete to succeed");
2281     }
2282 
2283     const void* cookie = testHarness->create_cookie();
2284 
2285     // Verify that we recieve full checkpoints when we only ask for
2286     // sequence numbers which lie within partial checkpoints.  We
2287     // should have the following Checkpoints in existence:
2288     //
2289     //   {  1,100} - MUTATE(key0..key99), from disk.
2290     //   {101,200} - MUTATE(key100.key199), from disk.
2291     //   {201,300} - DELETE(key0..key99), in memory (as persistence has been stopped).
2292     //
2293     // We request a start and end which lie in the middle of checkpoints -
2294     // start at 105 and end at 209. We should recieve to the end of
2295     // complete checkpoints, i.e. from 105 all the way to 300.
2296     DcpStreamCtx ctx;
2297     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2298     ctx.seqno = {105, 209};
2299     ctx.snapshot = {105, 105};
2300     ctx.exp_mutations = 95; // 105 to 200
2301     ctx.exp_deletions = 100; // 201 to 300
2302 
2303     if (isPersistentBucket(h)) {
2304         ctx.exp_markers = 2;
2305     } else {
2306         // the ephemeral stream request won't be broken into two snapshots of
2307         // backfill from disk vs the checkpoint in memory
2308         ctx.exp_markers = 1;
2309     }
2310 
2311     TestDcpConsumer tdc("unittest", cookie, h);
2312     tdc.addStreamCtx(ctx);
2313     tdc.run();
2314 
2315     testHarness->destroy_cookie(cookie);
2316 
2317     return SUCCESS;
2318 }
2319 
test_dcp_producer_stream_req_full_merged_snapshots( EngineIface* h)2320 static enum test_result test_dcp_producer_stream_req_full_merged_snapshots(
2321         EngineIface* h) {
2322     const int num_items = 300, batch_items = 100;
2323     for (int start_seqno = 0; start_seqno < num_items;
2324          start_seqno += batch_items) {
2325         wait_for_flusher_to_settle(h);
2326         write_items(h, batch_items, start_seqno);
2327     }
2328 
2329     wait_for_flusher_to_settle(h);
2330     verify_curr_items(h, num_items, "Wrong amount of items");
2331     wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2332 
2333     const void* cookie = testHarness->create_cookie();
2334 
2335     DcpStreamCtx ctx;
2336     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2337     ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2338     ctx.exp_mutations = num_items;
2339     /* Disk backfill sends all items in disk as one snapshot */
2340     ctx.exp_markers = 1;
2341 
2342     TestDcpConsumer tdc("unittest", cookie, h);
2343     tdc.addStreamCtx(ctx);
2344     tdc.run();
2345 
2346     testHarness->destroy_cookie(cookie);
2347 
2348     return SUCCESS;
2349 }
2350 
test_dcp_producer_stream_req_full(EngineIface* h)2351 static enum test_result test_dcp_producer_stream_req_full(EngineIface* h) {
2352     const int num_items = 300, batch_items = 100;
2353     for (int start_seqno = 0; start_seqno < num_items;
2354          start_seqno += batch_items) {
2355         wait_for_flusher_to_settle(h);
2356         write_items(h, batch_items, start_seqno);
2357     }
2358 
2359     wait_for_flusher_to_settle(h);
2360     verify_curr_items(h, num_items, "Wrong amount of items");
2361     wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2362 
2363     checkne(num_items - get_stat<uint64_t>(h, "ep_items_rm_from_checkpoints"),
2364             uint64_t{0},
2365             "Require a non-zero number of items to still be present in "
2366             "CheckpointManager to be able to get 2x snapshot markers "
2367             "(1x disk, 1x memory)");
2368 
2369     const void* cookie = testHarness->create_cookie();
2370 
2371     DcpStreamCtx ctx;
2372     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2373     ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2374     ctx.exp_mutations = num_items;
2375     /* Memory backfill sends items from checkpoint snapshots as much as possible
2376        Relies on backfill only when checkpoint snapshot is cleaned up */
2377     ctx.exp_markers = 2;
2378 
2379     TestDcpConsumer tdc("unittest", cookie, h);
2380     tdc.addStreamCtx(ctx);
2381     tdc.run();
2382 
2383     testHarness->destroy_cookie(cookie);
2384 
2385     return SUCCESS;
2386 }
2387 
2388 /*
2389  * Test that deleted items (with values) backfill correctly
2390  */
test_dcp_producer_deleted_item_backfill( EngineIface* h)2391 static enum test_result test_dcp_producer_deleted_item_backfill(
2392         EngineIface* h) {
2393     const int deletions = 10;
2394     write_items(h,
2395                 deletions,
2396                 0,
2397                 "del",
2398                 "value",
2399                 0 /*exp*/,
2400                 Vbid(0),
2401                 DocumentState::Deleted);
2402     wait_for_flusher_to_settle(h);
2403 
2404     const void* cookie = testHarness->create_cookie();
2405 
2406     DcpStreamCtx ctx;
2407     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2408     ctx.seqno = {0, deletions};
2409     ctx.exp_deletions = deletions;
2410     ctx.expected_values = deletions;
2411     ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2412     ctx.exp_markers = 1;
2413 
2414     TestDcpConsumer tdc("unittest", cookie, h);
2415     tdc.addStreamCtx(ctx);
2416     tdc.run();
2417 
2418     testHarness->destroy_cookie(cookie);
2419 
2420     return SUCCESS;
2421 }
2422 
2423 // Function to parameterize whether expiries should be outputted or not.
testDcpProducerExpiredItemBackfill( EngineIface* h, EnableExpiryOutput enableExpiryOutput)2424 static test_result testDcpProducerExpiredItemBackfill(
2425         EngineIface* h, EnableExpiryOutput enableExpiryOutput) {
2426     const int expiries = 5;
2427     const int start_seqno = 0;
2428     write_items(h,
2429                 expiries,
2430                 start_seqno,
2431                 "exp",
2432                 "value",
2433                 5 /*exp*/,
2434                 Vbid(0),
2435                 DocumentState::Alive);
2436 
2437     wait_for_flusher_to_settle(h);
2438     verify_curr_items(h, expiries, "Wrong number of items");
2439 
2440     testHarness->time_travel(256);
2441     const void* cookie1 = testHarness->create_cookie();
2442     for (int i = 0; i < expiries; ++i) {
2443         std::string key("exp" + std::to_string(i + start_seqno));
2444         cb::EngineErrorItemPair ret =
2445                 get(h, cookie1, key.c_str(), Vbid(0), DocStateFilter::Alive);
2446         checkeq(cb::engine_errc::no_such_key,
2447                 ret.first,
2448                 "Expected get to return 'no_such_key'");
2449     }
2450     testHarness->destroy_cookie(cookie1);
2451 
2452     wait_for_flusher_to_settle(h);
2453     checkeq(get_stat<int>(h, "vb_active_expired"),
2454             expiries,
2455             "Expected vb_active_expired to contain correct number of expiries");
2456 
2457     DcpStreamCtx ctx;
2458     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2459     ctx.seqno = {0, expiries * 2}; // doubled as each will cause a mutation,
2460     // as well as an expiry
2461 
2462     if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2463         ctx.exp_expirations = expiries;
2464     } else {
2465         ctx.exp_deletions = expiries;
2466     }
2467 
2468     ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2469     ctx.exp_markers = 1;
2470 
2471     const void* cookie = testHarness->create_cookie();
2472     TestDcpConsumer tdc("unittest", cookie, h);
2473     uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2474 
2475     if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2476         // dcp expiry requires the connection to opt in to delete times
2477         flags |= cb::mcbp::request::DcpOpenPayload::IncludeDeleteTimes;
2478     }
2479     tdc.openConnection(flags);
2480 
2481     if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2482         checkeq(ENGINE_SUCCESS,
2483                 tdc.sendControlMessage("enable_expiry_opcode", "true"),
2484                 "Failed to enable_expiry_opcode");
2485     }
2486 
2487     tdc.addStreamCtx(ctx);
2488 
2489     tdc.run(false);
2490 
2491     testHarness->destroy_cookie(cookie);
2492 
2493     return SUCCESS;
2494 }
2495 
test_dcp_producer_stream_req_backfill(EngineIface* h)2496 static enum test_result test_dcp_producer_stream_req_backfill(EngineIface* h) {
2497     const int num_items = 400, batch_items = 200;
2498     for (int start_seqno = 0; start_seqno < num_items;
2499          start_seqno += batch_items) {
2500         if (200 == start_seqno) {
2501             wait_for_flusher_to_settle(h);
2502             wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", 200);
2503             stop_persistence(h);
2504         }
2505         write_items(h, batch_items, start_seqno);
2506     }
2507 
2508     wait_for_stat_to_be_gte(h, "vb_0:num_checkpoints", 2, "checkpoint");
2509 
2510     const void* cookie = testHarness->create_cookie();
2511 
2512     DcpStreamCtx ctx;
2513     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2514     ctx.seqno = {0, 200};
2515     ctx.exp_mutations = 200;
2516     ctx.exp_markers = 1;
2517 
2518     TestDcpConsumer tdc("unittest", cookie, h);
2519     tdc.addStreamCtx(ctx);
2520     tdc.run();
2521 
2522     testHarness->destroy_cookie(cookie);
2523 
2524     return SUCCESS;
2525 }
2526 
2527 /*
2528  * Test that expired items (with values) backfill correctly with Expiry Output
2529  * disabled
2530  */
test_dcp_producer_expired_item_backfill_delete( EngineIface* h)2531 static enum test_result test_dcp_producer_expired_item_backfill_delete(
2532         EngineIface* h) {
2533     return testDcpProducerExpiredItemBackfill(h, EnableExpiryOutput::No);
2534 }
2535 
2536 /*
2537  * Test that expired items (with values) backfill correctly with Expiry Output
2538  * enabled
2539  */
test_dcp_producer_expired_item_backfill_expire( EngineIface* h)2540 static enum test_result test_dcp_producer_expired_item_backfill_expire(
2541         EngineIface* h) {
2542     return testDcpProducerExpiredItemBackfill(h, EnableExpiryOutput::Yes);
2543 }
2544 
test_dcp_producer_stream_req_diskonly(EngineIface* h)2545 static enum test_result test_dcp_producer_stream_req_diskonly(EngineIface* h) {
2546     const int num_items = 300, batch_items = 100;
2547     for (int start_seqno = 0; start_seqno < num_items;
2548          start_seqno += batch_items) {
2549         wait_for_flusher_to_settle(h);
2550         write_items(h, batch_items, start_seqno);
2551     }
2552 
2553     wait_for_flusher_to_settle(h);
2554     verify_curr_items(h, num_items, "Wrong amount of items");
2555     wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2556 
2557     const void* cookie = testHarness->create_cookie();
2558 
2559     DcpStreamCtx ctx;
2560     ctx.flags = DCP_ADD_STREAM_FLAG_DISKONLY;
2561     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2562     ctx.seqno = {0, static_cast<uint64_t>(-1)};
2563     ctx.exp_mutations = 300;
2564     ctx.exp_markers = 1;
2565 
2566     TestDcpConsumer tdc("unittest", cookie, h);
2567     tdc.addStreamCtx(ctx);
2568     tdc.run();
2569 
2570     testHarness->destroy_cookie(cookie);
2571 
2572     return SUCCESS;
2573 }
2574 
test_dcp_producer_disk_backfill_limits(EngineIface* h)2575 static enum test_result test_dcp_producer_disk_backfill_limits(EngineIface* h) {
2576     const int num_items = 3;
2577     write_items(h, num_items);
2578 
2579     wait_for_flusher_to_settle(h);
2580     verify_curr_items(h, num_items, "Wrong amount of items");
2581     wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2582 
2583     const void* cookie = testHarness->create_cookie();
2584 
2585     DcpStreamCtx ctx;
2586     ctx.flags = DCP_ADD_STREAM_FLAG_DISKONLY;
2587     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2588     ctx.seqno = {0, static_cast<uint64_t>(-1)};
2589     ctx.exp_mutations = 3;
2590     ctx.exp_markers = 1;
2591 
2592     TestDcpConsumer tdc("unittest", cookie, h);
2593     tdc.addStreamCtx(ctx);
2594     tdc.run();
2595 
2596     uint64_t exp_backfill_task_runs;
2597     if (isPersistentBucket(h)) {
2598         /* Backfill task runs are expected as below:
2599            once for backfill_state_init + once for backfill_state_completing +
2600            once post all backfills are run finished. Here since we have
2601            dcp_scan_byte_limit = 100, we expect the backfill task to run
2602            additional 'num_items' during backfill_state_scanning state. */
2603         exp_backfill_task_runs = 3 + num_items;
2604     } else {
2605         /* Backfill task runs are expected as below:
2606            once for backfill_state_init + once for backfill_state_completing.
2607            Here since we have dcp_scan_byte_limit = 100, we expect the backfill
2608            task to run additional 'num_items' during BackfillState::scanning
2609            state. */
2610         exp_backfill_task_runs = 2 + num_items;
2611     }
2612     checkeq(exp_backfill_task_runs,
2613             get_histo_stat(h,
2614                            "BackfillManagerTask[auxIO]",
2615                            "runtimes",
2616                            Histo_stat_info::TOTAL_COUNT),
2617             "backfill_tasks did not run expected number of times");
2618 
2619     testHarness->destroy_cookie(cookie);
2620 
2621     return SUCCESS;
2622 }
2623 
test_dcp_producer_disk_backfill_buffer_limits( EngineIface* h)2624 static enum test_result test_dcp_producer_disk_backfill_buffer_limits(
2625         EngineIface* h) {
2626     const int num_items = 3;
2627     write_items(h, num_items);
2628 
2629     wait_for_flusher_to_settle(h);
2630     verify_curr_items(h, num_items, "Wrong amount of items");
2631 
2632     /* Wait for the checkpoint to be removed so that upon DCP connection
2633        backfill is scheduled */
2634     wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", num_items);
2635 
2636     const void* cookie = testHarness->create_cookie();
2637 
2638     DcpStreamCtx ctx;
2639     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2640     ctx.seqno = {0, num_items};
2641     ctx.exp_mutations = 3;
2642     ctx.exp_markers = 1;
2643 
2644     TestDcpConsumer tdc("unittest", cookie, h);
2645     tdc.addStreamCtx(ctx);
2646     tdc.run();
2647 
2648     testHarness->destroy_cookie(cookie);
2649 
2650     return SUCCESS;
2651 }
2652 
test_dcp_producer_stream_req_mem(EngineIface* h)2653 static enum test_result test_dcp_producer_stream_req_mem(EngineIface* h) {
2654     const int num_items = 300, batch_items = 100;
2655     for (int start_seqno = 0; start_seqno < num_items;
2656          start_seqno += batch_items) {
2657         wait_for_flusher_to_settle(h);
2658         write_items(h, batch_items, start_seqno);
2659     }
2660 
2661     wait_for_flusher_to_settle(h);
2662     verify_curr_items(h, num_items, "Wrong amount of items");
2663 
2664     const void* cookie = testHarness->create_cookie();
2665 
2666     DcpStreamCtx ctx;
2667     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2668     ctx.seqno = {200, 300};
2669     ctx.snapshot = {200, 200};
2670     ctx.exp_mutations = 100;
2671     ctx.exp_markers = 1;
2672 
2673     TestDcpConsumer tdc("unittest", cookie, h);
2674     tdc.addStreamCtx(ctx);
2675     tdc.run();
2676 
2677     testHarness->destroy_cookie(cookie);
2678 
2679     return SUCCESS;
2680 }
2681 
2682 /**
2683  * Test that a DCP stream request in DGM scenarios correctly receives items
2684  * from both memory and disk.
2685  */
test_dcp_producer_stream_req_dgm(EngineIface* h)2686 static enum test_result test_dcp_producer_stream_req_dgm(EngineIface* h) {
2687     // Test only works for the now removed 2-bit LRU eviction algorithm as it
2688     // relies on looking at the LRU state.
2689     // @todo Investigate converting the test to work with the new hifi_mfu
2690     // eviction algorithm.
2691     return SUCCESS;
2692 
2693     const void* cookie = testHarness->create_cookie();
2694 
2695     int i = 0;  // Item count
2696     while (true) {
2697         // Gathering stats on every store is expensive, just check every 100 iterations
2698         if ((i % 100) == 0) {
2699             if (get_int_stat(h, "vb_active_perc_mem_resident") < 50) {
2700                 break;
2701             }
2702         }
2703 
2704         std::stringstream ss;
2705         ss << "key" << i;
2706         ENGINE_ERROR_CODE ret =
2707                 store(h, cookie, OPERATION_SET, ss.str().c_str(), "somevalue");
2708         if (ret == ENGINE_SUCCESS) {
2709             i++;
2710         }
2711     }
2712 
2713     // Sanity check - ensure we have enough vBucket quota (max_size)
2714     // such that we have 1000 items - enough to give us 0.1%
2715     // granuarity in any residency calculations. */
2716     checkge(i, 1000,
2717             "Does not have expected min items; Check max_size setting");
2718 
2719     wait_for_flusher_to_settle(h);
2720 
2721     verify_curr_items(h, i, "Wrong number of items");
2722     double num_non_resident = get_int_stat(h, "vb_active_num_non_resident");
2723     checkge(num_non_resident,
2724             i * 0.5,
2725             "Expected at least 50% of items to be non-resident");
2726 
2727     // Reduce max_size from 6291456 to 6000000
2728     set_param(h,
2729               cb::mcbp::request::SetParamPayload::Type::Flush,
2730               "max_size",
2731               "6000000");
2732     checkgt(50,
2733             get_int_stat(h, "vb_active_perc_mem_resident"),
2734             "Too high percentage of memory resident");
2735 
2736     DcpStreamCtx ctx;
2737     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2738     ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2739     ctx.exp_mutations = i;
2740     ctx.exp_markers = 1;
2741 
2742     TestDcpConsumer tdc("unittest", cookie, h);
2743     tdc.addStreamCtx(ctx);
2744     tdc.run();
2745 
2746     testHarness->destroy_cookie(cookie);
2747 
2748     return SUCCESS;
2749 }
2750 
2751 /**
2752  * Test that eviction hotness data is passed in DCP stream.
2753  */
test_dcp_producer_stream_req_coldness(EngineIface* h)2754 static enum test_result test_dcp_producer_stream_req_coldness(EngineIface* h) {
2755     const void* cookie = testHarness->create_cookie();
2756 
2757     for (int ii = 0; ii < 10; ii++) {
2758         std::stringstream ss;
2759         ss << "key" << ii;
2760         store(h, cookie, OPERATION_SET, ss.str().c_str(), "somevalue");
2761     }
2762 
2763     wait_for_flusher_to_settle(h);
2764     verify_curr_items(h, 10, "Wrong number of items");
2765 
2766     for (int ii = 0; ii < 5; ii++) {
2767         std::stringstream ss;
2768         ss << "key" << ii;
2769         evict_key(h, ss.str().c_str(), Vbid(0), "Ejected.");
2770     }
2771     wait_for_flusher_to_settle(h);
2772     wait_for_stat_to_be(h, "ep_num_value_ejects", 5);
2773 
2774     TestDcpConsumer tdc("unittest", cookie, h);
2775     uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2776 
2777     tdc.openConnection(flags);
2778 
2779     checkeq(ENGINE_SUCCESS,
2780             tdc.sendControlMessage("supports_hifi_MFU", "true"),
2781             "Failed to configure MFU");
2782 
2783     DcpStreamCtx ctx;
2784     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2785     ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2786     ctx.exp_mutations = 10;
2787     ctx.exp_markers = 1;
2788 
2789     // Only stream from disk to ensure that we only ever get a single snapshot.
2790     // If we got unlucky we could see 2 snapshots due to creation of a second
2791     // checkpoint if we were streaming from the checkpoint manager.
2792     ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2793 
2794     tdc.addStreamCtx(ctx);
2795     tdc.run(false);
2796 
2797     checkeq(tdc.getNruCounters()[1],
2798             5, "unexpected number of hot items");
2799     checkeq(tdc.getNruCounters()[0],
2800             5, "unexpected number of cold items");
2801     testHarness->destroy_cookie(cookie);
2802 
2803     return SUCCESS;
2804 }
2805 
2806 /**
2807  * Test that eviction hotness data is picked up by the DCP consumer
2808  */
test_dcp_consumer_hotness_data(EngineIface* h)2809 static enum test_result test_dcp_consumer_hotness_data(EngineIface* h) {
2810     const void* cookie = testHarness->create_cookie();
2811     uint32_t opaque = 0xFFFF0000;
2812     uint32_t flags = 0;
2813     Vbid vbid = Vbid(0);
2814     const char* name = "unittest";
2815 
2816     // Set vbucket 0 to a replica so we can consume a mutation over DCP
2817     check(set_vbucket_state(h, vbid, vbucket_state_replica),
2818           "Failed to set vbucket state.");
2819 
2820     // Open consumer connection
2821     auto dcp = requireDcpIface(h);
2822     checkeq(ENGINE_SUCCESS,
2823             dcp->open(cookie,
2824                       opaque,
2825                       0,
2826                       flags,
2827                       name,
2828                       R"({"consumer_name":"replica1"})"),
2829             "Failed dcp Consumer open connection.");
2830 
2831     add_stream_for_consumer(h,
2832                             cookie,
2833                             opaque++,
2834                             vbid,
2835                             DCP_ADD_STREAM_FLAG_TAKEOVER,
2836                             cb::mcbp::Status::Success);
2837 
2838     uint32_t stream_opaque =
2839             get_int_stat(h, "eq_dcpq:unittest:stream_0_opaque", "dcp");
2840 
2841     // Snapshot marker indicating a mutation will follow
2842     checkeq(ENGINE_SUCCESS,
2843             dcp->snapshot_marker(cookie,
2844                                  stream_opaque,
2845                                  vbid,
2846                                  0,
2847                                  1,
2848                                  0,
2849                                  {} /*HCS*/,
2850                                  {} /*maxVisibleSeqno*/),
2851             "Failed to send marker!");
2852 
2853     const DocKey docKey("key", DocKeyEncodesCollectionId::No);
2854     checkeq(ENGINE_SUCCESS,
2855             dcp->mutation(cookie,
2856                           stream_opaque,
2857                           docKey,
2858                           {(const uint8_t*)"value", 5},
2859                           0, // privileged bytes
2860                           PROTOCOL_BINARY_RAW_BYTES,
2861                           0, // cas
2862                           vbid,
2863                           0, // flags
2864                           1, // by_seqno
2865                           0, // rev_seqno
2866                           0, // expiration
2867                           0, // lock_time
2868                           {}, // meta
2869                           128 // frequency value
2870                           ),
2871             "Failed to send dcp mutation");
2872 
2873     // Set vbucket 0 to active so we can perform a get
2874     check(set_vbucket_state(h, vbid, vbucket_state_active),
2875           "Failed to set vbucket state.");
2876 
2877     // Perform a get to retrieve the frequency counter value
2878     auto rv = get(h, cookie, "key", vbid, DocStateFilter::AliveOrDeleted);
2879     checkeq(cb::engine_errc::success, rv.first, "Failed to fetch");
2880     const Item* it = reinterpret_cast<const Item*>(rv.second.get());
2881 
2882     // Confirm that the item that was consumed over DCP has picked up
2883     // the correct hotness data value.
2884     // Performing the get may increase the hotness value by 1 and therefore
2885     // it is valid for the value to be 128 or 129.
2886     checkle(128,
2887             int(it->getFreqCounterValue()),
2888             "Failed to set the hotness data to the correct value");
2889     checkge(129,
2890             int(it->getFreqCounterValue()),
2891             "Failed to set the hotness data to the correct value");
2892 
2893     testHarness->destroy_cookie(cookie);
2894 
2895     return SUCCESS;
2896 }
2897 
test_dcp_producer_stream_latest(EngineIface* h)2898 static enum test_result test_dcp_producer_stream_latest(EngineIface* h) {
2899     const int num_items = 300, batch_items = 100;
2900     for (int start_seqno = 0; start_seqno < num_items;
2901          start_seqno += batch_items) {
2902         wait_for_flusher_to_settle(h);
2903         write_items(h, batch_items, start_seqno);
2904     }
2905 
2906     wait_for_flusher_to_settle(h);
2907     verify_curr_items(h, num_items, "Wrong amount of items");
2908 
2909     const void* cookie = testHarness->create_cookie();
2910 
2911     DcpStreamCtx ctx;
2912     ctx.flags = DCP_ADD_STREAM_FLAG_LATEST;
2913     ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2914     ctx.seqno = {200, 205};
2915     ctx.snapshot = {200, 200};
2916     ctx.exp_mutations = 100;
2917     ctx.exp_markers = 1;
2918 
2919     TestDcpConsumer tdc("unittest", cookie, h);
2920     tdc.addStreamCtx(ctx);
2921     tdc.run();
2922 
2923     testHarness->destroy_cookie(cookie);
2924 
2925     return SUCCESS;
2926 }
2927 
test_dcp_producer_keep_stream_open(EngineIface* h)2928 static enum test_result test_dcp_producer_keep_stream_open(EngineIface* h) {
2929     const std::string conn_name("unittest");
2930     const int num_items = 2, vb = 0;
2931 
2932     write_items(h, num_items);
2933 
2934     wait_for_flusher_to_settle(h);
2935     verify_curr_items(h, num_items, "Wrong amount of items");
2936 
2937     const void* cookie = testHarness->create_cookie();
2938 
2939     /* We want to stream items till end and keep the stream open. Then we want
2940        to verify the stream is still open */
2941     struct continuous_dcp_ctx cdc = {
2942             h,
2943             cookie,
2944             Vbid(0),
2945             conn_name,
2946             0,
2947             std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
2948     cb_thread_t dcp_thread;
2949     cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
2950               == 0);
2951 
2952     /* Wait for producer to be created */
2953     wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
2954 
2955     /* Wait for an active stream to be created */
2956     const std::string stat_stream_count("eq_dcpq:" + conn_name +
2957                                         ":num_streams");
2958     wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
2959 
2960     /* Wait for the dcp test client to receive upto highest seqno we have */
2961     cb::RelaxedAtomic<uint64_t> exp_items(num_items);
2962     wait_for_val_to_be("last_sent_seqno",
2963                        cdc.dcpConsumer->producers.last_byseqno,
2964                        exp_items);
2965 
2966     /* Check if the stream is still open after sending out latest items */
2967     std::string stat_stream_state("eq_dcpq:" + conn_name + ":stream_" +
2968                              std::to_string(vb) + "_state");
2969     std::string state = get_str_stat(h, stat_stream_state.c_str(), "dcp");
2970     checkeq(state.compare("in-memory"), 0, "Stream is not open");
2971 
2972     /* Before closing the connection stop the thread that continuously polls
2973        for dcp data */
2974     cdc.dcpConsumer->stop();
2975     testHarness->notify_io_complete(cookie, ENGINE_SUCCESS);
2976     cb_assert(cb_join_thread(dcp_thread) == 0);
2977     testHarness->destroy_cookie(cookie);
2978 
2979     return SUCCESS;
2980 }
2981 
test_dcp_producer_keep_stream_open_replica( EngineIface* h)2982 static enum test_result test_dcp_producer_keep_stream_open_replica(
2983         EngineIface* h) {
2984     /* This test case validates if a replica vbucket correctly sends items
2985        and snapshot end seqno when a stream requests for items till end of time
2986        (end_seqno in req is 2^64 - 1).
2987        The test has 2 parts.
2988        (i) Set up replica vbucket such that it has items to be streamed from
2989            backfill and memory.
2990        (ii) Open a stream (in a DCP conn) and see if all the items are received
2991             correctly */
2992 
2993     /* Part (i):  Set up replica vbucket such that it has items to be streamed
2994                   from backfill and memory. */
2995     check(set_vbucket_state(h, Vbid(0), vbucket_state_replica),
2996           "Failed to set vbucket state.");
2997 
2998     const void* cookie = testHarness->create_cookie();
2999     uint32_t opaque = 0xFFFF0000;
3000     uint32_t seqno = 0;
3001     uint32_t flags = 0;
3002     const int num_items = 10;
3003     const std::string conn_name("unittest");
3004     int vb = 0;
3005     auto dcp = requireDcpIface(h);
3006 
3007     /* Open an DCP consumer connection */
3008     checkeq(ENGINE_SUCCESS,
3009             dcp->open(cookie,
3010                       opaque,
3011                       seqno,
3012                       flags,
3013                       conn_name,
3014                       R"({"consumer_name":"replica1"})"),
3015             "Failed dcp producer open connection.");
3016 
3017     std::string type = get_str_stat(h, "eq_dcpq:unittest:type", "dcp");
3018     checkeq(0, type.compare("consumer"), "Consumer not found");
3019 
3020     opaque = add_stream_for_consumer(
3021             h, cookie, opaque, Vbid(0), 0, cb::mcbp::Status::Success);
3022 
3023     /* Send DCP mutations with in memory flag (0x01) */
3024     dcp_stream_to_replica(
3025             h, cookie, opaque, Vbid(0), 0x01, 1, num_items, 0, num_items);
3026 
3027     /* Send 10 more DCP mutations with checkpoint creation flag (0x04) */
3028     uint64_t start = num_items;
3029     dcp_stream_to_replica(h,
3030                           cookie,
3031                           opaque,
3032                           Vbid(0),
3033                           0x04,
3034                           start + 1,
3035                           start + 10,
3036                           start,
3037                           start + 10);
3038 
3039     wait_for_flusher_to_settle(h);
3040     stop_persistence(h);
3041     checkeq(2 * num_items,
3042             get_int_stat(h, "vb_replica_curr_items"),
3043             "wrong number of items in replica vbucket");
3044 
3045     /* Add 10 more items to the replica node on a new checkpoint.
3046        Send with flag (0x04) indicating checkpoint creation */
3047     start = 2 * num_items;
3048     dcp_stream_to_replica(h,
3049                           cookie,
3050                           opaque,
3051                           Vbid(0),
3052                           0x04,
3053                           start + 1,
3054                           start + 10,
3055                           start,
3056                           start + 10);
3057 
3058     /* Expecting for Disk backfill + in memory snapshot merge.
3059        Wait for a checkpoint to be removed */
3060     wait_for_stat_to_be_lte(h, "vb_0:num_checkpoints", 2, "checkpoint");
3061 
3062     /* Part (ii): Open a stream (in a DCP conn) and see if all the items are
3063                   received correctly */
3064     /* We want to stream items till end and keep the stream open. Then we want
3065        to verify the stream is still open */
3066     const void* cookie1 = testHarness->create_cookie();
3067     const std::string conn_name1("unittest1");
3068     struct continuous_dcp_ctx cdc = {
3069             h,
3070             cookie1,
3071             Vbid(0),
3072             conn_name1,
3073             0,
3074             std::make_unique<TestDcpConsumer>(conn_name1, cookie1, h)};
3075     cb_thread_t dcp_thread;
3076     cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
3077               == 0);
3078 
3079     /* Wait for producer to be created */
3080     wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
3081 
3082     /* Wait for an active stream to be created */
3083     const std::string stat_stream_count("eq_dcpq:" + conn_name1 +
3084                                         ":num_streams");
3085     wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
3086 
3087     /* Wait for the dcp test client to receive upto highest seqno we have */
3088     cb::RelaxedAtomic<uint64_t> exp_items(3 * num_items);
3089     wait_for_val_to_be("last_sent_seqno",
3090                        cdc.dcpConsumer->producers.last_byseqno,
3091                        exp_items);
3092 
3093     /* Check if correct snap end seqno is sent */
3094     std::string stat_stream_last_sent_snap_end_seqno("eq_dcpq:" + conn_name1 +
3095                                                      ":stream_" +
3096                                                      std::to_string(vb) +
3097                                                      "_last_sent_snap_end_seqno");
3098     wait_for_stat_to_be(h,
3099                         stat_stream_last_sent_snap_end_seqno.c_str(),
3100                         3 * num_items,
3101                         "dcp");
3102 
3103     /* Check if the stream is still open after sending out latest items */
3104     std::string stat_stream_state("eq_dcpq:" + conn_name1 + ":stream_" +
3105                                   std::to_string(vb) + "_state");
3106     std::string state = get_str_stat(h, stat_stream_state.c_str(), "dcp");
3107     checkeq(state.compare("in-memory"), 0, "Stream is not open");
3108 
3109     /* Before closing the connection stop the thread that continuously polls
3110        for dcp data */
3111     cdc.dcpConsumer->stop();
3112     testHarness->notify_io_complete(cookie1, ENGINE_SUCCESS);
3113     cb_assert(cb_join_thread(dcp_thread) == 0);
3114 
3115     testHarness->destroy_cookie(cookie);
3116     testHarness->destroy_cookie(cookie1);
3117 
3118     return SUCCESS;
3119 }
3120 
test_dcp_producer_stream_cursor_movement( EngineIface* h)3121 static enum test_result test_dcp_producer_stream_cursor_movement(
3122         EngineIface* h) {
3123     const std::string conn_name("unittest");
3124     const int num_items = 30;
3125     uint64_t curr_chkpt_id = 0;
3126     for (int j = 0; j < num_items; ++j) {
3127         if (j % 10 == 0) {
3128             wait_for_flusher_to_settle(h);
3129         }
3130         if (j == (num_items - 1)) {
3131             /* Since checkpoint max items is set to 10 and we are going to
3132                write 30th item, a new checkpoint could be added after
3133                writing and persisting the 30th item. I mean, if the checkpoint
3134                id is got outside the while loop, there could be an error due to
3135                race (flusher and checkpoint remover could run before getting
3136                this stat) */
3137             curr_chkpt_id =
3138                     get_ull_stat(h, "vb_0:open_checkpoint_id", "checkpoint");
3139         }
3140         std::string key("key" + std::to_string(j));
3141         checkeq(ENGINE_SUCCESS,
3142                 store(h, NULL, OPERATION_SET, key.c_str(), "data"),
3143                 "Failed to store a value");
3144     }
3145 
3146     wait_for_flusher_to_settle(h);
3147     verify_curr_items(h, num_items, "Wrong amount of items");
3148 
3149     const void* cookie = testHarness->create_cookie();
3150 
3151     /* We want to stream items till end and keep the stream open. We want to
3152        verify if the DCP cursor has moved to new open checkpoint */
3153     struct continuous_dcp_ctx cdc = {
3154             h,
3155             cookie,
3156             Vbid(0),
3157             conn_name,
3158             20,
3159             std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
3160     cb_thread_t dcp_thread;
3161     cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
3162               == 0);
3163 
3164     /* Wait for producer to be created */
3165     wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
3166 
3167     /* Wait for an active stream to be created */
3168     const std::string stat_stream_count("eq_dcpq:" + conn_name +
3169                                         ":num_streams");
3170     wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
3171 
3172     /* Wait for the dcp test client to receive upto highest seqno we have */
3173     cb::RelaxedAtomic<uint64_t> exp_items(num_items);
3174     wait_for_val_to_be("last_sent_seqno",
3175                        cdc.dcpConsumer->producers.last_byseqno,
3176                        exp_items);
3177 
3178     /* Wait for new open (empty) checkpoint to be added */
3179     wait_for_stat_to_be(
3180             h, "vb_0:open_checkpoint_id", curr_chkpt_id + 1, "checkpoint");
3181 
3182     /* We want to make sure that no cursors are lingering on any of the previous
3183        checkpoints. For that we wait for checkpoint remover to remove all but
3184        the latest open checkpoint cursor */
3185     wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
3186 
3187     /* Before closing the connection stop the thread that continuously polls
3188        for dcp data */
3189     cdc.dcpConsumer->stop();
3190     testHarness->notify_io_complete(cookie, ENGINE_SUCCESS);
3191     cb_assert(cb_join_thread(dcp_thread) == 0);
3192     testHarness->destroy_cookie(cookie);
3193 
3194     return SUCCESS;
3195 }
3196 
test_dcp_producer_stream_req_nmvb(EngineIface* h)3197 static test_result test_dcp_producer_stream_req_nmvb(EngineIface* h) {
3198     const void* cookie1 = testHarness->create_cookie();
3199     uint32_t opaque = 0;
3200     uint32_t seqno = 0;
3201     uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
3202     const char *name = "unittest";
3203     auto dcp = requireDcpIface(h);
3204 
3205     checkeq(ENGINE_SUCCESS,
3206             dcp->open(cookie1,
3207                       opaque,
3208                       seqno,
3209                       flags,
3210                       name,
3211                       R"({"consumer_name":"replica1"})"),
3212             "Failed dcp producer open connection.");
3213 
3214     Vbid req_vbucket = Vbid(1);
3215     uint64_t rollback = 0;
3216 
3217     checkeq(ENGINE_NOT_MY_VBUCKET,
3218             dcp->stream_req(cookie1,
3219                             0,
3220                             0,
3221                             req_vbucket,
3222                             0,
3223                             0,
3224                             0,
3225                             0,
3226                             0,
3227                             &rollback,
3228                             mock_dcp_add_failover_log,
3229                             {}),
3230             "Expected not my vbucket");
3231     testHarness->destroy_cookie(cookie1);
3232 
3233     return SUCCESS;
3234 }
3235 
test_dcp_agg_stats(EngineIface* h)3236 static test_result test_dcp_agg_stats(EngineIface* h) {
3237     const int num_items = 300, batch_items = 100;
3238     for (int start_seqno = 0; start_seqno < num_items;
3239          start_seqno += batch_items) {
3240         wait_for_flusher_to_settle(h);
3241         write_items(h, batch_items, start_seqno);
3242     }
3243 
3244     wait_for_flusher_to_settle(h);
3245     verify_curr_items(h, num_items, "Wrong amount of items");
3246 
3247     const void *cookie[5];
3248 
3249     uint64_t total_bytes = 0;
3250     for (int j = 0; j < 5; ++j) {
3251         std::string name("unittest_" + std::to_string(j));
3252         cookie[j] = testHarness->create_cookie();
3253 
3254         DcpStreamCtx ctx;
3255         ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
3256         ctx.seqno = {200, 300};
3257         ctx.snapshot = {200, 200};
3258         ctx.exp_mutations = 100;
3259         ctx.exp_markers = 1;
3260 
3261         TestDcpConsumer tdc(name, cookie[j], h);
3262