1b84d09deSDave Rigby/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2b84d09deSDave Rigby/*
3b84d09deSDave Rigby *     Copyright 2016 Couchbase, Inc
4b84d09deSDave Rigby *
5b84d09deSDave Rigby *   Licensed under the Apache License, Version 2.0 (the "License");
6b84d09deSDave Rigby *   you may not use this file except in compliance with the License.
7b84d09deSDave Rigby *   You may obtain a copy of the License at
8b84d09deSDave Rigby *
9b84d09deSDave Rigby *       http://www.apache.org/licenses/LICENSE-2.0
10b84d09deSDave Rigby *
11b84d09deSDave Rigby *   Unless required by applicable law or agreed to in writing, software
12b84d09deSDave Rigby *   distributed under the License is distributed on an "AS IS" BASIS,
13b84d09deSDave Rigby *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14b84d09deSDave Rigby *   See the License for the specific language governing permissions and
15b84d09deSDave Rigby *   limitations under the License.
16b84d09deSDave Rigby */
17b84d09deSDave Rigby
18ee0084c6SDave Rigby/*
19ee0084c6SDave Rigby * Unit test for DCP-related classes.
20ee0084c6SDave Rigby *
21ee0084c6SDave Rigby * Due to the way our classes are structured, most of the different DCP classes
22aadb7ef8SManu Dhundi * need an instance of EPBucket& other related objects.
23ee0084c6SDave Rigby */
24ee0084c6SDave Rigby
25c76a444bSDave Rigby#include "../mock/mock_dcp.h"
26344a5eb7SJames Harrison#include "../mock/mock_dcp_conn_map.h"
27c76a444bSDave Rigby#include "../mock/mock_dcp_consumer.h"
28c76a444bSDave Rigby#include "../mock/mock_dcp_producer.h"
29c76a444bSDave Rigby#include "../mock/mock_stream.h"
30d1f4517aSManu Dhundi#include "../mock/mock_synchronous_ep_engine.h"
31da4e3493SDave Rigby#include "checkpoint.h"
32f47eb0edSDaniel Owen#include "connmap.h"
339009cf78SDaniel Owen#include "dcp/backfill_disk.h"
349009cf78SDaniel Owen#include "dcp/dcp-types.h"
3558f80b18SDaniel Owen#include "dcp/dcpconnmap.h"
361cd7a852SDave Rigby#include "dcp/producer.h"
371cd7a852SDave Rigby#include "dcp/stream.h"
38ae32b5caSPaolo Cocchi#include "dcp_utils.h"
39c76a444bSDave Rigby#include "ep_time.h"
40ee0084c6SDave Rigby#include "evp_engine_test.h"
41d355bfe2SManu Dhundi#include "evp_store_single_threaded_test.h"
42e1c3550fSManu Dhundi#include "failover-table.h"
432288acbbSJim Walker#include "memory_tracker.h"
442288acbbSJim Walker#include "objectregistry.h"
45c3784c74SDave Rigby#include "test_helpers.h"
46b84d09deSDave Rigby
47526fb7a9SJames Harrison#include <dcp/backfill_memory.h>
48344a5eb7SJames Harrison#include <gtest/gtest.h>
493726d8a7SSriram Ganesan#include <platform/compress.h>
5054ce79e0SDaniel Owen#include <xattr/utils.h>
5103c34577SDave Rigby
520739f2fdSJim Walker#include <thread>
530739f2fdSJim Walker
54706b5173SManu Dhundiextern uint8_t dcp_last_op;
55706b5173SManu Dhundiextern uint32_t dcp_last_flags;
56706b5173SManu Dhundi
57ee0084c6SDave Rigbyclass DCPTest : public EventuallyPersistentEngineTest {
5803c34577SDave Rigbyprotected:
59ee0084c6SDave Rigby    void SetUp() override {
60ee0084c6SDave Rigby        EventuallyPersistentEngineTest::SetUp();
6103c34577SDave Rigby
6203c34577SDave Rigby        // Set AuxIO threads to zero, so that the producer's
6303c34577SDave Rigby        // ActiveStreamCheckpointProcesserTask doesn't run.
64b20b9600SJames Harrison        ExecutorPool::get()->setNumAuxIO(0);
65f47eb0edSDaniel Owen        // Set NonIO threads to zero, so the connManager
66f47eb0edSDaniel Owen        // task does not run.
67b20b9600SJames Harrison        ExecutorPool::get()->setNumNonIO(0);
68706b5173SManu Dhundi        callbackCount = 0;
692288acbbSJim Walker
702288acbbSJim Walker#if defined(HAVE_JEMALLOC)
712288acbbSJim Walker        // MB-28370: Run with memory tracking for all alloc/deallocs when built
722288acbbSJim Walker        // with jemalloc.
732288acbbSJim Walker        MemoryTracker::getInstance(*get_mock_server_api()->alloc_hooks);
742288acbbSJim Walker        engine->getEpStats().memoryTrackerEnabled.store(true);
752288acbbSJim Walker#endif
7603c34577SDave Rigby    }
77b84d09deSDave Rigby
78c212fccbSJames Harrison    void TearDown() override {
79c212fccbSJames Harrison        /* MB-22041 changes to dynamically stopping threads rather than having
80c212fccbSJames Harrison         * the excess looping but not getting work. We now need to set the
81c212fccbSJames Harrison         * AuxIO and NonIO back to 1 to allow dead tasks to be cleaned up
82c212fccbSJames Harrison         */
83b20b9600SJames Harrison        ExecutorPool::get()->setNumAuxIO(1);
84b20b9600SJames Harrison        ExecutorPool::get()->setNumNonIO(1);
85c212fccbSJames Harrison
86c212fccbSJames Harrison        EventuallyPersistentEngineTest::TearDown();
872288acbbSJim Walker
882288acbbSJim Walker        MemoryTracker::destroyInstance();
89c212fccbSJames Harrison    }
903735c796SManu Dhundi
9103c34577SDave Rigby    // Setup a DCP producer and attach a stream and cursor to it.
92ba957b1bSDave Rigby    void setup_dcp_stream(
93ba957b1bSDave Rigby            int flags = 0,
94ba957b1bSDave Rigby            IncludeValue includeVal = IncludeValue::Yes,
95ba957b1bSDave Rigby            IncludeXattrs includeXattrs = IncludeXattrs::Yes,
96ba957b1bSDave Rigby            std::vector<std::pair<std::string, std::string>> controls = {}) {
9754ce79e0SDaniel Owen        if (includeVal == IncludeValue::No) {
9814fcb066SJim Walker            flags |= DCP_OPEN_NO_VALUE;
9914fcb066SJim Walker        }
10054ce79e0SDaniel Owen        if (includeXattrs == IncludeXattrs::Yes) {
10154ce79e0SDaniel Owen            flags |= DCP_OPEN_INCLUDE_XATTRS;
10254ce79e0SDaniel Owen        }
1031456e3a9SManu Dhundi        producer = std::make_shared<MockDcpProducer>(
1041456e3a9SManu Dhundi                *engine,
105929e6fbdSDave Rigby                cookie,
1061456e3a9SManu Dhundi                "test_producer",
1071456e3a9SManu Dhundi                flags,
1081456e3a9SManu Dhundi                cb::const_byte_buffer() /*no json*/,
1091456e3a9SManu Dhundi                /*startTask*/ true);
11014fcb066SJim Walker
1119baca6f3SDave Rigby        if (includeXattrs == IncludeXattrs::Yes) {
1129baca6f3SDave Rigby            producer->setNoopEnabled(true);
1139baca6f3SDave Rigby        }
1149baca6f3SDave Rigby
115afa3e748SManu Dhundi        // Since we are creating a mock active stream outside of
116afa3e748SManu Dhundi        // DcpProducer::streamRequest(), and we want the checkpt processor task,
117afa3e748SManu Dhundi        // create it explicitly here
118afa3e748SManu Dhundi        producer->createCheckpointProcessorTask();
119afa3e748SManu Dhundi        producer->scheduleCheckpointProcessorTask();
120afa3e748SManu Dhundi
121ba957b1bSDave Rigby        // Now set any controls before creating any streams
122ba957b1bSDave Rigby        for (const auto& control : controls) {
123ba957b1bSDave Rigby            EXPECT_EQ(ENGINE_SUCCESS,
124ba957b1bSDave Rigby                      producer->control(0,
125ba957b1bSDave Rigby                                        control.first.c_str(),
126ba957b1bSDave Rigby                                        control.first.size(),
127ba957b1bSDave Rigby                                        control.second.c_str(),
128ba957b1bSDave Rigby                                        control.second.size()));
129ba957b1bSDave Rigby        }
130ba957b1bSDave Rigby
13114fcb066SJim Walker        vb0 = engine->getVBucket(vbid);
13214fcb066SJim Walker        ASSERT_NE(nullptr, vb0.get());
13314fcb066SJim Walker        EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
134a7d604deSManu Dhundi        stream = std::make_shared<MockActiveStream>(engine,
135a7d604deSManu Dhundi                                                    producer,
136a7d604deSManu Dhundi                                                    flags,
137a7d604deSManu Dhundi                                                    /*opaque*/ 0,
138a7d604deSManu Dhundi                                                    *vb0,
139a7d604deSManu Dhundi                                                    /*st_seqno*/ 0,
140a7d604deSManu Dhundi                                                    /*en_seqno*/ ~0,
141a7d604deSManu Dhundi                                                    /*vb_uuid*/ 0xabcd,
142a7d604deSManu Dhundi                                                    /*snap_start_seqno*/ 0,
143a7d604deSManu Dhundi                                                    /*snap_end_seqno*/ ~0,
144a7d604deSManu Dhundi                                                    includeVal,
145a7d604deSManu Dhundi                                                    includeXattrs);
14606c2fe8aSManu Dhundi
147841fce6aSDave Rigby        EXPECT_FALSE(vb0->checkpointManager->registerCursor(
14849d69f53SJim Walker                stream->getCursorName(), 1, false, MustSendCheckpointEnd::NO))
149841fce6aSDave Rigby                << "Found an existing TAP cursor when attempting to register "
150841fce6aSDave Rigby                   "ours";
15138ba8360SJim Walker        stream->setActive();
152b84d09deSDave Rigby    }
153b84d09deSDave Rigby
154e63181b0SManu Dhundi    void destroy_dcp_stream() {
1551456e3a9SManu Dhundi        producer->closeStream(/*opaque*/ 0, vb0->getId());
156e63181b0SManu Dhundi    }
157e63181b0SManu Dhundi
15854ce79e0SDaniel Owen    /*
15954ce79e0SDaniel Owen     * Creates an item with the key \"key\", containing json data and xattrs.
16054ce79e0SDaniel Owen     * @return a unique_ptr to a newly created item.
16154ce79e0SDaniel Owen     */
16254ce79e0SDaniel Owen    std::unique_ptr<Item> makeItemWithXattrs() {
16354ce79e0SDaniel Owen        std::string valueData = R"({"json":"yes"})";
16454ce79e0SDaniel Owen        std::string data = createXattrValue(valueData);
16570c0f1d2SDaniel Owen        protocol_binary_datatype_t datatype = (PROTOCOL_BINARY_DATATYPE_JSON |
16670c0f1d2SDaniel Owen                                               PROTOCOL_BINARY_DATATYPE_XATTR);
16754ce79e0SDaniel Owen        return std::make_unique<Item>(makeStoredDocKey("key"),
16854ce79e0SDaniel Owen                                      /*flags*/0,
16954ce79e0SDaniel Owen                                      /*exp*/0,
17054ce79e0SDaniel Owen                                      data.c_str(),
17154ce79e0SDaniel Owen                                      data.size(),
17270c0f1d2SDaniel Owen                                      datatype);
17354ce79e0SDaniel Owen    }
17454ce79e0SDaniel Owen
17554ce79e0SDaniel Owen    /*
17654ce79e0SDaniel Owen     * Creates an item with the key \"key\", containing json data and no xattrs.
17754ce79e0SDaniel Owen     * @return a unique_ptr to a newly created item.
17854ce79e0SDaniel Owen     */
17954ce79e0SDaniel Owen    std::unique_ptr<Item> makeItemWithoutXattrs() {
18054ce79e0SDaniel Owen            std::string valueData = R"({"json":"yes"})";
18170c0f1d2SDaniel Owen            protocol_binary_datatype_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
18254ce79e0SDaniel Owen            return std::make_unique<Item>(makeStoredDocKey("key"),
18354ce79e0SDaniel Owen                                          /*flags*/0,
18454ce79e0SDaniel Owen                                          /*exp*/0,
18554ce79e0SDaniel Owen                                          valueData.c_str(),
18654ce79e0SDaniel Owen                                          valueData.size(),
18770c0f1d2SDaniel Owen                                          datatype);
18854ce79e0SDaniel Owen    }
18954ce79e0SDaniel Owen
190a3cf07d7SManu Dhundi    /* Add items onto the vbucket and wait for the checkpoint to be removed */
191959f95dbSManu Dhundi    void addItemsAndRemoveCheckpoint(int numItems) {
192959f95dbSManu Dhundi        for (int i = 0; i < numItems; ++i) {
193959f95dbSManu Dhundi            std::string key("key" + std::to_string(i));
194959f95dbSManu Dhundi            store_item(vbid, key, "value");
195959f95dbSManu Dhundi        }
196269b255bSManu Dhundi        removeCheckpoint(numItems);
197269b255bSManu Dhundi    }
198959f95dbSManu Dhundi
199269b255bSManu Dhundi    void removeCheckpoint(int numItems) {
200959f95dbSManu Dhundi        /* Create new checkpoint so that we can remove the current checkpoint
201269b255bSManu Dhundi           and force a backfill in the DCP stream */
202841fce6aSDave Rigby        auto& ckpt_mgr = *vb0->checkpointManager;
203959f95dbSManu Dhundi        ckpt_mgr.createNewCheckpoint();
204959f95dbSManu Dhundi
205959f95dbSManu Dhundi        /* Wait for removal of the old checkpoint, this also would imply that
206269b255bSManu Dhundi           the items are persisted (in case of persistent buckets) */
207959f95dbSManu Dhundi        {
208959f95dbSManu Dhundi            bool new_ckpt_created;
209959f95dbSManu Dhundi            std::chrono::microseconds uSleepTime(128);
210959f95dbSManu Dhundi            while (static_cast<size_t>(numItems) !=
211959f95dbSManu Dhundi                   ckpt_mgr.removeClosedUnrefCheckpoints(*vb0,
212959f95dbSManu Dhundi                                                         new_ckpt_created)) {
213959f95dbSManu Dhundi                uSleepTime = decayingSleep(uSleepTime);
214959f95dbSManu Dhundi            }
215959f95dbSManu Dhundi        }
216959f95dbSManu Dhundi    }
217959f95dbSManu Dhundi
2181456e3a9SManu Dhundi    std::shared_ptr<MockDcpProducer> producer;
219a7d604deSManu Dhundi    std::shared_ptr<MockActiveStream> stream;
220115b9765SJim Walker    VBucketPtr vb0;
221e5e0876cSJim Walker
222e5e0876cSJim Walker    /*
223e5e0876cSJim Walker     * Fake callback emulating dcp_add_failover_log
224e5e0876cSJim Walker     */
225e5e0876cSJim Walker    static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(
226e5e0876cSJim Walker            vbucket_failover_t* entry,
227e5e0876cSJim Walker            size_t nentries,
228e5e0876cSJim Walker            gsl::not_null<const void*> cookie) {
229e5e0876cSJim Walker        callbackCount++;
230e5e0876cSJim Walker        return ENGINE_SUCCESS;
231e5e0876cSJim Walker    }
232e5e0876cSJim Walker
233e5e0876cSJim Walker    // callbackCount needs to be static as its used inside of the static
234e5e0876cSJim Walker    // function fakeDcpAddFailoverLog.
235e5e0876cSJim Walker    static int callbackCount;
236e5e0876cSJim Walker};
237e5e0876cSJim Walkerint DCPTest::callbackCount = 0;
238e5e0876cSJim Walker
239e5e0876cSJim Walkerclass StreamTest : public DCPTest,
240e5e0876cSJim Walker                   public ::testing::WithParamInterface<std::string> {
241e5e0876cSJim Walkerprotected:
242e5e0876cSJim Walker    void SetUp() override {
243e5e0876cSJim Walker        bucketType = GetParam();
244e5e0876cSJim Walker        DCPTest::SetUp();
245e5e0876cSJim Walker        vb0 = engine->getVBucket(0);
246e5e0876cSJim Walker        EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
247e5e0876cSJim Walker    }
248e5e0876cSJim Walker
249e5e0876cSJim Walker    void TearDown() override {
250e5e0876cSJim Walker        if (producer) {
251e5e0876cSJim Walker            producer->cancelCheckpointCreatorTask();
252e5e0876cSJim Walker        }
253e5e0876cSJim Walker        // Destroy various engine objects
254e5e0876cSJim Walker        vb0.reset();
255e5e0876cSJim Walker        stream.reset();
256e5e0876cSJim Walker        producer.reset();
257e5e0876cSJim Walker        DCPTest::TearDown();
258e5e0876cSJim Walker    }
25903c34577SDave Rigby};
260b84d09deSDave Rigby
2614023228dSDaniel Owen/*
26254ce79e0SDaniel Owen * Test that when have a producer with IncludeValue and IncludeXattrs both set
26354ce79e0SDaniel Owen * to No an active stream created via a streamRequest returns true for
26454ce79e0SDaniel Owen * isKeyOnly.
2654023228dSDaniel Owen */
26654ce79e0SDaniel OwenTEST_P(StreamTest, test_streamIsKeyOnlyTrue) {
267959f95dbSManu Dhundi    setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::No);
2684023228dSDaniel Owen    uint64_t rollbackSeqno;
2693735c796SManu Dhundi    auto err = producer->streamRequest(/*flags*/ 0,
2703735c796SManu Dhundi                                       /*opaque*/ 0,
2713735c796SManu Dhundi                                       /*vbucket*/ 0,
2723735c796SManu Dhundi                                       /*start_seqno*/ 0,
2733735c796SManu Dhundi                                       /*end_seqno*/ 0,
2743735c796SManu Dhundi                                       /*vb_uuid*/ 0,
2753735c796SManu Dhundi                                       /*snap_start*/ 0,
2763735c796SManu Dhundi                                       /*snap_end*/ 0,
2774023228dSDaniel Owen                                       &rollbackSeqno,
2783735c796SManu Dhundi                                       DCPTest::fakeDcpAddFailoverLog);
2794023228dSDaniel Owen    ASSERT_EQ(ENGINE_SUCCESS, err)
2804023228dSDaniel Owen        << "stream request did not return ENGINE_SUCCESS";
2814023228dSDaniel Owen
282a7d604deSManu Dhundi    auto activeStream =
283a7d604deSManu Dhundi            std::dynamic_pointer_cast<ActiveStream>(producer->findStream(0));
284a7d604deSManu Dhundi    ASSERT_NE(nullptr, activeStream);
285a7d604deSManu Dhundi    EXPECT_TRUE(activeStream->isKeyOnly());
286e63181b0SManu Dhundi    destroy_dcp_stream();
2874023228dSDaniel Owen}
2885bf7e4e7SSriram Ganesan
289aacf24fdSTrond NorbyeENGINE_ERROR_CODE mock_mutation_return_engine_e2big(
290aacf24fdSTrond Norbye        gsl::not_null<const void*> cookie,
291aacf24fdSTrond Norbye        uint32_t opaque,
292aacf24fdSTrond Norbye        item* itm,
293aacf24fdSTrond Norbye        uint16_t vbucket,
294aacf24fdSTrond Norbye        uint64_t by_seqno,
295aacf24fdSTrond Norbye        uint64_t rev_seqno,
296aacf24fdSTrond Norbye        uint32_t lock_time,
297aacf24fdSTrond Norbye        const void* meta,
298aacf24fdSTrond Norbye        uint16_t nmeta,
299aacf24fdSTrond Norbye        uint8_t nru,
300aacf24fdSTrond Norbye        uint8_t collection_len) {
3015bf7e4e7SSriram Ganesan    Item* item = reinterpret_cast<Item*>(itm);
3025bf7e4e7SSriram Ganesan    delete item;
3035bf7e4e7SSriram Ganesan    return ENGINE_E2BIG;
3045bf7e4e7SSriram Ganesan}
3055bf7e4e7SSriram Ganesan
3063726d8a7SSriram Ganesanstd::string decompressValue(std::string compressedValue) {
3073726d8a7SSriram Ganesan    cb::compression::Buffer buffer;
3083726d8a7SSriram Ganesan    if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
309c1fa3523SSriram Ganesan                                  compressedValue, buffer)) {
3103726d8a7SSriram Ganesan        return {};
3113726d8a7SSriram Ganesan    }
3123726d8a7SSriram Ganesan
313663d97bdSTrond Norbye    return std::string(buffer.data(), buffer.size());
3143726d8a7SSriram Ganesan}
3153726d8a7SSriram Ganesan
3163726d8a7SSriram Ganesanextern std::string dcp_last_value;
3173726d8a7SSriram Ganesanextern uint32_t dcp_last_packet_size;
3183726d8a7SSriram Ganesanextern protocol_binary_datatype_t dcp_last_datatype;
3193726d8a7SSriram Ganesan
320e5e0876cSJim Walkerclass CompressionStreamTest : public DCPTest,
321e5e0876cSJim Walker                              public ::testing::WithParamInterface<
322e5e0876cSJim Walker                                      ::testing::tuple<std::string, bool>> {
323e5e0876cSJim Walkerpublic:
324e5e0876cSJim Walker    void SetUp() override {
325e5e0876cSJim Walker        bucketType = ::testing::get<0>(GetParam());
326e5e0876cSJim Walker        DCPTest::SetUp();
327e5e0876cSJim Walker        vb0 = engine->getVBucket(0);
328e5e0876cSJim Walker        EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
329e5e0876cSJim Walker    }
330e5e0876cSJim Walker
331e5e0876cSJim Walker    void TearDown() override {
332e5e0876cSJim Walker        if (producer) {
333e5e0876cSJim Walker            producer->cancelCheckpointCreatorTask();
334e5e0876cSJim Walker        }
335e5e0876cSJim Walker        // Destroy various engine objects
336e5e0876cSJim Walker        vb0.reset();
337e5e0876cSJim Walker        stream.reset();
338e5e0876cSJim Walker        producer.reset();
339e5e0876cSJim Walker        DCPTest::TearDown();
340e5e0876cSJim Walker    }
341e5e0876cSJim Walker
342e5e0876cSJim Walker    bool isXattr() const {
343e5e0876cSJim Walker        return ::testing::get<1>(GetParam());
344e5e0876cSJim Walker    }
345e5e0876cSJim Walker
346e5e0876cSJim Walker    size_t getItemSize(Item& item) {
347e5e0876cSJim Walker        if (isXattr()) {
348e5e0876cSJim Walker            return MutationResponse::mutationBaseMsgBytes +
349e5e0876cSJim Walker                   item.getKey().size() +
350e5e0876cSJim Walker                   // DCP won't recompress the pruned document
351e5e0876cSJim Walker                   getXattrSize(false);
352e5e0876cSJim Walker        }
353e5e0876cSJim Walker        return MutationResponse::mutationBaseMsgBytes + item.getKey().size() +
354e5e0876cSJim Walker               item.getNBytes();
355e5e0876cSJim Walker    }
356e5e0876cSJim Walker
357e5e0876cSJim Walker    size_t getXattrSize(bool compressed) const {
358e5e0876cSJim Walker        return createXattrValue({}, true, compressed).size();
359e5e0876cSJim Walker    }
360e5e0876cSJim Walker};
361e5e0876cSJim Walker
362ba957b1bSDave Rigby// Test the compression control error case
363ba957b1bSDave RigbyTEST_P(StreamTest, validate_compression_control_message_denied) {
364ba957b1bSDave Rigby    setup_dcp_stream();
365ba957b1bSDave Rigby    std::string compressCtrlMsg("force_value_compression");
366ba957b1bSDave Rigby    std::string compressCtrlValue("true");
367ba957b1bSDave Rigby    EXPECT_FALSE(producer->isCompressionEnabled());
368ba957b1bSDave Rigby
369ba957b1bSDave Rigby    // Sending a control message without actually enabling SNAPPY must fail
370ba957b1bSDave Rigby    EXPECT_EQ(ENGINE_EINVAL,
371ba957b1bSDave Rigby              producer->control(0,
372ba957b1bSDave Rigby                                compressCtrlMsg.c_str(),
373ba957b1bSDave Rigby                                compressCtrlMsg.size(),
374ba957b1bSDave Rigby                                compressCtrlValue.c_str(),
375ba957b1bSDave Rigby                                compressCtrlValue.size()));
376ba957b1bSDave Rigby    destroy_dcp_stream();
377ba957b1bSDave Rigby}
378ba957b1bSDave Rigby
379ba957b1bSDave Rigby// Test the compression control success case
380ba957b1bSDave RigbyTEST_P(StreamTest, validate_compression_control_message_allowed) {
381ba957b1bSDave Rigby    // For success enable the snappy datatype on the connection
382ba957b1bSDave Rigby    mock_set_datatype_support(cookie, PROTOCOL_BINARY_DATATYPE_SNAPPY);
383ba957b1bSDave Rigby    setup_dcp_stream();
384ba957b1bSDave Rigby    std::string compressCtrlMsg("force_value_compression");
385ba957b1bSDave Rigby    std::string compressCtrlValue("true");
386ba957b1bSDave Rigby    EXPECT_TRUE(producer->isCompressionEnabled());
387ba957b1bSDave Rigby
388ba957b1bSDave Rigby    // Sending a control message after enabling SNAPPY should succeed
389ba957b1bSDave Rigby    EXPECT_EQ(ENGINE_SUCCESS,
390ba957b1bSDave Rigby              producer->control(0,
391ba957b1bSDave Rigby                                compressCtrlMsg.c_str(),
392ba957b1bSDave Rigby                                compressCtrlMsg.size(),
393ba957b1bSDave Rigby                                compressCtrlValue.c_str(),
394ba957b1bSDave Rigby                                compressCtrlValue.size()));
395ba957b1bSDave Rigby    destroy_dcp_stream();
396ba957b1bSDave Rigby}
397ba957b1bSDave Rigby
3983726d8a7SSriram Ganesan/**
3993726d8a7SSriram Ganesan * Test to verify DCP compression/decompression. There are 4 cases that are being
4003726d8a7SSriram Ganesan * tested
4013726d8a7SSriram Ganesan *
4023726d8a7SSriram Ganesan * 1. Add a compressed item and stream a compressed item
4033726d8a7SSriram Ganesan * 2. Add an uncompressed item and stream a compressed item
4043726d8a7SSriram Ganesan * 3. Add a compressed item and stream an uncompressed item
4053726d8a7SSriram Ganesan * 4. Add an uncompressed item and stream an uncompressed item
4063726d8a7SSriram Ganesan */
407ba957b1bSDave Rigby
408ba957b1bSDave Rigby/**
409ba957b1bSDave Rigby * There are 2 cases that are
410ba957b1bSDave Rigby * being tested in this test. This test uses a producer/connection without
411ba957b1bSDave Rigby * compression enabled
412ba957b1bSDave Rigby *
413ba957b1bSDave Rigby * 1. Add a compressed item and expect to stream an uncompressed item
414ba957b1bSDave Rigby * 2. Add an uncompressed item and expect to stream an uncompressed item
415ba957b1bSDave Rigby *
416ba957b1bSDave Rigby */
417ba957b1bSDave RigbyTEST_P(CompressionStreamTest, compression_not_enabled) {
4183726d8a7SSriram Ganesan    VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
419687ed3e5SSriram Ganesan    std::string valueData("{\"product\": \"car\",\"price\": \"100\"},"
420687ed3e5SSriram Ganesan                          "{\"product\": \"bus\",\"price\": \"1000\"},"
421687ed3e5SSriram Ganesan                          "{\"product\": \"Train\",\"price\": \"100000\"}");
422e5e0876cSJim Walker    auto item1 = makeCompressibleItem(vbid,
423e5e0876cSJim Walker                                      makeStoredDocKey("key1"),
424e5e0876cSJim Walker                                      valueData,
425e5e0876cSJim Walker                                      PROTOCOL_BINARY_DATATYPE_JSON,
426ba957b1bSDave Rigby                                      true, // compressed
427e5e0876cSJim Walker                                      isXattr());
428e5e0876cSJim Walker    auto item2 = makeCompressibleItem(vbid,
429e5e0876cSJim Walker                                      makeStoredDocKey("key2"),
430e5e0876cSJim Walker                                      valueData,
431e5e0876cSJim Walker                                      PROTOCOL_BINARY_DATATYPE_JSON,
432ba957b1bSDave Rigby                                      false, // uncompressed
433e5e0876cSJim Walker                                      isXattr());
434e5e0876cSJim Walker
435e5e0876cSJim Walker    auto includeValue = isXattr() ? IncludeValue::No : IncludeValue::Yes;
436e5e0876cSJim Walker    setup_dcp_stream(0, includeValue, IncludeXattrs::Yes);
4373726d8a7SSriram Ganesan
4386beefd65SSriram Ganesan    /**
4396beefd65SSriram Ganesan     * Ensure that compression is disabled
4406beefd65SSriram Ganesan     */
441