1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18/*
19 * Unit test for DCP-related classes.
20 *
21 * Due to the way our classes are structured, most of the different DCP classes
22 * need an instance of EPBucket& other related objects.
23 */
24
25#include "dcp_test.h"
26#include "../mock/mock_checkpoint_manager.h"
27#include "../mock/mock_dcp.h"
28#include "../mock/mock_dcp_conn_map.h"
29#include "../mock/mock_dcp_consumer.h"
30#include "../mock/mock_dcp_producer.h"
31#include "../mock/mock_stream.h"
32#include "../mock/mock_synchronous_ep_engine.h"
33#include "checkpoint.h"
34#include "checkpoint_manager.h"
35#include "checkpoint_utils.h"
36#include "dcp/active_stream_checkpoint_processor_task.h"
37#include "dcp/dcp-types.h"
38#include "dcp/dcpconnmap.h"
39#include "dcp/producer.h"
40#include "dcp/response.h"
41#include "dcp/stream.h"
42#include "dcp_utils.h"
43#include "ep_time.h"
44#include "evp_engine_test.h"
45#include "kv_bucket.h"
46#include "memory_tracker.h"
47#include "objectregistry.h"
48#include "test_helpers.h"
49#include "vbucket.h"
50#include "warmup.h"
51
52#include <folly/portability/GTest.h>
53#include <memcached/server_cookie_iface.h>
54#include <platform/cbassert.h>
55#include <platform/compress.h>
56#include <programs/engine_testapp/mock_server.h>
57
58#include <thread>
59
60/**
61 * The DCP tests wants to mock around with the notify_io_complete
62 * method. Previously we copied in a new notify_io_complete method, but
63 * we can't do that as the cookie interface contains virtual pointers.
64 * An easier approach is to create a class which just wraps the server
65 * API and we may subclass this class to override whatever method we want
66 *
67 * The constructor installs itself as the mock server cookie interface,
68 * and the destructor reinstalls the original server cookie interfa
69 */
70class WrappedServerCookieIface : public ServerCookieIface {
71public:
72    WrappedServerCookieIface() : wrapped(get_mock_server_api()->cookie) {
73        get_mock_server_api()->cookie = this;
74    }
75
76    ~WrappedServerCookieIface() override {
77        get_mock_server_api()->cookie = wrapped;
78    }
79
80    void store_engine_specific(gsl::not_null<const void*> cookie,
81                               void* engine_data) override {
82        wrapped->store_engine_specific(cookie, engine_data);
83    }
84    void* get_engine_specific(gsl::not_null<const void*> cookie) override {
85        return wrapped->get_engine_specific(cookie);
86    }
87    bool is_datatype_supported(gsl::not_null<const void*> cookie,
88                               protocol_binary_datatype_t datatype) override {
89        return wrapped->is_datatype_supported(cookie, datatype);
90    }
91    bool is_mutation_extras_supported(
92            gsl::not_null<const void*> cookie) override {
93        return wrapped->is_mutation_extras_supported(cookie);
94    }
95    bool is_collections_supported(gsl::not_null<const void*> cookie) override {
96        return wrapped->is_collections_supported(cookie);
97    }
98    cb::mcbp::ClientOpcode get_opcode_if_ewouldblock_set(
99            gsl::not_null<const void*> cookie) override {
100        return wrapped->get_opcode_if_ewouldblock_set(cookie);
101    }
102    bool validate_session_cas(uint64_t cas) override {
103        return wrapped->validate_session_cas(cas);
104    }
105    void decrement_session_ctr() override {
106        return wrapped->decrement_session_ctr();
107    }
108    void notify_io_complete(gsl::not_null<const void*> cookie,
109                            ENGINE_ERROR_CODE status) override {
110        return wrapped->notify_io_complete(cookie, status);
111    }
112    ENGINE_ERROR_CODE reserve(gsl::not_null<const void*> cookie) override {
113        return wrapped->reserve(cookie);
114    }
115    ENGINE_ERROR_CODE release(gsl::not_null<const void*> cookie) override {
116        return wrapped->release(cookie);
117    }
118    void set_priority(gsl::not_null<const void*> cookie,
119                      CONN_PRIORITY priority) override {
120        return wrapped->set_priority(cookie, priority);
121    }
122    CONN_PRIORITY get_priority(gsl::not_null<const void*> cookie) override {
123        return wrapped->get_priority(cookie);
124    }
125    bucket_id_t get_bucket_id(gsl::not_null<const void*> cookie) override {
126        return wrapped->get_bucket_id(cookie);
127    }
128    uint64_t get_connection_id(gsl::not_null<const void*> cookie) override {
129        return wrapped->get_connection_id(cookie);
130    }
131    cb::rbac::PrivilegeAccess check_privilege(
132            gsl::not_null<const void*> cookie,
133            cb::rbac::Privilege privilege) override {
134        return wrapped->check_privilege(cookie, privilege);
135    }
136    cb::mcbp::Status engine_error2mcbp(gsl::not_null<const void*> cookie,
137                                       ENGINE_ERROR_CODE code) override {
138        return wrapped->engine_error2mcbp(cookie, code);
139    }
140    std::pair<uint32_t, std::string> get_log_info(
141            gsl::not_null<const void*> cookie) override {
142        return wrapped->get_log_info(cookie);
143    }
144    std::string get_authenticated_user(
145            gsl::not_null<const void*> cookie) override {
146        return wrapped->get_authenticated_user(cookie);
147    }
148    in_port_t get_connected_port(gsl::not_null<const void*> cookie) override {
149        return wrapped->get_connected_port(cookie);
150    }
151    void set_error_context(gsl::not_null<void*> cookie,
152                           cb::const_char_buffer message) override {
153        wrapped->set_error_context(cookie, message);
154    }
155    void set_error_json_extras(gsl::not_null<void*> cookie,
156                               const nlohmann::json& json) override {
157        wrapped->set_error_json_extras(cookie, json);
158    }
159
160protected:
161    ServerCookieIface* wrapped;
162};
163
164void DCPTest::SetUp() {
165    EventuallyPersistentEngineTest::SetUp();
166
167    // Set AuxIO threads to zero, so that the producer's
168    // ActiveStreamCheckpointProcesserTask doesn't run.
169    ExecutorPool::get()->setNumAuxIO(0);
170    // Set NonIO threads to zero, so the connManager
171    // task does not run.
172    ExecutorPool::get()->setNumNonIO(0);
173    callbackCount = 0;
174
175#if defined(HAVE_JEMALLOC)
176    // MB-28370: Run with memory tracking for all alloc/deallocs when built
177    // with jemalloc.
178    MemoryTracker::getInstance(*get_mock_server_api()->alloc_hooks);
179    engine->getEpStats().memoryTrackerEnabled.store(true);
180#endif
181}
182
183void DCPTest::TearDown() {
184    /* MB-22041 changes to dynamically stopping threads rather than having
185     * the excess looping but not getting work. We now need to set the
186     * AuxIO and NonIO back to 1 to allow dead tasks to be cleaned up
187     */
188    ExecutorPool::get()->setNumAuxIO(1);
189    ExecutorPool::get()->setNumNonIO(1);
190
191    EventuallyPersistentEngineTest::TearDown();
192
193    MemoryTracker::destroyInstance();
194}
195
196void DCPTest::create_dcp_producer(
197        int flags,
198        IncludeValue includeVal,
199        IncludeXattrs includeXattrs,
200        std::vector<std::pair<std::string, std::string>> controls) {
201    if (includeVal == IncludeValue::No) {
202        flags |= cb::mcbp::request::DcpOpenPayload::NoValue;
203    }
204    if (includeVal == IncludeValue::NoWithUnderlyingDatatype) {
205        flags |= cb::mcbp::request::DcpOpenPayload::
206                NoValueWithUnderlyingDatatype;
207    }
208    if (includeXattrs == IncludeXattrs::Yes) {
209        flags |= cb::mcbp::request::DcpOpenPayload::IncludeXattrs;
210    }
211    producer = std::make_shared<MockDcpProducer>(*engine,
212                                                 cookie,
213                                                 "test_producer",
214                                                 flags,
215                                                 /*startTask*/ true);
216
217    if (includeXattrs == IncludeXattrs::Yes) {
218        producer->setNoopEnabled(true);
219    }
220
221    // Since we are creating a mock active stream outside of
222    // DcpProducer::streamRequest(), and we want the checkpt processor task,
223    // create it explicitly here
224    producer->createCheckpointProcessorTask();
225    producer->scheduleCheckpointProcessorTask();
226
227    // Now set any controls before creating any streams
228    for (const auto& control : controls) {
229        EXPECT_EQ(ENGINE_SUCCESS,
230                  producer->control(0, control.first, control.second));
231    }
232}
233
234void DCPTest::setup_dcp_stream(
235        int flags,
236        IncludeValue includeVal,
237        IncludeXattrs includeXattrs,
238        std::vector<std::pair<std::string, std::string>> controls) {
239    create_dcp_producer(flags, includeVal, includeXattrs, controls);
240
241    vb0 = engine->getVBucket(vbid);
242    ASSERT_NE(nullptr, vb0.get());
243    EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
244    stream = std::make_shared<MockActiveStream>(engine,
245                                                producer,
246                                                flags,
247                                                /*opaque*/ 0,
248                                                *vb0,
249                                                /*st_seqno*/ 0,
250                                                /*en_seqno*/ ~0,
251                                                /*vb_uuid*/ 0xabcd,
252                                                /*snap_start_seqno*/ 0,
253                                                /*snap_end_seqno*/ ~0,
254                                                includeVal,
255                                                includeXattrs);
256
257    stream->public_registerCursor(
258            *vb0->checkpointManager, producer->getName(), 0);
259    stream->setActive();
260}
261
262void DCPTest::destroy_dcp_stream() {
263    producer->closeStream(/*opaque*/ 0, vb0->getId());
264}
265
266DCPTest::StreamRequestResult DCPTest::doStreamRequest(DcpProducer& producer,
267                                                      uint64_t startSeqno,
268                                                      uint64_t endSeqno,
269                                                      uint64_t snapStart,
270                                                      uint64_t snapEnd,
271                                                      uint64_t vbUUID) {
272    DCPTest::StreamRequestResult result;
273    result.status = producer.streamRequest(/*flags*/ 0,
274                                           /*opaque*/ 0,
275                                           Vbid(0),
276                                           startSeqno,
277                                           endSeqno,
278                                           vbUUID,
279                                           snapStart,
280                                           snapEnd,
281                                           &result.rollbackSeqno,
282                                           DCPTest::fakeDcpAddFailoverLog,
283                                           {});
284    return result;
285}
286
287void DCPTest::prepareCheckpointItemsForStep(dcp_message_producers& msgProducers,
288                                            MockDcpProducer& producer,
289                                            VBucket& vb) {
290    producer.notifySeqnoAvailable(
291            vb.getId(), vb.getHighSeqno(), SyncWriteOperation::Yes);
292    ASSERT_EQ(ENGINE_EWOULDBLOCK, producer.step(&msgProducers));
293    ASSERT_EQ(1, producer.getCheckpointSnapshotTask()->queueSize());
294    producer.getCheckpointSnapshotTask()->run();
295}
296
297std::unique_ptr<Item> DCPTest::makeItemWithXattrs() {
298    std::string valueData = R"({"json":"yes"})";
299    std::string data = createXattrValue(valueData);
300    protocol_binary_datatype_t datatype =
301            (PROTOCOL_BINARY_DATATYPE_JSON | PROTOCOL_BINARY_DATATYPE_XATTR);
302    return std::make_unique<Item>(makeStoredDocKey("key"),
303                                  /*flags*/ 0,
304                                  /*exp*/ 0,
305                                  data.c_str(),
306                                  data.size(),
307                                  datatype);
308}
309
310std::unique_ptr<Item> DCPTest::makeItemWithoutXattrs() {
311    std::string valueData = R"({"json":"yes"})";
312    protocol_binary_datatype_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
313    return std::make_unique<Item>(makeStoredDocKey("key"),
314                                  /*flags*/ 0,
315                                  /*exp*/ 0,
316                                  valueData.c_str(),
317                                  valueData.size(),
318                                  datatype);
319}
320
321void DCPTest::addItemsAndRemoveCheckpoint(int numItems) {
322    for (int i = 0; i < numItems; ++i) {
323        std::string key("key" + std::to_string(i));
324        store_item(vbid, key, "value");
325    }
326    removeCheckpoint(numItems);
327}
328
329void DCPTest::removeCheckpoint(int numItems) {
330    /* Create new checkpoint so that we can remove the current checkpoint
331       and force a backfill in the DCP stream */
332    auto& ckpt_mgr = *vb0->checkpointManager;
333    ckpt_mgr.createNewCheckpoint();
334
335    /* Wait for removal of the old checkpoint, this also would imply that
336       the items are persisted (in case of persistent buckets) */
337    bool new_ckpt_created;
338    std::chrono::microseconds uSleepTime(128);
339    int itemsRemoved = 0;
340    while (true) {
341        itemsRemoved +=
342                ckpt_mgr.removeClosedUnrefCheckpoints(*vb0, new_ckpt_created);
343        if (itemsRemoved >= numItems) {
344            break;
345        }
346        uSleepTime = decayingSleep(uSleepTime);
347    };
348    EXPECT_EQ(numItems, itemsRemoved);
349}
350int DCPTest::callbackCount = 0;
351
352void DCPTest::runCheckpointProcessor(dcp_message_producers& producers) {
353    // Step which will notify the snapshot task
354    EXPECT_EQ(ENGINE_EWOULDBLOCK, producer->step(&producers));
355
356    EXPECT_EQ(1, producer->getCheckpointSnapshotTask()->queueSize());
357
358    // Now call run on the snapshot task to move checkpoint into DCP
359    // stream
360    producer->getCheckpointSnapshotTask()->run();
361}
362
363/*
364 * MB-30189: Test that addStats() on the DcpProducer object doesn't
365 * attempt to dereference the cookie passed in (as it's not it's
366 * object).  Check that no invalid memory accesses occur; requires
367 * ASan for maximum accuracy in testing.
368 */
369TEST_F(DCPTest, MB30189_addStats) {
370    create_dcp_producer();
371    class MockStats {
372    } mockStats;
373    producer->addStats(
374            [](const char* key,
375               const uint16_t klen,
376               const char* val,
377               const uint32_t vlen,
378               gsl::not_null<const void*> cookie) {
379                // do nothing
380            },
381            &mockStats);
382}
383
384std::string decompressValue(std::string compressedValue) {
385    cb::compression::Buffer buffer;
386    if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
387                                  compressedValue, buffer)) {
388        return {};
389    }
390
391    return std::string(buffer.data(), buffer.size());
392}
393
394class CompressionStreamTest : public DCPTest,
395                              public ::testing::WithParamInterface<
396                                      ::testing::tuple<std::string, bool>> {
397public:
398    void SetUp() override {
399        bucketType = ::testing::get<0>(GetParam());
400        DCPTest::SetUp();
401        vb0 = engine->getVBucket(Vbid(0));
402        EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
403    }
404
405    void TearDown() override {
406        if (producer) {
407            producer->cancelCheckpointCreatorTask();
408        }
409        // Destroy various engine objects
410        vb0.reset();
411        stream.reset();
412        producer.reset();
413        DCPTest::TearDown();
414    }
415
416    bool isXattr() const {
417        return ::testing::get<1>(GetParam());
418    }
419
420    size_t getItemSize(const Item& item) {
421        size_t base = MutationResponse::mutationBaseMsgBytes +
422                      item.getKey().makeDocKeyWithoutCollectionID().size();
423        if (isXattr()) {
424            // DCP won't recompress the pruned document
425            return base + getXattrSize(false);
426        }
427        return base + item.getNBytes();
428    }
429
430    size_t getXattrSize(bool compressed) const {
431        return createXattrValue({}, true, compressed).size();
432    }
433};
434
435/**
436 * Test to verify DCP compression/decompression. There are 4 cases that are being
437 * tested
438 *
439 * 1. Add a compressed item and stream a compressed item
440 * 2. Add an uncompressed item and stream a compressed item
441 * 3. Add a compressed item and stream an uncompressed item
442 * 4. Add an uncompressed item and stream an uncompressed item
443 */
444
445/**
446 * There are 2 cases that are
447 * being tested in this test. This test uses a producer/connection without
448 * compression enabled
449 *
450 * 1. Add a compressed item and expect to stream an uncompressed item
451 * 2. Add an uncompressed item and expect to stream an uncompressed item
452 *
453 */
454TEST_P(CompressionStreamTest, compression_not_enabled) {
455    VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
456    std::string valueData("{\"product\": \"car\",\"price\": \"100\"},"
457                          "{\"product\": \"bus\",\"price\": \"1000\"},"
458                          "{\"product\": \"Train\",\"price\": \"100000\"}");
459    auto item1 = makeCompressibleItem(vbid,
460                                      makeStoredDocKey("key1"),
461                                      valueData,
462                                      PROTOCOL_BINARY_DATATYPE_JSON,
463                                      true, // compressed
464                                      isXattr());
465    auto item2 = makeCompressibleItem(vbid,
466                                      makeStoredDocKey("key2"),
467                                      valueData,
468                                      PROTOCOL_BINARY_DATATYPE_JSON,
469                                      false, // uncompressed
470                                      isXattr());
471
472    auto includeValue = isXattr() ? IncludeValue::No : IncludeValue::Yes;
473    setup_dcp_stream(0, includeValue, IncludeXattrs::Yes);
474
475    /**
476     * Ensure that compression is disabled
477     */
478    ASSERT_FALSE(producer->isCompressionEnabled());
479
480    MockDcpMessageProducers producers(engine);
481
482    // Now, add 2 items
483    EXPECT_EQ(ENGINE_SUCCESS, engine->getKVBucket()->set(*item1, cookie));
484    EXPECT_EQ(ENGINE_SUCCESS, engine->getKVBucket()->set(*item2, cookie));
485
486    auto keyAndSnappyValueMessageSize = getItemSize(*item1);
487
488    /**
489     * Create a DCP response and check that a new item isn't created and that
490     * the size of the response message is greater than the size of the original
491     * message (or equal for xattr stream)
492     */
493    queued_item qi(std::move(item1));
494    std::unique_ptr<DcpResponse> dcpResponse =
495            stream->public_makeResponseFromItem(qi,
496                                                SendCommitSyncWriteAs::Commit);
497    auto mutProdResponse = dynamic_cast<MutationResponse*>(dcpResponse.get());
498    ASSERT_NE(qi.get(), mutProdResponse->getItem().get());
499    if (isXattr()) {
500        // The same sizes. makeResponseFromItem will have inflated and not
501        // compressed as part of the value pruning
502        EXPECT_EQ(keyAndSnappyValueMessageSize, dcpResponse->getMessageSize());
503    } else {
504        EXPECT_LT(keyAndSnappyValueMessageSize, dcpResponse->getMessageSize());
505    }
506
507    EXPECT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status);
508
509    prepareCheckpointItemsForStep(producers, *producer, *vb);
510
511    /* Stream the snapshot marker first */
512    EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
513    EXPECT_EQ(0, producer->getItemsSent());
514
515    /* Stream the first mutation */
516    protocol_binary_datatype_t expectedDataType =
517            isXattr() ? PROTOCOL_BINARY_DATATYPE_XATTR
518                      : PROTOCOL_BINARY_DATATYPE_JSON;
519    EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
520    std::string value(qi->getValue()->getData(), qi->getValue()->valueSize());
521    EXPECT_STREQ(producers.last_value.c_str(), decompressValue(value).c_str());
522
523    if (isXattr()) {
524        // The pruned packet won't be recompressed
525        EXPECT_EQ(producers.last_packet_size, keyAndSnappyValueMessageSize);
526    } else {
527        EXPECT_GT(producers.last_packet_size, keyAndSnappyValueMessageSize);
528    }
529
530    EXPECT_FALSE(mcbp::datatype::is_snappy(producers.last_datatype));
531    EXPECT_EQ(expectedDataType, producers.last_datatype);
532
533    /**
534     * Create a DCP response and check that a new item is created and
535     * the message size is less than the size of original item
536     */
537    uint32_t keyAndValueMessageSize = getItemSize(*item2);
538    qi.reset(std::move(item2));
539    dcpResponse = stream->public_makeResponseFromItem(
540            qi, SendCommitSyncWriteAs::Commit);
541    mutProdResponse = dynamic_cast<MutationResponse*>(dcpResponse.get());
542
543    // A new pruned item will always be generated
544    if (!isXattr()) {
545        ASSERT_EQ(qi.get(), mutProdResponse->getItem().get());
546    }
547    EXPECT_EQ(dcpResponse->getMessageSize(), keyAndValueMessageSize);
548
549    /* Stream the second mutation */
550    EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
551
552    value.assign(qi->getValue()->getData(), qi->getValue()->valueSize());
553    EXPECT_STREQ(value.c_str(), producers.last_value.c_str());
554    EXPECT_EQ(producers.last_packet_size, keyAndValueMessageSize);
555
556    EXPECT_FALSE(mcbp::datatype::is_snappy(producers.last_datatype));
557    EXPECT_EQ(expectedDataType, producers.last_datatype);
558}
559
560/**
561 * Test to verify DCP compression, this test has client snappy enabled
562 *
563 *  - Add a compressed item and expect we stream a compressed item
564 *
565 * Note when the test is running xattr-only DCP, expect we stream an
566 * uncompressed item
567 */
568TEST_P(CompressionStreamTest, connection_snappy_enabled) {
569    VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
570    std::string valueData(
571            "{\"product\": \"car\",\"price\": \"100\"},"
572            "{\"product\": \"bus\",\"price\": \"1000\"},"
573            "{\"product\": \"Train\",\"price\": \"100000\"}");
574
575    auto item = makeCompressibleItem(vbid,
576                                     makeStoredDocKey("key"),
577                                     valueData,
578                                     PROTOCOL_BINARY_DATATYPE_JSON,
579                                     true, // compressed
580                                     isXattr());
581
582    // Enable the snappy datatype on the connection
583    mock_set_datatype_support(cookie, PROTOCOL_BINARY_DATATYPE_SNAPPY);
584
585    auto includeValue = isXattr() ? IncludeValue::No : IncludeValue::Yes;
586    setup_dcp_stream(0, includeValue, IncludeXattrs::Yes);
587
588    EXPECT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status);
589    MockDcpMessageProducers producers(engine);
590    ASSERT_TRUE(producer->isCompressionEnabled());
591
592    // Now, add the 3rd item. This item should be compressed
593    EXPECT_EQ(ENGINE_SUCCESS, engine->getKVBucket()->set(*item, cookie));
594
595    prepareCheckpointItemsForStep(producers, *producer, *vb);
596
597    /* Stream the snapshot marker */
598    ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));
599
600    /* Stream the 3rd mutation */
601    ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));
602
603    /**
604     * Create a DCP response and check that a new item is created and
605     * the message size is greater than the size of original item
606     */
607    auto keyAndSnappyValueMessageSize = getItemSize(*item);
608    queued_item qi = std::move(item);
609    auto dcpResponse = stream->public_makeResponseFromItem(
610            qi, SendCommitSyncWriteAs::Commit);
611    auto* mutProdResponse = dynamic_cast<MutationResponse*>(dcpResponse.get());
612    std::string value;
613    if (!isXattr()) {
614        ASSERT_EQ(qi.get(), mutProdResponse->getItem().get());
615        value.assign(qi->getValue()->getData(), qi->getValue()->valueSize());
616    }
617
618    EXPECT_STREQ(producers.last_value.c_str(), value.c_str());
619    EXPECT_EQ(dcpResponse->getMessageSize(), keyAndSnappyValueMessageSize);
620
621    EXPECT_EQ(producers.last_packet_size, keyAndSnappyValueMessageSize);
622
623    // If xattr-only enabled on DCP, we won't re-compress (after we've
624    // decompressed the document and split out the xattrs)
625    protocol_binary_datatype_t snappy =
626            isXattr() ? 0 : PROTOCOL_BINARY_DATATYPE_SNAPPY;
627    protocol_binary_datatype_t expectedDataType =
628            isXattr() ? PROTOCOL_BINARY_DATATYPE_XATTR
629                      : PROTOCOL_BINARY_DATATYPE_JSON;
630    EXPECT_EQ((expectedDataType | snappy), producers.last_datatype);
631}
632
633/**
634 * Test to verify DCP compression, this test has client snappy enabled
635 *
636 *  - Add an uncompressed item and expect we stream a compressed item
637 */
638TEST_P(CompressionStreamTest, force_value_compression_enabled) {
639    VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
640    std::string valueData(
641            "{\"product\": \"car\",\"price\": \"100\"},"
642            "{\"product\": \"bus\",\"price\": \"1000\"},"
643            "{\"product\": \"Train\",\"price\": \"100000\"}");
644
645    auto item = makeCompressibleItem(vbid,
646                                     makeStoredDocKey("key"),
647                                     valueData,
648                                     PROTOCOL_BINARY_DATATYPE_JSON,
649                                     false, // not compressed
650                                     isXattr());
651
652    // Enable the snappy datatype on the connection
653    mock_set_datatype_support(cookie, PROTOCOL_BINARY_DATATYPE_SNAPPY);
654    auto includeValue = isXattr() ? IncludeValue::No : IncludeValue::Yes;
655
656    // Setup the producer/stream and request force_value_compression
657    setup_dcp_stream(0,
658                     includeValue,
659                     IncludeXattrs::Yes,
660                     {{"force_value_compression", "true"}});
661
662    EXPECT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status);
663    MockDcpMessageProducers producers(engine);
664
665    ASSERT_TRUE(producer->isForceValueCompressionEnabled());
666
667    // Now, add the 4th item, which is not compressed
668    EXPECT_EQ(ENGINE_SUCCESS, engine->getKVBucket()->set(*item, cookie));
669    /**
670     * Create a DCP response and check that a new item is created and
671     * the message size is less than the size of the original item
672     */
673    auto keyAndValueMessageSize = getItemSize(*item);
674    queued_item qi = std::move(item);
675    auto dcpResponse = stream->public_makeResponseFromItem(
676            qi, SendCommitSyncWriteAs::Commit);
677    auto* mutProdResponse = dynamic_cast<MutationResponse*>(dcpResponse.get());
678    ASSERT_NE(qi.get(), mutProdResponse->getItem().get());
679    EXPECT_LT(dcpResponse->getMessageSize(), keyAndValueMessageSize);
680
681    prepareCheckpointItemsForStep(producers, *producer, *vb);
682
683    /* Stream the snapshot marker */
684    ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));
685
686    /* Stream the mutation */
687    ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));
688    std::string value(qi->getValue()->getData(), qi->getValue()->valueSize());
689    EXPECT_STREQ(decompressValue(producers.last_value).c_str(), value.c_str());
690    EXPECT_LT(producers.last_packet_size, keyAndValueMessageSize);
691
692    protocol_binary_datatype_t expectedDataType =
693            isXattr() ? PROTOCOL_BINARY_DATATYPE_XATTR
694                      : PROTOCOL_BINARY_DATATYPE_JSON;
695    EXPECT_EQ((expectedDataType | PROTOCOL_BINARY_DATATYPE_SNAPPY),
696              producers.last_datatype);
697
698    destroy_dcp_stream();
699}
700
701class ConnectionTest : public DCPTest,
702                       public ::testing::WithParamInterface<
703                               std::tuple<std::string, std::string>> {
704protected:
705    void SetUp() override {
706        bucketType = std::get<0>(GetParam());
707        DCPTest::SetUp();
708        vbid = Vbid(0);
709        if (bucketType == "ephemeral") {
710            engine->getConfiguration().setEphemeralFullPolicy(
711                    std::get<1>(GetParam()));
712        }
713    }
714
715    ENGINE_ERROR_CODE set_vb_state(Vbid vbid, vbucket_state_t state) {
716        return engine->getKVBucket()->setVBucketState(
717                vbid, state, {}, TransferVB::Yes);
718    }
719
720    /**
721     * Creates a consumer conn and sends items on the conn with memory usage
722     * near to replication threshold
723     *
724     * @param beyondThreshold indicates if the memory usage should above the
725     *                        threshold or just below it
726     */
727    void sendConsumerMutationsNearThreshold(bool beyondThreshold);
728
729    /**
730     * Creates a consumer conn and makes the consumer processor task run with
731     * memory usage near to replication threshold
732     *
733     * @param beyondThreshold indicates if the memory usage should above the
734     *                        threshold or just below it
735     */
736    void processConsumerMutationsNearThreshold(bool beyondThreshold);
737
738    /**
739     * @param producerState Are we simulating a negotiation against a Producer
740     *  that enables IncludeDeletedUserXattrs?
741     */
742    void testConsumerNegotiatesIncludeDeletedUserXattrs(
743            IncludeDeletedUserXattrs producerState);
744
745    /**
746     * @param producerState Are we simulating a negotiation against a Producer
747     *  that enables IncludeDeletedUserXattrs?
748     */
749    void testProducerNegotiatesIncludeDeletedUserXattrs(
750            IncludeDeletedUserXattrs producerState);
751
752    /* vbucket associated with this connection */
753    Vbid vbid;
754};
755
756/*
757 * Test that the connection manager interval is a multiple of the value we
758 * are setting the noop interval to.  This ensures we do not set the the noop
759 * interval to a value that cannot be adhered to.  The reason is that if there
760 * is no DCP traffic we snooze for the connection manager interval before
761 * sending the noop.
762 */
763TEST_P(ConnectionTest, test_mb19955) {
764    const void* cookie = create_mock_cookie();
765    engine->getConfiguration().setConnectionManagerInterval(2);
766
767    // Create a Mock Dcp producer
768    auto producer = std::make_shared<MockDcpProducer>(*engine,
769                                                      cookie,
770                                                      "test_producer",
771                                                      /*flags*/ 0);
772    // "1" is not a multiple of "2" and so we should return ENGINE_EINVAL
773    EXPECT_EQ(ENGINE_EINVAL, producer->control(0, "set_noop_interval", "1"))
774            << "Expected producer.control to return ENGINE_EINVAL";
775    destroy_mock_cookie(cookie);
776}
777
778TEST_P(ConnectionTest, test_maybesendnoop_buffer_full) {
779    const void* cookie = create_mock_cookie();
780    // Create a Mock Dcp producer
781    auto producer = std::make_shared<MockDcpProducer>(*engine,
782                                                      cookie,
783                                                      "test_producer",
784                                                      /*flags*/ 0);
785
786    class MockE2BigMessageProducers : public MockDcpMessageProducers {
787    public:
788        ENGINE_ERROR_CODE noop(uint32_t) override {
789            return ENGINE_E2BIG;
790        }
791
792    } producers;
793
794    producer->setNoopEnabled(true);
795    const auto send_time = ep_current_time() + 21;
796    producer->setNoopSendTime(send_time);
797    ENGINE_ERROR_CODE ret = producer->maybeSendNoop(&producers);
798    EXPECT_EQ(ENGINE_E2BIG, ret)
799    << "maybeSendNoop not returning ENGINE_E2BIG";
800    EXPECT_FALSE(producer->getNoopPendingRecv())
801            << "Waiting for noop acknowledgement";
802    EXPECT_EQ(send_time, producer->getNoopSendTime())
803            << "SendTime has been updated";
804    producer->cancelCheckpointCreatorTask();
805    destroy_mock_cookie(cookie);
806}
807
808TEST_P(ConnectionTest, test_maybesendnoop_send_noop) {
809    const void* cookie = create_mock_cookie();
810    // Create a Mock Dcp producer
811    auto producer = std::make_shared<MockDcpProducer>(*engine,
812                                                      cookie,
813                                                      "test_producer",
814                                                      /*flags*/ 0);
815
816    MockDcpMessageProducers producers(handle);
817    producer->setNoopEnabled(true);
818    const auto send_time = ep_current_time() + 21;
819    producer->setNoopSendTime(send_time);
820    ENGINE_ERROR_CODE ret = producer->maybeSendNoop(&producers);
821    EXPECT_EQ(ENGINE_SUCCESS, ret)
822            << "maybeSendNoop not returning ENGINE_SUCCESS";
823    EXPECT_TRUE(producer->getNoopPendingRecv())
824            << "Not waiting for noop acknowledgement";
825    EXPECT_NE(send_time, producer->getNoopSendTime())
826            << "SendTime has not been updated";
827    producer->cancelCheckpointCreatorTask();
828    destroy_mock_cookie(cookie);
829}
830
831TEST_P(ConnectionTest, test_maybesendnoop_noop_already_pending) {
832    const void* cookie = create_mock_cookie();
833    // Create a Mock Dcp producer
834    auto producer = std::make_shared<MockDcpProducer>(*engine,
835                                                      cookie,
836                                                      "test_producer",
837                                                      /*flags*/ 0);
838
839    MockDcpMessageProducers producers(engine);
840    const auto send_time = ep_current_time();
841    TimeTraveller marty(engine->getConfiguration().getDcpIdleTimeout() + 1);
842    producer->setNoopEnabled(true);
843    producer->setNoopSendTime(send_time);
844    ENGINE_ERROR_CODE ret = producer->maybeSendNoop(&producers);
845    // Check to see if a noop was sent i.e. returned ENGINE_SUCCESS
846    EXPECT_EQ(ENGINE_SUCCESS, ret)
847            << "maybeSendNoop not returning ENGINE_SUCCESS";
848    EXPECT_TRUE(producer->getNoopPendingRecv())
849            << "Not awaiting noop acknowledgement";
850    EXPECT_NE(send_time, producer->getNoopSendTime())
851            << "SendTime has not been updated";
852    ret = producer->maybeSendNoop(&producers);
853    // Check to see if a noop was not sent i.e. returned ENGINE_FAILED
854    EXPECT_EQ(ENGINE_FAILED, ret)
855        << "maybeSendNoop not returning ENGINE_FAILED";
856    producer->setLastReceiveTime(send_time);
857    ret = producer->maybeDisconnect();
858    // Check to see if we want to disconnect i.e. returned ENGINE_DISCONNECT
859    EXPECT_EQ(ENGINE_DISCONNECT, ret)
860        << "maybeDisconnect not returning ENGINE_DISCONNECT";
861    producer->setLastReceiveTime(
862            send_time + engine->getConfiguration().getDcpIdleTimeout() + 1);
863    ret = producer->maybeDisconnect();
864    // Check to see if we don't want to disconnect i.e. returned ENGINE_FAILED
865    EXPECT_EQ(ENGINE_FAILED, ret)
866        << "maybeDisconnect not returning ENGINE_FAILED";
867    EXPECT_TRUE(producer->getNoopPendingRecv())
868            << "Not waiting for noop acknowledgement";
869    producer->cancelCheckpointCreatorTask();
870    destroy_mock_cookie(cookie);
871}
872
873TEST_P(ConnectionTest, test_maybesendnoop_not_enabled) {
874    const void* cookie = create_mock_cookie();
875    // Create a Mock Dcp producer
876    auto producer = std::make_shared<MockDcpProducer>(*engine,
877                                                      cookie,
878                                                      "test_producer",
879                                                      /*flags*/ 0);
880
881    MockDcpMessageProducers producers(handle);
882    producer->setNoopEnabled(false);
883    const auto send_time = ep_current_time() + 21;
884    producer->setNoopSendTime(send_time);
885    ENGINE_ERROR_CODE ret = producer->maybeSendNoop(&producers);
886    EXPECT_EQ(ENGINE_FAILED, ret)
887    << "maybeSendNoop not returning ENGINE_FAILED";
888    EXPECT_FALSE(producer->getNoopPendingRecv())
889            << "Waiting for noop acknowledgement";
890    EXPECT_EQ(send_time, producer->getNoopSendTime())
891            << "SendTime has been updated";
892    producer->cancelCheckpointCreatorTask();
893    destroy_mock_cookie(cookie);
894}
895
896TEST_P(ConnectionTest, test_maybesendnoop_not_sufficient_time_passed) {
897    const void* cookie = create_mock_cookie();
898    // Create a Mock Dcp producer
899    auto producer = std::make_shared<MockDcpProducer>(*engine,
900                                                      cookie,
901                                                      "test_producer",
902                                                      /*flags*/ 0);
903
904    MockDcpMessageProducers producers(handle);
905    producer->setNoopEnabled(true);
906    rel_time_t current_time = ep_current_time();
907    producer->setNoopSendTime(current_time);
908    ENGINE_ERROR_CODE ret = producer->maybeSendNoop(&producers);
909    EXPECT_EQ(ENGINE_FAILED, ret)
910    << "maybeSendNoop not returning ENGINE_FAILED";
911    EXPECT_FALSE(producer->getNoopPendingRecv())
912            << "Waiting for noop acknowledgement";
913    EXPECT_EQ(current_time, producer->getNoopSendTime())
914            << "SendTime has been incremented";
915    producer->cancelCheckpointCreatorTask();
916    destroy_mock_cookie(cookie);
917}
918
919TEST_P(ConnectionTest, test_deadConnections) {
920    MockDcpConnMap connMap(*engine);
921    connMap.initialize();
922    const void *cookie = create_mock_cookie();
923    // Create a new Dcp producer
924    connMap.newProducer(cookie,
925                        "test_producer",
926                        /*flags*/ 0);
927
928    // Disconnect the producer connection
929    connMap.disconnect(cookie);
930    EXPECT_EQ(1, connMap.getNumberOfDeadConnections())
931        << "Unexpected number of dead connections";
932    connMap.manageConnections();
933    // Should be zero deadConnections
934    EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
935        << "Dead connections still remain";
936}
937
938TEST_P(ConnectionTest, test_mb23637_findByNameWithConnectionDoDisconnect) {
939    MockDcpConnMap connMap(*engine);
940    connMap.initialize();
941    const void *cookie = create_mock_cookie();
942    // Create a new Dcp producer
943    connMap.newProducer(cookie,
944                        "test_producer",
945                        /*flags*/ 0);
946    // should be able to find the connection
947    ASSERT_NE(nullptr, connMap.findByName("eq_dcpq:test_producer"));
948    // Disconnect the producer connection
949    connMap.disconnect(cookie);
950    ASSERT_EQ(1, connMap.getNumberOfDeadConnections())
951        << "Unexpected number of dead connections";
952    // should not be able to find because the connection has been marked as
953    // wanting to disconnect
954    EXPECT_EQ(nullptr, connMap.findByName("eq_dcpq:test_producer"));
955    connMap.manageConnections();
956    // Should be zero deadConnections
957    EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
958        << "Dead connections still remain";
959}
960
961TEST_P(ConnectionTest, test_mb23637_findByNameWithDuplicateConnections) {
962    MockDcpConnMap connMap(*engine);
963    connMap.initialize();
964    const void* cookie1 = create_mock_cookie();
965    const void* cookie2 = create_mock_cookie();
966    // Create a new Dcp producer
967    DcpProducer* producer = connMap.newProducer(cookie1,
968                                                "test_producer",
969                                                /*flags*/ 0);
970    ASSERT_NE(nullptr, producer) << "producer is null";
971    // should be able to find the connection
972    ASSERT_NE(nullptr, connMap.findByName("eq_dcpq:test_producer"));
973
974    // Create a duplicate Dcp producer
975    DcpProducer* duplicateproducer =
976            connMap.newProducer(cookie2, "test_producer", /*flags*/ 0);
977    ASSERT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
978    ASSERT_NE(nullptr, duplicateproducer) << "duplicateproducer is null";
979
980    // should find the duplicateproducer as the first producer has been marked
981    // as wanting to disconnect
982    EXPECT_EQ(duplicateproducer,
983              connMap.findByName("eq_dcpq:test_producer").get());
984
985    // Disconnect the producer connection
986    connMap.disconnect(cookie1);
987    // Disconnect the duplicateproducer connection
988    connMap.disconnect(cookie2);
989    EXPECT_EQ(2, connMap.getNumberOfDeadConnections())
990        << "Unexpected number of dead connections";
991
992    connMap.manageConnections();
993    // Should be zero deadConnections
994    EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
995        << "Dead connections still remain";
996}
997
998
999TEST_P(ConnectionTest, test_mb17042_duplicate_name_producer_connections) {
1000    MockDcpConnMap connMap(*engine);
1001    connMap.initialize();
1002    const void* cookie1 = create_mock_cookie();
1003    const void* cookie2 = create_mock_cookie();
1004    // Create a new Dcp producer
1005    DcpProducer* producer = connMap.newProducer(cookie1,
1006                                                "test_producer",
1007                                                /*flags*/ 0);
1008    EXPECT_NE(nullptr, producer) << "producer is null";
1009
1010    // Create a duplicate Dcp producer
1011    DcpProducer* duplicateproducer = connMap.newProducer(cookie2,
1012                                                         "test_producer",
1013                                                         /*flags*/ 0);
1014    EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
1015    EXPECT_NE(nullptr, duplicateproducer) << "duplicateproducer is null";
1016
1017    // Disconnect the producer connection
1018    connMap.disconnect(cookie1);
1019    // Disconnect the duplicateproducer connection
1020    connMap.disconnect(cookie2);
1021    // Cleanup the deadConnections
1022    connMap.manageConnections();
1023    // Should be zero deadConnections
1024    EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
1025        << "Dead connections still remain";
1026}
1027
1028TEST_P(ConnectionTest, test_mb17042_duplicate_name_consumer_connections) {
1029    MockDcpConnMap connMap(*engine);
1030    connMap.initialize();
1031    auto* cookie1 = (struct MockCookie*)create_mock_cookie();
1032    auto* cookie2 = (struct MockCookie*)create_mock_cookie();
1033    // Create a new Dcp consumer
1034    DcpConsumer* consumer = connMap.newConsumer(cookie1, "test_consumer");
1035    EXPECT_NE(nullptr, consumer) << "consumer is null";
1036
1037    // Create a duplicate Dcp consumer
1038    DcpConsumer* duplicateconsumer =
1039            connMap.newConsumer(cookie2, "test_consumer");
1040    EXPECT_TRUE(consumer->doDisconnect()) << "consumer doDisconnect == false";
1041    EXPECT_NE(nullptr, duplicateconsumer) << "duplicateconsumer is null";
1042
1043    // Disconnect the consumer connection
1044    connMap.disconnect(cookie1);
1045    // Disconnect the duplicateconsumer connection
1046    connMap.disconnect(cookie2);
1047    // Cleanup the deadConnections
1048    connMap.manageConnections();
1049    // Should be zero deadConnections
1050    EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
1051        << "Dead connections still remain";
1052}
1053
1054TEST_P(ConnectionTest, test_mb17042_duplicate_cookie_producer_connections) {
1055    MockDcpConnMap connMap(*engine);
1056    connMap.initialize();
1057    const void* cookie = create_mock_cookie();
1058    // Create a new Dcp producer
1059    DcpProducer* producer = connMap.newProducer(cookie,
1060                                                "test_producer1",
1061                                                /*flags*/ 0);
1062
1063    // Create a duplicate Dcp producer
1064    DcpProducer* duplicateproducer = connMap.newProducer(cookie,
1065                                                         "test_producer2",
1066                                                         /*flags*/ 0);
1067
1068    EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
1069    EXPECT_EQ(nullptr, duplicateproducer) << "duplicateproducer is not null";
1070
1071    // Disconnect the producer connection
1072    connMap.disconnect(cookie);
1073    // Cleanup the deadConnections
1074    connMap.manageConnections();
1075    // Should be zero deadConnections
1076    EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
1077        << "Dead connections still remain";
1078}
1079
1080/* Checks that the DCP producer does an async stream close when the DCP client
1081   expects "DCP_STREAM_END" msg. */
1082TEST_P(ConnectionTest, test_producer_stream_end_on_client_close_stream) {
1083#ifdef UNDEFINED_SANITIZER
1084    // See below MB-28739 comment for why this is skipped.
1085    std::cerr << "MB-28739[UBsan] skipping test\n";
1086    return;
1087#endif
1088    const void* cookie = create_mock_cookie();
1089    /* Create a new Dcp producer */
1090    producer = std::make_shared<MockDcpProducer>(*engine,
1091                                                 cookie,
1092                                                 "test_producer",
1093                                                 /*flags*/ 0);
1094
1095    /* Send a control message to the producer indicating that the DCP client
1096       expects a "DCP_STREAM_END" upon stream close */
1097    const std::string sendStreamEndOnClientStreamCloseCtrlMsg(
1098            "send_stream_end_on_client_close_stream");
1099    const std::string sendStreamEndOnClientStreamCloseCtrlValue("true");
1100    EXPECT_EQ(ENGINE_SUCCESS,
1101              producer->control(0,
1102                                sendStreamEndOnClientStreamCloseCtrlMsg,
1103                                sendStreamEndOnClientStreamCloseCtrlValue));
1104
1105    /* Open stream */
1106    EXPECT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status);
1107
1108    // MB-28739[UBSan]: The following cast is undefined behaviour - the DCP
1109    // connection map object is of type DcpConnMap; so it's undefined to cast
1110    // to MockDcpConnMap.
1111    // However, in this instance MockDcpConnMap has identical member variables
1112    // to DcpConnMap - the mock just exposes normally private data - and so
1113    // this /seems/ ok.
1114    // As such allow it in general, but skip this test under UBSan.
1115    MockDcpConnMap& mockConnMap =
1116            static_cast<MockDcpConnMap&>(engine->getDcpConnMap());
1117    mockConnMap.addConn(cookie, producer);
1118    EXPECT_TRUE(mockConnMap.doesConnHandlerExist(vbid, "test_producer"));
1119
1120    /* Close stream */
1121    EXPECT_EQ(ENGINE_SUCCESS, producer->closeStream(0, vbid));
1122
1123    /* Expect a stream end message */
1124    MockDcpMessageProducers producers(handle);
1125    EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
1126    EXPECT_EQ(cb::mcbp::ClientOpcode::DcpStreamEnd, producers.last_op);
1127    EXPECT_EQ(END_STREAM_CLOSED, producers.last_flags);
1128
1129    /* Re-open stream for the same vbucket on the conn */
1130    EXPECT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status);
1131
1132    /* Check that the new stream is opened properly */
1133    auto stream = producer->findStream(vbid);
1134    auto* as = static_cast<ActiveStream*>(stream.get());
1135    EXPECT_TRUE(as->isInMemory());
1136
1137    // MB-27769: Prior to the fix, this would fail here because we would skip
1138    // adding the connhandler into the connmap vbConns vector, causing the
1139    // stream to never get notified.
1140    EXPECT_TRUE(mockConnMap.doesConnHandlerExist(vbid, "test_producer"));
1141
1142    mockConnMap.disconnect(cookie);
1143    EXPECT_FALSE(mockConnMap.doesConnHandlerExist(vbid, "test_producer"));
1144    mockConnMap.manageConnections();
1145}
1146
1147/* Checks that the DCP producer does a synchronous stream close when the DCP
1148   client does not expect "DCP_STREAM_END" msg. */
1149TEST_P(ConnectionTest, test_producer_no_stream_end_on_client_close_stream) {
1150    MockDcpConnMap connMap(*engine);
1151    connMap.initialize();
1152    const void* cookie = create_mock_cookie();
1153
1154    /* Create a new Dcp producer */
1155    DcpProducer* producer = connMap.newProducer(cookie,
1156                                                "test_producer",
1157                                                /*flags*/ 0);
1158
1159    /* Open stream */
1160    EXPECT_EQ(ENGINE_SUCCESS, doStreamRequest(*producer).status);
1161
1162    /* Close stream */
1163    EXPECT_EQ(ENGINE_SUCCESS, producer->closeStream(0, vbid));
1164
1165    /* Don't expect a stream end message (or any other message as the stream is
1166       closed) */
1167    MockDcpMessageProducers producers(handle);
1168    EXPECT_EQ(ENGINE_EWOULDBLOCK, producer->step(&producers));
1169
1170    /* Check that the stream is not found in the producer's stream map */
1171    EXPECT_TRUE(producer->findStreams(vbid)->wlock().empty());
1172
1173    /* Disconnect the producer connection */
1174    connMap.disconnect(cookie);
1175    /* Cleanup the deadConnections */
1176    connMap.manageConnections();
1177}
1178
1179TEST_P(ConnectionTest, test_producer_unknown_ctrl_msg) {
1180    const void* cookie = create_mock_cookie();
1181    /* Create a new Dcp producer */
1182    auto producer = std::make_shared<MockDcpProducer>(*engine,
1183                                                      cookie,
1184                                                      "test_producer",
1185                                                      /*flags*/ 0);
1186
1187    /* Send an unkown control message to the producer and expect an error code
1188       of "ENGINE_EINVAL" */
1189    const std::string unkownCtrlMsg("unknown");
1190    const std::string unkownCtrlValue("blah");
1191    EXPECT_EQ(ENGINE_EINVAL,
1192              producer->control(0, unkownCtrlMsg, unkownCtrlValue));
1193    destroy_mock_cookie(cookie);
1194}
1195
1196TEST_P(ConnectionTest, test_mb17042_duplicate_cookie_consumer_connections) {
1197    MockDcpConnMap connMap(*engine);
1198    connMap.initialize();
1199    const void* cookie = create_mock_cookie();
1200    // Create a new Dcp consumer
1201    DcpConsumer* consumer = connMap.newConsumer(cookie, "test_consumer1");
1202
1203    // Create a duplicate Dcp consumer
1204    DcpConsumer* duplicateconsumer =
1205            connMap.newConsumer(cookie, "test_consumer2");
1206    EXPECT_TRUE(consumer->doDisconnect()) << "consumer doDisconnect == false";
1207    EXPECT_EQ(nullptr, duplicateconsumer) << "duplicateconsumer is not null";
1208
1209    // Disconnect the consumer connection
1210    connMap.disconnect(cookie);
1211    // Cleanup the deadConnections
1212    connMap.manageConnections();
1213    // Should be zero deadConnections
1214    EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
1215        << "Dead connections still remain";
1216}
1217
1218TEST_P(ConnectionTest, test_update_of_last_message_time_in_consumer) {
1219    const void* cookie = create_mock_cookie();
1220    Vbid vbid(0);
1221    // Create a Mock Dcp consumer
1222    auto consumer =
1223            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
1224    consumer->setLastMessageTime(1234);
1225    consumer->addStream(/*opaque*/ 0, vbid, /*flags*/ 0);
1226    EXPECT_NE(1234, consumer->getLastMessageTime())
1227        << "lastMessagerTime not updated for addStream";
1228    consumer->setLastMessageTime(1234);
1229    consumer->closeStream(/*opaque*/ 0, vbid);
1230    EXPECT_NE(1234, consumer->getLastMessageTime())
1231        << "lastMessagerTime not updated for closeStream";
1232    consumer->setLastMessageTime(1234);
1233    consumer->streamEnd(/*opaque*/ 0, vbid, /*flags*/ 0);
1234    EXPECT_NE(1234, consumer->getLastMessageTime())
1235        << "lastMessagerTime not updated for streamEnd";
1236    const DocKey docKey{nullptr, 0, DocKeyEncodesCollectionId::No};
1237    consumer->mutation(0, // opaque
1238                       docKey,
1239                       {}, // value
1240                       0, // priv bytes
1241                       PROTOCOL_BINARY_RAW_BYTES,
1242                       0, // cas
1243                       vbid, // vbucket
1244                       0, // flags
1245                       0, // locktime
1246                       0, // by seqno
1247                       0, // rev seqno
1248                       0, // exptime
1249                       {}, // meta
1250                       0); // nru
1251    EXPECT_NE(1234, consumer->getLastMessageTime())
1252        << "lastMessagerTime not updated for mutation";
1253    consumer->setLastMessageTime(1234);
1254    consumer->deletion(0, // opaque
1255                       docKey,
1256                       {}, // value
1257                       0, // priv bytes
1258                       PROTOCOL_BINARY_RAW_BYTES,
1259                       0, // cas
1260                       vbid, // vbucket
1261                       0, // by seqno
1262                       0, // rev seqno
1263                       {}); // meta
1264    EXPECT_NE(1234, consumer->getLastMessageTime())
1265        << "lastMessagerTime not updated for deletion";
1266    consumer->setLastMessageTime(1234);
1267    consumer->expiration(0, // opaque
1268                         docKey,
1269                         {}, // value
1270                         0, // priv bytes
1271                         PROTOCOL_BINARY_RAW_BYTES,
1272                         0, // cas
1273                         vbid, // vbucket
1274                         0, // by seqno
1275                         0, // rev seqno
1276                         {}); // meta
1277    EXPECT_NE(1234, consumer->getLastMessageTime())
1278        << "lastMessagerTime not updated for expiration";
1279    consumer->setLastMessageTime(1234);
1280    consumer->snapshotMarker(/*opaque*/ 0,
1281                             vbid,
1282                             /*start_seqno*/ 0,
1283                             /*end_seqno*/ 0,
1284                             /*flags*/ 0,
1285                             /*HCS*/ {},
1286                             /*maxVisibleSeqno*/ {});
1287    EXPECT_NE(1234, consumer->getLastMessageTime())
1288        << "lastMessagerTime not updated for snapshotMarker";
1289    consumer->setLastMessageTime(1234);
1290    consumer->noop(/*opaque*/0);
1291    EXPECT_NE(1234, consumer->getLastMessageTime())
1292        << "lastMessagerTime not updated for noop";
1293    consumer->setLastMessageTime(1234);
1294    consumer->setVBucketState(/*opaque*/ 0,
1295                              vbid,
1296                              /*state*/ vbucket_state_active);
1297    EXPECT_NE(1234, consumer->getLastMessageTime())
1298        << "lastMessagerTime not updated for setVBucketState";
1299    destroy_mock_cookie(cookie);
1300}
1301
1302TEST_P(ConnectionTest, test_consumer_add_stream) {
1303    const void* cookie = create_mock_cookie();
1304    Vbid vbid = Vbid(0);
1305
1306    /* Create a Mock Dcp consumer */
1307    auto consumer =
1308            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
1309
1310    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
1311    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
1312                                                  /*flags*/0));
1313
1314    /* Set the passive to dead state. Note that we want to set the stream to
1315       dead state but not erase it from the streams map in the consumer
1316       connection*/
1317    MockPassiveStream *stream = static_cast<MockPassiveStream*>
1318                                    ((consumer->getVbucketStream(vbid)).get());
1319
1320    stream->transitionStateToDead();
1321
1322    /* Add a passive stream on the same vb */
1323    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
1324                                                  /*flags*/0));
1325
1326    /* Expected the newly added stream to be in active state */
1327    stream = static_cast<MockPassiveStream*>
1328                                    ((consumer->getVbucketStream(vbid)).get());
1329    ASSERT_TRUE(stream->isActive());
1330
1331    /* Close stream before deleting the connection */
1332    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
1333
1334    destroy_mock_cookie(cookie);
1335}
1336
1337TEST_P(ConnectionTest, consumer_waits_for_add_stream) {
1338    const void* cookie = create_mock_cookie();
1339    MockDcpMessageProducers producers(engine);
1340    MockDcpConsumer consumer(*engine, cookie, "test_consumer");
1341    ASSERT_EQ(ENGINE_EWOULDBLOCK, consumer.step(&producers));
1342    // fake that we received add stream
1343    consumer.setPendingAddStream(false);
1344    ASSERT_EQ(ENGINE_SUCCESS, consumer.step(&producers));
1345
1346    destroy_mock_cookie(cookie);
1347}
1348
1349TEST_P(ConnectionTest, consumer_get_error_map) {
1350    // We want to test that the Consumer processes the GetErrorMap negotiation
1351    // with the Producer correctly. I.e., the Consumer must check the
1352    // Producer's version and set internal flags accordingly.
1353    // Note: we test both the cases of pre-5.0.0 and post-5.0.0 Producer
1354    for (auto prodIsV5orHigher : {true, false}) {
1355        const void* cookie = create_mock_cookie();
1356        // GetErrorMap negotiation performed only if NOOP is enabled
1357        engine->getConfiguration().setDcpEnableNoop(true);
1358        MockDcpMessageProducers producers(engine);
1359
1360        // Create a mock DcpConsumer
1361        MockDcpConsumer consumer(*engine, cookie, "test_consumer");
1362        consumer.setPendingAddStream(false);
1363        ASSERT_EQ(1 /*PendingRequest*/,
1364                  static_cast<uint8_t>(consumer.getGetErrorMapState()));
1365        ASSERT_EQ(false, consumer.getProducerIsVersion5orHigher());
1366
1367        // If a Flow Control Policy is enabled, then the first call to step()
1368        // will handle the Flow Control negotiation. We do not want to test that
1369        // here, so this is just to let the test to work with all EP
1370        // configurations.
1371        if (engine->getConfiguration().getDcpFlowControlPolicy() != "none") {
1372            ASSERT_EQ(ENGINE_SUCCESS, consumer.step(&producers));
1373        }
1374
1375        // The next call to step() is expected to start the GetErrorMap
1376        // negotiation
1377        ASSERT_EQ(ENGINE_SUCCESS, consumer.step(&producers));
1378        ASSERT_EQ(2 /*PendingResponse*/,
1379                  static_cast<uint8_t>(consumer.getGetErrorMapState()));
1380
1381        // At this point the consumer is waiting for a response from the
1382        // producer. I simulate the producer's response with a call to
1383        // handleResponse()
1384        protocol_binary_response_header resp{};
1385        resp.response.setMagic(cb::mcbp::Magic::ClientResponse);
1386        resp.response.setOpcode(cb::mcbp::ClientOpcode::GetErrorMap);
1387        resp.response.setStatus(
1388                prodIsV5orHigher
1389                        ? cb::mcbp::Status::Success
1390                        : cb::mcbp::Status::UnknownCommand);
1391        ASSERT_TRUE(consumer.handleResponse(&resp));
1392        ASSERT_EQ(0 /*Skip*/,
1393                  static_cast<uint8_t>(consumer.getGetErrorMapState()));
1394        ASSERT_EQ(prodIsV5orHigher ? true : false,
1395                  consumer.getProducerIsVersion5orHigher());
1396
1397        destroy_mock_cookie(cookie);
1398    }
1399}
1400
1401// Regression test for MB 20645 - ensure that a call to addStats after a
1402// connection has been disconnected (and closeAllStreams called) doesn't crash.
1403TEST_P(ConnectionTest, test_mb20645_stats_after_closeAllStreams) {
1404    MockDcpConnMap connMap(*engine);
1405    connMap.initialize();
1406    const void *cookie = create_mock_cookie();
1407    // Create a new Dcp producer
1408    DcpProducer* producer = connMap.newProducer(cookie,
1409                                                "test_producer",
1410                                                /*flags*/ 0);
1411
1412    // Disconnect the producer connection
1413    connMap.disconnect(cookie);
1414
1415    // Try to read stats. Shouldn't crash.
1416    producer->addStats([](const char* key,
1417                          const uint16_t klen,
1418                          const char* val,
1419                          const uint32_t vlen,
1420                          gsl::not_null<const void*> cookie) {},
1421                       // Cookie is not being used in the callback, but the
1422                       // API requires it. Pass in the producer as cookie
1423                       static_cast<const void*>(producer));
1424
1425    destroy_mock_cookie(cookie);
1426}
1427
1428// Verify that when a DELETE_BUCKET event occurs, we correctly notify any
1429// DCP connections which are currently in ewouldblock state, so the frontend
1430// can correctly close the connection.
1431// If we don't notify then front-end connections can hang for a long period of
1432// time).
1433TEST_P(ConnectionTest, test_mb20716_connmap_notify_on_delete) {
1434    MockDcpConnMap connMap(*engine);
1435    connMap.initialize();
1436    const void *cookie = create_mock_cookie();
1437    // Create a new Dcp producer.
1438    DcpProducer* producer = connMap.newProducer(cookie,
1439                                                "mb_20716r",
1440                                                /*flags*/ 0);
1441
1442    // Check preconditions.
1443    EXPECT_TRUE(producer->isPaused());
1444
1445    // Hook into notify_io_complete.
1446    // We (ab)use the engine_specific API to pass a pointer to a count of
1447    // how many times notify_io_complete has been called.
1448    size_t notify_count = 0;
1449    class MockServerCookieApi : public WrappedServerCookieIface {
1450    public:
1451        void notify_io_complete(gsl::not_null<const void*> cookie,
1452                                ENGINE_ERROR_CODE status) override {
1453            auto* notify_ptr = reinterpret_cast<size_t*>(
1454                    wrapped->get_engine_specific(cookie));
1455            (*notify_ptr)++;
1456        }
1457    } scapi;
1458
1459    scapi.store_engine_specific(cookie, &notify_count);
1460
1461    // 0. Should start with no notifications.
1462    ASSERT_EQ(0, notify_count);
1463
1464    // 1. Check that the periodic connNotifier (processPendingNotifications)
1465    // isn't sufficient to notify (it shouldn't be, as our connection has
1466    // no notification pending).
1467    connMap.processPendingNotifications();
1468    ASSERT_EQ(0, notify_count);
1469
1470    // 1. Simulate a bucket deletion.
1471    connMap.shutdownAllConnections();
1472
1473    // Can also get a second notify as part of manageConnections being called
1474    // in shutdownAllConnections().
1475    EXPECT_GE(notify_count, 1)
1476        << "expected at least one notify after shutting down all connections";
1477
1478    // Restore notify_io_complete callback.
1479    destroy_mock_cookie(cookie);
1480}
1481
1482// Consumer variant of above test.
1483TEST_P(ConnectionTest, test_mb20716_connmap_notify_on_delete_consumer) {
1484    MockDcpConnMap connMap(*engine);
1485    connMap.initialize();
1486    const void *cookie = create_mock_cookie();
1487    // Create a new Dcp consumer
1488    auto& consumer = dynamic_cast<MockDcpConsumer&>(
1489            *connMap.newConsumer(cookie, "mb_20716_consumer"));
1490    consumer.setPendingAddStream(false);
1491
1492    // Move consumer into paused state (aka EWOULDBLOCK).
1493    MockDcpMessageProducers producers(handle);
1494    ENGINE_ERROR_CODE result;
1495    do {
1496        result = consumer.step(&producers);
1497        handleProducerResponseIfStepBlocked(consumer, producers);
1498    } while (result == ENGINE_SUCCESS);
1499    EXPECT_EQ(ENGINE_EWOULDBLOCK, result);
1500
1501    // Check preconditions.
1502    EXPECT_TRUE(consumer.isPaused());
1503
1504    // Hook into notify_io_complete.
1505    // We (ab)use the engine_specific API to pass a pointer to a count of
1506    // how many times notify_io_complete has been called.
1507    size_t notify_count = 0;
1508
1509    class MockServerCookieApi : public WrappedServerCookieIface {
1510    public:
1511        void notify_io_complete(gsl::not_null<const void*> cookie,
1512                                ENGINE_ERROR_CODE status) override {
1513            auto* notify_ptr = reinterpret_cast<size_t*>(
1514                    get_mock_server_api()->cookie->get_engine_specific(cookie));
1515            (*notify_ptr)++;
1516        }
1517    } scapi;
1518
1519    scapi.store_engine_specific(cookie, &notify_count);
1520
1521    // 0. Should start with no notifications.
1522    ASSERT_EQ(0, notify_count);
1523
1524    // 1. Check that the periodic connNotifier (processPendingNotifications)
1525    // isn't sufficient to notify (it shouldn't be, as our connection has
1526    // no notification pending).
1527    connMap.processPendingNotifications();
1528    ASSERT_EQ(0, notify_count);
1529
1530    // 2. Simulate a bucket deletion.
1531    connMap.shutdownAllConnections();
1532
1533    // Can also get a second notify as part of manageConnections being called
1534    // in shutdownAllConnections().
1535    EXPECT_GE(notify_count, 1)
1536        << "expected at least one notify after shutting down all connections";
1537
1538    // Restore notify_io_complete callback.
1539    destroy_mock_cookie(cookie);
1540}
1541
1542TEST_P(ConnectionTest, ConsumerWithoutConsumerNameDoesNotEnableSyncRepl) {
1543    MockDcpConnMap connMap(*engine);
1544    connMap.initialize();
1545    const void* cookie = create_mock_cookie();
1546    // Create a new Dcp consumer
1547    auto& consumer = dynamic_cast<MockDcpConsumer&>(
1548            *connMap.newConsumer(cookie, "consumer"));
1549    consumer.setPendingAddStream(false);
1550    EXPECT_FALSE(consumer.isSyncReplicationEnabled());
1551    EXPECT_EQ(DcpConsumer::BlockingDcpControlNegotiation::State::Completed,
1552              consumer.public_getSyncReplNegotiation().state);
1553
1554    destroy_mock_cookie(cookie);
1555}
1556
1557TEST_P(ConnectionTest, ConsumerWithConsumerNameEnablesSyncRepl) {
1558    MockDcpConnMap connMap(*engine);
1559    connMap.initialize();
1560    const void* cookie = create_mock_cookie();
1561    // Create a new Dcp consumer
1562    auto& consumer = dynamic_cast<MockDcpConsumer&>(
1563            *connMap.newConsumer(cookie, "consumer", "replica1"));
1564    consumer.setPendingAddStream(false);
1565    EXPECT_FALSE(consumer.isSyncReplicationEnabled());
1566    using State = DcpConsumer::BlockingDcpControlNegotiation::State;
1567    auto syncReplNeg = consumer.public_getSyncReplNegotiation();
1568    EXPECT_EQ(State::PendingRequest, syncReplNeg.state);
1569
1570    // Move consumer into paused state (aka EWOULDBLOCK) by stepping through the
1571    // DCP_CONTROL logic.
1572    MockDcpMessageProducers producers(handle);
1573    ENGINE_ERROR_CODE result;
1574    do {
1575        result = consumer.step(&producers);
1576        handleProducerResponseIfStepBlocked(consumer, producers);
1577        syncReplNeg = consumer.public_getSyncReplNegotiation();
1578    } while (syncReplNeg.state != State::Completed);
1579    EXPECT_EQ(ENGINE_SUCCESS, result);
1580
1581    // Last step - send the consumer name
1582    ASSERT_TRUE(consumer.public_getPendingSendConsumerName());
1583    EXPECT_EQ(ENGINE_SUCCESS, consumer.step(&producers));
1584
1585    // SyncReplication negotiation is now completed, SyncReplication is enabled
1586    // on this consumer, and we have sent the consumer name to the producer.
1587    EXPECT_TRUE(consumer.isSyncReplicationEnabled());
1588    EXPECT_FALSE(consumer.public_getPendingSendConsumerName());
1589    EXPECT_EQ("replica1", producers.last_value);
1590
1591    destroy_mock_cookie(cookie);
1592}
1593
1594void ConnectionTest::testConsumerNegotiatesIncludeDeletedUserXattrs(
1595        IncludeDeletedUserXattrs producerState) {
1596    MockDcpConnMap connMap(*engine);
1597    connMap.initialize();
1598    const void* cookie = create_mock_cookie();
1599
1600    // Create a new Consumer, flag that we want it to support DeleteXattr
1601    auto& consumer = dynamic_cast<MockDcpConsumer&>(
1602            *connMap.newConsumer(cookie, "conn_name", "the_consumer_name"));
1603
1604    using State = DcpConsumer::BlockingDcpControlNegotiation::State;
1605    auto syncReplNeg = consumer.public_getSyncReplNegotiation();
1606    ASSERT_EQ(State::PendingRequest, syncReplNeg.state);
1607
1608    // Consumer::step performs multiple negotiation steps. Some of them are
1609    // "blocking" steps. So, move the Consumer to the point where all the
1610    // negotiation steps (but IncludeDeletedUserXattrs) have been completed and
1611    // verifying that the negotiation of DeletedUserXattrs flows as exptected.
1612
1613    consumer.setPendingAddStream(false);
1614    MockDcpMessageProducers producers(handle);
1615    ENGINE_ERROR_CODE result;
1616    // Step and unblock (ie, simulate Producer response) up to completing the
1617    // SyncRepl negotiation, which is the last blocking step before the
1618    // DeletedUserXattrs negotiation
1619    do {
1620        result = consumer.step(&producers);
1621        handleProducerResponseIfStepBlocked(consumer, producers);
1622        syncReplNeg = consumer.public_getSyncReplNegotiation();
1623    } while (syncReplNeg.state != State::Completed);
1624    EXPECT_EQ(ENGINE_SUCCESS, result);
1625
1626    // Skip over "send consumer name"
1627    EXPECT_EQ(ENGINE_SUCCESS, consumer.step(&producers));
1628    ASSERT_FALSE(consumer.public_getPendingSendConsumerName());
1629
1630    // Check pre-negotiation state
1631    auto xattrNeg = consumer.public_getDeletedUserXattrsNegotiation();
1632    ASSERT_EQ(State::PendingRequest, xattrNeg.state);
1633    ASSERT_EQ(0, xattrNeg.opaque);
1634
1635    // Start negotiation - consumer sends DcpControl
1636    EXPECT_EQ(ENGINE_SUCCESS, consumer.step(&producers));
1637    xattrNeg = consumer.public_getDeletedUserXattrsNegotiation();
1638    EXPECT_EQ(State::PendingResponse, xattrNeg.state);
1639    EXPECT_EQ("include_deleted_user_xattrs", producers.last_key);
1640    EXPECT_EQ("true", producers.last_value);
1641    EXPECT_GT(xattrNeg.opaque, 0);
1642    EXPECT_EQ(xattrNeg.opaque, producers.last_opaque);
1643
1644    // Verify blocked - Consumer cannot proceed until negotiation completes
1645    EXPECT_EQ(ENGINE_EWOULDBLOCK, consumer.step(&producers));
1646    xattrNeg = consumer.public_getDeletedUserXattrsNegotiation();
1647    EXPECT_EQ(State::PendingResponse, xattrNeg.state);
1648
1649    // Simulate Producer response
1650    cb::mcbp::Status respStatus =
1651            (producerState == IncludeDeletedUserXattrs::Yes
1652                     ? cb::mcbp::Status::Success
1653                     : cb::mcbp::Status::UnknownCommand);
1654    protocol_binary_response_header resp{};
1655    resp.response.setMagic(cb::mcbp::Magic::ClientResponse);
1656    resp.response.setOpcode(cb::mcbp::ClientOpcode::DcpControl);
1657    resp.response.setStatus(respStatus);
1658    resp.response.setOpaque(xattrNeg.opaque);
1659    consumer.handleResponse(&resp);
1660
1661    // Verify final consumer state
1662    xattrNeg = consumer.public_getDeletedUserXattrsNegotiation();
1663    EXPECT_EQ(State::Completed, xattrNeg.state);
1664    EXPECT_EQ(producerState, consumer.public_getIncludeDeletedUserXattrs());
1665
1666    destroy_mock_cookie(cookie);
1667}
1668
1669TEST_P(ConnectionTest, ConsumerNegotiatesDeletedUserXattrs_DisabledAtProducer) {
1670    testConsumerNegotiatesIncludeDeletedUserXattrs(
1671            IncludeDeletedUserXattrs::No);
1672}
1673
1674TEST_P(ConnectionTest, ConsumerNegotiatesDeletedUserXattrs_EnabledAtProducer) {
1675    testConsumerNegotiatesIncludeDeletedUserXattrs(
1676            IncludeDeletedUserXattrs::Yes);
1677}
1678
1679void ConnectionTest::testProducerNegotiatesIncludeDeletedUserXattrs(
1680        IncludeDeletedUserXattrs producerState) {
1681    const void* cookie = create_mock_cookie();
1682
1683    uint32_t dcpOpenFlags;
1684    ENGINE_ERROR_CODE expectedControlResp;
1685    switch (producerState) {
1686    case IncludeDeletedUserXattrs::Yes: {
1687        dcpOpenFlags =
1688                cb::mcbp::request::DcpOpenPayload::IncludeDeletedUserXattrs;
1689        expectedControlResp = ENGINE_SUCCESS;
1690        break;
1691    }
1692    case IncludeDeletedUserXattrs::No: {
1693        dcpOpenFlags = 0;
1694        expectedControlResp = ENGINE_EINVAL;
1695        break;
1696    }
1697    }
1698
1699    const auto producer = std::make_shared<MockDcpProducer>(
1700            *engine, cookie, "test_producer", dcpOpenFlags);
1701    EXPECT_EQ(producerState, producer->public_getIncludeDeletedUserXattrs());
1702    EXPECT_EQ(expectedControlResp,
1703              producer->control(0, "include_deleted_user_xattrs", "true"));
1704
1705    destroy_mock_cookie(cookie);
1706}
1707
1708TEST_P(ConnectionTest,
1709       ProducerNegotiatesIncludeDeletedUserXattrs_DisabledAtProducer) {
1710    testProducerNegotiatesIncludeDeletedUserXattrs(
1711            IncludeDeletedUserXattrs::No);
1712}
1713
1714TEST_P(ConnectionTest,
1715       ProducerNegotiatesIncludeDeletedUserXattrs_EnabledAtProducer) {
1716    testProducerNegotiatesIncludeDeletedUserXattrs(
1717            IncludeDeletedUserXattrs::Yes);
1718}
1719
1720TEST_P(ConnectionTest, AckCorrectPassiveStream) {
1721    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
1722
1723    const void* cookie1 = create_mock_cookie();
1724    Vbid vbid = Vbid(0);
1725
1726    // Create our first consumer and enable SyncReplication so that we can ack
1727    auto consumer1 =
1728            std::make_shared<MockDcpConsumer>(*engine, cookie1, "consumer1");
1729    consumer1->enableSyncReplication();
1730
1731    // Add our stream and accept to mimic real scenario and ensure it is alive
1732    ASSERT_EQ(ENGINE_SUCCESS,
1733              consumer1->addStream(0 /*opaque*/, vbid, 0 /*flags*/));
1734    MockPassiveStream* stream1 = static_cast<MockPassiveStream*>(
1735            (consumer1->getVbucketStream(vbid)).get());
1736    stream1->acceptStream(cb::mcbp::Status::Success, 0 /*opaque*/);
1737    ASSERT_TRUE(stream1->isActive());
1738
1739    ASSERT_EQ(2, stream1->public_readyQ().size());
1740    auto resp = stream1->public_popFromReadyQ();
1741    EXPECT_EQ(DcpResponse::Event::StreamReq, resp->getEvent());
1742    resp = stream1->public_popFromReadyQ();
1743    EXPECT_EQ(DcpResponse::Event::AddStream, resp->getEvent());
1744    EXPECT_EQ(0, stream1->public_readyQ().size());
1745
1746    // Now close the stream to transition to dead
1747    consumer1->closeStream(0 /*opaque*/, vbid);
1748    ASSERT_FALSE(stream1->isActive());
1749
1750    // Add a new consumer and new PassiveStream
1751    const void* cookie2 = create_mock_cookie();
1752    auto consumer2 =
1753            std::make_shared<MockDcpConsumer>(*engine, cookie2, "consumer2");
1754    consumer2->enableSyncReplication();
1755    ASSERT_EQ(ENGINE_SUCCESS,
1756              consumer2->addStream(1 /*opaque*/, vbid, 0 /*flags*/));
1757    MockPassiveStream* stream2 = static_cast<MockPassiveStream*>(
1758            (consumer2->getVbucketStream(vbid)).get());
1759    stream2->acceptStream(cb::mcbp::Status::Success, 1 /*opaque*/);
1760    ASSERT_TRUE(stream2->isActive());
1761
1762    EXPECT_EQ(2, stream2->public_readyQ().size());
1763    resp = stream2->public_popFromReadyQ();
1764    EXPECT_EQ(DcpResponse::Event::StreamReq, resp->getEvent());
1765    resp = stream2->public_popFromReadyQ();
1766    EXPECT_EQ(DcpResponse::Event::AddStream, resp->getEvent());
1767    EXPECT_EQ(0, stream2->public_readyQ().size());
1768
1769    // When we ack we should hit the active stream (stream2)
1770    engine->getDcpConnMap().seqnoAckVBPassiveStream(vbid, 1);
1771    EXPECT_EQ(0, stream1->public_readyQ().size());
1772    EXPECT_EQ(1, stream2->public_readyQ().size());
1773
1774    ASSERT_EQ(ENGINE_SUCCESS, consumer2->closeStream(1 /*opaque*/, vbid));
1775    destroy_mock_cookie(cookie1);
1776    destroy_mock_cookie(cookie2);
1777}
1778
1779/*
1780 * The following tests that when the disk_backfill_queue configuration is
1781 * set to false on receiving a snapshot marker it does not move into the
1782 * backfill phase and the open checkpoint id does not get set to zero.  Also
1783 * checks that on receiving a subsequent snapshot marker we do not create a
1784 * second checkpoint.
1785 */
1786TEST_P(ConnectionTest, test_not_using_backfill_queue) {
1787    // Make vbucket replica so can add passive stream
1788    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
1789
1790    const void* cookie = create_mock_cookie();
1791    /*
1792     * Create a Mock Dcp consumer. Since child class subobj of MockDcpConsumer
1793     *  obj are accounted for by SingleThreadedRCPtr, use the same here
1794     */
1795    auto consumer =
1796            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
1797
1798    // Add passive stream
1799    ASSERT_EQ(ENGINE_SUCCESS,
1800              consumer->addStream(/*opaque*/ 0,
1801                                  vbid,
1802                                  /*flags*/ 0));
1803    // Get the checkpointManager
1804    auto& manager =
1805            *(engine->getKVBucket()->getVBucket(vbid)->checkpointManager);
1806
1807    // Because the vbucket was previously active it will have an
1808    // openCheckpointId of 2
1809    EXPECT_EQ(2, manager.getOpenCheckpointId());
1810
1811    // Send a snapshotMarker
1812    consumer->snapshotMarker(/*opaque*/ 1,
1813                             Vbid(0),
1814                             /*start_seqno*/ 0,
1815                             /*end_seqno*/ 1,
1816                             /*flags set to MARKER_FLAG_DISK*/ 0x2,
1817                             /*HCS*/ 0,
1818                             /*maxVisibleSeqno*/ {});
1819
1820    EXPECT_TRUE(engine->getKVBucket()
1821                        ->getVBucket(vbid)
1822                        ->isReceivingInitialDiskSnapshot());
1823
1824    auto producer = std::make_shared<MockDcpProducer>(*engine,
1825                                                      cookie,
1826                                                      "test_producer",
1827                                                      /*flags*/ 0);
1828
1829    /*
1830     * StreamRequest should tmp fail due to the associated vbucket receiving
1831     * a disk snapshot.
1832     */
1833    uint64_t rollbackSeqno = 0;
1834    auto err = producer->streamRequest(/*flags*/ 0,
1835                                       /*opaque*/ 0,
1836                                       vbid,
1837                                       /*start_seqno*/ 0,
1838                                       /*end_seqno*/ 0,
1839                                       /*vb_uuid*/ 0,
1840                                       /*snap_start*/ 0,
1841                                       /*snap_end*/ 0,
1842                                       &rollbackSeqno,
1843                                       fakeDcpAddFailoverLog,
1844                                       {});
1845
1846    EXPECT_EQ(ENGINE_TMPFAIL, err);
1847
1848    // Open checkpoint Id should not be effected.
1849    EXPECT_EQ(2, manager.getOpenCheckpointId());
1850
1851    /* Send a mutation */
1852    const DocKey docKey{nullptr, 0, DocKeyEncodesCollectionId::No};
1853    EXPECT_EQ(ENGINE_SUCCESS,
1854              consumer->mutation(/*opaque*/ 1,
1855                                 docKey,
1856                                 {}, // value
1857                                 0, // priv bytes
1858                                 PROTOCOL_BINARY_RAW_BYTES,
1859                                 0, // cas
1860                                 vbid,
1861                                 0, // flags
1862                                 1, // bySeqno
1863                                 0, // rev seqno
1864                                 0, // exptime
1865                                 0, // locktime
1866                                 {}, // meta
1867                                 0)); // nru
1868
1869    // Have received the mutation and so have snapshot end.
1870    EXPECT_FALSE(engine->getKVBucket()
1871                         ->getVBucket(vbid)
1872                         ->isReceivingInitialDiskSnapshot());
1873
1874    consumer->snapshotMarker(/*opaque*/ 1,
1875                             Vbid(0),
1876                             /*start_seqno*/ 0,
1877                             /*end_seqno*/ 0,
1878                             /*flags*/ 0,
1879                             /*HCS*/ {},
1880                             /*maxVisibleSeqno*/ {});
1881
1882    // A new opencheckpoint should no be opened
1883    EXPECT_EQ(2, manager.getOpenCheckpointId());
1884
1885    // Close stream
1886    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/ 0, vbid));
1887    destroy_mock_cookie(cookie);
1888}
1889
1890/*
1891 * The following test has been adapted following the removal of vbucket DCP
1892 * backfill queue. It now demonstrates some of the 'new' behaviour of
1893 * DCP snapshot markers  on a replica checkpoint.
1894 * When snapshots arrive and no items exist, no new CP is created, only the
1895 * existing snapshot is updated
1896 */
1897TEST_P(ConnectionTest, SnapshotsAndNoData) {
1898    // Make vbucket replica so can add passive stream
1899    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
1900
1901    const void *cookie = create_mock_cookie();
1902    /*
1903     * Create a Mock Dcp consumer. Since child class subobj of MockDcpConsumer
1904     *  obj are accounted for by SingleThreadedRCPtr, use the same here
1905     */
1906    auto consumer =
1907            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
1908
1909    // Add passive stream
1910    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
1911                                                  /*flags*/0));
1912    // Get the checkpointManager
1913    auto& manager =
1914            *(engine->getKVBucket()->getVBucket(vbid)->checkpointManager);
1915
1916    // Because the vbucket was previously active it will have an
1917    // openCheckpointId of 2
1918    EXPECT_EQ(2, manager.getOpenCheckpointId());
1919
1920    // Send a snapshotMarker to move the vbucket into a backfilling state
1921    consumer->snapshotMarker(/*opaque*/ 1,
1922                             Vbid(0),
1923                             /*start_seqno*/ 0,
1924                             /*end_seqno*/ 0,
1925                             /*flags set to MARKER_FLAG_DISK*/ 0x2,
1926                             /*HCS*/ 0,
1927                             /*maxVisibleSeqno*/ {});
1928
1929    EXPECT_EQ(2, manager.getOpenCheckpointId());
1930
1931    consumer->snapshotMarker(/*opaque*/ 1,
1932                             Vbid(0),
1933                             /*start_seqno*/ 1,
1934                             /*end_seqno*/ 2,
1935                             /*flags*/ 0,
1936                             /*HCS*/ {},
1937                             /*maxVisibleSeqno*/ {});
1938
1939    EXPECT_EQ(2, manager.getOpenCheckpointId());
1940    // Still cp:2 but the snap-end changes
1941    auto snapInfo = manager.getSnapshotInfo();
1942    EXPECT_EQ(0, snapInfo.range.getStart()); // no data so start is still 0
1943    EXPECT_EQ(2, snapInfo.range.getEnd());
1944
1945    // Close stream
1946    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
1947    destroy_mock_cookie(cookie);
1948}
1949
1950class DcpConnMapTest : public ::testing::Test {
1951protected:
1952    void SetUp() override {
1953        engine = SynchronousEPEngine::build({});
1954
1955        initialize_time_functions(get_mock_server_api()->core);
1956
1957        /* Set up one vbucket in the bucket */
1958        engine->getKVBucket()->setVBucketState(vbid, vbucket_state_active);
1959    }
1960
1961    void TearDown() override {
1962        destroy_mock_event_callbacks();
1963        ObjectRegistry::onSwitchThread(nullptr);
1964    }
1965
1966    /**
1967     * Fake callback emulating dcp_add_failover_log
1968     */
1969    static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(
1970            vbucket_failover_t* entry,
1971            size_t nentries,
1972            gsl::not_null<const void*> cookie) {
1973        return ENGINE_SUCCESS;
1974    }
1975
1976    enum class ConnExistsBy : uint8_t { Cookie, Name };
1977
1978    /**
1979     * MB-36915: With a recent change, we unconditionally acquire an exclusive
1980     * lock to vbstate in KVBucket::setVBucketState. But, the new lock
1981     * introduces a potential deadlock by lock-inversion on connLock and
1982     * vbstateLock in EPE::dcpOpen if a connection with the same cookie or name
1983     * already exists in conn-map. TSAN easily spots the issue as soon as we
1984     * have an execution where two threads run in parallel and execute the code
1985     * responsible for the potential deadlock, which is what this test achieves.
1986     */
1987    void testLockInversionInSetVBucketStateAndNewProducer(ConnExistsBy by);
1988
1989    SynchronousEPEngineUniquePtr engine;
1990    const Vbid vbid = Vbid(0);
1991};
1992
1993/* Tests that there is no memory loss due to cyclic reference between connection
1994 * and other objects (like dcp streams). It is possible that connections are
1995 * deleted from the dcp connmap when dcp connmap is deleted due to abrupt
1996 * deletion of 'EventuallyPersistentEngine' obj.
1997 * This test simulates the abrupt deletion of dcp connmap object
1998 */
1999TEST_F(DcpConnMapTest, DeleteProducerOnUncleanDCPConnMapDelete) {
2000    /* Create a new Dcp producer */
2001    const void* dummyMockCookie = create_mock_cookie();
2002    DcpProducer* producer = engine->getDcpConnMap().newProducer(dummyMockCookie,
2003                                                                "test_producer",
2004                                                                /*flags*/ 0);
2005    /* Open stream */
2006    uint64_t rollbackSeqno = 0;
2007    uint32_t opaque = 0;
2008    EXPECT_EQ(ENGINE_SUCCESS,
2009              producer->streamRequest(/*flags*/ 0,
2010                                      opaque,
2011                                      vbid,
2012                                      /*start_seqno*/ 0,
2013                                      /*end_seqno*/ ~0,
2014                                      /*vb_uuid*/ 0,
2015                                      /*snap_start*/ 0,
2016                                      /*snap_end*/ 0,
2017                                      &rollbackSeqno,
2018                                      fakeDcpAddFailoverLog,
2019                                      {}));
2020
2021    destroy_mock_cookie(dummyMockCookie);
2022
2023    /* Delete the connmap, connection should be deleted as the owner of
2024       the connection (connmap) is deleted. Checks that there is no cyclic
2025       reference between conn (producer) and stream or any other object */
2026    engine->setDcpConnMap(nullptr);
2027}
2028
2029/* Tests that there is no memory loss due to cyclic reference between a
2030 * notifier connection and a notifier stream.
2031 */
2032TEST_F(DcpConnMapTest, DeleteNotifierConnOnUncleanDCPConnMapDelete) {
2033    /* Create a new Dcp producer */
2034    const void* dummyMockCookie = create_mock_cookie();
2035    DcpProducer* producer = engine->getDcpConnMap().newProducer(
2036            dummyMockCookie,
2037            "test_producer",
2038            cb::mcbp::request::DcpOpenPayload::Notifier);
2039    /* Open notifier stream */
2040    uint64_t rollbackSeqno = 0;
2041    uint32_t opaque = 0;
2042    EXPECT_EQ(ENGINE_SUCCESS,
2043              producer->streamRequest(/*flags*/ 0,
2044                                      opaque,
2045                                      vbid,
2046                                      /*start_seqno*/ 0,
2047                                      /*end_seqno*/ ~0,
2048                                      /*vb_uuid*/ 0,
2049                                      /*snap_start*/ 0,
2050                                      /*snap_end*/ 0,
2051                                      &rollbackSeqno,
2052                                      fakeDcpAddFailoverLog,
2053                                      {}));
2054
2055    destroy_mock_cookie(dummyMockCookie);
2056
2057    /* Delete the connmap, connection should be deleted as the owner of
2058       the connection (connmap) is deleted. Checks that there is no cyclic
2059       reference between conn (producer) and stream or any other object */
2060    engine->setDcpConnMap(nullptr);
2061}
2062
2063/* Tests that there is no memory loss due to cyclic reference between a
2064 * consumer connection and a passive stream.
2065 */
2066TEST_F(DcpConnMapTest, DeleteConsumerConnOnUncleanDCPConnMapDelete) {
2067    /* Consumer stream needs a replica vbucket */
2068    engine->getKVBucket()->setVBucketState(vbid, vbucket_state_replica);
2069
2070    /* Create a new Dcp consumer */
2071    const void* dummyMockCookie = create_mock_cookie();
2072    DcpConsumer* consumer = engine->getDcpConnMap().newConsumer(
2073            dummyMockCookie, "test_consumer");
2074
2075    /* Add passive stream */
2076    ASSERT_EQ(ENGINE_SUCCESS,
2077              consumer->addStream(/*opaque*/ 0,
2078                                  vbid,
2079                                  /*flags*/ 0));
2080
2081    destroy_mock_cookie(dummyMockCookie);
2082
2083    /* Delete the connmap, connection should be deleted as the owner of
2084       the connection (connmap) is deleted. Checks that there is no cyclic
2085       reference between conn (consumer) and stream or any other object */
2086    engine->setDcpConnMap(nullptr);
2087}
2088
2089TEST_F(DcpConnMapTest, TestCorrectConnHandlerRemoved) {
2090    auto connMapPtr = std::make_unique<MockDcpConnMap>(*engine);
2091    MockDcpConnMap& connMap = *connMapPtr;
2092    engine->setDcpConnMap(std::move(connMapPtr));
2093
2094    const void* cookieA = create_mock_cookie();
2095    const void* cookieB = create_mock_cookie();
2096
2097    ASSERT_EQ(ENGINE_SUCCESS,
2098              engine->getKVBucket()->setVBucketState(
2099                      vbid, vbucket_state_replica, {}, TransferVB::Yes));
2100
2101    DcpConsumer* consumerA =
2102            connMap.newConsumer(cookieA, "test_consumerA", "test_consumerA");
2103    EXPECT_FALSE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:test_consumerA"));
2104    consumerA->addStream(0xdead, vbid, 0);
2105    EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:test_consumerA"));
2106    // destroys the first consumer, leaving a weakptr in the vbConn map
2107    EXPECT_TRUE(connMap.removeConn(cookieA));
2108
2109    // Create a new consumer, with a stream for the same VB
2110    DcpConsumer* consumerB =
2111            connMap.newConsumer(cookieB, "test_consumerB", "test_consumerB");
2112    EXPECT_FALSE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:test_consumerB"));
2113    consumerB->addStream(0xbeef, vbid, 0);
2114    EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:test_consumerB"));
2115
2116    // Here the ConnHandler added to connMap.vbConns in addStream should be
2117    // removed
2118    connMap.disconnect(cookieB);
2119
2120    // Consumer B should not be in the vbConns any more
2121    EXPECT_FALSE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:test_consumerB"));
2122
2123    /* Cleanup the deadConnections */
2124    connMap.manageConnections();
2125    destroy_mock_cookie(cookieA);
2126}
2127
2128// MB-35061 - Test to ensure the Producer ConnHandler is removed
2129// when a producer stream ends, and does not linger and become confused
2130// with a later Producer stream.
2131TEST_F(DcpConnMapTest, TestCorrectRemovedOnStreamEnd) {
2132    auto connMapPtr = std::make_unique<MockDcpConnMap>(*engine);
2133    MockDcpConnMap& connMap = *connMapPtr;
2134    engine->setDcpConnMap(std::move(connMapPtr));
2135
2136    // create cookies
2137    const void* producerCookie = create_mock_cookie();
2138    const void* consumerCookie = create_mock_cookie();
2139
2140    MockDcpMessageProducers producers(engine.get());
2141
2142    // create a producer (We are currently active)
2143    auto producer = engine->getDcpConnMap().newProducer(producerCookie,
2144                                                        "producerA",
2145                                                        /*flags*/ 0);
2146
2147    producer->control(0xdead, "send_stream_end_on_client_close_stream", "true");
2148
2149    uint64_t rollbackSeqno = 0;
2150
2151    ASSERT_EQ(ENGINE_SUCCESS,
2152              producer->streamRequest(0, // flags
2153                                      0xdead,
2154                                      vbid,
2155                                      0, // start_seqno
2156                                      ~0ull, // end_seqno
2157                                      0, // vbucket_uuid,
2158                                      0, // snap_start_seqno,
2159                                      0, // snap_end_seqno,
2160                                      &rollbackSeqno,
2161                                      fakeDcpAddFailoverLog,
2162                                      {}));
2163
2164    EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:producerA"));
2165
2166    // Close the stream. This will not remove the ConnHandler from
2167    // ConnMap.vbConns because we are waiting to send streamEnd.
2168    ASSERT_EQ(ENGINE_SUCCESS, producer->closeStream(0xdead, vbid));
2169    // Step to send the streamEnd, and remove the ConnHandler
2170    ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));
2171
2172    // Move to replica
2173    ASSERT_EQ(ENGINE_SUCCESS,
2174              engine->getKVBucket()->setVBucketState(
2175                      vbid, vbucket_state_replica, {}, TransferVB::Yes));
2176
2177    // confirm the ConnHandler was removed
2178    EXPECT_FALSE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:producerA"));
2179
2180    // Create a consumer (we are now a replica)
2181    DcpConsumer* consumer = connMap.newConsumer(
2182            consumerCookie, "test_consumerA", "test_consumerA");
2183
2184    EXPECT_FALSE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:test_consumerA"));
2185
2186    // add a stream for the same VB as before
2187    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(0xbeef, vbid, 0));
2188    EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:test_consumerA"));
2189
2190    // End the stream. This should remove the Consumer ConnHandler from vbConns
2191    auto streamOpaque =
2192            static_cast<MockDcpConsumer*>(consumer)->getStreamOpaque(0xbeef);
2193    ASSERT_TRUE(streamOpaque);
2194    ASSERT_EQ(ENGINE_SUCCESS,
2195              consumer->streamEnd(*streamOpaque, vbid, /* flags */ 0));
2196
2197    // expect neither ConnHandler remains in vbConns
2198    EXPECT_FALSE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:producerA"));
2199    EXPECT_FALSE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:test_consumerA"));
2200
2201    /* Cleanup the deadConnections */
2202    connMap.manageConnections();
2203    destroy_mock_cookie(producerCookie);
2204    destroy_mock_cookie(consumerCookie);
2205}
2206
2207/**
2208 * MB-36637: With a recent change, we unconditionally acquire an exclusive lock
2209 * to vbstate in KVBucket::setVBucketState. But, deep down in the call hierarchy
2210 * (ActiveStream::setDead) we may lock again on the same mutex. That happens
2211 * if we are closing streams that support SyncReplication. So in this test we:
2212 * 1) create a Producer and enable SyncReplication
2213 * 2) create an ActiveStream (which implicitly supports SyncReplication)
2214 * 3) issue a KVBucket::setVBucketState, with newState != oldState
2215 * Step (3) deadlocks before this fix.
2216 */
2217TEST_F(DcpConnMapTest, AvoidDoubleLockToVBStateAtSetVBucketState) {
2218    const void* cookie = create_mock_cookie();
2219    const uint32_t flags = 0;
2220    auto& connMap = dynamic_cast<MockDcpConnMap&>(engine->getDcpConnMap());
2221    auto* producer = connMap.newProducer(cookie, "producer", flags);
2222
2223    const uint32_t opaque = 0xdead;
2224    // Vbstate lock acquired in ActiveStream::setDead (executed by
2225    // DcpConnMap::disconnect) only if SyncRepl is enabled
2226    producer->control(opaque, "enable_sync_writes", "true");
2227    producer->control(opaque, "consumer_name", "consumer");
2228
2229    uint64_t rollbackSeqno = 0;
2230    ASSERT_EQ(ENGINE_SUCCESS,
2231              producer->streamRequest(flags,
2232                                      opaque,
2233                                      vbid,
2234                                      0, // start_seqno
2235                                      ~0ull, // end_seqno
2236                                      0, // vbucket_uuid,
2237                                      0, // snap_start_seqno,
2238                                      0, // snap_end_seqno,
2239                                      &rollbackSeqno,
2240                                      fakeDcpAddFailoverLog,
2241                                      {} /*collection_filter*/));
2242
2243    EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:producer"));
2244
2245    engine->getKVBucket()->setVBucketState(
2246            vbid,
2247            vbucket_state_t::vbucket_state_replica,
2248            {} /*meta*/,
2249            TransferVB::No);
2250
2251    // Cleanup
2252    connMap.manageConnections();
2253    destroy_mock_cookie(cookie);
2254}
2255
2256/**
2257 * MB-36557: With a recent change, we unconditionally acquire an exclusive lock
2258 * to vbstate in KVBucket::setVBucketState. But, the new lock introduces a
2259 * potential deadlock by lock inversion with EPE::handleDisconnect on connLock
2260 * and vbstateLock.
2261 * TSAN easily spots the issue as soon as we have an execution where two threads
2262 * run in parallel and execute the code responsible for the potential deadlock,
2263 * which is what this test achieves.
2264 */
2265TEST_F(DcpConnMapTest,
2266       AvoidLockInversionInSetVBucketStateAndConnMapDisconnect) {
2267    const void* cookie = create_mock_cookie();
2268    const uint32_t flags = 0;
2269    auto& connMap = dynamic_cast<MockDcpConnMap&>(engine->getDcpConnMap());
2270    auto* producer = connMap.newProducer(cookie, "producer", flags);
2271
2272    const uint32_t opaque = 0xdead;
2273    // Vbstate lock acquired in ActiveStream::setDead (executed by
2274    // DcpConnMap::disconnect) only if SyncRepl is enabled
2275    producer->control(opaque, "enable_sync_writes", "true");
2276    producer->control(opaque, "consumer_name", "consumer");
2277
2278    uint64_t rollbackSeqno = 0;
2279    ASSERT_EQ(ENGINE_SUCCESS,
2280              producer->streamRequest(flags,
2281                                      opaque,
2282                                      vbid,
2283                                      0, // start_seqno
2284                                      ~0ull, // end_seqno
2285                                      0, // vbucket_uuid,
2286                                      0, // snap_start_seqno,
2287                                      0, // snap_end_seqno,
2288                                      &rollbackSeqno,
2289                                      fakeDcpAddFailoverLog,
2290                                      {} /*collection_filter*/));
2291
2292    EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:producer"));
2293
2294    std::thread t1 = std::thread([this]() -> void {
2295        engine->getKVBucket()->setVBucketState(
2296                vbid,
2297                vbucket_state_t::vbucket_state_replica,
2298                {} /*meta*/,
2299                TransferVB::No);
2300    });
2301
2302    // Disconnect in this thread
2303    connMap.disconnect(cookie);
2304
2305    t1.join();
2306
2307    // Check that streams have been shutdown at disconnect
2308    EXPECT_FALSE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:producer"));
2309
2310    // Cleanup
2311    connMap.manageConnections();
2312}
2313
2314void DcpConnMapTest::testLockInversionInSetVBucketStateAndNewProducer(
2315        ConnExistsBy by) {
2316    auto& connMap = dynamic_cast<MockDcpConnMap&>(engine->getDcpConnMap());
2317    const auto* cookie = create_mock_cookie();
2318    const std::string connName = "producer";
2319    const uint32_t flags = 0;
2320    auto* producer = connMap.newProducer(cookie, connName, flags);
2321
2322    const uint32_t opaque = 0;
2323    // Vbstate lock acquired in ActiveStream::setDead (executed by
2324    // DcpConnMap::newProducer) only if SyncRepl is enabled
2325    producer->control(opaque, "enable_sync_writes", "true");
2326    producer->control(opaque, "consumer_name", "consumer");
2327
2328    const auto streamRequest =
2329            [this, flags, opaque](DcpProducer& producer) -> void {
2330        uint64_t rollbackSeqno = 0;
2331        ASSERT_EQ(ENGINE_SUCCESS,
2332                  producer.streamRequest(flags,
2333                                         opaque,
2334                                         vbid,
2335                                         0, // start_seqno
2336                                         ~0ull, // end_seqno
2337                                         0, // vbucket_uuid,
2338                                         0, // snap_start_seqno,
2339                                         0, // snap_end_seqno,
2340                                         &rollbackSeqno,
2341                                         fakeDcpAddFailoverLog,
2342                                         {} /*collection_filter*/));
2343    };
2344
2345    // Check that the conne has been created and exists in vbConns at stream-req
2346    {
2347        SCOPED_TRACE("");
2348        streamRequest(*producer);
2349    }
2350    EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:" + connName));
2351
2352    std::thread t1 = std::thread([this]() -> void {
2353        engine->getKVBucket()->setVBucketState(
2354                vbid,
2355                vbucket_state_t::vbucket_state_replica,
2356                {} /*meta*/,
2357                TransferVB::No);
2358    });
2359
2360    // New producer in this thread.
2361    // Note: ActiveStream::setDead executed only if re-creating the same
2362    // connection (ie, same cookie or connection name).
2363    const auto* cookie2 = create_mock_cookie();
2364    switch (by) {
2365    case ConnExistsBy::Cookie: {
2366        const std::string newConnName = "newName";
2367        EXPECT_FALSE(connMap.newProducer(cookie, newConnName, flags));
2368        // Note: Connection flagged as disconnected but not yet removed
2369        // from conn-map. See comments in ConnMap::newProducer.
2370        EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:" + connName));
2371        break;
2372    }
2373    case ConnExistsBy ::Name: {
2374        producer = connMap.newProducer(cookie2, connName, flags);
2375        ASSERT_TRUE(producer);
2376        // Check that the connection has been re-created with the same name
2377        // and exists in vbConns at stream-req
2378        {
2379            SCOPED_TRACE("");
2380            streamRequest(*producer);
2381        }
2382        EXPECT_TRUE(connMap.doesConnHandlerExist(vbid, "eq_dcpq:" + connName));
2383        break;
2384    }
2385    }
2386
2387    t1.join();
2388
2389    // Cleanup
2390    connMap.manageConnections();
2391    destroy_mock_cookie(cookie);
2392    destroy_mock_cookie(cookie2);
2393}
2394
2395TEST_F(DcpConnMapTest,
2396       AvoidLockInversionInSetVBucketStateAndNewProducerExistingCookie) {
2397    testLockInversionInSetVBucketStateAndNewProducer(ConnExistsBy::Cookie);
2398}
2399
2400TEST_F(DcpConnMapTest,
2401       AvoidLockInversionInSetVBucketStateAndNewProducerExistingName) {
2402    testLockInversionInSetVBucketStateAndNewProducer(ConnExistsBy::Name);
2403}
2404
2405class NotifyTest : public DCPTest {
2406protected:
2407    std::unique_ptr<MockDcpConnMap> connMap;
2408    DcpProducer* producer = nullptr;
2409    int callbacks = 0;
2410};
2411
2412class ConnMapNotifyTest {
2413public:
2414    ConnMapNotifyTest(EventuallyPersistentEngine& engine)
2415        : connMap(new MockDcpConnMap(engine)),
2416          callbacks(0),
2417          cookie(create_mock_cookie()) {
2418        connMap->initialize();
2419
2420        // Save `this` in server-specific so we can retrieve it from
2421        // dcp_test_notify_io_complete below:
2422        get_mock_server_api()->cookie->store_engine_specific(cookie, this);
2423
2424        producer = connMap->newProducer(cookie,
2425                                        "test_producer",
2426                                        /*flags*/ 0);
2427    }
2428
2429    ~ConnMapNotifyTest() {
2430        destroy_mock_cookie(cookie);
2431    }
2432
2433    void notify() {
2434        callbacks++;
2435        connMap->addConnectionToPending(producer->shared_from_this());
2436    }
2437
2438    int getCallbacks() {
2439        return callbacks;
2440    }
2441
2442    static void dcp_test_notify_io_complete(gsl::not_null<const void*> cookie,
2443                                            ENGINE_ERROR_CODE status) {
2444        const auto* notifyTest = reinterpret_cast<const ConnMapNotifyTest*>(
2445                get_mock_server_api()->cookie->get_engine_specific(
2446                        cookie.get()));
2447        cb_assert(notifyTest != nullptr);
2448        const_cast<ConnMapNotifyTest*>(notifyTest)->notify();
2449    }
2450
2451    std::unique_ptr<MockDcpConnMap> connMap;
2452    DcpProducer* producer;
2453
2454private:
2455    int callbacks;
2456    const void* cookie = nullptr;
2457};
2458
2459
2460TEST_F(NotifyTest, test_mb19503_connmap_notify) {
2461    ConnMapNotifyTest notifyTest(*engine);
2462
2463    // Hook into notify_io_complete
2464    class MockServerCookieApi : public WrappedServerCookieIface {
2465    public:
2466        void notify_io_complete(gsl::not_null<const void*> cookie,
2467                                ENGINE_ERROR_CODE status) override {
2468            ConnMapNotifyTest::dcp_test_notify_io_complete(cookie, status);
2469        }
2470    } scapi;
2471
2472    // Should be 0 when we begin
2473    ASSERT_EQ(0, notifyTest.getCallbacks());
2474    ASSERT_TRUE(notifyTest.producer->isPaused());
2475    ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
2476
2477    // 1. Call addConnectionToPending - this will queue the producer
2478    notifyTest.connMap->addConnectionToPending(
2479            notifyTest.producer->shared_from_this());
2480    EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
2481
2482    // 2. Call processPendingNotifications this will invoke notifyIOComplete
2483    //    which we've hooked into. For step 3 go to dcp_test_notify_io_complete
2484    notifyTest.connMap->processPendingNotifications();
2485
2486    // 2.1 One callback should of occurred, and we should still have one
2487    //     notification pending (see dcp_test_notify_io_complete).
2488    EXPECT_EQ(1, notifyTest.getCallbacks());
2489    EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
2490
2491    // 4. Call processPendingNotifications again, is there a new connection?
2492    notifyTest.connMap->processPendingNotifications();
2493
2494    // 5. There should of been 2 callbacks
2495    EXPECT_EQ(2, notifyTest.getCallbacks());
2496}
2497
2498// Variation on test_mb19503_connmap_notify - check that notification is correct
2499// when notifiable is not paused.
2500TEST_F(NotifyTest, test_mb19503_connmap_notify_paused) {
2501    ConnMapNotifyTest notifyTest(*engine);
2502
2503    // Hook into notify_io_complete
2504    class MockServerCookieApi : public WrappedServerCookieIface {
2505    public:
2506        void notify_io_complete(gsl::not_null<const void*> cookie,
2507                                ENGINE_ERROR_CODE status) override {
2508            ConnMapNotifyTest::dcp_test_notify_io_complete(cookie, status);
2509        }
2510    } scapi;
2511
2512    // Should be 0 when we begin
2513    ASSERT_EQ(notifyTest.getCallbacks(), 0);
2514    ASSERT_TRUE(notifyTest.producer->isPaused());
2515    ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
2516
2517    // 1. Call addConnectionToPending - this will queue the producer
2518    notifyTest.connMap->addConnectionToPending(
2519            notifyTest.producer->shared_from_this());
2520    EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
2521
2522    // 2. Mark connection as not paused.
2523    notifyTest.producer->unPause();
2524
2525    // 3. Call processPendingNotifications - as the connection is not paused
2526    // this should *not* invoke notifyIOComplete.
2527    notifyTest.connMap->processPendingNotifications();
2528
2529    // 3.1 Should have not had any callbacks.
2530    EXPECT_EQ(0, notifyTest.getCallbacks());
2531    // 3.2 Should have no pending notifications.
2532    EXPECT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
2533
2534    // 4. Now mark the connection as paused.
2535    ASSERT_FALSE(notifyTest.producer->isPaused());
2536    notifyTest.producer->pause();
2537
2538    // 4. Add another notification - should queue the producer again.
2539    notifyTest.connMap->addConnectionToPending(
2540            notifyTest.producer->shared_from_this());
2541    EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
2542
2543    // 5. Call processPendingNotifications a second time - as connection is
2544    //    paused this time we *should* get a callback.
2545    notifyTest.connMap->processPendingNotifications();
2546    EXPECT_EQ(1, notifyTest.getCallbacks());
2547}
2548
2549// Tests that the MutationResponse created for the deletion response is of the
2550// correct size.
2551TEST_P(ConnectionTest, test_mb24424_deleteResponse) {
2552    const void* cookie = create_mock_cookie();
2553    Vbid vbid = Vbid(0);
2554
2555    auto consumer =
2556            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2557
2558    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
2559    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
2560                                                  /*flags*/0));
2561
2562    MockPassiveStream *stream = static_cast<MockPassiveStream*>
2563                                       ((consumer->
2564                                               getVbucketStream(vbid)).get());
2565    ASSERT_TRUE(stream->isActive());
2566
2567    std::string key = "key";
2568    const DocKey docKey{reinterpret_cast<const uint8_t*>(key.data()),
2569                        key.size(),
2570                        DocKeyEncodesCollectionId::No};
2571    uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
2572    cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
2573
2574    consumer->deletion(/*opaque*/ 1,
2575                       /*key*/ docKey,
2576                       /*value*/ {},
2577                       /*priv_bytes*/ 0,
2578                       /*datatype*/ PROTOCOL_BINARY_RAW_BYTES,
2579                       /*cas*/ 0,
2580                       /*vbucket*/ vbid,
2581                       /*bySeqno*/ 1,
2582                       /*revSeqno*/ 0,
2583                       /*meta*/ meta);
2584
2585    auto messageSize = MutationResponse::deletionBaseMsgBytes + key.size() +
2586                       sizeof(extMeta);
2587
2588    EXPECT_EQ(messageSize, stream->responseMessageSize);
2589
2590    /* Close stream before deleting the connection */
2591    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
2592
2593    destroy_mock_cookie(cookie);
2594}
2595
2596// Tests that the MutationResponse created for the mutation response is of the
2597// correct size.
2598TEST_P(ConnectionTest, test_mb24424_mutationResponse) {
2599    const void* cookie = create_mock_cookie();
2600    Vbid vbid = Vbid(0);
2601
2602    auto consumer =
2603            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2604
2605    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
2606    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
2607                                                  /*flags*/0));
2608
2609    MockPassiveStream *stream = static_cast<MockPassiveStream*>
2610                                       ((consumer->
2611                                               getVbucketStream(vbid)).get());
2612    ASSERT_TRUE(stream->isActive());
2613
2614    std::string key = "key";
2615    std::string data = R"({"json":"yes"})";
2616    const DocKey docKey{reinterpret_cast<const uint8_t*>(key.data()),
2617                        key.size(),
2618                        DocKeyEncodesCollectionId::No};
2619    cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(data.data()),
2620        data.size()};
2621    uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
2622    cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
2623
2624    consumer->mutation(/*opaque*/1,
2625                       /*key*/docKey,
2626                       /*values*/value,
2627                       /*priv_bytes*/0,
2628                       /*datatype*/PROTOCOL_BINARY_DATATYPE_JSON,
2629                       /*cas*/0,
2630                       /*vbucket*/vbid,
2631                       /*flags*/0,
2632                       /*bySeqno*/1,
2633                       /*revSeqno*/0,
2634                       /*exptime*/0,
2635                       /*lock_time*/0,
2636                       /*meta*/meta,
2637                       /*nru*/0);
2638
2639    auto messageSize = MutationResponse::mutationBaseMsgBytes +
2640            key.size() + data.size() + sizeof(extMeta);
2641
2642    EXPECT_EQ(messageSize, stream->responseMessageSize);
2643
2644    /* Close stream before deleting the connection */
2645    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
2646
2647    destroy_mock_cookie(cookie);
2648}
2649
2650void ConnectionTest::sendConsumerMutationsNearThreshold(bool beyondThreshold) {
2651    const void* cookie = create_mock_cookie();
2652    const uint32_t opaque = 1;
2653    const uint64_t snapStart = 1;
2654    const uint64_t snapEnd = std::numeric_limits<uint64_t>::max();
2655    uint64_t bySeqno = snapStart;
2656
2657    /* Set up a consumer connection */
2658    auto consumer =
2659            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2660
2661    /* Replica vbucket */
2662    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
2663
2664    /* Passive stream */
2665    ASSERT_EQ(ENGINE_SUCCESS,
2666              consumer->addStream(/*opaque*/ 0,
2667                                  vbid,
2668                                  /*flags*/ 0));
2669    MockPassiveStream* stream = static_cast<MockPassiveStream*>(
2670            (consumer->getVbucketStream(vbid)).get());
2671    ASSERT_TRUE(stream->isActive());
2672
2673    /* Send a snapshotMarker before sending items for replication */
2674    EXPECT_EQ(ENGINE_SUCCESS,
2675              consumer->snapshotMarker(opaque,
2676                                       vbid,
2677                                       snapStart,
2678                                       snapEnd,
2679                                       /* in-memory snapshot */ 0x1,
2680                                       {} /*HCS*/,
2681                                       {} /*maxVisibleSeq*/));
2682
2683    /* Send an item for replication */
2684    const DocKey docKey{nullptr, 0, DocKeyEncodesCollectionId::No};
2685    EXPECT_EQ(ENGINE_SUCCESS,
2686              consumer->mutation(opaque,
2687                                 docKey,
2688                                 {}, // value
2689                                 0, // priv bytes
2690                                 PROTOCOL_BINARY_RAW_BYTES,
2691                                 0, // cas
2692                                 vbid,
2693                                 0, // flags
2694                                 bySeqno,
2695                                 0, // rev seqno
2696                                 0, // exptime
2697                                 0, // locktime
2698                                 {}, // meta
2699                                 0)); // nru
2700
2701    /* Set 'mem_used' beyond the 'replication threshold' */
2702    EPStats& stats = engine->getEpStats();
2703    if (beyondThreshold) {
2704        stats.setMaxDataSize(stats.getPreciseTotalMemoryUsed());
2705    } else {
2706        /* Set 'mem_used' just 1 byte less than the 'replication threshold'.
2707           That is we are below 'replication threshold', but not enough space
2708           for  the new item */
2709        stats.setMaxDataSize(stats.getPreciseTotalMemoryUsed() + 1);
2710        /* Simpler to set the replication threshold to 1 and test, rather than
2711           testing with maxData = (memUsed / replicationThrottleThreshold);
2712           that is, we are avoiding a division */
2713        engine->getConfiguration().setReplicationThrottleThreshold(100);
2714    }
2715
2716    if ((engine->getConfiguration().getBucketType() == "ephemeral") &&
2717        (engine->getConfiguration().getEphemeralFullPolicy()) ==
2718                "fail_new_data") {
2719        /* Expect disconnect signal in Ephemeral with "fail_new_data" policy */
2720        while (1) {
2721            /* Keep sending items till the memory usage goes above the
2722               threshold and the connection is disconnected */
2723            if (ENGINE_DISCONNECT ==
2724                consumer->mutation(opaque,
2725                                   docKey,
2726                                   {}, // value
2727                                   0, // priv bytes
2728                                   PROTOCOL_BINARY_RAW_BYTES,
2729                                   0, // cas
2730                                   vbid,
2731                                   0, // flags
2732                                   ++bySeqno,
2733                                   0, // rev seqno
2734                                   0, // exptime
2735                                   0, // locktime
2736                                   {}, // meta
2737                                   0)) {
2738                break;
2739            }
2740        }
2741    } else {
2742        /* In 'couchbase' buckets we buffer the replica items and indirectly
2743           throttle replication by not sending flow control acks to the
2744           producer. Hence we do not drop the connection here */
2745        EXPECT_EQ(ENGINE_SUCCESS,
2746                  consumer->mutation(opaque,
2747                                     docKey,
2748                                     {}, // value
2749                                     0, // priv bytes
2750                                     PROTOCOL_BINARY_RAW_BYTES,
2751                                     0, // cas
2752                                     vbid,
2753                                     0, // flags
2754                                     bySeqno + 1,
2755                                     0, // rev seqno
2756                                     0, // exptime
2757                                     0, // locktime
2758                                     {}, // meta
2759                                     0)); // nru
2760    }
2761
2762    /* Close stream before deleting the connection */
2763    EXPECT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
2764
2765    destroy_mock_cookie(cookie);
2766}
2767
2768/* Here we test how the DCP consumer handles the scenario where the memory
2769   usage is beyond the replication throttle threshold.
2770   In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
2771   indicate close of the consumer conn and in other cases it is expected to
2772   just defer processing. */
2773TEST_P(ConnectionTest, ReplicateAfterThrottleThreshold) {
2774    sendConsumerMutationsNearThreshold(true);
2775}
2776
2777/* Here we test how the DCP consumer handles the scenario where the memory
2778   usage is just below the replication throttle threshold, but will go over the
2779   threshold when it adds the new mutation from the processor buffer to the
2780   hashtable.
2781   In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
2782   indicate close of the consumer conn and in other cases it is expected to
2783   just defer processing. */
2784TEST_P(ConnectionTest, ReplicateJustBeforeThrottleThreshold) {
2785    sendConsumerMutationsNearThreshold(false);
2786}
2787
2788void ConnectionTest::processConsumerMutationsNearThreshold(
2789        bool beyondThreshold) {
2790    const void* cookie = create_mock_cookie();
2791    const uint32_t opaque = 1;
2792    const uint64_t snapStart = 1, snapEnd = 10;
2793    const uint64_t bySeqno = snapStart;
2794
2795    /* Set up a consumer connection */
2796    auto consumer =
2797            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2798
2799    /* Replica vbucket */
2800    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
2801
2802    /* Passive stream */
2803    ASSERT_EQ(ENGINE_SUCCESS,
2804              consumer->addStream(/*opaque*/ 0,
2805                                  vbid,
2806                                  /*flags*/ 0));
2807    MockPassiveStream* stream = static_cast<MockPassiveStream*>(
2808            (consumer->getVbucketStream(vbid)).get());
2809    ASSERT_TRUE(stream->isActive());
2810
2811    /* Send a snapshotMarker before sending items for replication */
2812    EXPECT_EQ(ENGINE_SUCCESS,
2813              consumer->snapshotMarker(opaque,
2814                                       vbid,
2815                                       snapStart,
2816                                       snapEnd,
2817                                       /* in-memory snapshot */ 0x1,
2818                                       /*HCS*/ {},
2819                                       /*maxVisibleSeqno*/ {}));
2820
2821    /* Simulate a situation where adding a mutation temporarily fails
2822       and hence adds the mutation to a replication buffer. For that, we
2823       set vbucket::takeover_backed_up to true */
2824    engine->getKVBucket()->getVBucket(vbid)->setTakeoverBackedUpState(true);
2825
2826    /* Send an item for replication and expect it to be buffered */
2827    const DocKey docKey{"mykey", DocKeyEncodesCollectionId::No};
2828    EXPECT_EQ(ENGINE_SUCCESS,
2829              consumer->mutation(opaque,
2830                                 docKey,
2831                                 {}, // value
2832                                 0, // priv bytes
2833                                 PROTOCOL_BINARY_RAW_BYTES,
2834                                 0, // cas
2835                                 vbid,
2836                                 0, // flags
2837                                 bySeqno,
2838                                 0, // rev seqno
2839                                 0, // exptime
2840                                 0, // locktime
2841                                 {}, // meta
2842                                 0)); // nru
2843    EXPECT_EQ(1, stream->getNumBufferItems());
2844
2845    /* Set back the vbucket::takeover_backed_up to false */
2846    engine->getKVBucket()->getVBucket(vbid)->setTakeoverBackedUpState(false);
2847
2848    /* Set 'mem_used' beyond the 'replication threshold' */
2849    EPStats& stats = engine->getEpStats();
2850    if (beyondThreshold) {
2851        /* Actually setting it well above also, as there can be a drop in memory
2852           usage during testing */
2853        stats.setMaxDataSize(stats.getEstimatedTotalMemoryUsed() / 4);
2854    } else {
2855        /* set max size to a value just over */
2856        stats.setMaxDataSize(stats.getEstimatedTotalMemoryUsed() + 1);
2857        /* Simpler to set the replication threshold to 1 and test, rather than
2858           testing with maxData = (memUsed / replicationThrottleThreshold); that
2859           is, we are avoiding a division */
2860        engine->getConfiguration().setReplicationThrottleThreshold(100);
2861    }
2862
2863    MockDcpMessageProducers producers(handle);
2864    if ((engine->getConfiguration().getBucketType() == "ephemeral") &&
2865        (engine->getConfiguration().getEphemeralFullPolicy()) ==
2866                "fail_new_data") {
2867        /* Make a call to the function that would be called by the processor
2868           task here */
2869        EXPECT_EQ(stop_processing, consumer->processBufferedItems());
2870
2871        /* Expect the connection to be notified */
2872        EXPECT_FALSE(consumer->isPaused());
2873
2874        /* Expect disconnect signal in Ephemeral with "fail_new_data" policy */
2875        EXPECT_EQ(ENGINE_DISCONNECT, consumer->step(&producers));
2876    } else {
2877        uint32_t backfoffs = consumer->getNumBackoffs();
2878
2879        /* Make a call to the function that would be called by the processor
2880           task here */
2881        if (beyondThreshold) {
2882            EXPECT_EQ(more_to_process, consumer->processBufferedItems());
2883        } else {
2884            EXPECT_EQ(cannot_process, consumer->processBufferedItems());
2885        }
2886
2887        EXPECT_EQ(backfoffs + 1, consumer->getNumBackoffs());
2888
2889        /* In 'couchbase' buckets we buffer the replica items and indirectly
2890           throttle replication by not sending flow control acks to the
2891           producer. Hence we do not drop the connection here */
2892        EXPECT_EQ(ENGINE_SUCCESS, consumer->step(&producers));
2893
2894        /* Close stream before deleting the connection */
2895        EXPECT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
2896    }
2897    destroy_mock_cookie(cookie);
2898}
2899
2900/* Here we test how the Processor task in DCP consumer handles the scenario
2901   where the memory usage is beyond the replication throttle threshold.
2902   In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
2903   indicate close of the consumer conn and in other cases it is expected to
2904   just defer processing. */
2905TEST_P(ConnectionTest, ProcessReplicationBufferAfterThrottleThreshold) {
2906    processConsumerMutationsNearThreshold(true);
2907}
2908
2909/* Here we test how the Processor task in DCP consumer handles the scenario
2910   where the memory usage is just below the replication throttle threshold,
2911   but will go over the threshold when it adds the new mutation from the
2912   processor buffer to the hashtable.
2913   In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
2914   indicate close of the consumer conn and in other cases it is expected to
2915   just defer processing. */
2916TEST_P(ConnectionTest,
2917       DISABLED_ProcessReplicationBufferJustBeforeThrottleThreshold) {
2918    /* There are sporadic failures seen while testing this. The problem is
2919       we need to have a memory usage just below max_size, so we need to
2920       start at that point. But sometimes the memory usage goes further below
2921       resulting in the test failure (a hang). Hence commenting out the test.
2922       Can be run locally as and when needed. */
2923    processConsumerMutationsNearThreshold(false);
2924}
2925
2926TEST_P(ConnectionTest, ProducerEnablesDeleteXattr) {
2927    const void* cookie = create_mock_cookie();
2928
2929    uint32_t flags = 0;
2930    {
2931        const auto producer = std::make_shared<MockDcpProducer>(
2932                *engine, cookie, "test_producer", flags);
2933        EXPECT_EQ(IncludeDeletedUserXattrs::No,
2934                  producer->public_getIncludeDeletedUserXattrs());
2935    }
2936
2937    flags = cb::mcbp::request::DcpOpenPayload::IncludeDeletedUserXattrs;
2938    const auto producer = std::make_shared<MockDcpProducer>(
2939            *engine, cookie, "test_producer", flags);
2940    EXPECT_EQ(IncludeDeletedUserXattrs::Yes,
2941              producer->public_getIncludeDeletedUserXattrs());
2942
2943    destroy_mock_cookie(cookie);
2944}
2945
2946class ActiveStreamChkptProcessorTaskTest : public SingleThreadedKVBucketTest {
2947public:
2948    ActiveStreamChkptProcessorTaskTest() : cookie(create_mock_cookie()) {
2949    }
2950
2951    void SetUp() override {
2952        SingleThreadedKVBucketTest::SetUp();
2953
2954        /* Start an active vb and add 3 items */
2955        store->setVBucketState(vbid, vbucket_state_active);
2956        addItems(3);
2957
2958        producers = std::make_unique<MockDcpMessageProducers>(engine.get());
2959        producer = std::make_shared<MockDcpProducer>(
2960                *engine,
2961                cookie,
2962                "test_producer",
2963                0 /*flags*/,
2964                false /*startTask*/);
2965
2966        /* Create the checkpoint processor task object, but don't schedule */
2967        producer->createCheckpointProcessorTask();
2968    }
2969
2970    void TearDown() override {
2971        producer->cancelCheckpointCreatorTask();
2972        producer->closeAllStreams();
2973        destroy_mock_cookie(cookie);
2974        SingleThreadedKVBucketTest::TearDown();
2975    }
2976
2977    void addItems(int numItems) {
2978        for (int i = 0; i < numItems; ++i) {
2979            std::string key("key" + std::to_string(i));
2980            store_item(vbid, makeStoredDocKey(key), "value");
2981        }
2982    }
2983
2984    /*
2985     * Fake callback emulating dcp_add_failover_log
2986     */
2987    static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(
2988            vbucket_failover_t* entry,
2989            size_t nentries,
2990            gsl::not_null<const void*> cookie) {
2991        return ENGINE_SUCCESS;
2992    }
2993
2994    void notifyAndStepToCheckpoint() {
2995        SingleThreadedKVBucketTest::notifyAndStepToCheckpoint(*producer,
2996                                                              *producers);
2997    }
2998
2999    const void* cookie;
3000    std::unique_ptr<MockDcpMessageProducers> producers;
3001    std::shared_ptr<MockDcpProducer> producer;
3002    const Vbid vbid = Vbid(0);
3003};
3004
3005TEST_F(ActiveStreamChkptProcessorTaskTest, DeleteDeadStreamEntry) {
3006    uint64_t rollbackSeqno;
3007    uint32_t opaque = 1;
3008    ASSERT_EQ(ENGINE_SUCCESS,
3009              producer->streamRequest(
3010                      0, // flags
3011                      opaque,
3012                      vbid,
3013                      0, // start_seqno
3014                      ~0ull, // end_seqno
3015                      0, // vbucket_uuid,
3016                      0, // snap_start_seqno,
3017                      0, // snap_end_seqno,
3018                      &rollbackSeqno,
3019                      ActiveStreamChkptProcessorTaskTest::fakeDcpAddFailoverLog,
3020                      {}));
3021    /* Checkpoint task processor Q will already have any entry for the stream */
3022    EXPECT_EQ(1, producer->getCheckpointSnapshotTask()->queueSize());
3023
3024    /* Close and open the stream without clearing the checkpoint task processor
3025     Q */
3026    producer->closeStream(opaque, vbid);
3027    ASSERT_EQ(ENGINE_SUCCESS,
3028              producer->streamRequest(
3029                      0, // flags
3030                      opaque,
3031                      vbid,
3032                      0, // start_seqno
3033                      ~0ull, // end_seqno
3034                      0, // vbucket_uuid,
3035                      0, // snap_start_seqno,
3036                      0, // snap_end_seqno,
3037                      &rollbackSeqno,
3038                      ActiveStreamChkptProcessorTaskTest::fakeDcpAddFailoverLog,
3039                      {}));
3040
3041    /* The checkpoint processor Q should be processed with the new stream
3042     getting the item(s) */
3043    notifyAndStepToCheckpoint();
3044}
3045
3046// Test handleResponse accepts opcodes that the producer can send
3047TEST_F(SingleThreadedKVBucketTest, ProducerHandleResponse) {
3048    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3049
3050    auto producer = std::make_shared<MockDcpProducer>(*engine,
3051                                                      cookie,
3052                                                      "ProducerHandleResponse",
3053                                                      /*flags*/ 0);
3054
3055    MockDcpMessageProducers producers(engine.get());
3056
3057    protocol_binary_response_header message{};
3058    message.response.setMagic(cb::mcbp::Magic::ClientResponse);
3059    for (auto status : {cb::mcbp::Status::NotMyVbucket,
3060                        cb::mcbp::Status::KeyEnoent,
3061                        cb::mcbp::Status::Success}) {
3062        message.response.setStatus(status);
3063        for (auto op : {cb::mcbp::ClientOpcode::DcpOpen,
3064                        cb::mcbp::ClientOpcode::DcpAddStream,
3065                        cb::mcbp::ClientOpcode::DcpCloseStream,
3066                        cb::mcbp::ClientOpcode::DcpStreamReq,
3067                        cb::mcbp::ClientOpcode::DcpGetFailoverLog,
3068                        cb::mcbp::ClientOpcode::DcpMutation,
3069                        cb::mcbp::ClientOpcode::DcpDeletion,
3070                        cb::mcbp::ClientOpcode::DcpExpiration,
3071                        cb::mcbp::ClientOpcode::DcpBufferAcknowledgement,
3072                        cb::mcbp::ClientOpcode::DcpControl,
3073                        cb::mcbp::ClientOpcode::DcpSystemEvent,
3074                        cb::mcbp::ClientOpcode::GetErrorMap}) {
3075            message.response.setOpcode(op);
3076            EXPECT_TRUE(producer->handleResponse(&message));
3077        }
3078    }
3079    // We should disconnect when we see cb::mcbp::Status::KeyEnoent for
3080    // a durability DCP op
3081    for (auto op : {cb::mcbp::ClientOpcode::DcpPrepare,
3082                    cb::mcbp::ClientOpcode::DcpCommit,
3083                    cb::mcbp::ClientOpcode::DcpAbort}) {
3084        message.response.setOpcode(op);
3085
3086        for (auto status :
3087             {cb::mcbp::Status::NotMyVbucket, cb::mcbp::Status::Success}) {
3088            message.response.setStatus(status);
3089
3090            EXPECT_TRUE(producer->handleResponse(&message));
3091        }
3092    }
3093}
3094
3095// Test that we disconnect whe we receive a non success status code for the
3096// majority of Dcp Opcodes.
3097TEST_F(SingleThreadedKVBucketTest, ProducerHandleResponseDisconnect) {
3098    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3099
3100    auto producer = std::make_shared<MockDcpProducer>(
3101            *engine, cookie, "ProducerHandleResponceDiscconnect", 0);
3102    MockDcpMessageProducers producers(engine.get());
3103
3104    protocol_binary_response_header message{};
3105    message.response.setMagic(cb::mcbp::Magic::ClientResponse);
3106    for (auto errorCode : {cb::mcbp::Status::E2big,
3107                           cb::mcbp::Status::Einval,
3108                           cb::mcbp::Status::Enomem,
3109                           cb::mcbp::Status::Erange,
3110                           cb::mcbp::Status::Etmpfail,
3111                           cb::mcbp::Status::KeyEexists,
3112                           cb::mcbp::Status::Locked,
3113                           cb::mcbp::Status::SyncWriteAmbiguous,
3114                           cb::mcbp::Status::SyncWriteInProgress,
3115                           cb::mcbp::Status::SyncWriteReCommitInProgress,
3116                           cb::mcbp::Status::UnknownCollection}) {
3117        message.response.setStatus(errorCode);
3118        for (auto op : {cb::mcbp::ClientOpcode::DcpOpen,
3119                        cb::mcbp::ClientOpcode::DcpAddStream,
3120                        cb::mcbp::ClientOpcode::DcpCloseStream,
3121                        cb::mcbp::ClientOpcode::DcpStreamReq,
3122                        cb::mcbp::ClientOpcode::DcpGetFailoverLog,
3123                        cb::mcbp::ClientOpcode::DcpMutation,
3124                        cb::mcbp::ClientOpcode::DcpDeletion,
3125                        cb::mcbp::ClientOpcode::DcpExpiration,
3126                        cb::mcbp::ClientOpcode::DcpBufferAcknowledgement,
3127                        cb::mcbp::ClientOpcode::DcpControl,
3128                        cb::mcbp::ClientOpcode::DcpSystemEvent,
3129                        cb::mcbp::ClientOpcode::GetErrorMap,
3130                        cb::mcbp::ClientOpcode::DcpPrepare,
3131                        cb::mcbp::ClientOpcode::DcpCommit,
3132                        cb::mcbp::ClientOpcode::DcpAbort}) {
3133            message.response.setOpcode(op);
3134            EXPECT_FALSE(producer->handleResponse(&message));
3135        }
3136    }
3137    message.response.setStatus(cb::mcbp::Status::KeyEnoent);
3138    for (auto op : {cb::mcbp::ClientOpcode::DcpPrepare,
3139                    cb::mcbp::ClientOpcode::DcpCommit,
3140                    cb::mcbp::ClientOpcode::DcpAbort}) {
3141        message.response.setOpcode(op);
3142        EXPECT_FALSE(producer->handleResponse(&message));
3143    }
3144}
3145
3146// Test how we handle DcpStreamEnd responses from a consumer
3147TEST_F(SingleThreadedKVBucketTest, ProducerHandleResponseStreamEnd) {
3148    auto producer = std::make_shared<MockDcpProducer>(
3149            *engine, cookie, "ProducerHandleResponceStreamEnd", 0);
3150    MockDcpMessageProducers producers(engine.get());
3151
3152    protocol_binary_response_header message{};
3153    message.response.setMagic(cb::mcbp::Magic::ClientResponse);
3154    message.response.setOpcode(cb::mcbp::ClientOpcode::DcpStreamEnd);
3155    for (auto errorCode :
3156         {cb::mcbp::Status::KeyEnoent, cb::mcbp::Status::NotMyVbucket, cb::mcbp::Status::Success}) {
3157        message.response.setStatus(errorCode);
3158        EXPECT_TRUE(producer->handleResponse(&message));
3159    }
3160    for (auto errorCode : {cb::mcbp::Status::E2big,
3161                           cb::mcbp::Status::Einval,
3162                           cb::mcbp::Status::Enomem,
3163                           cb::mcbp::Status::Erange,
3164                           cb::mcbp::Status::Etmpfail,
3165                           cb::mcbp::Status::KeyEexists,
3166                           cb::mcbp::Status::Locked,
3167                           cb::mcbp::Status::SyncWriteAmbiguous,
3168                           cb::mcbp::Status::SyncWriteInProgress,
3169                           cb::mcbp::Status::SyncWriteReCommitInProgress,
3170                           cb::mcbp::Status::UnknownCollection}) {
3171        message.response.setStatus(errorCode);
3172        EXPECT_FALSE(producer->handleResponse(&message));
3173    }
3174}
3175
3176// Test how we handle DcpNoop responses from a consumer
3177TEST_F(SingleThreadedKVBucketTest, ProducerHandleResponseNoop) {
3178    auto producer = std::make_shared<MockDcpProducer>(
3179            *engine, cookie, "ProducerHandleResponceNoop", 0);
3180    MockDcpMessageProducers producers(engine.get());
3181
3182    protocol_binary_response_header message{};
3183    message.response.setMagic(cb::mcbp::Magic::ClientResponse);
3184    message.response.setOpcode(cb::mcbp::ClientOpcode::DcpNoop);
3185
3186    for (auto errorCode : {cb::mcbp::Status::E2big,
3187                           cb::mcbp::Status::Einval,
3188                           cb::mcbp::Status::Enomem,
3189                           cb::mcbp::Status::Erange,
3190                           cb::mcbp::Status::Etmpfail,
3191                           cb::mcbp::Status::KeyEexists,
3192                           cb::mcbp::Status::Locked,
3193                           cb::mcbp::Status::Success,
3194                           cb::mcbp::Status::SyncWriteAmbiguous,
3195                           cb::mcbp::Status::SyncWriteInProgress,
3196                           cb::mcbp::Status::SyncWriteReCommitInProgress,
3197                           cb::mcbp::Status::UnknownCollection}) {
3198        message.response.setStatus(errorCode);
3199        // Test DcpNoop when the opaque is the default opaque value
3200        message.response.setOpaque(10000000);
3201        EXPECT_TRUE(producer->handleResponse(&message));
3202        for (uint32_t Opaque : {123, 0}) {
3203            message.response.setOpaque(Opaque);
3204            EXPECT_FALSE(producer->handleResponse(&message));
3205        }
3206    }
3207
3208    for (auto errorCode :
3209         {cb::mcbp::Status::NotMyVbucket, cb::mcbp::Status::KeyEnoent}) {
3210        message.response.setStatus(errorCode);
3211        // Test DcpNoop when the opaque is the default opaque value
3212        message.response.setOpaque(10000000);
3213        EXPECT_TRUE(producer->handleResponse(&message));
3214        for (uint32_t Opaque : {123, 0}) {
3215            message.response.setOpaque(Opaque);
3216            EXPECT_TRUE(producer->handleResponse(&message));
3217        }
3218    }
3219}
3220
3221TEST_F(SingleThreadedKVBucketTest, ConsumerIdleTimeoutUpdatedOnConfigChange) {
3222    engine->getConfiguration().setDcpIdleTimeout(100);
3223
3224    auto consumer =
3225            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
3226    ASSERT_EQ(std::chrono::seconds(100), consumer->getIdleTimeout());
3227
3228    // Need to put our consumer in the ConnMap or we won't know to change the
3229    // value when the config is updated.
3230    auto& connMap = static_cast<MockDcpConnMap&>(engine->getDcpConnMap());
3231    connMap.addConn(cookie, consumer);
3232    ASSERT_TRUE(connMap.findByName("test_consumer"));
3233
3234    engine->getConfiguration().setDcpIdleTimeout(200);
3235
3236    EXPECT_EQ(std::chrono::seconds(200), consumer->getIdleTimeout());
3237
3238    connMap.removeConn(cookie);
3239}
3240
3241TEST_F(SingleThreadedKVBucketTest, ProducerIdleTimeoutUpdatedOnConfigChange) {
3242    engine->getConfiguration().setDcpIdleTimeout(100);
3243
3244    auto producer = std::make_shared<MockDcpProducer>(*engine,
3245                                                      cookie,
3246                                                      "test_producer",
3247                                                      /*flags*/ 0);
3248    ASSERT_EQ(std::chrono::seconds(100), producer->getIdleTimeout());
3249
3250    // Need to put our producer in the ConnMap or we won't know to change the
3251    // value when the config is updated.
3252    auto& connMap = static_cast<MockDcpConnMap&>(engine->getDcpConnMap());
3253    connMap.addConn(cookie, producer);
3254    ASSERT_TRUE(connMap.findByName("test_producer"));
3255
3256    engine->getConfiguration().setDcpIdleTimeout(200);
3257
3258    EXPECT_EQ(std::chrono::seconds(200), producer->getIdleTimeout());
3259
3260    connMap.removeConn(cookie);
3261}
3262
3263struct PrintToStringCombinedNameXattrOnOff {
3264    std::string operator()(
3265            const ::testing::TestParamInfo<::testing::tuple<std::string, bool>>&
3266                    info) const {
3267        if (::testing::get<1>(info.param)) {
3268            return ::testing::get<0>(info.param) + "_xattr";
3269}
3270        return ::testing::get<0>(info.param);
3271    }
3272};
3273
3274// Test cases which run in both Full and Value eviction
3275INSTANTIATE_TEST_CASE_P(CompressionStreamTest,
3276                        CompressionStreamTest,
3277                        ::testing::Combine(::testing::Values("persistent",
3278                                                             "ephemeral"),
3279                                           ::testing::Bool()),
3280                        PrintToStringCombinedNameXattrOnOff());
3281
3282INSTANTIATE_TEST_CASE_P(PersistentAndEphemeral,
3283                        ConnectionTest,
3284                        STParameterizedBucketTest::allConfigValues(), );
3285