1/* -*- MODE: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18/*
19 * Testsuite for 'dcp' functionality in ep-engine.
20 */
21
22// We need to include the tracer _before_ the header which defines
23// the check macros to avoid conflicts with folly's headers
24#include <memcached/tracer.h>
25
26#include "ep_test_apis.h"
27#include "ep_testsuite_common.h"
28#include "mock/mock_dcp.h"
29#include "programs/engine_testapp/mock_server.h"
30
31#include <platform/cb_malloc.h>
32#include <platform/cbassert.h>
33#include <platform/compress.h>
34#include <platform/platform_thread.h>
35#include <condition_variable>
36#include <thread>
37
38using namespace std::string_literals;
39
40// Helper functions ///////////////////////////////////////////////////////////
41
42/**
43 * Converts the given engine to a DcpIface*. If engine doesn't implement
44 * DcpIface then throws.
45 * @returns non-null ptr to DcpIface.
46 */
47static gsl::not_null<DcpIface*> requireDcpIface(EngineIface* engine) {
48    return dynamic_cast<DcpIface*>(engine);
49}
50
51static void dcp_step(EngineIface* h,
52                     const void* cookie,
53                     MockDcpMessageProducers& producers) {
54    auto dcp = requireDcpIface(h);
55    ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
56    check(err == ENGINE_SUCCESS || err == ENGINE_EWOULDBLOCK,
57          "Expected success or engine_ewouldblock");
58    if (err == ENGINE_EWOULDBLOCK) {
59        producers.clear_dcp_data();
60    }
61}
62
63static void dcpHandleResponse(EngineIface* h,
64                              const void* cookie,
65                              protocol_binary_response_header* response,
66                              MockDcpMessageProducers& producers) {
67    auto dcp = requireDcpIface(h);
68    auto erroCode = dcp->response_handler(cookie, response);
69    check(erroCode == ENGINE_SUCCESS || erroCode == ENGINE_EWOULDBLOCK,
70          "Expected 'success' or 'engine_ewouldblock'");
71    if (erroCode == ENGINE_EWOULDBLOCK) {
72        producers.clear_dcp_data();
73    }
74}
75
76static bool wait_started(false);
77
78struct SeqnoRange {
79    uint64_t start;
80    uint64_t end;
81};
82
83/**
84 * DeletionOpcode is used to determine whether or not to perform the deletion
85 * path, or the expiration path.
86 */
87enum class DeletionOpcode : bool {
88    Deletion,
89    Expiration,
90};
91
92class DcpStreamCtx {
93/**
94 * This class represents all attributes required for
95 * a stream. Objects of this class type are to be fed
96 * to TestDcpConsumer.
97 */
98public:
99    DcpStreamCtx()
100        : vbucket(0),
101          flags(0),
102          vb_uuid(0),
103          exp_mutations(0),
104          exp_deletions(0),
105          exp_expirations(0),
106          exp_markers(0),
107          extra_takeover_ops(0),
108          exp_disk_snapshot(false),
109          exp_conflict_res(0),
110          skip_estimate_check(false),
111          live_frontend_client(false),
112          skip_verification(false),
113          exp_err(ENGINE_SUCCESS),
114          exp_rollback(0),
115          expected_values(0),
116          opaque(0) {
117        seqno = {0, static_cast<uint64_t>(~0)};
118        snapshot = {0, static_cast<uint64_t>(~0)};
119    }
120
121    /* Vbucket Id */
122    Vbid vbucket;
123    /* Stream flags */
124    uint32_t flags;
125    /* Vbucket UUID */
126    uint64_t vb_uuid;
127    /* Sequence number range */
128    SeqnoRange seqno;
129    /* Snapshot range */
130    SeqnoRange snapshot;
131    /* Number of mutations expected (for verification) */
132    size_t exp_mutations;
133    /* Number of deletions expected (for verification) */
134    size_t exp_deletions;
135    /* Number of expiries expected (for verification) */
136    size_t exp_expirations;
137    /* Number of snapshot markers expected (for verification) */
138    size_t exp_markers;
139    /* Extra front end mutations as part of takeover */
140    size_t extra_takeover_ops;
141    /* Flag - expect disk snapshot or not */
142    bool exp_disk_snapshot;
143    /* Expected conflict resolution flag */
144    uint8_t exp_conflict_res;
145    /* Skip estimate check during takeover */
146    bool skip_estimate_check;
147    /*
148       live_frontend_client to be set to true when streaming is done in parallel
149       with a client issuing writes to the vbucket. In this scenario, predicting
150       the number of snapshot markers received is difficult.
151    */
152    bool live_frontend_client;
153    /*
154       skip_verification to be set to true if verification of mutation count,
155       deletion count, marker count etc. is to be skipped at the end of
156       streaming.
157     */
158    bool skip_verification;
159    /* Expected error code on stream creation. We need this because rollback is
160       a valid operation and returns ENGINE_ROLLBACK (not ENGINE_SUCCESS) */
161    ENGINE_ERROR_CODE exp_err;
162    /* Expected rollback seqno */
163    uint64_t exp_rollback;
164    /* Expected number of values (from mutations or deleted_values) */
165    size_t expected_values;
166    /* stream opaque */
167    uint32_t opaque;
168};
169
170class TestDcpConsumer {
171/**
172 * This class represents a DcpConsumer which is responsible
173 * for spawning a DcpProducer at the server and receiving
174 * messages from it.
175 */
176public:
177    TestDcpConsumer(const std::string& _name,
178                    const void* _cookie,
179                    EngineIface* h)
180        : producers(h),
181          name(_name),
182          cookie(_cookie),
183          opaque(0),
184          total_bytes(0),
185          simulate_cursor_dropping(false),
186          flow_control_buf_size(1024),
187          disable_ack(false),
188          h(h),
189          dcp(requireDcpIface(h)),
190          nruCounter(2) {
191    }
192
193    uint64_t getTotalBytes() {
194        return total_bytes;
195    }
196
197    void simulateCursorDropping() {
198        simulate_cursor_dropping = true;
199    }
200
201    void setFlowControlBufSize(uint64_t to) {
202        flow_control_buf_size = to;
203    }
204
205    void disableAcking() {
206        disable_ack = true;
207    }
208
209    void addStreamCtx(DcpStreamCtx &ctx) {
210        stream_ctxs.push_back(ctx);
211    }
212
213    void run(bool openConn = true);
214
215    // Stop the thread if it is running. This is safe to be called from
216    // a different thread to the thread calling run().
217    void stop();
218
219    /**
220     * This method just opens a DCP connection. Note it does not open a stream
221     * and does not call the dcp step function to get all the items from the
222     * producer.
223     * @param flags Flags to pass to DCP_OPEN.
224     */
225    void openConnection(
226            uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer);
227
228    /* This method opens a stream on an existing DCP connection.
229       This does not call the dcp step function to get all the items from the
230       producer */
231    ENGINE_ERROR_CODE openStreams();
232
233    /* if clear is true, it will also clear the stream vector */
234    ENGINE_ERROR_CODE closeStreams(bool fClear = false);
235
236    ENGINE_ERROR_CODE sendControlMessage(const std::string& name,
237                                         const std::string& value);
238
239    const std::vector<int>& getNruCounters() const {
240        return nruCounter;
241    }
242
243    MockDcpMessageProducers producers;
244
245private:
246    /* Vbucket-level stream stats used in test */
247    struct VBStats {
248        VBStats()
249            : num_mutations(0),
250              num_deletions(0),
251              num_expirations(0),
252              num_snapshot_markers(0),
253              num_set_vbucket_pending(0),
254              num_set_vbucket_active(0),
255              pending_marker_ack(false),
256              marker_end(0),
257              last_by_seqno(0),
258              extra_takeover_ops(0),
259              exp_disk_snapshot(false),
260              exp_conflict_res(0),
261              num_values(0) {
262        }
263
264        size_t num_mutations;
265        size_t num_deletions;
266        size_t num_expirations;
267        size_t num_snapshot_markers;
268        size_t num_set_vbucket_pending;
269        size_t num_set_vbucket_active;
270        bool pending_marker_ack;
271        uint64_t marker_end;
272        uint64_t last_by_seqno;
273        size_t extra_takeover_ops;
274        bool exp_disk_snapshot;
275        uint8_t exp_conflict_res;
276        size_t num_values;
277    };
278
279    /* Connection name */
280    const std::string name;
281    /* Connection cookie */
282    const void *cookie;
283    /* Vector containing information of streams */
284    std::vector<DcpStreamCtx> stream_ctxs;
285    /* Opaque value in the connection */
286    uint32_t opaque;
287    /* Total bytes received */
288    uint64_t total_bytes;
289    /* Flag to simulate cursor dropping */
290    bool simulate_cursor_dropping;
291    /* Flow control buffer size */
292    uint64_t flow_control_buf_size;
293    /* Flag to disable acking */
294    bool disable_ack;
295    /* map of vbstats */
296    std::map<Vbid, VBStats> vb_stats;
297    EngineIface* h;
298    gsl::not_null<DcpIface*> dcp;
299    std::vector<int> nruCounter;
300
301    // Flag used by run() to check if it should continue to execute.
302    std::atomic<bool> done{false};
303
304    /**
305     * Helper function to perform the very similar resolution of a deletion
306     * and an expiry, triggered inside the run() case switch where one of these
307     * operations is returned as the last_op.
308     * @param stats The vbstats that will be updated by this function.
309     * @param bytes_read The current no of bytes read which will be updated by
310     *                   this function.
311     * @param all_bytes The total no of bytes read which will be updated by
312     *                  this function.
313     * @param vbid The vBucket ID.
314     * @param delOrExpire Determines whether to take the deletion case or the
315     *                    expiration case.
316     */
317    void deleteOrExpireCase(TestDcpConsumer::VBStats& stats,
318                            uint32_t& bytes_read,
319                            uint64_t& all_bytes,
320                            Vbid vbid,
321                            DeletionOpcode delOrExpire);
322};
323
324ENGINE_ERROR_CODE TestDcpConsumer::sendControlMessage(
325        const std::string& name, const std::string& value) {
326    auto dcp = requireDcpIface(h);
327    return dcp->control(cookie, ++opaque, name, value);
328}
329
330void TestDcpConsumer::deleteOrExpireCase(TestDcpConsumer::VBStats& stats,
331                                         uint32_t& bytes_read,
332                                         uint64_t& all_bytes,
333                                         Vbid vbid,
334                                         DeletionOpcode delOrExpire) {
335    cb_assert(vbid != static_cast<Vbid>(-1));
336    checklt(stats.last_by_seqno,
337            producers.last_byseqno.load(),
338            "Expected bigger seqno");
339    stats.last_by_seqno = producers.last_byseqno;
340    if (delOrExpire == DeletionOpcode::Deletion) {
341        stats.num_deletions++;
342    } else {
343        stats.num_expirations++;
344    }
345    bytes_read += producers.last_packet_size;
346    all_bytes += producers.last_packet_size;
347    if (stats.pending_marker_ack &&
348        producers.last_byseqno == stats.marker_end) {
349        sendDcpAck(h,
350                   cookie,
351                   cb::mcbp::ClientOpcode::DcpSnapshotMarker,
352                   cb::mcbp::Status::Success,
353                   producers.last_opaque);
354    }
355
356    if (!producers.last_value.empty()) {
357        stats.num_values++;
358    }
359    return;
360}
361
362void TestDcpConsumer::run(bool openConn) {
363    checkle(size_t{1}, stream_ctxs.size(), "No dcp_stream arguments provided!");
364
365    /* Open the connection with the DCP producer */
366    if (openConn) {
367        openConnection();
368    }
369
370    /* Open streams in the above open connection */
371    openStreams();
372
373    bool exp_all_items_streamed = true;
374    size_t num_stream_ends_received = 0;
375    uint32_t bytes_read = 0;
376    uint64_t all_bytes = 0;
377    uint64_t total_acked_bytes = 0;
378    uint64_t ack_limit = flow_control_buf_size / 2;
379
380    bool delay_buffer_acking = false;
381    if (simulate_cursor_dropping) {
382        /**
383         * Simulates cursor dropping by slowing down the initial buffer
384         * acknowledgement from the consmer.
385         *
386         * Note that the cursor may not be dropped if the memory usage
387         * is not over the cursor_dropping_upper_threshold or if the
388         * checkpoint_remover sleep time is high.
389         */
390        delay_buffer_acking = true;
391    }
392
393    do {
394        if (!disable_ack && (bytes_read > ack_limit)) {
395            if (delay_buffer_acking) {
396                std::this_thread::sleep_for(std::chrono::seconds(2));
397                delay_buffer_acking = false;
398            }
399            dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
400            total_acked_bytes += bytes_read;
401            bytes_read = 0;
402        }
403        ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
404        if (err == ENGINE_DISCONNECT) {
405            done = true;
406        } else {
407            const Vbid vbid = producers.last_vbucket;
408            auto &stats = vb_stats[vbid];
409            switch (producers.last_op) {
410            case cb::mcbp::ClientOpcode::DcpMutation:
411                cb_assert(vbid != static_cast<Vbid>(-1));
412                checklt(stats.last_by_seqno,
413                        producers.last_byseqno.load(),
414                        "Expected bigger seqno");
415                stats.last_by_seqno = producers.last_byseqno;
416                stats.num_mutations++;
417                bytes_read += producers.last_packet_size;
418                all_bytes += producers.last_packet_size;
419                if (stats.pending_marker_ack &&
420                    producers.last_byseqno == stats.marker_end) {
421                    sendDcpAck(h,
422                               cookie,
423                               cb::mcbp::ClientOpcode::DcpSnapshotMarker,
424                               cb::mcbp::Status::Success,
425                               producers.last_opaque);
426                }
427
428                if (producers.last_nru > 0) {
429                    nruCounter[1]++;
430                } else {
431                    nruCounter[0]++;
432                }
433                if (!producers.last_value.empty()) {
434                    stats.num_values++;
435                }
436
437                break;
438            case cb::mcbp::ClientOpcode::DcpDeletion:
439                deleteOrExpireCase(stats,
440                                   bytes_read,
441                                   all_bytes,
442                                   vbid,
443                                   DeletionOpcode::Deletion);
444                break;
445            case cb::mcbp::ClientOpcode::DcpExpiration:
446                deleteOrExpireCase(stats,
447                                   bytes_read,
448                                   all_bytes,
449                                   vbid,
450                                   DeletionOpcode::Expiration);
451                break;
452            case cb::mcbp::ClientOpcode::DcpStreamEnd:
453                cb_assert(vbid != static_cast<Vbid>(-1));
454                if (++num_stream_ends_received == stream_ctxs.size()) {
455                    done = true;
456                }
457                bytes_read += producers.last_packet_size;
458                all_bytes += producers.last_packet_size;
459                break;
460            case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
461                cb_assert(vbid != static_cast<Vbid>(-1));
462                if (stats.exp_disk_snapshot &&
463                    stats.num_snapshot_markers == 0) {
464                    checkeq(uint32_t{1},
465                            producers.last_flags,
466                            "Expected disk snapshot");
467                }
468
469                if (producers.last_flags & 8) {
470                    stats.pending_marker_ack = true;
471                    stats.marker_end = producers.last_snap_end_seqno;
472                }
473
474                stats.num_snapshot_markers++;
475                bytes_read += producers.last_packet_size;
476                all_bytes += producers.last_packet_size;
477                break;
478            case cb::mcbp::ClientOpcode::DcpSetVbucketState:
479                cb_assert(vbid != static_cast<Vbid>(-1));
480                if (producers.last_vbucket_state == vbucket_state_pending) {
481                    stats.num_set_vbucket_pending++;
482                    for (size_t j = 0; j < stats.extra_takeover_ops; ++j) {
483                        std::string key("key" + std::to_string(j));
484                        checkeq(ENGINE_SUCCESS,
485                                store(h,
486                                      NULL,
487                                      OPERATION_SET,
488                                      key.c_str(),
489                                      "data",
490                                      nullptr,
491                                      0,
492                                      vbid),
493                                "Failed to store a value");
494                    }
495                } else if (producers.last_vbucket_state ==
496                           vbucket_state_active) {
497                    stats.num_set_vbucket_active++;
498                }
499                bytes_read += producers.last_packet_size;
500                all_bytes += producers.last_packet_size;
501                sendDcpAck(h,
502                           cookie,
503                           cb::mcbp::ClientOpcode::DcpSetVbucketState,
504                           cb::mcbp::Status::Success,
505                           producers.last_opaque);
506                break;
507            case cb::mcbp::ClientOpcode::Invalid:
508                if (disable_ack && flow_control_buf_size &&
509                    (bytes_read >= flow_control_buf_size)) {
510                    /* If there is no acking and if flow control is enabled
511                       we are done because producer should not send us any
512                       more items. We need this to test that producer stops
513                       sending items correctly when there are no acks while
514                       flow control is enabled */
515                    done = true;
516                    exp_all_items_streamed = false;
517                } else {
518                    /* No messages were ready on the last step call, so we
519                     * wait till the conn is notified of new item.
520                     * Note that we check for 0 because we clear the
521                     * producers.last_op value below.
522                     */
523                    testHarness->lock_cookie(cookie);
524                    /* waitfor_cookie() waits on a condition variable. But
525                       the api expects the cookie to be locked before
526                       calling it */
527                    testHarness->waitfor_cookie(cookie);
528                    testHarness->unlock_cookie(cookie);
529                }
530                break;
531            default:
532                // Aborting ...
533                std::stringstream ss;
534                ss << "Unknown DCP operation: " << to_string(producers.last_op);
535                check(false, ss.str().c_str());
536            }
537            producers.last_op = cb::mcbp::ClientOpcode::Invalid;
538            producers.last_nru = 0;
539            producers.last_vbucket = Vbid(-1);
540        }
541    } while (!done);
542
543    total_bytes += all_bytes;
544
545    for (const auto& ctx : stream_ctxs) {
546        if (!ctx.skip_verification) {
547            auto &stats = vb_stats[ctx.vbucket];
548            if (simulate_cursor_dropping) {
549                if (stats.num_snapshot_markers == 0) {
550                    cb_assert(stats.num_mutations == 0 &&
551                              stats.num_deletions == 0);
552                } else {
553                    checkge(ctx.exp_mutations, stats.num_mutations,
554                          "Invalid number of mutations");
555                    checkge(ctx.exp_deletions, stats.num_deletions,
556                          "Invalid number of deletes");
557                    checkge(ctx.exp_expirations,
558                            stats.num_expirations,
559                            "Invalid number of expirations");
560                }
561            } else {
562                // Account for cursors that may have been dropped because
563                // of high memory usage
564                if (get_int_stat(h, "ep_cursors_dropped") > 0) {
565                    // Hard to predict exact number of markers to be received
566                    // if in case of a live parallel front end load
567                    if (!ctx.live_frontend_client) {
568                        checkle(stats.num_snapshot_markers, ctx.exp_markers,
569                                "Invalid number of markers");
570                    }
571                    checkle(stats.num_mutations, ctx.exp_mutations,
572                            "Invalid number of mutations");
573                    checkle(stats.num_deletions, ctx.exp_deletions,
574                            "Invalid number of deletions");
575                    checkle(stats.num_expirations,
576                            ctx.exp_expirations,
577                            "Invalid number of expirations");
578                } else {
579                    checkeq(ctx.exp_mutations, stats.num_mutations,
580                            "Invalid number of mutations");
581                    checkeq(ctx.exp_deletions, stats.num_deletions,
582                            "Invalid number of deletes");
583                    checkeq(ctx.exp_expirations,
584                            stats.num_expirations,
585                            "Invalid number of expirations");
586                    if (ctx.live_frontend_client) {
587                        // Hard to predict exact number of markers to be received
588                        // if in case of a live parallel front end load
589                        if (ctx.exp_mutations > 0 || ctx.exp_deletions > 0 ||
590                            ctx.exp_expirations > 0) {
591                            checkle(size_t{1},
592                                    stats.num_snapshot_markers,
593                                    "Snapshot marker count can't be zero");
594                        }
595                    } else {
596                        checkeq(ctx.exp_markers, stats.num_snapshot_markers,
597                                "Unexpected number of snapshot markers");
598                    }
599                }
600            }
601
602            if (ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) {
603                checkeq(size_t{1},
604                        stats.num_set_vbucket_pending,
605                        "Didn't receive pending set state");
606                checkeq(size_t{1},
607                        stats.num_set_vbucket_active,
608                        "Didn't receive active set state");
609            }
610
611            /* Check if the readyQ size goes to zero after all items are streamed */
612            if (exp_all_items_streamed) {
613                std::stringstream stats_ready_queue_memory;
614                stats_ready_queue_memory << "eq_dcpq:" << name.c_str()
615                                         << ":stream_" << ctx.vbucket.get()
616                                         << "_ready_queue_memory";
617                checkeq(uint64_t{0},
618                        get_ull_stat(h,
619                                     stats_ready_queue_memory.str().c_str(),
620                                     "dcp"),
621                        "readyQ size did not go to zero");
622
623                std::string stats_backfill_buffer_items(
624                        "eq_dcpq:" + name + ":stream_" +
625                        std::to_string(ctx.vbucket.get()) +
626                        "_backfill_buffer_items");
627                checkeq(uint64_t{0},
628                        get_ull_stat(
629                                h, stats_backfill_buffer_items.c_str(), "dcp"),
630                        "backfill buffer items did not go to zero");
631            }
632            if (ctx.expected_values) {
633                checkeq(ctx.expected_values,
634                        stats.num_values,
635                        "Expected values didn't match");
636            }
637        }
638    }
639
640    /* Check if the producer has updated flow control stat correctly */
641    if (flow_control_buf_size) {
642        char stats_buffer[50] = {0};
643        snprintf(stats_buffer, sizeof(stats_buffer), "eq_dcpq:%s:unacked_bytes",
644                 name.c_str());
645        checkeq((all_bytes - total_acked_bytes),
646                get_ull_stat(h, stats_buffer, "dcp"),
647                "Buffer Size did not get set correctly");
648    }
649}
650
651void TestDcpConsumer::stop() {
652    this->done = true;
653}
654
655void TestDcpConsumer::openConnection(uint32_t flags) {
656    /* Reset any stale dcp data */
657    producers.clear_dcp_data();
658
659    opaque = 1;
660
661    /* Set up Producer at server */
662    checkeq(ENGINE_SUCCESS,
663            dcp->open(cookie,
664                      ++opaque,
665                      0,
666                      flags,
667                      name,
668                      R"({"consumer_name":"replica1"})"),
669            "Failed dcp producer open connection.");
670
671    /* Set flow control buffer size */
672    std::string flow_control_buf_sz(std::to_string(flow_control_buf_size));
673    checkeq(ENGINE_SUCCESS,
674            dcp->control(cookie,
675                         ++opaque,
676                         "connection_buffer_size",
677                         flow_control_buf_sz),
678            "Failed to establish connection buffer");
679    char stats_buffer[50] = {0};
680    if (flow_control_buf_size) {
681        snprintf(stats_buffer, sizeof(stats_buffer),
682                 "eq_dcpq:%s:max_buffer_bytes", name.c_str());
683        checkeq(static_cast<int>(flow_control_buf_size),
684                get_int_stat(h, stats_buffer, "dcp"),
685                "TestDcpConsumer::openConnection() : "
686                "Buffer Size did not get set correctly");
687    } else {
688        snprintf(stats_buffer, sizeof(stats_buffer),
689                 "eq_dcpq:%s:flow_control", name.c_str());
690        std::string status = get_str_stat(h, stats_buffer, "dcp");
691        checkeq(status, std::string("disabled"), "Flow control enabled!");
692    }
693
694    checkeq(ENGINE_SUCCESS,
695            dcp->control(cookie, ++opaque, "enable_ext_metadata", "true"),
696            "Failed to enable xdcr extras");
697}
698
699ENGINE_ERROR_CODE TestDcpConsumer::openStreams() {
700    for (auto& ctx : stream_ctxs) {
701        /* Different opaque for every stream created */
702        ++opaque;
703
704        /* Initiate stream request */
705        uint64_t rollback = 0;
706        ENGINE_ERROR_CODE rv = dcp->stream_req(cookie,
707                                               ctx.flags,
708                                               opaque,
709                                               ctx.vbucket,
710                                               ctx.seqno.start,
711                                               ctx.seqno.end,
712                                               ctx.vb_uuid,
713                                               ctx.snapshot.start,
714                                               ctx.snapshot.end,
715                                               &rollback,
716                                               mock_dcp_add_failover_log,
717                                               {});
718
719        checkeq(ctx.exp_err, rv, "Failed to initiate stream request");
720
721        if (rv == ENGINE_NOT_MY_VBUCKET || rv == ENGINE_ENOTSUP) {
722            return rv;
723        }
724
725        if (rv == ENGINE_ROLLBACK || rv == ENGINE_KEY_ENOENT) {
726            checkeq(ctx.exp_rollback, rollback,
727                    "Rollback didn't match expected value");
728            return rv;
729        }
730
731        if (ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) {
732            ctx.seqno.end  = std::numeric_limits<uint64_t>::max();
733        } else if (ctx.flags & DCP_ADD_STREAM_FLAG_LATEST ||
734                   ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
735            std::string high_seqno("vb_" + std::to_string(ctx.vbucket.get()) +
736                                   ":high_seqno");
737            ctx.seqno.end =
738                    get_ull_stat(h, high_seqno.c_str(), "vbucket-seqno");
739        }
740
741        std::stringstream stats_flags;
742        stats_flags << "eq_dcpq:" << name.c_str() << ":stream_"
743                    << ctx.vbucket.get() << "_flags";
744        checkeq(ctx.flags,
745                (uint32_t)get_int_stat(h, stats_flags.str().c_str(), "dcp"),
746                "Flags didn't match");
747
748        std::stringstream stats_opaque;
749        stats_opaque << "eq_dcpq:" << name.c_str() << ":stream_"
750                     << ctx.vbucket.get() << "_opaque";
751        checkeq(opaque,
752                (uint32_t)get_int_stat(h, stats_opaque.str().c_str(), "dcp"),
753                "Opaque didn't match");
754        ctx.opaque = opaque;
755
756        std::stringstream stats_start_seqno;
757        stats_start_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
758                          << ctx.vbucket.get() << "_start_seqno";
759        checkeq(ctx.seqno.start,
760                (uint64_t)get_ull_stat(
761                        h, stats_start_seqno.str().c_str(), "dcp"),
762                "Start Seqno Didn't match");
763
764        std::stringstream stats_end_seqno;
765        stats_end_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
766                        << ctx.vbucket.get() << "_end_seqno";
767        checkeq(ctx.seqno.end,
768                (uint64_t)get_ull_stat(h, stats_end_seqno.str().c_str(), "dcp"),
769                "End Seqno didn't match");
770
771        std::stringstream stats_vb_uuid;
772        stats_vb_uuid << "eq_dcpq:" << name.c_str() << ":stream_"
773                      << ctx.vbucket.get() << "_vb_uuid";
774        checkeq(ctx.vb_uuid,
775                (uint64_t)get_ull_stat(h, stats_vb_uuid.str().c_str(), "dcp"),
776                "VBucket UUID didn't match");
777
778        std::stringstream stats_snap_seqno;
779        stats_snap_seqno << "eq_dcpq:" << name.c_str() << ":stream_"
780                         << ctx.vbucket.get() << "_snap_start_seqno";
781        checkeq(ctx.snapshot.start,
782                (uint64_t)get_ull_stat(
783                        h, stats_snap_seqno.str().c_str(), "dcp"),
784                "snap start seqno didn't match");
785
786        if ((ctx.flags & DCP_ADD_STREAM_FLAG_TAKEOVER) &&
787            !ctx.skip_estimate_check) {
788            std::string high_seqno_str(
789                    "vb_" + std::to_string(ctx.vbucket.get()) + ":high_seqno");
790            uint64_t vb_high_seqno =
791                    get_ull_stat(h, high_seqno_str.c_str(), "vbucket-seqno");
792            uint64_t est = vb_high_seqno - ctx.seqno.start;
793            std::stringstream stats_takeover;
794            stats_takeover << "dcp-vbtakeover " << ctx.vbucket.get() << " "
795                           << name.c_str();
796            wait_for_stat_to_be_lte(
797                    h, "estimate", est, stats_takeover.str().c_str());
798        }
799
800        if (ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
801            /* Wait for backfill to start */
802            std::string stats_backfill_read_bytes("eq_dcpq:" + name +
803                                                  ":backfill_buffer_bytes_read");
804            wait_for_stat_to_be_gte(
805                    h, stats_backfill_read_bytes.c_str(), 0, "dcp");
806            /* Verify that we have no dcp cursors in the checkpoint. (There will
807             just be one persistence cursor) */
808            std::string stats_num_conn_cursors(
809                    "vb_" + std::to_string(ctx.vbucket.get()) +
810                    ":num_conn_cursors");
811            /* In case of persistent buckets there will be 1 persistent cursor,
812               in case of ephemeral buckets there will be no cursor */
813            checkge(1,
814                    get_int_stat(h, stats_num_conn_cursors.c_str(),
815                            "checkpoint"),
816                    "DCP cursors not expected to be registered");
817        }
818
819        // Init stats used in test
820        VBStats stats;
821        stats.extra_takeover_ops = ctx.extra_takeover_ops;
822        stats.exp_disk_snapshot = ctx.exp_disk_snapshot;
823        stats.exp_conflict_res = ctx.exp_conflict_res;
824
825        vb_stats[ctx.vbucket] = stats;
826    }
827    return ENGINE_SUCCESS;
828}
829
830ENGINE_ERROR_CODE TestDcpConsumer::closeStreams(bool fClear) {
831    ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
832    for (auto& ctx : stream_ctxs) {
833        if (ctx.opaque > 0) {
834            err = dcp->close_stream(cookie, ctx.opaque, Vbid(0), {});
835            if (ENGINE_SUCCESS != err) {
836                break;
837            }
838        }
839    }
840
841    if (fClear) {
842        stream_ctxs.clear();
843    }
844    return err;
845}
846
847static void notifier_request(EngineIface* h,
848                             const void* cookie,
849                             uint32_t opaque,
850                             Vbid vbucket,
851                             uint64_t start,
852                             bool shouldSucceed) {
853    uint32_t flags = 0;
854    uint64_t rollback = 0;
855    uint64_t vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
856    uint64_t snap_start_seqno = get_ull_stat(h, "vb_0:0:seq", "failovers");
857    uint64_t snap_end_seqno = snap_start_seqno;
858    auto dcp = requireDcpIface(h);
859
860    ENGINE_ERROR_CODE err = dcp->stream_req(cookie,
861                                            flags,
862                                            opaque,
863                                            vbucket,
864                                            start,
865                                            0,
866                                            vb_uuid,
867                                            snap_start_seqno,
868                                            snap_end_seqno,
869                                            &rollback,
870                                            mock_dcp_add_failover_log,
871                                            {});
872    checkeq(ENGINE_SUCCESS, err, "Failed to initiate stream request");
873
874    std::string type = get_str_stat(h, "eq_dcpq:unittest:type", "dcp");
875    checkeq("notifier"s, type, "Consumer not found");
876
877    checkeq(flags,
878            static_cast<uint32_t>(
879                    get_int_stat(h, "eq_dcpq:unittest:stream_0_flags", "dcp")),
880            "Flags didn't match");
881    checkeq(opaque,
882            static_cast<uint32_t>(
883                    get_int_stat(h, "eq_dcpq:unittest:stream_0_opaque", "dcp")),
884            "Opaque didn't match");
885    checkeq(start,
886            get_ull_stat(h, "eq_dcpq:unittest:stream_0_start_seqno", "dcp"),
887            "Start Seqno Didn't match");
888    checkeq(uint64_t{0},
889            get_ull_stat(h, "eq_dcpq:unittest:stream_0_end_seqno", "dcp"),
890            "End Seqno didn't match");
891    checkeq(vb_uuid,
892            get_ull_stat(h, "eq_dcpq:unittest:stream_0_vb_uuid", "dcp"),
893            "VBucket UUID didn't match");
894    checkeq(snap_start_seqno,
895            get_ull_stat(h,"eq_dcpq:unittest:stream_0_snap_start_seqno", "dcp"),
896            "snap start seqno didn't match");
897}
898
899static void dcp_stream_to_replica(EngineIface* h,
900                                  const void* cookie,
901                                  uint32_t opaque,
902                                  Vbid vbucket,
903                                  uint32_t flags,
904                                  uint64_t start,
905                                  uint64_t end,
906                                  uint64_t snap_start_seqno,
907                                  uint64_t snap_end_seqno,
908                                  uint8_t cas = 0x1,
909                                  uint8_t datatype = 1,
910                                  uint32_t exprtime = 0,
911                                  uint32_t lockTime = 0,
912                                  uint64_t revSeqno = 0) {
913    /* Send snapshot marker */
914    auto dcp = requireDcpIface(h);
915    checkeq(ENGINE_SUCCESS,
916            dcp->snapshot_marker(cookie,
917                                 opaque,
918                                 vbucket,
919                                 snap_start_seqno,
920                                 snap_end_seqno,
921                                 flags,
922                                 0 /*HCS*/,
923                                 {} /*maxVisibleSeqno*/),
924            "Failed to send marker!");
925    const std::string data("data");
926    /* Send DCP mutations */
927    for (uint64_t i = start; i <= end; i++) {
928        const std::string key{"key" + std::to_string(i)};
929        const DocKey docKey{key, DocKeyEncodesCollectionId::No};
930        const cb::const_byte_buffer value{(uint8_t*)data.data(), data.size()};
931        checkeq(ENGINE_SUCCESS,
932                dcp->mutation(cookie,
933                              opaque,
934                              docKey,
935                              value,
936                              0, // priv bytes
937                              PROTOCOL_BINARY_RAW_BYTES,
938                              cas,
939                              vbucket,
940                              flags,
941                              i, // by seqno
942                              revSeqno,
943                              exprtime,
944                              lockTime,
945                              {},
946                              INITIAL_NRU_VALUE),
947                "Failed dcp mutate.");
948    }
949}
950
951/* This is a helper function to read items from an existing DCP Producer. It
952   reads items from start to end on the connection. (Note: this can work
953   correctly only in case there is one vbucket)
954   Currently this supports only streaming mutations, but can be extend to stream
955   deletion etc */
956static void dcp_stream_from_producer_conn(EngineIface* h,
957                                          const void* cookie,
958                                          uint32_t opaque,
959                                          uint64_t start,
960                                          uint64_t end,
961                                          uint64_t expSnapStart,
962                                          MockDcpMessageProducers& producers) {
963    bool done = false;
964    size_t bytes_read = 0;
965    bool pending_marker_ack = false;
966    uint64_t marker_end = 0;
967    uint64_t num_mutations = 0;
968    uint64_t last_snap_start_seqno = 0;
969    auto dcp = requireDcpIface(h);
970
971    do {
972        if (bytes_read > 512) {
973            checkeq(ENGINE_SUCCESS,
974                    dcp->buffer_acknowledgement(
975                            cookie, ++opaque, Vbid(0), bytes_read),
976                    "Failed to get dcp buffer ack");
977            bytes_read = 0;
978        }
979        ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
980        if (err == ENGINE_DISCONNECT) {
981            done = true;
982        } else {
983            switch (producers.last_op) {
984            case cb::mcbp::ClientOpcode::DcpMutation:
985                bytes_read += producers.last_packet_size;
986                if (pending_marker_ack &&
987                    producers.last_byseqno == marker_end) {
988                    sendDcpAck(h,
989                               cookie,
990                               cb::mcbp::ClientOpcode::DcpSnapshotMarker,
991                               cb::mcbp::Status::Success,
992                               producers.last_opaque);
993                }
994                num_mutations++;
995                break;
996            case cb::mcbp::ClientOpcode::DcpStreamEnd:
997                done = true;
998                bytes_read += producers.last_packet_size;
999                break;
1000            case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
1001                if (producers.last_flags & 8) {
1002                    pending_marker_ack = true;
1003                    marker_end = producers.last_snap_end_seqno;
1004                }
1005                bytes_read += producers.last_packet_size;
1006                last_snap_start_seqno = producers.last_snap_start_seqno;
1007                break;
1008            case cb::mcbp::ClientOpcode::Invalid:
1009                break;
1010            default:
1011                // Aborting ...
1012                std::string err_string(
1013                        "Unexpected DCP operation: " +
1014                        to_string(producers.last_op) + " last_byseqno: " +
1015                        std::to_string(producers.last_byseqno.load()) +
1016                        " last_key: " + producers.last_key + " last_value: " +
1017                        producers.last_value + " last_flags: " +
1018                        std::to_string(producers.last_flags));
1019                check(false, err_string.c_str());
1020            }
1021            if (producers.last_byseqno >= end) {
1022                done = true;
1023            }
1024            producers.last_op = cb::mcbp::ClientOpcode::Invalid;
1025        }
1026    } while (!done);
1027
1028    /* Do buffer ack of the outstanding bytes */
1029    dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
1030    checkeq((end - start + 1), num_mutations, "Invalid number of mutations");
1031    if (expSnapStart) {
1032        checkge(last_snap_start_seqno,
1033                expSnapStart,
1034                "Incorrect snap start seqno");
1035    }
1036}
1037
1038static void dcp_stream_expiries_to_replica(EngineIface* h,
1039                                           const void* cookie,
1040                                           uint32_t opaque,
1041                                           Vbid vbucket,
1042                                           uint32_t flags,
1043                                           uint64_t start,
1044                                           uint64_t end,
1045                                           uint64_t snap_start_seqno,
1046                                           uint64_t snap_end_seqno,
1047                                           uint32_t delTime,
1048                                           uint64_t revSeqno = 0,
1049                                           uint8_t cas = 0x1) {
1050    auto dcp = requireDcpIface(h);
1051    checkeq(ENGINE_SUCCESS,
1052            dcp->snapshot_marker(cookie,
1053                                 opaque,
1054                                 vbucket,
1055                                 snap_start_seqno,
1056                                 snap_end_seqno,
1057                                 flags,
1058                                 0 /*HCS*/,
1059                                 {} /*maxVisibleSeqno*/),
1060            "Failed to send marker!");
1061    const std::string data("data");
1062    /* Stream Expiries */
1063    for (uint64_t i = start; i <= end; i++) {
1064        const std::string key{"key" + std::to_string(i)};
1065        const DocKey docKey{key, DocKeyEncodesCollectionId::No};
1066        checkeq(ENGINE_SUCCESS,
1067                dcp->expiration(cookie,
1068                                opaque,
1069                                docKey,
1070                                {},
1071                                0, // priv bytes
1072                                PROTOCOL_BINARY_RAW_BYTES,
1073                                cas,
1074                                vbucket,
1075                                i,
1076                                revSeqno,
1077                                delTime),
1078                "Failed dcp expiry");
1079    }
1080}
1081
1082struct mb16357_ctx {
1083    mb16357_ctx(EngineIface* _h, int _items)
1084        : h(_h), dcp(requireDcpIface(h)), items(_items) {
1085    }
1086
1087    EngineIface* h;
1088    gsl::not_null<DcpIface*> dcp;
1089    int items;
1090    std::mutex mutex;
1091    std::condition_variable cv;
1092    bool compactor_waiting{false};
1093    bool compaction_start{false};
1094};
1095
1096struct writer_thread_ctx {
1097    EngineIface* h;
1098    int items;
1099    Vbid vbid;
1100};
1101
1102struct continuous_dcp_ctx {
1103    EngineIface* h;
1104    const void *cookie;
1105    Vbid vbid;
1106    const std::string &name;
1107    uint64_t start_seqno;
1108    std::unique_ptr<TestDcpConsumer> dcpConsumer;
1109};
1110
1111//Forward declaration required for dcp_thread_func
1112static uint32_t add_stream_for_consumer(EngineIface* h,
1113                                        const void* cookie,
1114                                        uint32_t opaque,
1115                                        Vbid vbucket,
1116                                        uint32_t flags,
1117                                        cb::mcbp::Status response,
1118                                        uint64_t exp_snap_start = 0,
1119                                        uint64_t exp_snap_end = 0);
1120
1121extern "C" {
1122    static void dcp_thread_func(void *args) {
1123        struct mb16357_ctx *ctx = static_cast<mb16357_ctx *>(args);
1124
1125        const void* cookie = testHarness->create_cookie();
1126        uint32_t opaque = 0xFFFF0000;
1127        uint32_t flags = 0;
1128        std::string name = "unittest";
1129
1130        // Wait for compaction thread to to ready (and waiting on cv) - as
1131        // we don't want the nofify_one() to be lost.
1132        for (;;) {
1133            std::lock_guard<std::mutex> lh(ctx->mutex);
1134            if (ctx->compactor_waiting) {
1135                break;
1136            }
1137        };
1138        // Now compactor is waiting to run (and we are just about to start DCP
1139        // stream, activate compaction.
1140        {
1141            std::lock_guard<std::mutex> lh(ctx->mutex);
1142            ctx->compaction_start = true;
1143        }
1144        ctx->cv.notify_one();
1145
1146        // Switch to replica
1147        check(set_vbucket_state(ctx->h, Vbid(0), vbucket_state_replica),
1148              "Failed to set vbucket state.");
1149
1150        // Open consumer connection
1151        checkeq(ctx->dcp->open(cookie,
1152                               opaque,
1153                               0,
1154                               flags,
1155                               name,
1156                               R"({"consumer_name":"replica1"})"),
1157                ENGINE_SUCCESS,
1158                "Failed dcp Consumer open connection.");
1159
1160        add_stream_for_consumer(ctx->h,
1161                                cookie,
1162                                opaque++,
1163                                Vbid(0),
1164                                0,
1165                                cb::mcbp::Status::Success);
1166
1167        uint32_t stream_opaque =
1168                get_int_stat(ctx->h, "eq_dcpq:unittest:stream_0_opaque", "dcp");
1169
1170        for (int i = 1; i <= ctx->items; i++) {
1171            std::stringstream ss;
1172            ss << "kamakeey-" << i;
1173
1174            // send mutations in single mutation snapshots to race more with compaction
1175            ctx->dcp->snapshot_marker(cookie,
1176                                      stream_opaque,
1177                                      Vbid(0),
1178                                      ctx->items,
1179                                      ctx->items + i,
1180                                      2,
1181                                      0 /*HCS*/,
1182                                      {} /*maxVisibleSeqno*/);
1183
1184            const std::string key = ss.str();
1185            const DocKey docKey{key, DocKeyEncodesCollectionId::No};
1186            ctx->dcp->mutation(cookie,
1187                               stream_opaque,
1188                               docKey,
1189                               {(const uint8_t*)"value", 5},
1190                               0, // priv bytes
1191                               PROTOCOL_BINARY_RAW_BYTES,
1192                               i * 3, // cas
1193                               Vbid(0),
1194                               0, // flags
1195                               i + ctx->items, // by_seqno
1196                               i + ctx->items, // rev_seqno
1197                               0, // exptime
1198                               0, // locktime
1199                               {}, // meta
1200                               INITIAL_NRU_VALUE);
1201        }
1202
1203        testHarness->destroy_cookie(cookie);
1204    }
1205
1206    static void compact_thread_func(void *args) {
1207        struct mb16357_ctx *ctx = static_cast<mb16357_ctx *>(args);
1208        std::unique_lock<std::mutex> lk(ctx->mutex);
1209        ctx->compactor_waiting = true;
1210        ctx->cv.wait(lk, [ctx]{return ctx->compaction_start;});
1211        compact_db(ctx->h, Vbid(0), Vbid(0), 99, ctx->items, 1);
1212    }
1213
1214    static void writer_thread(void *args) {
1215        struct writer_thread_ctx *wtc = static_cast<writer_thread_ctx *>(args);
1216
1217        for (int i = 0; i < wtc->items; ++i) {
1218            std::string key("key_" + std::to_string(i));
1219            checkeq(ENGINE_SUCCESS,
1220                    store(wtc->h,
1221                          nullptr,
1222                          OPERATION_SET,
1223                          key.c_str(),
1224                          "somevalue",
1225                          nullptr,
1226                          0,
1227                          wtc->vbid),
1228                    "Failed to store value");
1229        }
1230    }
1231
1232    static void continuous_dcp_thread(void *args) {
1233        struct continuous_dcp_ctx *cdc = static_cast<continuous_dcp_ctx *>(args);
1234
1235        DcpStreamCtx ctx;
1236        ctx.vbucket = cdc->vbid;
1237        std::string vbuuid_entry("vb_" + std::to_string(cdc->vbid.get()) +
1238                                 ":0:id");
1239        ctx.vb_uuid = get_ull_stat(cdc->h, vbuuid_entry.c_str(), "failovers");
1240        ctx.seqno = {cdc->start_seqno, std::numeric_limits<uint64_t>::max()};
1241        ctx.snapshot = {cdc->start_seqno, cdc->start_seqno};
1242        ctx.skip_verification = true;
1243
1244        cdc->dcpConsumer->addStreamCtx(ctx);
1245        cdc->dcpConsumer->run();
1246    }
1247}
1248
1249/* DCP step thread that keeps running till it reads upto 'exp_mutations'.
1250   Note: the exp_mutations is cumulative across all streams in the DCP
1251         connection */
1252static void dcp_waiting_step(EngineIface* h,
1253                             const void* cookie,
1254                             uint32_t opaque,
1255                             uint64_t exp_mutations,
1256                             MockDcpMessageProducers& producers) {
1257    bool done = false;
1258    size_t bytes_read = 0;
1259    bool pending_marker_ack = false;
1260    uint64_t marker_end = 0;
1261    uint64_t num_mutations = 0;
1262
1263    auto dcp = requireDcpIface(h);
1264
1265    do {
1266        if (bytes_read > 512) {
1267            checkeq(ENGINE_SUCCESS,
1268                    dcp->buffer_acknowledgement(
1269                            cookie, ++opaque, Vbid(0), bytes_read),
1270                    "Failed to get dcp buffer ack");
1271            bytes_read = 0;
1272        }
1273        ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
1274        if (err == ENGINE_DISCONNECT) {
1275            done = true;
1276        } else {
1277            switch (producers.last_op) {
1278            case cb::mcbp::ClientOpcode::DcpMutation:
1279                bytes_read += producers.last_packet_size;
1280                if (pending_marker_ack &&
1281                    producers.last_byseqno == marker_end) {
1282                    sendDcpAck(h,
1283                               cookie,
1284                               cb::mcbp::ClientOpcode::DcpSnapshotMarker,
1285                               cb::mcbp::Status::Success,
1286                               producers.last_opaque);
1287                }
1288                ++num_mutations;
1289                break;
1290            case cb::mcbp::ClientOpcode::DcpStreamEnd:
1291                done = true;
1292                bytes_read += producers.last_packet_size;
1293                break;
1294            case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
1295                if (producers.last_flags & 8) {
1296                    pending_marker_ack = true;
1297                    marker_end = producers.last_snap_end_seqno;
1298                }
1299                bytes_read += producers.last_packet_size;
1300                break;
1301            case cb::mcbp::ClientOpcode::Invalid:
1302                /* No messages were ready on the last step call, so we
1303                 * wait till the conn is notified of new item.
1304                 * Note that we check for 0 because we clear the
1305                 * producers.last_op value below.
1306                 */
1307                testHarness->lock_cookie(cookie);
1308                /* waitfor_cookie() waits on a condition variable. But
1309                   the api expects the cookie to be locked before
1310                   calling it */
1311                wait_started = true;
1312                testHarness->waitfor_cookie(cookie);
1313                testHarness->unlock_cookie(cookie);
1314                break;
1315            default:
1316                // Aborting ...
1317                std::string err_string("Unexpected DCP operation: " +
1318                                       to_string(producers.last_op));
1319                check(false, err_string.c_str());
1320            }
1321            if (num_mutations >= exp_mutations) {
1322                done = true;
1323            }
1324            producers.last_op = cb::mcbp::ClientOpcode::Invalid;
1325        }
1326    } while (!done);
1327
1328    /* Do buffer ack of the outstanding bytes */
1329    dcp->buffer_acknowledgement(cookie, ++opaque, Vbid(0), bytes_read);
1330}
1331
1332// Testcases //////////////////////////////////////////////////////////////////
1333
1334static enum test_result test_dcp_vbtakeover_no_stream(EngineIface* h) {
1335    write_items(h, 10);
1336    if (isPersistentBucket(h) && is_full_eviction(h)) {
1337        // MB-21646: FE mode - curr_items (which is part of "estimate") is
1338        // updated as part of flush, and thus if the writes are flushed in
1339        // blocks < 10 we may see an estimate < 10
1340        wait_for_flusher_to_settle(h);
1341    }
1342
1343    const auto est = get_int_stat(h, "estimate", "dcp-vbtakeover 0");
1344    checkeq(10, est, "Invalid estimate for non-existent stream");
1345    checkeq(ENGINE_NOT_MY_VBUCKET,
1346            get_stats(h, "dcp-vbtakeover 1"_ccb, {}, add_stats),
1347            "Expected not my vbucket");
1348
1349    return SUCCESS;
1350}
1351
1352/*
1353 * The following test is similar to the test_dcp_consumer_open test and
1354 * test_dcp_producer_open test, in that it opens a connections, then
1355 * immediately closes it.
1356 * It then moves time forward and repeats the creation of a connection and
1357 * checks that the new connection was created after the previous connection.
1358 */
1359
1360static enum test_result test_dcp_notifier_open(EngineIface* h) {
1361    const auto* cookie1 = testHarness->create_cookie();
1362    const std::string name("unittest");
1363    const uint32_t seqno = 0;
1364    uint32_t opaque = 0;
1365    auto dcp = requireDcpIface(h);
1366
1367    checkeq(ENGINE_SUCCESS,
1368            dcp->open(cookie1,
1369                      opaque,
1370                      seqno,
1371                      cb::mcbp::request::DcpOpenPayload::Notifier,
1372                      name,
1373                      R"({"consumer_name":"replica1"})"),
1374            "Failed dcp consumer open connection.");
1375
1376    const std::string stat_type("eq_dcpq:" + name + ":type");
1377    auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1378    const std::string stat_created("eq_dcpq:" + name + ":created");
1379    const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1380    checkeq(0, type.compare("notifier"), "Notifier not found");
1381    testHarness->destroy_cookie(cookie1);
1382
1383    testHarness->time_travel(600);
1384
1385    const auto* cookie2 = testHarness->create_cookie();
1386    checkeq(ENGINE_SUCCESS,
1387            dcp->open(cookie2,
1388                      opaque,
1389                      seqno,
1390                      cb::mcbp::request::DcpOpenPayload::Notifier,
1391                      name,
1392                      R"({"consumer_name":"replica1"})"),
1393            "Failed dcp consumer open connection.");
1394
1395    type = get_str_stat(h, stat_type.c_str(), "dcp");
1396    checkeq(0, type.compare("notifier"), "Notifier not found");
1397    checkle((created + 600), get_int_stat(h, stat_created.c_str(), "dcp"),
1398            "New dcp stream is not newer");
1399    testHarness->destroy_cookie(cookie2);
1400
1401    return SUCCESS;
1402}
1403
1404static enum test_result test_dcp_notifier(EngineIface* h) {
1405    write_items(h, 10);
1406    const auto* cookie = testHarness->create_cookie();
1407    const std::string name("unittest");
1408    const uint32_t seqno = 0;
1409    const Vbid vbucket = Vbid(0);
1410    uint32_t opaque = 0;
1411    uint64_t start = 0;
1412    auto dcp = requireDcpIface(h);
1413    MockDcpMessageProducers producers(h);
1414
1415    checkeq(ENGINE_SUCCESS,
1416            dcp->open(cookie,
1417                      opaque,
1418                      seqno,
1419                      cb::mcbp::request::DcpOpenPayload::Notifier,
1420                      name,
1421                      R"({"consumer_name":"replica1"})"),
1422            "Failed dcp notifier open connection.");
1423    // Get notification for an old item
1424    notifier_request(h, cookie, ++opaque, vbucket, start, true);
1425    dcp_step(h, cookie, producers);
1426    checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1427            producers.last_op,
1428            "Expected stream end");
1429    // Get notification when we're slightly behind
1430    start += 9;
1431    notifier_request(h, cookie, ++opaque, vbucket, start, true);
1432    dcp_step(h, cookie, producers);
1433    checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1434            producers.last_op,
1435            "Expected stream end");
1436    // Wait for notification of a future item
1437    start += 11;
1438    notifier_request(h, cookie, ++opaque, vbucket, start, true);
1439    dcp_step(h, cookie, producers);
1440    for (auto j = 0; j < 5; ++j) {
1441        const auto key = "key" + std::to_string(j);
1442        checkeq(ENGINE_SUCCESS,
1443                store(h,
1444                      nullptr,
1445                      OPERATION_SET,
1446                      key.c_str(),
1447                      "data",
1448                      nullptr),
1449                "Failed to store a value");
1450    }
1451    // Shouldn't get a stream end yet
1452    dcp_step(h, cookie, producers);
1453    checkne(cb::mcbp::ClientOpcode::DcpStreamEnd,
1454            producers.last_op,
1455            "Wasn't expecting a stream end");
1456    for (auto j = 0; j < 6; ++j) {
1457        const auto key = "key" + std::to_string(j);
1458        checkeq(ENGINE_SUCCESS,
1459                store(h,
1460                      nullptr,
1461                      OPERATION_SET,
1462                      key.c_str(),
1463                      "data",
1464                      nullptr),
1465                "Failed to store a value");
1466    }
1467    // Should get a stream end
1468    dcp_step(h, cookie, producers);
1469    checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1470            producers.last_op,
1471            "Expected stream end");
1472    testHarness->destroy_cookie(cookie);
1473
1474    return SUCCESS;
1475}
1476
1477/*
1478 * The following test is similar to the previous test
1479 * (test_dcp_notifier) , whereby a notifier connection is opened and
1480 * notifier_requests are made checking for the occurance of stream
1481 * end commands.
1482 *
1483 * In the following test we make a notifier_request equal to
1484 * the number of operations performed (in this case one).  The test is
1485 * to ensure that a stream end is not received.  A second operation is
1486 * then performed and this time we check for the occurance of a
1487 * stream end command.
1488 */
1489
1490static enum test_result test_dcp_notifier_equal_to_number_of_items(
1491        EngineIface* h) {
1492    const std::string key("key0");
1493    checkeq(ENGINE_SUCCESS,
1494            store(h, nullptr, OPERATION_SET, key.c_str(), "data", nullptr),
1495            "Failed to store a value");
1496
1497    const auto* cookie = testHarness->create_cookie();
1498    const std::string name("unittest");
1499    const uint32_t seqno = 0;
1500    const Vbid vbucket = Vbid(0);
1501    const uint64_t start = 1;
1502    uint32_t opaque = 0;
1503    auto dcp = requireDcpIface(h);
1504    MockDcpMessageProducers producers(h);
1505
1506    checkeq(ENGINE_SUCCESS,
1507            dcp->open(cookie,
1508                      opaque,
1509                      seqno,
1510                      cb::mcbp::request::DcpOpenPayload::Notifier,
1511                      name,
1512                      R"({"consumer_name":"replica1"})"),
1513            "Failed dcp notifier open connection.");
1514    // Should not get a stream end
1515    notifier_request(h, cookie, ++opaque, vbucket, start, true);
1516    dcp_step(h, cookie, producers);
1517    checkne(cb::mcbp::ClientOpcode::DcpStreamEnd,
1518            producers.last_op,
1519            "Wasn't expecting a stream end");
1520    checkeq(ENGINE_SUCCESS,
1521            store(h, nullptr, OPERATION_SET, "key0", "data"),
1522            "Failed to store a value");
1523    dcp_step(h, cookie, producers);
1524    checkeq(cb::mcbp::ClientOpcode::DcpStreamEnd,
1525            producers.last_op,
1526            "Expected stream end");
1527    testHarness->destroy_cookie(cookie);
1528
1529    return SUCCESS;
1530}
1531
1532static enum test_result test_dcp_consumer_open(EngineIface* h) {
1533    const auto* cookie1 = testHarness->create_cookie();
1534    const std::string name("unittest");
1535    const uint32_t opaque = 0;
1536    const uint32_t seqno = 0;
1537    const uint32_t flags = 0;
1538    auto dcp = requireDcpIface(h);
1539
1540    checkeq(ENGINE_SUCCESS,
1541            dcp->open(cookie1,
1542                      opaque,
1543                      seqno,
1544                      flags,
1545                      name,
1546                      R"({"consumer_name":"replica1"})"),
1547            "Failed dcp consumer open connection.");
1548
1549    const auto stat_type("eq_dcpq:" + name + ":type");
1550    auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1551    const auto stat_created("eq_dcpq:" + name + ":created");
1552    const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1553    checkeq(0, type.compare("consumer"), "Consumer not found");
1554    testHarness->destroy_cookie(cookie1);
1555
1556    testHarness->time_travel(600);
1557
1558    const auto* cookie2 = testHarness->create_cookie();
1559    checkeq(ENGINE_SUCCESS,
1560            dcp->open(cookie2,
1561                      opaque,
1562                      seqno,
1563                      flags,
1564                      name,
1565                      R"({"consumer_name":"replica1"})"),
1566            "Failed dcp consumer open connection.");
1567
1568    type = get_str_stat(h, stat_type.c_str(), "dcp");
1569    checkeq(0, type.compare("consumer"), "Consumer not found");
1570    checklt(created, get_int_stat(h, stat_created.c_str(), "dcp"),
1571            "New dcp stream is not newer");
1572    testHarness->destroy_cookie(cookie2);
1573
1574    return SUCCESS;
1575}
1576
1577static enum test_result test_dcp_consumer_flow_control_none(EngineIface* h) {
1578    const auto* cookie1 = testHarness->create_cookie();
1579    const std::string name("unittest");
1580    const uint32_t opaque = 0;
1581    const uint32_t seqno = 0;
1582    const uint32_t flags = 0;
1583    auto dcp = requireDcpIface(h);
1584
1585    checkeq(ENGINE_SUCCESS,
1586            dcp->open(cookie1,
1587                      opaque,
1588                      seqno,
1589                      flags,
1590                      name,
1591                      R"({"consumer_name":"replica1"})"),
1592            "Failed dcp consumer open connection.");
1593
1594    const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1595    checkeq(0,
1596            get_int_stat(h, stat_name.c_str(), "dcp"),
1597            "Flow Control Buffer Size not zero");
1598    testHarness->destroy_cookie(cookie1);
1599
1600    return SUCCESS;
1601}
1602
1603static enum test_result test_dcp_consumer_flow_control_static(EngineIface* h) {
1604    const auto* cookie1 = testHarness->create_cookie();
1605    const std::string name("unittest");
1606    const uint32_t opaque = 0;
1607    const uint32_t seqno = 0;
1608    const uint32_t flags = 0;
1609    const auto flow_ctl_buf_def_size = 10485760;
1610    auto dcp = requireDcpIface(h);
1611
1612    checkeq(ENGINE_SUCCESS,
1613            dcp->open(cookie1,
1614                      opaque,
1615                      seqno,
1616                      flags,
1617                      name,
1618                      R"({"consumer_name":"replica1"})"),
1619            "Failed dcp consumer open connection.");
1620
1621    const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1622    checkeq(flow_ctl_buf_def_size,
1623            get_int_stat(h, stat_name.c_str(), "dcp"),
1624            "Flow Control Buffer Size not equal to default value");
1625    testHarness->destroy_cookie(cookie1);
1626
1627    return SUCCESS;
1628}
1629
1630static enum test_result test_dcp_consumer_flow_control_dynamic(EngineIface* h) {
1631    const auto* cookie1 = testHarness->create_cookie();
1632    const std::string name("unittest");
1633    const uint32_t opaque = 0;
1634    const uint32_t seqno = 0;
1635    const uint32_t flags = 0;
1636    /* Check the min limit */
1637    set_param(h,
1638              cb::mcbp::request::SetParamPayload::Type::Flush,
1639              "max_size",
1640              "500000000");
1641    checkeq(500000000, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1642
1643    auto dcp = requireDcpIface(h);
1644    checkeq(ENGINE_SUCCESS,
1645            dcp->open(cookie1,
1646                      opaque,
1647                      seqno,
1648                      flags,
1649                      name,
1650                      R"({"consumer_name":"replica1"})"),
1651            "Failed dcp consumer open connection.");
1652
1653    const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
1654    checkeq(10485760,
1655            get_int_stat(h, stat_name.c_str(), "dcp"),
1656            "Flow Control Buffer Size not equal to min");
1657    testHarness->destroy_cookie(cookie1);
1658
1659    /* Check the size as percentage of the bucket memory */
1660    const auto* cookie2 = testHarness->create_cookie();
1661    set_param(h,
1662              cb::mcbp::request::SetParamPayload::Type::Flush,
1663              "max_size",
1664              "2000000000");
1665    checkeq(2000000000, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1666
1667    checkeq(ENGINE_SUCCESS,
1668            dcp->open(cookie2,
1669                      opaque,
1670                      seqno,
1671                      flags,
1672                      name,
1673                      R"({"consumer_name":"replica1"})"),
1674            "Failed dcp consumer open connection.");
1675
1676    checkeq(20000000,
1677            get_int_stat(h, stat_name.c_str(), "dcp"),
1678            "Flow Control Buffer Size not equal to 1% of mem size");
1679    testHarness->destroy_cookie(cookie2);
1680
1681    /* Check the case when mem used by flow control bufs hit the threshold */
1682    /* Create around 10 more connections to use more than 10% of the total
1683       memory */
1684    for (auto count = 0; count < 10; count++) {
1685        const auto* cookie = testHarness->create_cookie();
1686        checkeq(ENGINE_SUCCESS,
1687                dcp->open(cookie,
1688                          opaque,
1689                          seqno,
1690                          flags,
1691                          name,
1692                          R"({"consumer_name":"replica1"})"),
1693                "Failed dcp consumer open connection.");
1694        testHarness->destroy_cookie(cookie);
1695    }
1696    /* By now mem used by flow control bufs would have crossed the threshold */
1697    const auto* cookie3 = testHarness->create_cookie();
1698    checkeq(ENGINE_SUCCESS,
1699            dcp->open(cookie3,
1700                      opaque,
1701                      seqno,
1702                      flags,
1703                      name,
1704                      R"({"consumer_name":"replica1"})"),
1705            "Failed dcp consumer open connection.");
1706
1707    checkeq(10485760,
1708            get_int_stat(h, stat_name.c_str(), "dcp"),
1709            "Flow Control Buffer Size not equal to min after threshold is hit");
1710    testHarness->destroy_cookie(cookie3);
1711
1712    /* Check the max limit */
1713    const auto* cookie4 = testHarness->create_cookie();
1714    set_param(h,
1715              cb::mcbp::request::SetParamPayload::Type::Flush,
1716              "max_size",
1717              "7000000000");
1718    checkeq(static_cast<uint64_t>(7000000000),
1719            get_ull_stat(h, "ep_max_size"),
1720            "Incorrect new size.");
1721
1722    checkeq(ENGINE_SUCCESS,
1723            dcp->open(cookie4,
1724                      opaque,
1725                      seqno,
1726                      flags,
1727                      name,
1728                      R"({"consumer_name":"replica1"})"),
1729            "Failed dcp consumer open connection.");
1730
1731    checkeq(52428800,
1732            get_int_stat(h, stat_name.c_str(), "dcp"),
1733            "Flow Control Buffer Size beyond max");
1734    testHarness->destroy_cookie(cookie4);
1735
1736    return SUCCESS;
1737}
1738
1739static enum test_result test_dcp_consumer_flow_control_aggressive(
1740        EngineIface* h) {
1741    const auto max_conns = 6;
1742    const void *cookie[max_conns];
1743    const auto flow_ctl_buf_max = 52428800;
1744    const auto flow_ctl_buf_min = 10485760;
1745    const auto ep_max_size = 1200000000;
1746    const auto bucketMemQuotaFraction = 0.05;
1747    set_param(h,
1748              cb::mcbp::request::SetParamPayload::Type::Flush,
1749              "max_size",
1750              std::to_string(ep_max_size).c_str());
1751    checkeq(ep_max_size, get_int_stat(h, "ep_max_size"), "Incorrect new size.");
1752
1753    std::vector<Vbid> vbuckets = {
1754            {Vbid{1}, Vbid{2}, Vbid{3}, Vbid{4}, Vbid{5}, Vbid{6}}};
1755    checkeq(std::size_t(max_conns),
1756            vbuckets.size(),
1757            "I need one vbucket per cookie");
1758    for (const auto& vb : vbuckets) {
1759        check(set_vbucket_state(h, vb, vbucket_state_replica),
1760              "Failed to set VBucket state.");
1761    }
1762
1763    /* Create first connection */
1764    const std::string name("unittest_");
1765    const auto name1(name + "0");
1766    const uint32_t opaque = 0;
1767    const uint32_t seqno = 0;
1768    const uint32_t flags = 0;
1769    cookie[0] = testHarness->create_cookie();
1770    auto dcp = requireDcpIface(h);
1771
1772    checkeq(ENGINE_SUCCESS,
1773            dcp->open(cookie[0],
1774                      opaque,
1775                      seqno,
1776                      flags,
1777                      name1,
1778                      R"({"consumer_name":"replica1"})"),
1779            "Failed dcp consumer open connection.");
1780
1781    checkeq(ENGINE_SUCCESS,
1782            dcp->add_stream(cookie[0], 0, vbuckets[0], 0),
1783            "Failed to set up stream");
1784
1785    /* Check the max limit */
1786    auto stat_name = "eq_dcpq:" + name1 + ":max_buffer_bytes";
1787    checkeq(flow_ctl_buf_max,
1788            get_int_stat(h, stat_name.c_str(), "dcp"),
1789            "Flow Control Buffer Size not equal to max");
1790
1791    /* Create at least 4 more connections */
1792    for (auto count = 1; count < max_conns - 1; count++) {
1793        cookie[count] = testHarness->create_cookie();
1794        const auto name2(name + std::to_string(count));
1795        checkeq(ENGINE_SUCCESS,
1796                dcp->open(cookie[count],
1797                          opaque,
1798                          seqno,
1799                          flags,
1800                          name2,
1801                          R"({"consumer_name":"replica1"})"),
1802                "Failed dcp consumer open connection.");
1803
1804        checkeq(ENGINE_SUCCESS,
1805                dcp->add_stream(cookie[count], 0, vbuckets[count], 0),
1806                "Failed to set up stream");
1807
1808        for (auto i = 0; i <= count; i++) {
1809            /* Check if the buffer size of all connections has changed */
1810            const auto stat_name("eq_dcpq:" + name + std::to_string(i) +
1811                               ":max_buffer_bytes");
1812            checkeq((int)((ep_max_size * bucketMemQuotaFraction) / (count + 1)),
1813                    get_int_stat(h, stat_name.c_str(), "dcp"),
1814                    "Flow Control Buffer Size not correct");
1815        }
1816    }
1817
1818    /* Opening another connection should set the buffer size to min value */
1819    cookie[max_conns - 1] = testHarness->create_cookie();
1820    const auto name3(name + std::to_string(max_conns - 1));
1821    const auto stat_name2("eq_dcpq:" + name3 + ":max_buffer_bytes");
1822    checkeq(ENGINE_SUCCESS,
1823            dcp->open(cookie[max_conns - 1],
1824                      opaque,
1825                      seqno,
1826                      flags,
1827                      name3,
1828                      R"({"consumer_name":"replica1"})"),
1829            "Failed dcp consumer open connection.");
1830
1831    checkeq(ENGINE_SUCCESS,
1832            dcp->add_stream(
1833                    cookie[max_conns - 1], 0, vbuckets[max_conns - 1], 0),
1834            "Failed to set up stream");
1835
1836    checkeq(flow_ctl_buf_min,
1837            get_int_stat(h, stat_name2.c_str(), "dcp"),
1838            "Flow Control Buffer Size not equal to min");
1839
1840    /* Disconnect connections and see if flow control
1841     * buffer size of existing conns increase
1842     */
1843    for (auto count = 0; count < max_conns / 2; count++) {
1844        testHarness->destroy_cookie(cookie[count]);
1845    }
1846    /* Wait for disconnected connections to be deleted */
1847    wait_for_stat_to_be(h, "ep_dcp_dead_conn_count", 0, "dcp");
1848
1849    /* Check if the buffer size of all connections has increased */
1850    const auto exp_buf_size = (int)((ep_max_size * bucketMemQuotaFraction) /
1851                              (max_conns - (max_conns / 2)));
1852
1853    /* Also check if we get control message indicating the flow control buffer
1854       size change from the consumer connections */
1855    MockDcpMessageProducers producers(h);
1856
1857    for (auto i = max_conns / 2; i < max_conns; i++) {
1858        /* Check if the buffer size of all connections has changed */
1859        const auto name4(name + std::to_string(i));
1860        const auto stat_name3("eq_dcpq:" + name4 + ":max_buffer_bytes");
1861        checkeq(exp_buf_size,
1862                get_int_stat(h, stat_name3.c_str(), "dcp"),
1863                "Flow Control Buffer Size not correct");
1864        checkeq(ENGINE_SUCCESS,
1865                dcp->step(cookie[i], &producers),
1866                "Pending flow control buffer change not processed");
1867        checkeq(cb::mcbp::ClientOpcode::DcpControl,
1868                producers.last_op,
1869                "Flow ctl buf size change control message not received");
1870        checkeq(0,
1871                producers.last_key.compare("connection_buffer_size"),
1872                "Flow ctl buf size change control message key error");
1873        checkeq(exp_buf_size,
1874                atoi(producers.last_value.c_str()),
1875                "Flow ctl buf size in control message not correct");
1876    }
1877    /* Disconnect remaining connections */
1878    for (auto count = max_conns / 2; count < max_conns; count++) {
1879        testHarness->destroy_cookie(cookie[count]);
1880    }
1881
1882    return SUCCESS;
1883}
1884
1885static enum test_result test_dcp_producer_open(EngineIface* h) {
1886    const auto* cookie1 = testHarness->create_cookie();
1887    const std::string name("unittest");
1888    const uint32_t opaque = 0;
1889    const uint32_t seqno = 0;
1890    auto dcp = requireDcpIface(h);
1891
1892    checkeq(ENGINE_SUCCESS,
1893            dcp->open(cookie1,
1894                      opaque,
1895                      seqno,
1896                      cb::mcbp::request::DcpOpenPayload::Producer,
1897                      name,
1898                      R"({"consumer_name":"replica1"})"),
1899            "Failed dcp producer open connection.");
1900    const auto stat_type("eq_dcpq:" + name + ":type");
1901    auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1902    const auto stat_created("eq_dcpq:" + name + ":created");
1903    const auto created = get_int_stat(h, stat_created.c_str(), "dcp");
1904    checkeq(0, type.compare("producer"), "Producer not found");
1905    testHarness->destroy_cookie(cookie1);
1906
1907    testHarness->time_travel(600);
1908
1909    const auto* cookie2 = testHarness->create_cookie();
1910    checkeq(ENGINE_SUCCESS,
1911            dcp->open(cookie2,
1912                      opaque,
1913                      seqno,
1914                      cb::mcbp::request::DcpOpenPayload::Producer,
1915                      name,
1916                      R"({"consumer_name":"replica1"})"),
1917            "Failed dcp producer open connection.");
1918    type = get_str_stat(h, stat_type.c_str(), "dcp");
1919    checkeq(0, type.compare("producer"), "Producer not found");
1920    checklt(created, get_int_stat(h, stat_created.c_str(), "dcp"),
1921            "New dcp stream is not newer");
1922    testHarness->destroy_cookie(cookie2);
1923
1924    return SUCCESS;
1925}
1926
1927static enum test_result test_dcp_producer_open_same_cookie(EngineIface* h) {
1928    const auto* cookie = testHarness->create_cookie();
1929    const std::string name("unittest");
1930    uint32_t opaque = 0;
1931    const uint32_t seqno = 0;
1932    auto dcp = requireDcpIface(h);
1933
1934    checkeq(ENGINE_SUCCESS,
1935            dcp->open(cookie,
1936                      opaque,
1937                      seqno,
1938                      cb::mcbp::request::DcpOpenPayload::Producer,
1939                      name,
1940                      R"({"consumer_name":"replica1"})"),
1941            "Failed dcp producer open connection.");
1942
1943    const auto stat_type("eq_dcpq:" + name + ":type");
1944    auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1945    checkeq(0, type.compare("producer"), "Producer not found");
1946    /*
1947     * Number of references is 2 (as opposed to 1) because a
1948     * mock_connstuct is initialised to having 1 reference
1949     * to represent a client being connected to it.
1950     */
1951    checkeq(2,
1952            testHarness->get_number_of_mock_cookie_references(cookie),
1953            "Number of cookie references is not two");
1954    /*
1955     * engine_data needs to be reset so that it passes the check that
1956     * a connection does not already exist on the same socket.
1957     */
1958    testHarness->store_engine_specific(cookie, nullptr);
1959
1960    checkeq(ENGINE_DISCONNECT,
1961            dcp->open(cookie,
1962                      opaque++,
1963                      seqno,
1964                      cb::mcbp::request::DcpOpenPayload::Producer,
1965                      name,
1966                      R"({"consumer_name":"replica1"})"),
1967            "Failed to return ENGINE_DISCONNECT");
1968
1969    checkeq(2,
1970            testHarness->get_number_of_mock_cookie_references(cookie),
1971            "Number of cookie references is not two");
1972
1973    testHarness->destroy_cookie(cookie);
1974
1975    checkeq(1,
1976            testHarness->get_number_of_mock_cookie_references(cookie),
1977            "Number of cookie references is not one");
1978
1979    return SUCCESS;
1980}
1981
1982static enum test_result test_dcp_noop(EngineIface* h) {
1983    const auto* cookie = testHarness->create_cookie();
1984    const std::string name("unittest");
1985    const uint32_t seqno = 0;
1986    uint32_t opaque = 0;
1987    auto dcp = requireDcpIface(h);
1988    MockDcpMessageProducers producers(h);
1989
1990    checkeq(ENGINE_SUCCESS,
1991            dcp->open(cookie,
1992                      opaque,
1993                      seqno,
1994                      cb::mcbp::request::DcpOpenPayload::Producer,
1995                      name,
1996                      R"({"consumer_name":"replica1"})"),
1997            "Failed dcp producer open connection.");
1998    const std::string param1_name("connection_buffer_size");
1999    const std::string param1_value("1024");
2000    checkeq(ENGINE_SUCCESS,
2001            dcp->control(cookie, ++opaque, param1_name, param1_value),
2002            "Failed to establish connection buffer");
2003    const std::string param2_name("enable_noop");
2004    const std::string param2_value("true");
2005    checkeq(ENGINE_SUCCESS,
2006            dcp->control(cookie, ++opaque, param2_name, param2_value),
2007            "Failed to enable no-ops");
2008
2009    testHarness->time_travel(201);
2010
2011    auto done = false;
2012    while (!done) {
2013        if (dcp->step(cookie, &producers) == ENGINE_DISCONNECT) {
2014            done = true;
2015        } else if (producers.last_op == cb::mcbp::ClientOpcode::DcpNoop) {
2016            done = true;
2017            // Producer opaques are hard coded to start from 10M
2018            checkeq(10000001,
2019                    (int)producers.last_opaque,
2020                    "last_opaque != 10,000,001");
2021            const auto stat_name("eq_dcpq:" + name + ":noop_wait");
2022            checkeq(1,
2023                    get_int_stat(h, stat_name.c_str(), "dcp"),
2024                    "Didn't send noop");
2025            sendDcpAck(h,
2026                       cookie,
2027                       cb::mcbp::ClientOpcode::DcpNoop,
2028                       cb::mcbp::Status::Success,
2029                       producers.last_opaque);
2030            checkeq(0,
2031                    get_int_stat(h, stat_name.c_str(), "dcp"),
2032                    "Didn't ack noop");
2033        } else if (producers.last_op != cb::mcbp::ClientOpcode::Invalid) {
2034            abort();
2035        }
2036        producers.last_op = cb::mcbp::ClientOpcode::Invalid;
2037    }
2038    testHarness->destroy_cookie(cookie);
2039
2040    return SUCCESS;
2041}
2042
2043static enum test_result test_dcp_noop_fail(EngineIface* h) {
2044    const auto* cookie = testHarness->create_cookie();
2045    const std::string name("unittest");
2046    const uint32_t seqno = 0;
2047    uint32_t opaque = 0;
2048    auto dcp = requireDcpIface(h);
2049
2050    checkeq(ENGINE_SUCCESS,
2051            dcp->open(cookie,
2052                      opaque,
2053                      seqno,
2054                      cb::mcbp::request::DcpOpenPayload::Producer,
2055                      name,
2056                      R"({"consumer_name":"replica1"})"),
2057            "Failed dcp producer open connection.");
2058    const std::string param1_name("connection_buffer_size");
2059    const std::string param1_value("1024");
2060    checkeq(ENGINE_SUCCESS,
2061            dcp->control(cookie, ++opaque, param1_name, param1_value),
2062            "Failed to establish connection buffer");
2063    const std::string param2_name("enable_noop");
2064    const std::string param2_value("true");
2065    checkeq(ENGINE_SUCCESS,
2066            dcp->control(cookie, ++opaque, param2_name, param2_value),
2067            "Failed to enable no-ops");
2068
2069    testHarness->time_travel(201);
2070
2071    MockDcpMessageProducers producers(h);
2072    while (dcp->step(cookie, &producers) != ENGINE_DISCONNECT) {
2073        if (producers.last_op == cb::mcbp::ClientOpcode::DcpNoop) {
2074            // Producer opaques are hard coded to start from 10M
2075            checkeq(10000001,
2076                    (int)producers.last_opaque,
2077                    "last_opaque != 10,000,001");
2078            const auto stat_name("eq_dcpq:" + name + ":noop_wait");
2079            checkeq(1,
2080                    get_int_stat(h, stat_name.c_str(), "dcp"),
2081                    "Didn't send noop");
2082            testHarness->time_travel(201);
2083        } else if (producers.last_op != cb::mcbp::ClientOpcode::Invalid) {
2084            abort();
2085        }
2086    }
2087    testHarness->destroy_cookie(cookie);
2088
2089    return SUCCESS;
2090}
2091
2092static enum test_result test_dcp_consumer_noop(EngineIface* h) {
2093    check(set_vbucket_state(h, Vbid(0), vbucket_state_replica),
2094          "Failed to set vbucket state.");
2095    const auto* cookie = testHarness->create_cookie();
2096    const std::string name("unittest");
2097    const uint32_t seqno = 0;
2098    const uint32_t flags = 0;
2099    const Vbid vbucket = Vbid(0);
2100    uint32_t opaque = 0;
2101    auto dcp = requireDcpIface(h);
2102
2103    // Open consumer connection
2104    checkeq(ENGINE_SUCCESS,
2105            dcp->open(cookie,
2106                      opaque,
2107                      seqno,
2108                      flags,
2109                      name,
2110                      R"({"consumer_name":"replica1"})"),
2111            "Failed dcp Consumer open connection.");
2112    add_stream_for_consumer(
2113            h, cookie, opaque, vbucket, flags, cb::mcbp::Status::Success);
2114    testHarness->time_travel(201);
2115    // No-op not recieved for 201 seconds. Should be ok.
2116    MockDcpMessageProducers producers(h);
2117    checkeq(ENGINE_EWOULDBLOCK,
2118            dcp->step(cookie, &producers),
2119            "Expected engine would block");
2120
2121    testHarness->time_travel(200);
2122
2123    // Message not recieved for over 400 seconds. Should disconnect.
2124    checkeq(ENGINE_DISCONNECT,
2125            dcp->step(cookie, &producers),
2126            "Expected engine disconnect");
2127    testHarness->destroy_cookie(cookie);
2128
2129    return SUCCESS;
2130}
2131
2132/**
2133 * Creates a DCP producer stream with the specified values for noop_manditory,
2134 * noop_enabled, and XATTRs enabled, and then attempts to open a stream,
2135 * checking for the expectedResult code.
2136 */
2137static void test_dcp_noop_mandatory_combo(EngineIface* h,
2138                                          bool noopManditory,
2139                                          bool enableNoop,
2140                                          bool enableXAttrs,
2141                                          ENGINE_ERROR_CODE expectedResult) {
2142    const void* cookie = testHarness->create_cookie();
2143
2144    // Configure manditory noop as requested.
2145    set_param(h,
2146              cb::mcbp::request::SetParamPayload::Type::Flush,
2147              "dcp_noop_mandatory_for_v5_features",
2148              noopManditory ? "true" : "false");
2149    checkeq(noopManditory,
2150            get_bool_stat(h, "ep_dcp_noop_mandatory_for_v5_features"),
2151            "Incorrect value for dcp_noop_mandatory_for_v5_features");
2152
2153    // Create DCP consumer with requested flags.
2154    TestDcpConsumer tdc("dcp_noop_manditory_test", cookie, h);
2155    uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2156    if (enableXAttrs) {
2157        flags |= cb::mcbp::request::DcpOpenPayload::IncludeXattrs;
2158    }
2159    tdc.openConnection(flags);
2160
2161    // Setup noop on the DCP connection.
2162    checkeq(ENGINE_SUCCESS,
2163            tdc.sendControlMessage("enable_noop",
2164                                   enableNoop ? "true" : "false"),
2165            "Failed to configure noop");
2166
2167    // Finally, attempt to create the stream and verify we get the expeced
2168    // response.
2169    DcpStreamCtx ctx;
2170    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2171    ctx.seqno = {0, static_cast<uint64_t>(-1)};
2172    ctx.exp_err = expectedResult;
2173    tdc.addStreamCtx(ctx);
2174
2175    tdc.openStreams();
2176
2177    testHarness->destroy_cookie(cookie);
2178}
2179
2180static enum test_result test_dcp_noop_mandatory(EngineIface* h) {
2181    // Test all combinations of {manditoryNoop, enable_noop, includeXAttr}
2182    test_dcp_noop_mandatory_combo(h, false, false, false, ENGINE_SUCCESS);
2183    test_dcp_noop_mandatory_combo(h, false, false, true, ENGINE_SUCCESS);
2184    test_dcp_noop_mandatory_combo(h, false, true, false, ENGINE_SUCCESS);
2185    test_dcp_noop_mandatory_combo(h, false, true, true, ENGINE_SUCCESS);
2186    test_dcp_noop_mandatory_combo(h, true, false, false, ENGINE_SUCCESS);
2187    test_dcp_noop_mandatory_combo(h, true, false, true, ENGINE_ENOTSUP);
2188    test_dcp_noop_mandatory_combo(h, true, true, false, ENGINE_SUCCESS);
2189    test_dcp_noop_mandatory_combo(h, true, true, true, ENGINE_SUCCESS);
2190
2191    return SUCCESS;
2192}
2193
2194static enum test_result test_dcp_producer_stream_req_open(EngineIface* h) {
2195    const void* cookie = testHarness->create_cookie();
2196    const int num_items = 3;
2197
2198    DcpStreamCtx ctx;
2199    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2200    ctx.seqno = {0, static_cast<uint64_t>(-1)};
2201
2202    std::string name("unittest");
2203    TestDcpConsumer tdc(name.c_str(), cookie, h);
2204    tdc.addStreamCtx(ctx);
2205
2206    tdc.openConnection();
2207
2208    /* Create a separate thread that does tries to get any DCP items */
2209    std::thread dcp_step_thread(
2210            dcp_waiting_step, h, cookie, 0, num_items, std::ref(tdc.producers));
2211
2212    /* We need to wait till the 'dcp_waiting_step' thread begins its wait */
2213    while (1) {
2214        /* Busy wait is ok here. To do a non busy wait we must use
2215         another condition variable which is an overkill here */
2216        testHarness->lock_cookie(cookie);
2217        if (wait_started) {
2218            testHarness->unlock_cookie(cookie);
2219            break;
2220        }
2221        testHarness->unlock_cookie(cookie);
2222    }
2223
2224    /* Now create a stream */
2225    tdc.openStreams();
2226
2227    /* Write items */
2228    write_items(h, num_items, 0);
2229    wait_for_flusher_to_settle(h);
2230    verify_curr_items(h, num_items, "Wrong amount of items");
2231
2232    /* If the notification (to 'dcp_waiting_step' thread upon writing an item)
2233     mechanism is efficient, we must see the 'dcp_waiting_step' finish before
2234     test time out */
2235    dcp_step_thread.join();
2236
2237    testHarness->destroy_cookie(cookie);
2238
2239    return SUCCESS;
2240}
2241
2242static enum test_result test_dcp_producer_stream_req_partial(EngineIface* h) {
2243    // Should start at checkpoint_id 2
2244    const auto initial_ckpt_id =
2245            get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint");
2246    checkeq(2, initial_ckpt_id, "Expected to start at checkpoint ID 2");
2247
2248    // Create two 'full' checkpoints by storing exactly 2 x 'chk_max_items'
2249    // into the VBucket.
2250    const auto max_ckpt_items = get_int_stat(h, "ep_chk_max_items");
2251
2252    write_items(h, max_ckpt_items);
2253    wait_for_flusher_to_settle(h);
2254    wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", max_ckpt_items);
2255    checkeq(initial_ckpt_id + 1,
2256            get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint"),
2257            "Expected #checkpoints to increase by 1 after storing items");
2258
2259    write_items(h, max_ckpt_items, max_ckpt_items);
2260    wait_for_flusher_to_settle(h);
2261    wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", max_ckpt_items * 2);
2262    checkeq(initial_ckpt_id + 2,
2263            get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint"),
2264            "Expected #checkpoints to increase by 2 after storing 2x "
2265            "max_ckpt_items");
2266
2267    // Stop persistece (so the persistence cursor cannot advance into the
2268    // deletions below, and hence de-dupe them with respect to the
2269    // additions we just did).
2270    stop_persistence(h);
2271
2272    // Now delete half of the keys. Given that we have reached the
2273    // maximum checkpoint size above, all the deletes should be in a
2274    // subsequent checkpoint.
2275    for (int j = 0; j < max_ckpt_items; ++j) {
2276        std::stringstream ss;
2277        ss << "key" << j;
2278        checkeq(ENGINE_SUCCESS,
2279                del(h, ss.str().c_str(), 0, Vbid(0)),
2280                "Expected delete to succeed");
2281    }
2282
2283    const void* cookie = testHarness->create_cookie();
2284
2285    // Verify that we recieve full checkpoints when we only ask for
2286    // sequence numbers which lie within partial checkpoints.  We
2287    // should have the following Checkpoints in existence:
2288    //
2289    //   {  1,100} - MUTATE(key0..key99), from disk.
2290    //   {101,200} - MUTATE(key100.key199), from disk.
2291    //   {201,300} - DELETE(key0..key99), in memory (as persistence has been stopped).
2292    //
2293    // We request a start and end which lie in the middle of checkpoints -
2294    // start at 105 and end at 209. We should recieve to the end of
2295    // complete checkpoints, i.e. from 105 all the way to 300.
2296    DcpStreamCtx ctx;
2297    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2298    ctx.seqno = {105, 209};
2299    ctx.snapshot = {105, 105};
2300    ctx.exp_mutations = 95; // 105 to 200
2301    ctx.exp_deletions = 100; // 201 to 300
2302
2303    if (isPersistentBucket(h)) {
2304        ctx.exp_markers = 2;
2305    } else {
2306        // the ephemeral stream request won't be broken into two snapshots of
2307        // backfill from disk vs the checkpoint in memory
2308        ctx.exp_markers = 1;
2309    }
2310
2311    TestDcpConsumer tdc("unittest", cookie, h);
2312    tdc.addStreamCtx(ctx);
2313    tdc.run();
2314
2315    testHarness->destroy_cookie(cookie);
2316
2317    return SUCCESS;
2318}
2319
2320static enum test_result test_dcp_producer_stream_req_full_merged_snapshots(
2321        EngineIface* h) {
2322    const int num_items = 300, batch_items = 100;
2323    for (int start_seqno = 0; start_seqno < num_items;
2324         start_seqno += batch_items) {
2325        wait_for_flusher_to_settle(h);
2326        write_items(h, batch_items, start_seqno);
2327    }
2328
2329    wait_for_flusher_to_settle(h);
2330    verify_curr_items(h, num_items, "Wrong amount of items");
2331    wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2332
2333    const void* cookie = testHarness->create_cookie();
2334
2335    DcpStreamCtx ctx;
2336    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2337    ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2338    ctx.exp_mutations = num_items;
2339    /* Disk backfill sends all items in disk as one snapshot */
2340    ctx.exp_markers = 1;
2341
2342    TestDcpConsumer tdc("unittest", cookie, h);
2343    tdc.addStreamCtx(ctx);
2344    tdc.run();
2345
2346    testHarness->destroy_cookie(cookie);
2347
2348    return SUCCESS;
2349}
2350
2351static enum test_result test_dcp_producer_stream_req_full(EngineIface* h) {
2352    const int num_items = 300, batch_items = 100;
2353    for (int start_seqno = 0; start_seqno < num_items;
2354         start_seqno += batch_items) {
2355        wait_for_flusher_to_settle(h);
2356        write_items(h, batch_items, start_seqno);
2357    }
2358
2359    wait_for_flusher_to_settle(h);
2360    verify_curr_items(h, num_items, "Wrong amount of items");
2361    wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2362
2363    checkne(num_items - get_stat<uint64_t>(h, "ep_items_rm_from_checkpoints"),
2364            uint64_t{0},
2365            "Require a non-zero number of items to still be present in "
2366            "CheckpointManager to be able to get 2x snapshot markers "
2367            "(1x disk, 1x memory)");
2368
2369    const void* cookie = testHarness->create_cookie();
2370
2371    DcpStreamCtx ctx;
2372    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2373    ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2374    ctx.exp_mutations = num_items;
2375    /* Memory backfill sends items from checkpoint snapshots as much as possible
2376       Relies on backfill only when checkpoint snapshot is cleaned up */
2377    ctx.exp_markers = 2;
2378
2379    TestDcpConsumer tdc("unittest", cookie, h);
2380    tdc.addStreamCtx(ctx);
2381    tdc.run();
2382
2383    testHarness->destroy_cookie(cookie);
2384
2385    return SUCCESS;
2386}
2387
2388/*
2389 * Test that deleted items (with values) backfill correctly
2390 */
2391static enum test_result test_dcp_producer_deleted_item_backfill(
2392        EngineIface* h) {
2393    const int deletions = 10;
2394    write_items(h,
2395                deletions,
2396                0,
2397                "del",
2398                "value",
2399                0 /*exp*/,
2400                Vbid(0),
2401                DocumentState::Deleted);
2402    wait_for_flusher_to_settle(h);
2403
2404    const void* cookie = testHarness->create_cookie();
2405
2406    DcpStreamCtx ctx;
2407    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2408    ctx.seqno = {0, deletions};
2409    ctx.exp_deletions = deletions;
2410    ctx.expected_values = deletions;
2411    ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2412    ctx.exp_markers = 1;
2413
2414    TestDcpConsumer tdc("unittest", cookie, h);
2415    tdc.addStreamCtx(ctx);
2416    tdc.run();
2417
2418    testHarness->destroy_cookie(cookie);
2419
2420    return SUCCESS;
2421}
2422
2423// Function to parameterize whether expiries should be outputted or not.
2424static test_result testDcpProducerExpiredItemBackfill(
2425        EngineIface* h, EnableExpiryOutput enableExpiryOutput) {
2426    const int expiries = 5;
2427    const int start_seqno = 0;
2428    write_items(h,
2429                expiries,
2430                start_seqno,
2431                "exp",
2432                "value",
2433                5 /*exp*/,
2434                Vbid(0),
2435                DocumentState::Alive);
2436
2437    wait_for_flusher_to_settle(h);
2438    verify_curr_items(h, expiries, "Wrong number of items");
2439
2440    testHarness->time_travel(256);
2441    const void* cookie1 = testHarness->create_cookie();
2442    for (int i = 0; i < expiries; ++i) {
2443        std::string key("exp" + std::to_string(i + start_seqno));
2444        cb::EngineErrorItemPair ret =
2445                get(h, cookie1, key.c_str(), Vbid(0), DocStateFilter::Alive);
2446        checkeq(cb::engine_errc::no_such_key,
2447                ret.first,
2448                "Expected get to return 'no_such_key'");
2449    }
2450    testHarness->destroy_cookie(cookie1);
2451
2452    wait_for_flusher_to_settle(h);
2453    checkeq(get_stat<int>(h, "vb_active_expired"),
2454            expiries,
2455            "Expected vb_active_expired to contain correct number of expiries");
2456
2457    DcpStreamCtx ctx;
2458    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2459    ctx.seqno = {0, expiries * 2}; // doubled as each will cause a mutation,
2460    // as well as an expiry
2461
2462    if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2463        ctx.exp_expirations = expiries;
2464    } else {
2465        ctx.exp_deletions = expiries;
2466    }
2467
2468    ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2469    ctx.exp_markers = 1;
2470
2471    const void* cookie = testHarness->create_cookie();
2472    TestDcpConsumer tdc("unittest", cookie, h);
2473    uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2474
2475    if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2476        // dcp expiry requires the connection to opt in to delete times
2477        flags |= cb::mcbp::request::DcpOpenPayload::IncludeDeleteTimes;
2478    }
2479    tdc.openConnection(flags);
2480
2481    if (enableExpiryOutput == EnableExpiryOutput::Yes) {
2482        checkeq(ENGINE_SUCCESS,
2483                tdc.sendControlMessage("enable_expiry_opcode", "true"),
2484                "Failed to enable_expiry_opcode");
2485    }
2486
2487    tdc.addStreamCtx(ctx);
2488
2489    tdc.run(false);
2490
2491    testHarness->destroy_cookie(cookie);
2492
2493    return SUCCESS;
2494}
2495
2496static enum test_result test_dcp_producer_stream_req_backfill(EngineIface* h) {
2497    const int num_items = 400, batch_items = 200;
2498    for (int start_seqno = 0; start_seqno < num_items;
2499         start_seqno += batch_items) {
2500        if (200 == start_seqno) {
2501            wait_for_flusher_to_settle(h);
2502            wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", 200);
2503            stop_persistence(h);
2504        }
2505        write_items(h, batch_items, start_seqno);
2506    }
2507
2508    wait_for_stat_to_be_gte(h, "vb_0:num_checkpoints", 2, "checkpoint");
2509
2510    const void* cookie = testHarness->create_cookie();
2511
2512    DcpStreamCtx ctx;
2513    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2514    ctx.seqno = {0, 200};
2515    ctx.exp_mutations = 200;
2516    ctx.exp_markers = 1;
2517
2518    TestDcpConsumer tdc("unittest", cookie, h);
2519    tdc.addStreamCtx(ctx);
2520    tdc.run();
2521
2522    testHarness->destroy_cookie(cookie);
2523
2524    return SUCCESS;
2525}
2526
2527/*
2528 * Test that expired items (with values) backfill correctly with Expiry Output
2529 * disabled
2530 */
2531static enum test_result test_dcp_producer_expired_item_backfill_delete(
2532        EngineIface* h) {
2533    return testDcpProducerExpiredItemBackfill(h, EnableExpiryOutput::No);
2534}
2535
2536/*
2537 * Test that expired items (with values) backfill correctly with Expiry Output
2538 * enabled
2539 */
2540static enum test_result test_dcp_producer_expired_item_backfill_expire(
2541        EngineIface* h) {
2542    return testDcpProducerExpiredItemBackfill(h, EnableExpiryOutput::Yes);
2543}
2544
2545static enum test_result test_dcp_producer_stream_req_diskonly(EngineIface* h) {
2546    const int num_items = 300, batch_items = 100;
2547    for (int start_seqno = 0; start_seqno < num_items;
2548         start_seqno += batch_items) {
2549        wait_for_flusher_to_settle(h);
2550        write_items(h, batch_items, start_seqno);
2551    }
2552
2553    wait_for_flusher_to_settle(h);
2554    verify_curr_items(h, num_items, "Wrong amount of items");
2555    wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2556
2557    const void* cookie = testHarness->create_cookie();
2558
2559    DcpStreamCtx ctx;
2560    ctx.flags = DCP_ADD_STREAM_FLAG_DISKONLY;
2561    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2562    ctx.seqno = {0, static_cast<uint64_t>(-1)};
2563    ctx.exp_mutations = 300;
2564    ctx.exp_markers = 1;
2565
2566    TestDcpConsumer tdc("unittest", cookie, h);
2567    tdc.addStreamCtx(ctx);
2568    tdc.run();
2569
2570    testHarness->destroy_cookie(cookie);
2571
2572    return SUCCESS;
2573}
2574
2575static enum test_result test_dcp_producer_disk_backfill_limits(EngineIface* h) {
2576    const int num_items = 3;
2577    write_items(h, num_items);
2578
2579    wait_for_flusher_to_settle(h);
2580    verify_curr_items(h, num_items, "Wrong amount of items");
2581    wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
2582
2583    const void* cookie = testHarness->create_cookie();
2584
2585    DcpStreamCtx ctx;
2586    ctx.flags = DCP_ADD_STREAM_FLAG_DISKONLY;
2587    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2588    ctx.seqno = {0, static_cast<uint64_t>(-1)};
2589    ctx.exp_mutations = 3;
2590    ctx.exp_markers = 1;
2591
2592    TestDcpConsumer tdc("unittest", cookie, h);
2593    tdc.addStreamCtx(ctx);
2594    tdc.run();
2595
2596    uint64_t exp_backfill_task_runs;
2597    if (isPersistentBucket(h)) {
2598        /* Backfill task runs are expected as below:
2599           once for backfill_state_init + once for backfill_state_completing +
2600           once post all backfills are run finished. Here since we have
2601           dcp_scan_byte_limit = 100, we expect the backfill task to run
2602           additional 'num_items' during backfill_state_scanning state. */
2603        exp_backfill_task_runs = 3 + num_items;
2604    } else {
2605        /* Backfill task runs are expected as below:
2606           once for backfill_state_init + once for backfill_state_completing.
2607           Here since we have dcp_scan_byte_limit = 100, we expect the backfill
2608           task to run additional 'num_items' during BackfillState::scanning
2609           state. */
2610        exp_backfill_task_runs = 2 + num_items;
2611    }
2612    checkeq(exp_backfill_task_runs,
2613            get_histo_stat(h,
2614                           "BackfillManagerTask[auxIO]",
2615                           "runtimes",
2616                           Histo_stat_info::TOTAL_COUNT),
2617            "backfill_tasks did not run expected number of times");
2618
2619    testHarness->destroy_cookie(cookie);
2620
2621    return SUCCESS;
2622}
2623
2624static enum test_result test_dcp_producer_disk_backfill_buffer_limits(
2625        EngineIface* h) {
2626    const int num_items = 3;
2627    write_items(h, num_items);
2628
2629    wait_for_flusher_to_settle(h);
2630    verify_curr_items(h, num_items, "Wrong amount of items");
2631
2632    /* Wait for the checkpoint to be removed so that upon DCP connection
2633       backfill is scheduled */
2634    wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", num_items);
2635
2636    const void* cookie = testHarness->create_cookie();
2637
2638    DcpStreamCtx ctx;
2639    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2640    ctx.seqno = {0, num_items};
2641    ctx.exp_mutations = 3;
2642    ctx.exp_markers = 1;
2643
2644    TestDcpConsumer tdc("unittest", cookie, h);
2645    tdc.addStreamCtx(ctx);
2646    tdc.run();
2647
2648    testHarness->destroy_cookie(cookie);
2649
2650    return SUCCESS;
2651}
2652
2653static enum test_result test_dcp_producer_stream_req_mem(EngineIface* h) {
2654    const int num_items = 300, batch_items = 100;
2655    for (int start_seqno = 0; start_seqno < num_items;
2656         start_seqno += batch_items) {
2657        wait_for_flusher_to_settle(h);
2658        write_items(h, batch_items, start_seqno);
2659    }
2660
2661    wait_for_flusher_to_settle(h);
2662    verify_curr_items(h, num_items, "Wrong amount of items");
2663
2664    const void* cookie = testHarness->create_cookie();
2665
2666    DcpStreamCtx ctx;
2667    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2668    ctx.seqno = {200, 300};
2669    ctx.snapshot = {200, 200};
2670    ctx.exp_mutations = 100;
2671    ctx.exp_markers = 1;
2672
2673    TestDcpConsumer tdc("unittest", cookie, h);
2674    tdc.addStreamCtx(ctx);
2675    tdc.run();
2676
2677    testHarness->destroy_cookie(cookie);
2678
2679    return SUCCESS;
2680}
2681
2682/**
2683 * Test that a DCP stream request in DGM scenarios correctly receives items
2684 * from both memory and disk.
2685 */
2686static enum test_result test_dcp_producer_stream_req_dgm(EngineIface* h) {
2687    // Test only works for the now removed 2-bit LRU eviction algorithm as it
2688    // relies on looking at the LRU state.
2689    // @todo Investigate converting the test to work with the new hifi_mfu
2690    // eviction algorithm.
2691    return SUCCESS;
2692
2693    const void* cookie = testHarness->create_cookie();
2694
2695    int i = 0;  // Item count
2696    while (true) {
2697        // Gathering stats on every store is expensive, just check every 100 iterations
2698        if ((i % 100) == 0) {
2699            if (get_int_stat(h, "vb_active_perc_mem_resident") < 50) {
2700                break;
2701            }
2702        }
2703
2704        std::stringstream ss;
2705        ss << "key" << i;
2706        ENGINE_ERROR_CODE ret =
2707                store(h, cookie, OPERATION_SET, ss.str().c_str(), "somevalue");
2708        if (ret == ENGINE_SUCCESS) {
2709            i++;
2710        }
2711    }
2712
2713    // Sanity check - ensure we have enough vBucket quota (max_size)
2714    // such that we have 1000 items - enough to give us 0.1%
2715    // granuarity in any residency calculations. */
2716    checkge(i, 1000,
2717            "Does not have expected min items; Check max_size setting");
2718
2719    wait_for_flusher_to_settle(h);
2720
2721    verify_curr_items(h, i, "Wrong number of items");
2722    double num_non_resident = get_int_stat(h, "vb_active_num_non_resident");
2723    checkge(num_non_resident,
2724            i * 0.5,
2725            "Expected at least 50% of items to be non-resident");
2726
2727    // Reduce max_size from 6291456 to 6000000
2728    set_param(h,
2729              cb::mcbp::request::SetParamPayload::Type::Flush,
2730              "max_size",
2731              "6000000");
2732    checkgt(50,
2733            get_int_stat(h, "vb_active_perc_mem_resident"),
2734            "Too high percentage of memory resident");
2735
2736    DcpStreamCtx ctx;
2737    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2738    ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2739    ctx.exp_mutations = i;
2740    ctx.exp_markers = 1;
2741
2742    TestDcpConsumer tdc("unittest", cookie, h);
2743    tdc.addStreamCtx(ctx);
2744    tdc.run();
2745
2746    testHarness->destroy_cookie(cookie);
2747
2748    return SUCCESS;
2749}
2750
2751/**
2752 * Test that eviction hotness data is passed in DCP stream.
2753 */
2754static enum test_result test_dcp_producer_stream_req_coldness(EngineIface* h) {
2755    const void* cookie = testHarness->create_cookie();
2756
2757    for (int ii = 0; ii < 10; ii++) {
2758        std::stringstream ss;
2759        ss << "key" << ii;
2760        store(h, cookie, OPERATION_SET, ss.str().c_str(), "somevalue");
2761    }
2762
2763    wait_for_flusher_to_settle(h);
2764    verify_curr_items(h, 10, "Wrong number of items");
2765
2766    for (int ii = 0; ii < 5; ii++) {
2767        std::stringstream ss;
2768        ss << "key" << ii;
2769        evict_key(h, ss.str().c_str(), Vbid(0), "Ejected.");
2770    }
2771    wait_for_flusher_to_settle(h);
2772    wait_for_stat_to_be(h, "ep_num_value_ejects", 5);
2773
2774    TestDcpConsumer tdc("unittest", cookie, h);
2775    uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
2776
2777    tdc.openConnection(flags);
2778
2779    checkeq(ENGINE_SUCCESS,
2780            tdc.sendControlMessage("supports_hifi_MFU", "true"),
2781            "Failed to configure MFU");
2782
2783    DcpStreamCtx ctx;
2784    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2785    ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
2786    ctx.exp_mutations = 10;
2787    ctx.exp_markers = 1;
2788
2789    // Only stream from disk to ensure that we only ever get a single snapshot.
2790    // If we got unlucky we could see 2 snapshots due to creation of a second
2791    // checkpoint if we were streaming from the checkpoint manager.
2792    ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
2793
2794    tdc.addStreamCtx(ctx);
2795    tdc.run(false);
2796
2797    checkeq(tdc.getNruCounters()[1],
2798            5, "unexpected number of hot items");
2799    checkeq(tdc.getNruCounters()[0],
2800            5, "unexpected number of cold items");
2801    testHarness->destroy_cookie(cookie);
2802
2803    return SUCCESS;
2804}
2805
2806/**
2807 * Test that eviction hotness data is picked up by the DCP consumer
2808 */
2809static enum test_result test_dcp_consumer_hotness_data(EngineIface* h) {
2810    const void* cookie = testHarness->create_cookie();
2811    uint32_t opaque = 0xFFFF0000;
2812    uint32_t flags = 0;
2813    Vbid vbid = Vbid(0);
2814    const char* name = "unittest";
2815
2816    // Set vbucket 0 to a replica so we can consume a mutation over DCP
2817    check(set_vbucket_state(h, vbid, vbucket_state_replica),
2818          "Failed to set vbucket state.");
2819
2820    // Open consumer connection
2821    auto dcp = requireDcpIface(h);
2822    checkeq(ENGINE_SUCCESS,
2823            dcp->open(cookie,
2824                      opaque,
2825                      0,
2826                      flags,
2827                      name,
2828                      R"({"consumer_name":"replica1"})"),
2829            "Failed dcp Consumer open connection.");
2830
2831    add_stream_for_consumer(h,
2832                            cookie,
2833                            opaque++,
2834                            vbid,
2835                            DCP_ADD_STREAM_FLAG_TAKEOVER,
2836                            cb::mcbp::Status::Success);
2837
2838    uint32_t stream_opaque =
2839            get_int_stat(h, "eq_dcpq:unittest:stream_0_opaque", "dcp");
2840
2841    // Snapshot marker indicating a mutation will follow
2842    checkeq(ENGINE_SUCCESS,
2843            dcp->snapshot_marker(cookie,
2844                                 stream_opaque,
2845                                 vbid,
2846                                 0,
2847                                 1,
2848                                 0,
2849                                 {} /*HCS*/,
2850                                 {} /*maxVisibleSeqno*/),
2851            "Failed to send marker!");
2852
2853    const DocKey docKey("key", DocKeyEncodesCollectionId::No);
2854    checkeq(ENGINE_SUCCESS,
2855            dcp->mutation(cookie,
2856                          stream_opaque,
2857                          docKey,
2858                          {(const uint8_t*)"value", 5},
2859                          0, // privileged bytes
2860                          PROTOCOL_BINARY_RAW_BYTES,
2861                          0, // cas
2862                          vbid,
2863                          0, // flags
2864                          1, // by_seqno
2865                          0, // rev_seqno
2866                          0, // expiration
2867                          0, // lock_time
2868                          {}, // meta
2869                          128 // frequency value
2870                          ),
2871            "Failed to send dcp mutation");
2872
2873    // Set vbucket 0 to active so we can perform a get
2874    check(set_vbucket_state(h, vbid, vbucket_state_active),
2875          "Failed to set vbucket state.");
2876
2877    // Perform a get to retrieve the frequency counter value
2878    auto rv = get(h, cookie, "key", vbid, DocStateFilter::AliveOrDeleted);
2879    checkeq(cb::engine_errc::success, rv.first, "Failed to fetch");
2880    const Item* it = reinterpret_cast<const Item*>(rv.second.get());
2881
2882    // Confirm that the item that was consumed over DCP has picked up
2883    // the correct hotness data value.
2884    // Performing the get may increase the hotness value by 1 and therefore
2885    // it is valid for the value to be 128 or 129.
2886    checkle(128,
2887            int(it->getFreqCounterValue()),
2888            "Failed to set the hotness data to the correct value");
2889    checkge(129,
2890            int(it->getFreqCounterValue()),
2891            "Failed to set the hotness data to the correct value");
2892
2893    testHarness->destroy_cookie(cookie);
2894
2895    return SUCCESS;
2896}
2897
2898static enum test_result test_dcp_producer_stream_latest(EngineIface* h) {
2899    const int num_items = 300, batch_items = 100;
2900    for (int start_seqno = 0; start_seqno < num_items;
2901         start_seqno += batch_items) {
2902        wait_for_flusher_to_settle(h);
2903        write_items(h, batch_items, start_seqno);
2904    }
2905
2906    wait_for_flusher_to_settle(h);
2907    verify_curr_items(h, num_items, "Wrong amount of items");
2908
2909    const void* cookie = testHarness->create_cookie();
2910
2911    DcpStreamCtx ctx;
2912    ctx.flags = DCP_ADD_STREAM_FLAG_LATEST;
2913    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2914    ctx.seqno = {200, 205};
2915    ctx.snapshot = {200, 200};
2916    ctx.exp_mutations = 100;
2917    ctx.exp_markers = 1;
2918
2919    TestDcpConsumer tdc("unittest", cookie, h);
2920    tdc.addStreamCtx(ctx);
2921    tdc.run();
2922
2923    testHarness->destroy_cookie(cookie);
2924
2925    return SUCCESS;
2926}
2927
2928static enum test_result test_dcp_producer_keep_stream_open(EngineIface* h) {
2929    const std::string conn_name("unittest");
2930    const int num_items = 2, vb = 0;
2931
2932    write_items(h, num_items);
2933
2934    wait_for_flusher_to_settle(h);
2935    verify_curr_items(h, num_items, "Wrong amount of items");
2936
2937    const void* cookie = testHarness->create_cookie();
2938
2939    /* We want to stream items till end and keep the stream open. Then we want
2940       to verify the stream is still open */
2941    struct continuous_dcp_ctx cdc = {
2942            h,
2943            cookie,
2944            Vbid(0),
2945            conn_name,
2946            0,
2947            std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
2948    cb_thread_t dcp_thread;
2949    cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
2950              == 0);
2951
2952    /* Wait for producer to be created */
2953    wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
2954
2955    /* Wait for an active stream to be created */
2956    const std::string stat_stream_count("eq_dcpq:" + conn_name +
2957                                        ":num_streams");
2958    wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
2959
2960    /* Wait for the dcp test client to receive upto highest seqno we have */
2961    cb::RelaxedAtomic<uint64_t> exp_items(num_items);
2962    wait_for_val_to_be("last_sent_seqno",
2963                       cdc.dcpConsumer->producers.last_byseqno,
2964                       exp_items);
2965
2966    /* Check if the stream is still open after sending out latest items */
2967    std::string stat_stream_state("eq_dcpq:" + conn_name + ":stream_" +
2968                             std::to_string(vb) + "_state");
2969    std::string state = get_str_stat(h, stat_stream_state.c_str(), "dcp");
2970    checkeq(state.compare("in-memory"), 0, "Stream is not open");
2971
2972    /* Before closing the connection stop the thread that continuously polls
2973       for dcp data */
2974    cdc.dcpConsumer->stop();
2975    testHarness->notify_io_complete(cookie, ENGINE_SUCCESS);
2976    cb_assert(cb_join_thread(dcp_thread) == 0);
2977    testHarness->destroy_cookie(cookie);
2978
2979    return SUCCESS;
2980}
2981
2982static enum test_result test_dcp_producer_keep_stream_open_replica(
2983        EngineIface* h) {
2984    /* This test case validates if a replica vbucket correctly sends items
2985       and snapshot end seqno when a stream requests for items till end of time
2986       (end_seqno in req is 2^64 - 1).
2987       The test has 2 parts.
2988       (i) Set up replica vbucket such that it has items to be streamed from
2989           backfill and memory.
2990       (ii) Open a stream (in a DCP conn) and see if all the items are received
2991            correctly */
2992
2993    /* Part (i):  Set up replica vbucket such that it has items to be streamed
2994                  from backfill and memory. */
2995    check(set_vbucket_state(h, Vbid(0), vbucket_state_replica),
2996          "Failed to set vbucket state.");
2997
2998    const void* cookie = testHarness->create_cookie();
2999    uint32_t opaque = 0xFFFF0000;
3000    uint32_t seqno = 0;
3001    uint32_t flags = 0;
3002    const int num_items = 10;
3003    const std::string conn_name("unittest");
3004    int vb = 0;
3005    auto dcp = requireDcpIface(h);
3006
3007    /* Open an DCP consumer connection */
3008    checkeq(ENGINE_SUCCESS,
3009            dcp->open(cookie,
3010                      opaque,
3011                      seqno,
3012                      flags,
3013                      conn_name,
3014                      R"({"consumer_name":"replica1"})"),
3015            "Failed dcp producer open connection.");
3016
3017    std::string type = get_str_stat(h, "eq_dcpq:unittest:type", "dcp");
3018    checkeq(0, type.compare("consumer"), "Consumer not found");
3019
3020    opaque = add_stream_for_consumer(
3021            h, cookie, opaque, Vbid(0), 0, cb::mcbp::Status::Success);
3022
3023    /* Send DCP mutations with in memory flag (0x01) */
3024    dcp_stream_to_replica(
3025            h, cookie, opaque, Vbid(0), 0x01, 1, num_items, 0, num_items);
3026
3027    /* Send 10 more DCP mutations with checkpoint creation flag (0x04) */
3028    uint64_t start = num_items;
3029    dcp_stream_to_replica(h,
3030                          cookie,
3031                          opaque,
3032                          Vbid(0),
3033                          0x04,
3034                          start + 1,
3035                          start + 10,
3036                          start,
3037                          start + 10);
3038
3039    wait_for_flusher_to_settle(h);
3040    stop_persistence(h);
3041    checkeq(2 * num_items,
3042            get_int_stat(h, "vb_replica_curr_items"),
3043            "wrong number of items in replica vbucket");
3044
3045    /* Add 10 more items to the replica node on a new checkpoint.
3046       Send with flag (0x04) indicating checkpoint creation */
3047    start = 2 * num_items;
3048    dcp_stream_to_replica(h,
3049                          cookie,
3050                          opaque,
3051                          Vbid(0),
3052                          0x04,
3053                          start + 1,
3054                          start + 10,
3055                          start,
3056                          start + 10);
3057
3058    /* Expecting for Disk backfill + in memory snapshot merge.
3059       Wait for a checkpoint to be removed */
3060    wait_for_stat_to_be_lte(h, "vb_0:num_checkpoints", 2, "checkpoint");
3061
3062    /* Part (ii): Open a stream (in a DCP conn) and see if all the items are
3063                  received correctly */
3064    /* We want to stream items till end and keep the stream open. Then we want
3065       to verify the stream is still open */
3066    const void* cookie1 = testHarness->create_cookie();
3067    const std::string conn_name1("unittest1");
3068    struct continuous_dcp_ctx cdc = {
3069            h,
3070            cookie1,
3071            Vbid(0),
3072            conn_name1,
3073            0,
3074            std::make_unique<TestDcpConsumer>(conn_name1, cookie1, h)};
3075    cb_thread_t dcp_thread;
3076    cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
3077              == 0);
3078
3079    /* Wait for producer to be created */
3080    wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
3081
3082    /* Wait for an active stream to be created */
3083    const std::string stat_stream_count("eq_dcpq:" + conn_name1 +
3084                                        ":num_streams");
3085    wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
3086
3087    /* Wait for the dcp test client to receive upto highest seqno we have */
3088    cb::RelaxedAtomic<uint64_t> exp_items(3 * num_items);
3089    wait_for_val_to_be("last_sent_seqno",
3090                       cdc.dcpConsumer->producers.last_byseqno,
3091                       exp_items);
3092
3093    /* Check if correct snap end seqno is sent */
3094    std::string stat_stream_last_sent_snap_end_seqno("eq_dcpq:" + conn_name1 +
3095                                                     ":stream_" +
3096                                                     std::to_string(vb) +
3097                                                     "_last_sent_snap_end_seqno");
3098    wait_for_stat_to_be(h,
3099                        stat_stream_last_sent_snap_end_seqno.c_str(),
3100                        3 * num_items,
3101                        "dcp");
3102
3103    /* Check if the stream is still open after sending out latest items */
3104    std::string stat_stream_state("eq_dcpq:" + conn_name1 + ":stream_" +
3105                                  std::to_string(vb) + "_state");
3106    std::string state = get_str_stat(h, stat_stream_state.c_str(), "dcp");
3107    checkeq(state.compare("in-memory"), 0, "Stream is not open");
3108
3109    /* Before closing the connection stop the thread that continuously polls
3110       for dcp data */
3111    cdc.dcpConsumer->stop();
3112    testHarness->notify_io_complete(cookie1, ENGINE_SUCCESS);
3113    cb_assert(cb_join_thread(dcp_thread) == 0);
3114
3115    testHarness->destroy_cookie(cookie);
3116    testHarness->destroy_cookie(cookie1);
3117
3118    return SUCCESS;
3119}
3120
3121static enum test_result test_dcp_producer_stream_cursor_movement(
3122        EngineIface* h) {
3123    const std::string conn_name("unittest");
3124    const int num_items = 30;
3125    uint64_t curr_chkpt_id = 0;
3126    for (int j = 0; j < num_items; ++j) {
3127        if (j % 10 == 0) {
3128            wait_for_flusher_to_settle(h);
3129        }
3130        if (j == (num_items - 1)) {
3131            /* Since checkpoint max items is set to 10 and we are going to
3132               write 30th item, a new checkpoint could be added after
3133               writing and persisting the 30th item. I mean, if the checkpoint
3134               id is got outside the while loop, there could be an error due to
3135               race (flusher and checkpoint remover could run before getting
3136               this stat) */
3137            curr_chkpt_id =
3138                    get_ull_stat(h, "vb_0:open_checkpoint_id", "checkpoint");
3139        }
3140        std::string key("key" + std::to_string(j));
3141        checkeq(ENGINE_SUCCESS,
3142                store(h, NULL, OPERATION_SET, key.c_str(), "data"),
3143                "Failed to store a value");
3144    }
3145
3146    wait_for_flusher_to_settle(h);
3147    verify_curr_items(h, num_items, "Wrong amount of items");
3148
3149    const void* cookie = testHarness->create_cookie();
3150
3151    /* We want to stream items till end and keep the stream open. We want to
3152       verify if the DCP cursor has moved to new open checkpoint */
3153    struct continuous_dcp_ctx cdc = {
3154            h,
3155            cookie,
3156            Vbid(0),
3157            conn_name,
3158            20,
3159            std::make_unique<TestDcpConsumer>(conn_name, cookie, h)};
3160    cb_thread_t dcp_thread;
3161    cb_assert(cb_create_thread(&dcp_thread, continuous_dcp_thread, &cdc, 0)
3162              == 0);
3163
3164    /* Wait for producer to be created */
3165    wait_for_stat_to_be(h, "ep_dcp_producer_count", 1, "dcp");
3166
3167    /* Wait for an active stream to be created */
3168    const std::string stat_stream_count("eq_dcpq:" + conn_name +
3169                                        ":num_streams");
3170    wait_for_stat_to_be(h, stat_stream_count.c_str(), 1, "dcp");
3171
3172    /* Wait for the dcp test client to receive upto highest seqno we have */
3173    cb::RelaxedAtomic<uint64_t> exp_items(num_items);
3174    wait_for_val_to_be("last_sent_seqno",
3175                       cdc.dcpConsumer->producers.last_byseqno,
3176                       exp_items);
3177
3178    /* Wait for new open (empty) checkpoint to be added */
3179    wait_for_stat_to_be(
3180            h, "vb_0:open_checkpoint_id", curr_chkpt_id + 1, "checkpoint");
3181
3182    /* We want to make sure that no cursors are lingering on any of the previous
3183       checkpoints. For that we wait for checkpoint remover to remove all but
3184       the latest open checkpoint cursor */
3185    wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
3186
3187    /* Before closing the connection stop the thread that continuously polls
3188       for dcp data */
3189    cdc.dcpConsumer->stop();
3190    testHarness->notify_io_complete(cookie, ENGINE_SUCCESS);
3191    cb_assert(cb_join_thread(dcp_thread) == 0);
3192    testHarness->destroy_cookie(cookie);
3193
3194    return SUCCESS;
3195}
3196
3197static test_result test_dcp_producer_stream_req_nmvb(EngineIface* h) {
3198    const void* cookie1 = testHarness->create_cookie();
3199    uint32_t opaque = 0;
3200    uint32_t seqno = 0;
3201    uint32_t flags = cb::mcbp::request::DcpOpenPayload::Producer;
3202    const char *name = "unittest";
3203    auto dcp = requireDcpIface(h);
3204
3205    checkeq(ENGINE_SUCCESS,
3206            dcp->open(cookie1,
3207                      opaque,
3208                      seqno,
3209                      flags,
3210                      name,
3211                      R"({"consumer_name":"replica1"})"),
3212            "Failed dcp producer open connection.");
3213
3214    Vbid req_vbucket = Vbid(1);
3215    uint64_t rollback = 0;
3216
3217    checkeq(ENGINE_NOT_MY_VBUCKET,
3218            dcp->stream_req(cookie1,
3219                            0,
3220                            0,
3221                            req_vbucket,
3222                            0,
3223                            0,
3224                            0,
3225                            0,
3226                            0,
3227                            &rollback,
3228                            mock_dcp_add_failover_log,
3229                            {}),
3230            "Expected not my vbucket");
3231    testHarness->destroy_cookie(cookie1);
3232
3233    return SUCCESS;
3234}
3235
3236static test_result test_dcp_agg_stats(EngineIface* h) {
3237    const int num_items = 300, batch_items = 100;
3238    for (int start_seqno = 0; start_seqno < num_items;
3239         start_seqno += batch_items) {
3240        wait_for_flusher_to_settle(h);
3241        write_items(h, batch_items, start_seqno);
3242    }
3243
3244    wait_for_flusher_to_settle(h);
3245    verify_curr_items(h, num_items, "Wrong amount of items");
3246
3247    const void *cookie[5];
3248
3249    uint64_t total_bytes = 0;
3250    for (int j = 0; j < 5; ++j) {
3251        std::string name("unittest_" + std::to_string(j));
3252        cookie[j] = testHarness->create_cookie();
3253
3254        DcpStreamCtx ctx;
3255        ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
3256        ctx.seqno = {200, 300};
3257        ctx.snapshot = {200, 200};
3258        ctx.exp_mutations = 100;
3259        ctx.exp_markers = 1;
3260
3261        TestDcpConsumer tdc(name, cookie[j], h);
3262        tdc.addStreamCtx(ctx);
3263        tdc.run();
3264        total_bytes += tdc.getTotalBytes();
3265    }
3266
3267    checkeq(5,
3268            get_int_stat(h, "unittest:producer_count", "dcpagg _"),
3269            "producer count mismatch");
3270    checkeq((int)total_bytes,
3271            get_int_stat(h, "unittest:total_bytes", "dcpagg _"),
3272            "aggregate total bytes sent mismatch");
3273    checkeq(500,
3274            get_int_stat(h, "unittest:items_sent", "dcpagg _"),
3275            "aggregate total items sent mismatch");
3276    checkeq(0,
3277            get_int_stat(h, "unittest:items_remaining", "dcpagg _"),
3278            "aggregate total items remaining mismatch");
3279
3280    for (int j = 0; j < 5; ++j) {
3281        testHarness->destroy_cookie(cookie[j]);
3282    }
3283
3284    return SUCCESS;
3285}
3286
3287static test_result test_dcp_cursor_dropping(EngineIface* h,
3288                                            bool replicationStream) {
3289    /* Initially write a few items */
3290    int num_items = 25;
3291    const int initialSnapshotSize = num_items;
3292
3293    // 75% is so that we don't hit the HWM (triggering the pager) yet is above
3294    // the thresholds required for cursor dropping
3295    const int cursor_dropping_mem_thres_perc = 75;
3296
3297    write_items(h, num_items, 1);
3298
3299    wait_for_flusher_to_settle(h);
3300    verify_curr_items(h, num_items, "Wrong amount of items");
3301
3302    /* Set up a dcp producer conn and stream a few items. This will cause the
3303       stream to transition from pending -> backfill -> in-memory state */
3304    MockDcpMessageProducers producers(h);
3305
3306    const void* cookie = testHarness->create_cookie();
3307    std::string conn_name = replicationStream ? "replication" : "unittest";
3308    uint32_t opaque = 1;
3309    uint64_t last_seqno_streamed = 0;
3310
3311    DcpStreamCtx ctx;
3312    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
3313    ctx.seqno = {0, static_cast<uint64_t>(-1)};
3314    ctx.snapshot = {0, 0};
3315
3316    TestDcpConsumer tdc(conn_name, cookie, h);
3317
3318    tdc.addStreamCtx(ctx);
3319
3320    tdc.openConnection();
3321
3322    if (replicationStream) {
3323        tdc.sendControlMessage("supports_cursor_dropping_vulcan", "true");
3324    }
3325
3326    tdc.openStreams();
3327
3328    /* Stream (from in-memory state) less than the number of items written.
3329       We want to do this because we want to test if the stream drops the items
3330       in the readyQ when we later switch from in-memory -> backfill state */
3331    dcp_stream_from_producer_conn(h,
3332                                  cookie,
3333                                  opaque,
3334                                  last_seqno_streamed + 1,
3335                                  num_items - 5,
3336                                  0,
3337                                  tdc.producers);
3338    last_seqno_streamed = num_items - 5;
3339
3340    /* Check if the stream is still in in-memory state after sending out
3341       items */
3342    std::string stat_stream_state("eq_dcpq:" + conn_name + ":stream_" +
3343                                  std::to_string(0) + "_state");
3344    std::string state = get_str_stat(h, stat_stream_state.c_str(), "dcp");
3345    checkeq(state.compare("in-memory"), 0, "Stream is in memory state");
3346
3347    /* Write items such that cursor is dropped due to heavy memory usage and
3348       stream state changes from memory->backfill */
3349    stop_persistence(h);
3350    num_items += write_items_upto_mem_perc(
3351            h, cursor_dropping_mem_thres_perc, num_items + 1);
3352
3353    // Sanity check - ensure we have enough vBucket quota (max_size)
3354    // such that we have 1000 items - enough to give us 0.1%
3355    // granularity in any residency calculations. */
3356    checkge(num_items, 1000,
3357            "Does not have expected min items; Check max_size setting");
3358
3359    /* Persist all items */
3360    start_persistence(h);
3361    wait_for_flusher_to_settle(h);
3362
3363    /* wait for cursor to be dropped. You need to make sure that there are
3364       2 checkpoints so that the cursors of one of the checkpoint is dropped.
3365       For this we need to have correct combination of max_size and
3366       chk_max_items (max_size=6291456;chk_max_items=8000 in this case) */
3367    wait_for_stat_to_be_gte(h, "ep_cursors_dropped", 1);
3368    dcp_stream_from_producer_conn(h,
3369                                  cookie,
3370                                  opaque,
3371                                  last_seqno_streamed + 1,
3372                                  num_items,
3373                                  initialSnapshotSize + 1,
3374                                  tdc.producers);
3375    last_seqno_streamed = num_items;
3376
3377    /* Write 10 more items to test if stream transitions correctly from
3378       backfill -> in-memory and sends out items */
3379    write_items(h, 10, num_items + 1);
3380    num_items += 10;
3381    dcp_stream_from_producer_conn(h,
3382                                  cookie,
3383                                  opaque,
3384                                  last_seqno_streamed + 1,
3385                                  num_items,
3386                                  last_seqno_streamed + 1,
3387                                  tdc.producers);
3388
3389    testHarness->destroy_cookie(cookie);
3390    return SUCCESS;
3391}
3392
3393static test_result test_dcp_cursor_dropping(EngineIface* h) {
3394    return test_dcp_cursor_dropping(h, false);
3395}
3396
3397static test_result test_dcp_cursor_dropping_replication(EngineIface* h) {
3398    return test_dcp_cursor_dropping(h, true);
3399}
3400static test_result test_dcp_cursor_dropping_backfill(EngineIface* h) {
3401    /* Initially write a few items */
3402    int num_items = 50;
3403    const int initialSnapshotSize = num_items;
3404    // 75% is so that we don't hit the HWM (triggering the pager) yet is above
3405    // the thresholds required for cursor dropping
3406    const int cursor_dropping_mem_thres_perc = 75;
3407
3408    write_items(h, num_items, 1);
3409
3410    wait_for_flusher_to_settle(h);
3411    verify_curr_items(h, num_items, "Wrong amount of items");
3412    wait_for_stat_to_be(h, "vb_0:open_checkpoint_id", 3, "checkpoint");
3413
3414    /* Set up a connection */
3415    const void* cookie = testHarness->create_cookie();
3416    std::string conn_name("unittest");
3417    uint32_t opaque = 1;
3418
3419    DcpStreamCtx ctx;
3420    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
3421    ctx.seqno = {0, static_cast<uint64_t>(-1)};
3422    ctx.snapshot = {0, 0};
3423
3424    TestDcpConsumer tdc(conn_name, cookie, h);
3425
3426    tdc.addStreamCtx(ctx);
3427
3428    tdc.openConnection();
3429    tdc.openStreams();
3430
3431    /* Write items such that we cross threshold for cursor dropping */
3432    stop_persistence(h);
3433    num_items += write_items_upto_mem_perc(
3434            h, cursor_dropping_mem_thres_perc, num_items + 1);
3435
3436    /* Sanity check - ensure we have enough vBucket quota (max_size)
3437       such that we have 1000 items - enough to give us 0.1%
3438       granuarity in any residency calculations. */
3439    checkge(num_items, 1000,
3440            "Does not have expected min items; Check max_size setting");
3441
3442    /* Persist all items so that we can drop the replication cursor and
3443       schedule another backfill */
3444    start_persistence(h);
3445    wait_for_flusher_to_settle(h);
3446
3447    wait_for_stat_to_be_gte(h, "ep_cursors_dropped", 1);
3448
3449    /* Read all the items from the producer. This ensures that the items are
3450       backfilled correctly after scheduling 2 successive backfills. */
3451    dcp_stream_from_producer_conn(h,
3452                                  cookie,
3453                                  opaque,
3454                                  1,
3455                                  num_items,
3456                                  initialSnapshotSize + 1,
3457                                  tdc.producers);
3458
3459    testHarness->destroy_cookie(cookie);
3460    return SUCCESS;
3461}
3462
3463static test_result test_dcp_takeover(EngineIface* h) {
3464    const int num_items = 10;
3465    write_items(h, num_items);
3466
3467    const void* cookie = testHarness->create_cookie();
3468
3469    DcpStreamCtx ctx;
3470    ctx.flags = DCP_ADD_STREAM_FLAG_TAKEOVER;
3471    ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
3472    ctx.seqno = {0, 1000};
3473    ctx.exp_mutations = 20;
3474    ctx.exp_markers = 2;
3475    ctx.extra_takeover_ops = 10;
3476
3477    TestDcpConsumer tdc("unittest", cookie, h);
3478    tdc.addStreamCtx(ctx);
3479    tdc.run();
3480
3481    check(verify_vbucket_state(h, Vbid(0), vbucket_state_dead),
3482          "Wrong vb state");
3483
3484    testHarness->destroy_cookie(cookie);
3485
3486    return SUCCESS;
3487}
3488
3489static test_result test_dcp_takeover_no_items(EngineIface* h) {
3490    const int num_items = 10;
3491    write_items(h, num_items);
3492
3493    const void* cookie = testHarness->create_cookie();
3494    const char *name = "unittest";
3495    uint32_t opaque = 1;
3496    auto dcp = requireDcpIface(h);
3497
3498    checkeq(ENGINE_SUCCESS,
3499            dcp->open(cookie,
3500                      ++opaque,
3501                      0,
3502                      cb::mcbp::request::DcpOpenPayload::Producer,
3503                      name,
3504                      R"({"consumer_name":"replica1"})"),
3505            "Failed dcp producer open connection.");
3506
3507    Vbid vbucket = Vbid(0);
3508    uint32_t flags = DCP_ADD_STREAM_FLAG_TAKEOVER;
3509    uint64_t start_seqno = 10;
3510    uint64_t end_seqno = std::numeric_limits<uint64_t>::max();
3511    uint64_t vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
3512    uint64_t snap_start_seqno = 10;
3513    uint64_t snap_end_seqno = 10;
3514
3515    uint64_t rollback = 0;
3516    checkeq(ENGINE_SUCCESS,
3517            dcp->stream_req(cookie,
3518                            flags,
3519                            ++opaque,
3520                            vbucket,
3521                            start_seqno,
3522                            end_seqno,
3523                            vb_uuid,
3524                            snap_start_seqno,
3525                            snap_end_seqno,
3526                            &rollback,
3527                            mock_dcp_add_failover_log,
3528                            {}),
3529            "Failed to initiate stream request");
3530
3531    MockDcpMessageProducers producers(h);
3532
3533    bool done = false;
3534    int num_snapshot_markers = 0;
3535    int num_set_vbucket_pending = 0;
3536    int num_set_vbucket_active = 0;
3537
3538    do {
3539        ENGINE_ERROR_CODE err = dcp->step(cookie, &producers);
3540        if (err == ENGINE_DISCONNECT) {
3541            done = true;
3542        } else {
3543            switch (producers.last_op) {
3544            case cb::mcbp::ClientOpcode::DcpStreamEnd:
3545                done = true;
3546                break;
3547            case cb::mcbp::ClientOpcode::DcpSnapshotMarker:
3548                num_snapshot_markers++;
3549                break;
3550            case cb::mcbp::ClientOpcode::DcpSetVbucketState:
3551                if (producers.last_vbucket_state == vbucket_state_pending) {
3552                    num_set_vbucket_pending++;
3553                } else if (producers.last_vbucket_state ==
3554                           vbucket_state_active) {
3555                    num_set_vbucket_active++;
3556                }
3557                sendDcpAck(h,
3558                           cookie,
3559                           cb::mcbp::ClientOpcode::DcpSetVbucketState,
3560                           cb::mcbp::Status::Success,
3561                           producers.last_opaque);
3562                break;
3563            case cb::mcbp::ClientOpcode::Invalid:
3564                break;
3565            default:
3566                break;
3567                abort();
3568            }
3569            producers.last_op = cb::mcbp::ClientOpcode::Invalid;
3570        }
3571    } while (!done);
3572
3573    checkeq(0, num_snapshot_markers, "Invalid number of snapshot marker");
3574    checkeq(1, num_set_vbucket_pending, "Didn't receive pending set state");
3575    checkeq(1, num_set_vbucket_active, "Didn't receive active set state");
3576
3577    check(verify_vbucket_state(h, Vbid(0), vbucket_state_dead),
3578          "Wrong vb state");
3579    testHarness->destroy_cookie(cookie);
3580
3581    return SUCCESS;
3582}
3583
3584/**
3585 * Part of the Consumer-Producer negotiation happens over DCP_CONTROL and
3586 * introduces a blocking step, so we have to simulate the Producer response for
3587 * letting dcp_step() proceed.
3588 * Note that the blocking DCP_CONTROL request is signed at Consumer by
3589 * tracking the opaque value sent to the Producer, so we need to set the
3590 * proper opaque.
3591 *
3592 * At the time of writing, the SyncReplication and the IncludeDeletedUserXattrs
3593 * negotiations follow the described pattern.
3594 *
3595 * @param engine The engine interface
3596 * @param cookie The cookie representing the DCP Consumer into the engine
3597 * @param producers The MockDcpMessageProducers used by the Consumer
3598 */
3599static void simulateProdRespToDcpControlBlockingNegotiation(
3600        EngineIface* engine,
3601        const void* cookie,
3602        MockDcpMessageProducers& producers) {
3603    protocol_binary_response_header resp{};
3604    resp.response.setMagic(cb::mcbp::Magic::ClientResponse);
3605    resp.response.setOpcode(cb::mcbp::ClientOpcode::DcpControl);
3606    resp.response.setStatus(cb::mcbp::Status::Success);
3607    resp.response.setOpaque(producers.last_opaque);
3608    dcpHandleResponse(engine, cookie, &resp, producers);
3609}
3610
3611static uint32_t add_stream_for_consumer(EngineIface* h,
3612                                        const void* cookie,
3613                                        uint32_t opaque,
3614                                        Vbid vbucket,
3615                                        uint32_t flags,
3616                                        cb::mcbp::Status response,
3617                                        uint64_t exp_snap_start,
3618                                        uint64_t exp_snap_end) {
3619    using cb::mcbp::ClientOpcode;
3620
3621    auto dcp = requireDcpIface(h);
3622    checkeq(ENGINE_SUCCESS,
3623            dcp->add_stream(cookie, opaque, vbucket, flags),
3624            "Add stream request failed");
3625
3626    MockDcpMessageProducers producers(h);
3627
3628    auto dcpStepAndExpectControlMsg =
3629            [&h, cookie, opaque, &producers](std::string controlKey) {
3630                dcp_step(h, cookie, producers);
3631                checkeq(cb::mcbp::ClientOpcode::DcpControl,
3632                        producers.last_op,
3633                        "Unexpected last_op");
3634                checkeq(controlKey, producers.last_key, "Unexpected key");
3635                checkne(opaque, producers.last_opaque, "Unexpected opaque");
3636            };
3637
3638    dcpStepAndExpectControlMsg("connection_buffer_size"s);
3639
3640    if (get_bool_stat(h, "ep_dcp_enable_noop")) {
3641        // MB-29441: Check that the GetErrorMap message is sent
3642        dcp_step(h, cookie, producers);
3643        checkeq(ClientOpcode::GetErrorMap,
3644                producers.last_op,
3645                "Unexpected last_op");
3646        checkeq(""s, producers.last_key, "Unexpected non-empty key");
3647
3648        // Simulate that the GetErrorMap response has been received.
3649        // This step is necessary, as a pending GetErrorMap response would
3650        // not let the next dcp_step() to execute the
3651        // DcpControl/set_noop_interval call.
3652        protocol_binary_response_header resp{};
3653        resp.response.setMagic(cb::mcbp::Magic::ClientResponse);
3654        resp.response.setOpcode(cb::mcbp::ClientOpcode::GetErrorMap);
3655        resp.response.setStatus(cb::mcbp::Status::Success);
3656        dcpHandleResponse(h, cookie, &resp, producers);
3657
3658        // Check that the enable noop message is sent
3659        dcpStepAndExpectControlMsg("enable_noop"s);
3660
3661        // Check that the set noop interval message is sent
3662        dcpStepAndExpectControlMsg("set_noop_interval"s);
3663    }
3664
3665    dcpStepAndExpectControlMsg("set_priority"s);
3666    dcpStepAndExpectControlMsg("enable_ext_metadata"s);
3667    dcpStepAndExpectControlMsg("supports_cursor_dropping_vulcan"s);
3668    dcpStepAndExpectControlMsg("supports_hifi_MFU"s);
3669    dcpStepAndExpectControlMsg("send_stream_end_on_client_close_stream"s);
3670    dcpStepAndExpectControlMsg("enable_expiry_opcode"s);
3671    dcpStepAndExpectControlMsg("enable_sync_writes"s);
3672    simulateProdRespToDcpControlBlockingNegotiation(h, cookie, producers);
3673    dcpStepAndExpectControlMsg("consumer_name"s);
3674    dcpStepAndExpectControlMsg("include_deleted_user_xattrs"s);
3675    simulateProdRespToDcpControlBlockingNegotiation(h, cookie, producers);
3676
3677    dcp_step(h, cookie, producers);
3678    uint32_t stream_opaque = producers.last_opaque;
3679    checkeq(ClientOpcode::DcpStreamReq,
3680            producers.last_op,
3681            "Unexpected last_op");
3682    checkne(opaque, producers.last_opaque, "Unexpected opaque");
3683
3684    if (exp_snap_start != 0) {
3685        checkeq(exp_snap_start,
3686                producers.last_snap_start_seqno,
3687                "Unexpected snap start");
3688    }
3689
3690    if (exp_snap_end != 0) {
3691        checkeq(exp_snap_end,
3692                producers.last_snap_end_seqno,
3693                "Unexpected snap end");
3694    }
3695
3696    size_t bodylen = 0;
3697    if (response == cb::mcbp::Status::Success) {
3698        bodylen = 16;
3699    } else if (response == cb::mcbp::Status::Rollback) {
3700        bodylen = 8;
3701    }
3702
3703    size_t headerlen = sizeof(protocol_binary_response_header);
3704    size_t pkt_len = headerlen + bodylen;
3705
3706    protocol_binary_response_header* pkt =
3707        (protocol_binary_response_header*)cb_malloc(pkt_len);
3708    memset(pkt->bytes, '\0', pkt_len);
3709    pkt->response.setMagic(cb::mcbp::Magic::ClientResponse);
3710    pkt->response.setOpcode(cb::mcbp::ClientOpcode::DcpStreamReq);
3711    pkt->response.setStatus(response);
3712    pkt->response.setOpaque(producers.last_opaque);
3713
3714    if (response == cb::mcbp::Status::Rollback) {
3715        bodylen = sizeof(uint64_t);
3716        uint64_t rollbackSeqno = 0;
3717        memcpy(pkt->bytes + headerlen, &rollbackSeqno, bodylen);
3718    }
3719
3720    pkt->response.setBodylen(bodylen);
3721
3722    if (response == cb::mcbp::Status::Success) {
3723        uint64_t vb_uuid = htonll(123456789);
3724        uint64_t by_seqno = 0;
3725        memcpy(pkt->bytes + headerlen, &vb_uuid, sizeof(uint64_t));
3726        memcpy(pkt->bytes + headerlen + 8, &by_seqno, sizeof(uint64_t));
3727    }
3728
3729    checkeq(ENGINE_SUCCESS,
3730            dcp->response_handler(cookie, pkt),
3731            "Expected success");
3732    dcp_step(h, cookie, producers);
3733    cb_free(pkt);
3734
3735    if (response == cb::mcbp::Status::Rollback) {
3736        return stream_opaque;
3737    }
3738
3739    if (producers.last_op == cb::mcbp::ClientOpcode::DcpStreamReq) {
3740        checkne(opaque, producers.last_opaque, "Unexpected opaque");
3741        verify_curr_items(h, 0, "Wrong amount of items");
3742
3743        protocol_binary_response_header* pkt =
3744            (protocol_binary_response_header*)cb_malloc(pkt_len);
3745        memset(pkt->bytes, '\0', 40);
3746        pkt->response.setMagic(cb::mcbp::Magic::ClientResponse);
3747        pkt->response.setOpcode(cb::mcbp::ClientOpcode::DcpStreamReq);
3748        pkt->response.setStatus(cb::mcbp::Status::Success);
3749        pkt->response.setOpaque(producers.last_opaque);
3750        pkt->response.setBodylen(16);
3751
3752        uint64_t vb_uuid = htonll(123456789);
3753        uint64_t by_seqno = 0;
3754        memcpy(pkt->bytes + headerlen, &vb_uuid, sizeof(uint64_t));
3755        memcpy(pkt->bytes + headerlen + 8, &by_seqno, sizeof(uint64_t));
3756
3757        checkeq(ENGINE_SUCCESS,
3758                dcp->response_handler(cookie, pkt),
3759                "Expected success");
3760        dcp_step(h, cookie,