1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /*
19  * Unit test for DCP-related classes.
20  *
21  * Due to the way our classes are structured, most of the different DCP classes
22  * need an instance of EPBucket& other related objects.
23  */
24 
25 #include "../mock/mock_dcp.h"
26 #include "../mock/mock_dcp_conn_map.h"
27 #include "../mock/mock_dcp_consumer.h"
28 #include "../mock/mock_dcp_producer.h"
29 #include "../mock/mock_stream.h"
30 #include "../mock/mock_synchronous_ep_engine.h"
31 #include "checkpoint.h"
32 #include "connmap.h"
33 #include "dcp/backfill_disk.h"
34 #include "dcp/dcp-types.h"
35 #include "dcp/dcpconnmap.h"
36 #include "dcp/producer.h"
37 #include "dcp/stream.h"
38 #include "dcp_utils.h"
39 #include "ep_time.h"
40 #include "evp_engine_test.h"
41 #include "evp_store_single_threaded_test.h"
42 #include "failover-table.h"
43 #include "memory_tracker.h"
44 #include "objectregistry.h"
45 #include "test_helpers.h"
46 
47 #include <dcp/backfill_memory.h>
48 #include <gtest/gtest.h>
49 #include <platform/compress.h>
50 #include <xattr/utils.h>
51 
52 #include <thread>
53 
54 extern uint8_t dcp_last_op;
55 extern uint32_t dcp_last_flags;
56 
57 class DCPTest : public EventuallyPersistentEngineTest {
58 protected:
59     void SetUp() override {
60         EventuallyPersistentEngineTest::SetUp();
61 
62         // Set AuxIO threads to zero, so that the producer's
63         // ActiveStreamCheckpointProcesserTask doesn't run.
64         ExecutorPool::get()->setNumAuxIO(0);
65         // Set NonIO threads to zero, so the connManager
66         // task does not run.
67         ExecutorPool::get()->setNumNonIO(0);
68         callbackCount = 0;
69 
70 #if defined(HAVE_JEMALLOC)
71         // MB-28370: Run with memory tracking for all alloc/deallocs when built
72         // with jemalloc.
73         MemoryTracker::getInstance(*get_mock_server_api()->alloc_hooks);
74         engine->getEpStats().memoryTrackerEnabled.store(true);
75 #endif
76     }
77 
78     void TearDown() override {
79         /* MB-22041 changes to dynamically stopping threads rather than having
80          * the excess looping but not getting work. We now need to set the
81          * AuxIO and NonIO back to 1 to allow dead tasks to be cleaned up
82          */
83         ExecutorPool::get()->setNumAuxIO(1);
84         ExecutorPool::get()->setNumNonIO(1);
85 
86         EventuallyPersistentEngineTest::TearDown();
87 
88         MemoryTracker::destroyInstance();
89     }
90 
91     // Setup a DCP producer and attach a stream and cursor to it.
setup_dcp_stream( int flags = 0, IncludeValue includeVal = IncludeValue::Yes, IncludeXattrs includeXattrs = IncludeXattrs::Yes, std::vector<std::pair<std::string, std::string>> controls = {})92     void setup_dcp_stream(
93             int flags = 0,
94             IncludeValue includeVal = IncludeValue::Yes,
95             IncludeXattrs includeXattrs = IncludeXattrs::Yes,
96             std::vector<std::pair<std::string, std::string>> controls = {}) {
97         if (includeVal == IncludeValue::No) {
98             flags |= DCP_OPEN_NO_VALUE;
99         }
100         if (includeXattrs == IncludeXattrs::Yes) {
101             flags |= DCP_OPEN_INCLUDE_XATTRS;
102         }
103         producer = std::make_shared<MockDcpProducer>(
104                 *engine,
105                 cookie,
106                 "test_producer",
107                 flags,
108                 cb::const_byte_buffer() /*no json*/,
109                 /*startTask*/ true);
110 
111         if (includeXattrs == IncludeXattrs::Yes) {
112             producer->setNoopEnabled(true);
113         }
114 
115         // Since we are creating a mock active stream outside of
116         // DcpProducer::streamRequest(), and we want the checkpt processor task,
117         // create it explicitly here
118         producer->createCheckpointProcessorTask();
119         producer->scheduleCheckpointProcessorTask();
120 
121         // Now set any controls before creating any streams
122         for (const auto& control : controls) {
123             EXPECT_EQ(ENGINE_SUCCESS,
124                       producer->control(0,
125                                         control.first.c_str(),
126                                         control.first.size(),
127                                         control.second.c_str(),
128                                         control.second.size()));
129         }
130 
131         vb0 = engine->getVBucket(vbid);
132         ASSERT_NE(nullptr, vb0.get());
133         EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
134         stream = std::make_shared<MockActiveStream>(engine,
135                                                     producer,
136                                                     flags,
137                                                     /*opaque*/ 0,
138                                                     *vb0,
139                                                     /*st_seqno*/ 0,
140                                                     /*en_seqno*/ ~0,
141                                                     /*vb_uuid*/ 0xabcd,
142                                                     /*snap_start_seqno*/ 0,
143                                                     /*snap_end_seqno*/ ~0,
144                                                     includeVal,
145                                                     includeXattrs);
146 
147         EXPECT_FALSE(vb0->checkpointManager->registerCursor(
148                 stream->getCursorName(), 1, false, MustSendCheckpointEnd::NO))
149                 << "Found an existing TAP cursor when attempting to register "
150                    "ours";
151         stream->setActive();
152     }
153 
destroy_dcp_stream()154     void destroy_dcp_stream() {
155         producer->closeStream(/*opaque*/ 0, vb0->getId());
156     }
157 
158     /*
159      * Creates an item with the key \"key\", containing json data and xattrs.
160      * @return a unique_ptr to a newly created item.
161      */
makeItemWithXattrs()162     std::unique_ptr<Item> makeItemWithXattrs() {
163         std::string valueData = R"({"json":"yes"})";
164         std::string data = createXattrValue(valueData);
165         protocol_binary_datatype_t datatype = (PROTOCOL_BINARY_DATATYPE_JSON |
166                                                PROTOCOL_BINARY_DATATYPE_XATTR);
167         return std::make_unique<Item>(makeStoredDocKey("key"),
168                                       /*flags*/0,
169                                       /*exp*/0,
170                                       data.c_str(),
171                                       data.size(),
172                                       datatype);
173     }
174 
175     /*
176      * Creates an item with the key \"key\", containing json data and no xattrs.
177      * @return a unique_ptr to a newly created item.
178      */
makeItemWithoutXattrs()179     std::unique_ptr<Item> makeItemWithoutXattrs() {
180             std::string valueData = R"({"json":"yes"})";
181             protocol_binary_datatype_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
182             return std::make_unique<Item>(makeStoredDocKey("key"),
183                                           /*flags*/0,
184                                           /*exp*/0,
185                                           valueData.c_str(),
186                                           valueData.size(),
187                                           datatype);
188     }
189 
190     /* Add items onto the vbucket and wait for the checkpoint to be removed */
addItemsAndRemoveCheckpoint(int numItems)191     void addItemsAndRemoveCheckpoint(int numItems) {
192         for (int i = 0; i < numItems; ++i) {
193             std::string key("key" + std::to_string(i));
194             store_item(vbid, key, "value");
195         }
196         removeCheckpoint(numItems);
197     }
198 
removeCheckpoint(int numItems)199     void removeCheckpoint(int numItems) {
200         /* Create new checkpoint so that we can remove the current checkpoint
201            and force a backfill in the DCP stream */
202         auto& ckpt_mgr = *vb0->checkpointManager;
203         ckpt_mgr.createNewCheckpoint();
204 
205         /* Wait for removal of the old checkpoint, this also would imply that
206            the items are persisted (in case of persistent buckets) */
207         {
208             bool new_ckpt_created;
209             std::chrono::microseconds uSleepTime(128);
210             while (static_cast<size_t>(numItems) !=
211                    ckpt_mgr.removeClosedUnrefCheckpoints(*vb0,
212                                                          new_ckpt_created)) {
213                 uSleepTime = decayingSleep(uSleepTime);
214             }
215         }
216     }
217 
218     std::shared_ptr<MockDcpProducer> producer;
219     std::shared_ptr<MockActiveStream> stream;
220     VBucketPtr vb0;
221 
222     /*
223      * Fake callback emulating dcp_add_failover_log
224      */
fakeDcpAddFailoverLog( vbucket_failover_t* entry, size_t nentries, gsl::not_null<const void*> cookie)225     static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(
226             vbucket_failover_t* entry,
227             size_t nentries,
228             gsl::not_null<const void*> cookie) {
229         callbackCount++;
230         return ENGINE_SUCCESS;
231     }
232 
233     // callbackCount needs to be static as its used inside of the static
234     // function fakeDcpAddFailoverLog.
235     static int callbackCount;
236 };
237 int DCPTest::callbackCount = 0;
238 
239 class StreamTest : public DCPTest,
240                    public ::testing::WithParamInterface<std::string> {
241 protected:
242     void SetUp() override {
243         bucketType = GetParam();
244         DCPTest::SetUp();
245         vb0 = engine->getVBucket(0);
246         EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
247     }
248 
249     void TearDown() override {
250         if (producer) {
251             producer->cancelCheckpointCreatorTask();
252         }
253         // Destroy various engine objects
254         vb0.reset();
255         stream.reset();
256         producer.reset();
257         DCPTest::TearDown();
258     }
259 };
260 
261 /*
262  * Test that when have a producer with IncludeValue and IncludeXattrs both set
263  * to No an active stream created via a streamRequest returns true for
264  * isKeyOnly.
265  */
TEST_P(StreamTest, test_streamIsKeyOnlyTrue)266 TEST_P(StreamTest, test_streamIsKeyOnlyTrue) {
267     setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::No);
268     uint64_t rollbackSeqno;
269     auto err = producer->streamRequest(/*flags*/ 0,
270                                        /*opaque*/ 0,
271                                        /*vbucket*/ 0,
272                                        /*start_seqno*/ 0,
273                                        /*end_seqno*/ 0,
274                                        /*vb_uuid*/ 0,
275                                        /*snap_start*/ 0,
276                                        /*snap_end*/ 0,
277                                        &rollbackSeqno,
278                                        DCPTest::fakeDcpAddFailoverLog);
279     ASSERT_EQ(ENGINE_SUCCESS, err)
280         << "stream request did not return ENGINE_SUCCESS";
281 
282     auto activeStream =
283             std::dynamic_pointer_cast<ActiveStream>(producer->findStream(0));
284     ASSERT_NE(nullptr, activeStream);
285     EXPECT_TRUE(activeStream->isKeyOnly());
286     destroy_dcp_stream();
287 }
288 
mock_mutation_return_engine_e2big( gsl::not_null<const void*> cookie, uint32_t opaque, item* itm, uint16_t vbucket, uint64_t by_seqno, uint64_t rev_seqno, uint32_t lock_time, const void* meta, uint16_t nmeta, uint8_t nru, uint8_t collection_len)289 ENGINE_ERROR_CODE mock_mutation_return_engine_e2big(
290         gsl::not_null<const void*> cookie,
291         uint32_t opaque,
292         item* itm,
293         uint16_t vbucket,
294         uint64_t by_seqno,
295         uint64_t rev_seqno,
296         uint32_t lock_time,
297         const void* meta,
298         uint16_t nmeta,
299         uint8_t nru,
300         uint8_t collection_len) {
301     Item* item = reinterpret_cast<Item*>(itm);
302     delete item;
303     return ENGINE_E2BIG;
304 }
305 
decompressValue(std::string compressedValue)306 std::string decompressValue(std::string compressedValue) {
307     cb::compression::Buffer buffer;
308     if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
309                                   compressedValue, buffer)) {
310         return {};
311     }
312 
313     return std::string(buffer.data(), buffer.size());
314 }
315 
316 extern std::string dcp_last_value;
317 extern uint32_t dcp_last_packet_size;
318 extern protocol_binary_datatype_t dcp_last_datatype;
319 
320 class CompressionStreamTest : public DCPTest,
321                               public ::testing::WithParamInterface<
322                                       ::testing::tuple<std::string, bool>> {
323 public:
324     void SetUp() override {
325         bucketType = ::testing::get<0>(GetParam());
326         DCPTest::SetUp();
327         vb0 = engine->getVBucket(0);
328         EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
329     }
330 
331     void TearDown() override {
332         if (producer) {
333             producer->cancelCheckpointCreatorTask();
334         }
335         // Destroy various engine objects
336         vb0.reset();
337         stream.reset();
338         producer.reset();
339         DCPTest::TearDown();
340     }
341 
isXattr() const342     bool isXattr() const {
343         return ::testing::get<1>(GetParam());
344     }
345 
getItemSize(Item& item)346     size_t getItemSize(Item& item) {
347         if (isXattr()) {
348             return MutationResponse::mutationBaseMsgBytes +
349                    item.getKey().size() +
350                    // DCP won't recompress the pruned document
351                    getXattrSize(false);
352         }
353         return MutationResponse::mutationBaseMsgBytes + item.getKey().size() +
354                item.getNBytes();
355     }
356 
getXattrSize(bool compressed) const357     size_t getXattrSize(bool compressed) const {
358         return createXattrValue({}, true, compressed).size();
359     }
360 };
361 
362 // Test the compression control error case
TEST_P(StreamTest, validate_compression_control_message_denied)363 TEST_P(StreamTest, validate_compression_control_message_denied) {
364     setup_dcp_stream();
365     std::string compressCtrlMsg("force_value_compression");
366     std::string compressCtrlValue("true");
367     EXPECT_FALSE(producer->isCompressionEnabled());
368 
369     // Sending a control message without actually enabling SNAPPY must fail
370     EXPECT_EQ(ENGINE_EINVAL,
371               producer->control(0,
372                                 compressCtrlMsg.c_str(),
373                                 compressCtrlMsg.size(),
374                                 compressCtrlValue.c_str(),
375                                 compressCtrlValue.size()));
376     destroy_dcp_stream();
377 }
378 
379 // Test the compression control success case
TEST_P(StreamTest, validate_compression_control_message_allowed)380 TEST_P(StreamTest, validate_compression_control_message_allowed) {
381     // For success enable the snappy datatype on the connection
382     mock_set_datatype_support(cookie, PROTOCOL_BINARY_DATATYPE_SNAPPY);
383     setup_dcp_stream();
384     std::string compressCtrlMsg("force_value_compression");
385     std::string compressCtrlValue("true");
386     EXPECT_TRUE(producer->isCompressionEnabled());
387 
388     // Sending a control message after enabling SNAPPY should succeed
389     EXPECT_EQ(ENGINE_SUCCESS,
390               producer->control(0,
391                                 compressCtrlMsg.c_str(),
392                                 compressCtrlMsg.size(),
393                                 compressCtrlValue.c_str(),
394                                 compressCtrlValue.size()));
395     destroy_dcp_stream();
396 }
397 
398 /**
399  * Test to verify DCP compression/decompression. There are 4 cases that are being
400  * tested
401  *
402  * 1. Add a compressed item and stream a compressed item
403  * 2. Add an uncompressed item and stream a compressed item
404  * 3. Add a compressed item and stream an uncompressed item
405  * 4. Add an uncompressed item and stream an uncompressed item
406  */
407 
408 /**
409  * There are 2 cases that are
410  * being tested in this test. This test uses a producer/connection without
411  * compression enabled
412  *
413  * 1. Add a compressed item and expect to stream an uncompressed item
414  * 2. Add an uncompressed item and expect to stream an uncompressed item
415  *
416  */
TEST_P(CompressionStreamTest, compression_not_enabled)417 TEST_P(CompressionStreamTest, compression_not_enabled) {
418     VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
419     std::string valueData("{\"product\": \"car\",\"price\": \"100\"},"
420                           "{\"product\": \"bus\",\"price\": \"1000\"},"
421                           "{\"product\": \"Train\",\"price\": \"100000\"}");
422     auto item1 = makeCompressibleItem(vbid,
423                                       makeStoredDocKey("key1"),
424                                       valueData,
425                                       PROTOCOL_BINARY_DATATYPE_JSON,
426                                       true, // compressed
427                                       isXattr());
428     auto item2 = makeCompressibleItem(vbid,
429                                       makeStoredDocKey("key2"),
430                                       valueData,
431                                       PROTOCOL_BINARY_DATATYPE_JSON,
432                                       false, // uncompressed
433                                       isXattr());
434 
435     auto includeValue = isXattr() ? IncludeValue::No : IncludeValue::Yes;
436     setup_dcp_stream(0, includeValue, IncludeXattrs::Yes);
437 
438     /**
439      * Ensure that compression is disabled
440      */
441     ASSERT_FALSE(producer->isCompressionEnabled());
442 
443     auto producers = get_dcp_producers(reinterpret_cast<ENGINE_HANDLE*>(engine),
444                                        reinterpret_cast<ENGINE_HANDLE_V1*>(engine));
445 
446     // Now, add 2 items
447     EXPECT_EQ(ENGINE_SUCCESS, engine->getKVBucket()->set(*item1, cookie));
448     EXPECT_EQ(ENGINE_SUCCESS, engine->getKVBucket()->set(*item2, cookie));
449 
450     auto keyAndSnappyValueMessageSize = getItemSize(*item1);
451 
452     /**
453      * Create a DCP response and check that a new item isn't created and that
454      * the size of the response message is greater than the size of the original
455      * message (or equal for xattr stream)
456      */
457     queued_item qi(std::move(item1));
458     std::unique_ptr<DcpResponse> dcpResponse = stream->public_makeResponseFromItem(qi);
459     auto mutProdResponse = dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
460     ASSERT_NE(qi.get(), mutProdResponse->getItem().get());
461     if (isXattr()) {
462         // The same sizes. makeResponseFromItem will have inflated and not
463         // compressed as part of the value pruning
464         EXPECT_EQ(keyAndSnappyValueMessageSize, dcpResponse->getMessageSize());
465     } else {
466         EXPECT_LT(keyAndSnappyValueMessageSize, dcpResponse->getMessageSize());
467     }
468 
469     uint64_t rollbackSeqno;
470     EXPECT_EQ(ENGINE_SUCCESS,
471               producer->streamRequest(/*flags*/ 0,
472                                       /*opaque*/ 0,
473                                       /*vbucket*/ 0,
474                                       /*start_seqno*/ 0,
475                                       /*end_seqno*/ ~0,
476                                       /*vb_uuid*/ 0,
477                                       /*snap_start*/ 0,
478                                       /*snap_end*/ ~0,
479                                       &rollbackSeqno,
480                                       DCPTest::fakeDcpAddFailoverLog));
481 
482     producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
483     ASSERT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
484     ASSERT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
485     producer->getCheckpointSnapshotTask().run();
486 
487     /* Stream the snapshot marker first */
488     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
489     EXPECT_EQ(0, producer->getItemsSent());
490 
491     /* Stream the first mutation */
492     protocol_binary_datatype_t expectedDataType =
493             isXattr() ? PROTOCOL_BINARY_DATATYPE_XATTR
494                       : PROTOCOL_BINARY_DATATYPE_JSON;
495     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
496     std::string value(qi->getValue()->getData(), qi->getValue()->valueSize());
497     EXPECT_STREQ(dcp_last_value.c_str(), decompressValue(value).c_str());
498 
499     if (isXattr()) {
500         // The pruned packet won't be recompressed
501         EXPECT_EQ(dcp_last_packet_size, keyAndSnappyValueMessageSize);
502     } else {
503         EXPECT_GT(dcp_last_packet_size, keyAndSnappyValueMessageSize);
504     }
505 
506     EXPECT_FALSE(mcbp::datatype::is_snappy(dcp_last_datatype));
507     EXPECT_EQ(expectedDataType, dcp_last_datatype);
508 
509     /**
510      * Create a DCP response and check that a new item is created and
511      * the message size is less than the size of original item
512      */
513     uint32_t keyAndValueMessageSize = getItemSize(*item2);
514     qi.reset(std::move(item2));
515     dcpResponse = stream->public_makeResponseFromItem(qi);
516     mutProdResponse = dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
517 
518     // A new pruned item will always be generated
519     if (!isXattr()) {
520         ASSERT_EQ(qi.get(), mutProdResponse->getItem().get());
521     }
522     EXPECT_EQ(dcpResponse->getMessageSize(), keyAndValueMessageSize);
523 
524     /* Stream the second mutation */
525     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
526 
527     value.assign(qi->getValue()->getData(), qi->getValue()->valueSize());
528     EXPECT_STREQ(value.c_str(), dcp_last_value.c_str());
529     EXPECT_EQ(dcp_last_packet_size, keyAndValueMessageSize);
530 
531     EXPECT_FALSE(mcbp::datatype::is_snappy(dcp_last_datatype));
532     EXPECT_EQ(expectedDataType, dcp_last_datatype);
533 }
534 
535 /**
536  * Test to verify DCP compression, this test has client snappy enabled
537  *
538  *  - Add a compressed item and expect we stream a compressed item
539  *
540  * Note when the test is running xattr-only DCP, expect we stream an
541  * uncompressed item
542  */
TEST_P(CompressionStreamTest, connection_snappy_enabled)543 TEST_P(CompressionStreamTest, connection_snappy_enabled) {
544     VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
545     std::string valueData(
546             "{\"product\": \"car\",\"price\": \"100\"},"
547             "{\"product\": \"bus\",\"price\": \"1000\"},"
548             "{\"product\": \"Train\",\"price\": \"100000\"}");
549 
550     auto item = makeCompressibleItem(vbid,
551                                      makeStoredDocKey("key"),
552                                      valueData,
553                                      PROTOCOL_BINARY_DATATYPE_JSON,
554                                      true, // compressed
555                                      isXattr());
556 
557     //Enable the snappy datatype on the connection
558     mock_set_datatype_support(cookie, PROTOCOL_BINARY_DATATYPE_SNAPPY);
559 
560     auto includeValue = isXattr() ? IncludeValue::No : IncludeValue::Yes;
561     setup_dcp_stream(0, includeValue, IncludeXattrs::Yes);
562 
563     uint64_t rollbackSeqno;
564     EXPECT_EQ(ENGINE_SUCCESS,
565               producer->streamRequest(/*flags*/ 0,
566                                       /*opaque*/ 0,
567                                       /*vbucket*/ 0,
568                                       /*start_seqno*/ 0,
569                                       /*end_seqno*/ ~0,
570                                       /*vb_uuid*/ 0,
571                                       /*snap_start*/ 0,
572                                       /*snap_end*/ ~0,
573                                       &rollbackSeqno,
574                                       DCPTest::fakeDcpAddFailoverLog));
575 
576     auto producers =
577             get_dcp_producers(reinterpret_cast<ENGINE_HANDLE*>(engine),
578                               reinterpret_cast<ENGINE_HANDLE_V1*>(engine));
579     ASSERT_TRUE(producer->isCompressionEnabled());
580 
581     // Now, add the 3rd item. This item should be compressed
582     EXPECT_EQ(ENGINE_SUCCESS, engine->getKVBucket()->set(*item, cookie));
583 
584     producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
585     ASSERT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
586     ASSERT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
587     producer->getCheckpointSnapshotTask().run();
588 
589     /* Stream the snapshot marker */
590     ASSERT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
591 
592     /* Stream the 3rd mutation */
593     ASSERT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
594 
595     /**
596      * Create a DCP response and check that a new item is created and
597      * the message size is greater than the size of original item
598      */
599     auto keyAndSnappyValueMessageSize = getItemSize(*item);
600     queued_item qi = std::move(item);
601     auto dcpResponse = stream->public_makeResponseFromItem(qi);
602     auto* mutProdResponse =
603             dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
604     std::string value;
605     if (!isXattr()) {
606         ASSERT_EQ(qi.get(), mutProdResponse->getItem().get());
607         value.assign(qi->getValue()->getData(), qi->getValue()->valueSize());
608     }
609 
610     EXPECT_STREQ(dcp_last_value.c_str(), value.c_str());
611     EXPECT_EQ(dcpResponse->getMessageSize(), keyAndSnappyValueMessageSize);
612 
613     EXPECT_EQ(dcp_last_packet_size, keyAndSnappyValueMessageSize);
614 
615     // If xattr-only enabled on DCP, we won't re-compress (after we've
616     // decompressed the document and split out the xattrs)
617     protocol_binary_datatype_t snappy =
618             isXattr() ? 0 : PROTOCOL_BINARY_DATATYPE_SNAPPY;
619     protocol_binary_datatype_t expectedDataType =
620             isXattr() ? PROTOCOL_BINARY_DATATYPE_XATTR
621                       : PROTOCOL_BINARY_DATATYPE_JSON;
622     EXPECT_EQ((expectedDataType | snappy), dcp_last_datatype);
623 }
624 
625 /**
626  * Test to verify DCP compression, this test has client snappy enabled
627  *
628  *  - Add an uncompressed item and expect we stream a compressed item
629  */
TEST_P(CompressionStreamTest, force_value_compression_enabled)630 TEST_P(CompressionStreamTest, force_value_compression_enabled) {
631     VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
632     std::string valueData(
633             "{\"product\": \"car\",\"price\": \"100\"},"
634             "{\"product\": \"bus\",\"price\": \"1000\"},"
635             "{\"product\": \"Train\",\"price\": \"100000\"}");
636 
637     auto item = makeCompressibleItem(vbid,
638                                      makeStoredDocKey("key"),
639                                      valueData,
640                                      PROTOCOL_BINARY_DATATYPE_JSON,
641                                      false, // not compressed
642                                      isXattr());
643 
644     // Enable the snappy datatype on the connection
645     mock_set_datatype_support(cookie, PROTOCOL_BINARY_DATATYPE_SNAPPY);
646     auto includeValue = isXattr() ? IncludeValue::No : IncludeValue::Yes;
647 
648     // Setup the producer/stream and request force_value_compression
649     setup_dcp_stream(0,
650                      includeValue,
651                      IncludeXattrs::Yes,
652                      {{"force_value_compression", "true"}});
653 
654     uint64_t rollbackSeqno;
655     EXPECT_EQ(ENGINE_SUCCESS,
656               producer->streamRequest(/*flags*/ 0,
657                                       /*opaque*/ 0,
658                                       /*vbucket*/ 0,
659                                       /*start_seqno*/ 0,
660                                       /*end_seqno*/ ~0,
661                                       /*vb_uuid*/ 0,
662                                       /*snap_start*/ 0,
663                                       /*snap_end*/ ~0,
664                                       &rollbackSeqno,
665                                       DCPTest::fakeDcpAddFailoverLog));
666     auto producers =
667             get_dcp_producers(reinterpret_cast<ENGINE_HANDLE*>(engine),
668                               reinterpret_cast<ENGINE_HANDLE_V1*>(engine));
669 
670     ASSERT_TRUE(producer->isForceValueCompressionEnabled());
671 
672     // Now, add the 4th item, which is not compressed
673     EXPECT_EQ(ENGINE_SUCCESS, engine->getKVBucket()->set(*item, cookie));
674     /**
675      * Create a DCP response and check that a new item is created and
676      * the message size is less than the size of the original item
677      */
678     auto keyAndValueMessageSize = getItemSize(*item);
679     queued_item qi = std::move(item);
680     auto dcpResponse = stream->public_makeResponseFromItem(qi);
681     auto* mutProdResponse =
682             dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
683     ASSERT_NE(qi.get(), mutProdResponse->getItem().get());
684     EXPECT_LT(dcpResponse->getMessageSize(), keyAndValueMessageSize);
685 
686     producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
687     ASSERT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
688     ASSERT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
689     producer->getCheckpointSnapshotTask().run();
690 
691     /* Stream the snapshot marker */
692     ASSERT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
693 
694     /* Stream the mutation */
695     ASSERT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
696     std::string value(qi->getValue()->getData(), qi->getValue()->valueSize());
697     EXPECT_STREQ(decompressValue(dcp_last_value).c_str(), value.c_str());
698     EXPECT_LT(dcp_last_packet_size, keyAndValueMessageSize);
699 
700     protocol_binary_datatype_t expectedDataType =
701             isXattr() ? PROTOCOL_BINARY_DATATYPE_XATTR
702                       : PROTOCOL_BINARY_DATATYPE_JSON;
703     EXPECT_EQ((expectedDataType | PROTOCOL_BINARY_DATATYPE_SNAPPY),
704               dcp_last_datatype);
705 
706     destroy_dcp_stream();
707 }
708 
709 /*
710  * Test to verify the number of items, total bytes sent and total data size
711  * by the producer when DCP compression is enabled
712  */
TEST_P(StreamTest, test_verifyProducerCompressionStats)713 TEST_P(StreamTest, test_verifyProducerCompressionStats) {
714     VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
715     setup_dcp_stream();
716     std::string compressibleValue("{\"product\": \"car\",\"price\": \"100\"},"
717                                   "{\"product\": \"bus\",\"price\": \"1000\"},"
718                                   "{\"product\": \"Train\",\"price\": \"100000\"}");
719     std::string regularValue("{\"product\": \"car\",\"price\": \"100\"}");
720 
721     std::string compressCtrlMsg("force_value_compression");
722     std::string compressCtrlValue("true");
723 
724     mock_set_datatype_support(producer->getCookie(), PROTOCOL_BINARY_DATATYPE_SNAPPY);
725 
726     ASSERT_EQ(ENGINE_SUCCESS, producer->control(0, compressCtrlMsg.c_str(),
727                                                 compressCtrlMsg.size(),
728                                                 compressCtrlValue.c_str(),
729                                                 compressCtrlValue.size()));
730     ASSERT_TRUE(producer->isForceValueCompressionEnabled());
731 
732     store_item(vbid, "key1", compressibleValue.c_str());
733     store_item(vbid, "key2", regularValue.c_str());
734     store_item(vbid, "key3", compressibleValue.c_str());
735 
736     auto producers = get_dcp_producers(reinterpret_cast<ENGINE_HANDLE*>(engine),
737                                        reinterpret_cast<ENGINE_HANDLE_V1*>(engine));
738 
739     uint64_t rollbackSeqno;
740     auto err = producer->streamRequest(/*flags*/ 0,
741                                        /*opaque*/ 0,
742                                        /*vbucket*/ 0,
743                                        /*start_seqno*/ 0,
744                                        /*end_seqno*/ ~0,
745                                        /*vb_uuid*/ 0,
746                                        /*snap_start*/ 0,
747                                        /*snap_end*/ ~0,
748                                        &rollbackSeqno,
749                                        DCPTest::fakeDcpAddFailoverLog);
750 
751     ASSERT_EQ(ENGINE_SUCCESS, err);
752     producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
753 
754     ASSERT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
755     ASSERT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
756     producer->getCheckpointSnapshotTask().run();
757 
758     /* Stream the snapshot marker first */
759     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
760     EXPECT_EQ(0, producer->getItemsSent());
761 
762     uint64_t totalBytesSent = producer->getTotalBytesSent();
763     uint64_t totalUncompressedDataSize = producer->getTotalUncompressedDataSize();
764     EXPECT_GT(totalBytesSent, 0);
765     EXPECT_GT(totalUncompressedDataSize, 0);
766 
767     /* Stream the first mutation. This should increment the
768      * number of items, total bytes sent and total data size.
769      * Since this is a compressible document, the total bytes
770      * sent should be incremented by a lesser value than the
771      * total data size.
772      */
773     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
774     EXPECT_EQ(1, producer->getItemsSent());
775     EXPECT_GT(producer->getTotalBytesSent(), totalBytesSent);
776     EXPECT_GT(producer->getTotalUncompressedDataSize(), totalUncompressedDataSize);
777     EXPECT_LT(producer->getTotalBytesSent() - totalBytesSent,
778               producer->getTotalUncompressedDataSize() - totalUncompressedDataSize);
779 
780     totalBytesSent = producer->getTotalBytesSent();
781     totalUncompressedDataSize = producer->getTotalUncompressedDataSize();
782 
783     /*
784      * Now stream the second mutation. This should increment the
785      * number of items and the total bytes sent. In this case,
786      * the total data size should be incremented by exactly the
787      * same amount as the total bytes sent
788      */
789     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
790     EXPECT_EQ(2, producer->getItemsSent());
791     EXPECT_GT(producer->getTotalBytesSent(), totalBytesSent);
792     EXPECT_GT(producer->getTotalUncompressedDataSize(), totalUncompressedDataSize);
793     EXPECT_EQ(producer->getTotalBytesSent() - totalBytesSent,
794               producer->getTotalUncompressedDataSize() - totalUncompressedDataSize);
795 
796     totalBytesSent = producer->getTotalBytesSent();
797     totalUncompressedDataSize = producer->getTotalUncompressedDataSize();
798 
799     /*
800      * Disable value compression on the producer side and stream a
801      * compressible document. This should result in an increase in
802      * total bytes. Even though the document is compressible, the
803      * total data size and the total bytes sent would be incremented
804      * by exactly the same amount
805      */
806     compressCtrlValue.assign("false");
807     ASSERT_EQ(ENGINE_SUCCESS, producer->control(0, compressCtrlMsg.c_str(),
808                                                 compressCtrlMsg.size(),
809                                                 compressCtrlValue.c_str(),
810                                                 compressCtrlValue.size()));
811     mock_set_datatype_support(producer->getCookie(),
812                               PROTOCOL_BINARY_RAW_BYTES);
813 
814     ASSERT_FALSE(producer->isCompressionEnabled());
815     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
816     EXPECT_EQ(3, producer->getItemsSent());
817     EXPECT_GT(producer->getTotalBytesSent(), totalBytesSent);
818     EXPECT_GT(producer->getTotalUncompressedDataSize(), totalUncompressedDataSize);
819     EXPECT_EQ(producer->getTotalBytesSent() - totalBytesSent,
820               producer->getTotalUncompressedDataSize() - totalUncompressedDataSize);
821 
822     destroy_dcp_stream();
823 }
824 
825 /*
826  * Test to verify the number of items and the total bytes sent
827  * by the producer under normal and error conditions
828  */
TEST_P(StreamTest, test_verifyProducerStats)829 TEST_P(StreamTest, test_verifyProducerStats) {
830     VBucketPtr vb = engine->getKVBucket()->getVBucket(vbid);
831     setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::No);
832     store_item(vbid, "key1", "value1");
833     store_item(vbid, "key2", "value2");
834     auto producers = get_dcp_producers(reinterpret_cast<ENGINE_HANDLE*>(engine),
835                                        reinterpret_cast<ENGINE_HANDLE_V1*>(engine));
836     uint64_t rollbackSeqno;
837     auto err = producer->streamRequest(/*flags*/ 0,
838                                        /*opaque*/ 0,
839                                        /*vbucket*/ 0,
840                                        /*start_seqno*/ 0,
841                                        /*end_seqno*/ ~0,
842                                        /*vb_uuid*/ 0,
843                                        /*snap_start*/ 0,
844                                        /*snap_end*/ ~0,
845                                        &rollbackSeqno,
846                                        DCPTest::fakeDcpAddFailoverLog);
847 
848     EXPECT_EQ(ENGINE_SUCCESS, err);
849     producer->notifySeqnoAvailable(vbid, vb->getHighSeqno());
850     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
851 
852     EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
853 
854     producer->getCheckpointSnapshotTask().run();
855 
856     /* Stream the snapshot marker first */
857     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
858     EXPECT_EQ(0, producer->getItemsSent());
859 
860     uint64_t totalBytes = producer->getTotalBytesSent();
861     EXPECT_GT(totalBytes, 0);
862 
863     /* Stream the first mutation. This should increment the
864      * number of items and the total bytes sent.
865      */
866     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
867     EXPECT_EQ(1, producer->getItemsSent());
868     EXPECT_GT(producer->getTotalBytesSent(), totalBytes);
869     totalBytes = producer->getTotalBytesSent();
870 
871     /* Now simulate a failure while trying to stream the next
872      * mutation.
873      */
874     auto mutation_callback = producers->mutation;
875     producers->mutation = mock_mutation_return_engine_e2big;
876 
877     EXPECT_EQ(ENGINE_E2BIG, producer->step(producers.get()));
878 
879     /* The number of items total bytes sent should remain the same */
880     EXPECT_EQ(1, producer->getItemsSent());
881     EXPECT_EQ(producer->getTotalBytesSent(), totalBytes);
882     totalBytes = producer->getTotalBytesSent();
883 
884     /* Now stream the mutation again and the stats should have incremented */
885     producers->mutation = mutation_callback;
886 
887     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
888     EXPECT_EQ(2, producer->getItemsSent());
889     EXPECT_GT(producer->getTotalBytesSent(), totalBytes);
890 
891     destroy_dcp_stream();
892 }
893 
894 /*
895  * Test that when have a producer with IncludeValue set to Yes and IncludeXattrs
896  * set to No an active stream created via a streamRequest returns false for
897  * isKeyOnly.
898  */
TEST_P(StreamTest, test_streamIsKeyOnlyFalseBecauseOfIncludeValue)899 TEST_P(StreamTest, test_streamIsKeyOnlyFalseBecauseOfIncludeValue) {
900     setup_dcp_stream(0, IncludeValue::Yes, IncludeXattrs::No);
901     uint64_t rollbackSeqno;
902     auto err = producer->streamRequest(/*flags*/ 0,
903                                        /*opaque*/ 0,
904                                        /*vbucket*/ 0,
905                                        /*start_seqno*/ 0,
906                                        /*end_seqno*/ 0,
907                                        /*vb_uuid*/ 0,
908                                        /*snap_start*/ 0,
909                                        /*snap_end*/ 0,
910                                        &rollbackSeqno,
911                                        DCPTest::fakeDcpAddFailoverLog);
912     ASSERT_EQ(ENGINE_SUCCESS, err)
913         << "stream request did not return ENGINE_SUCCESS";
914 
915     auto activeStream =
916             std::dynamic_pointer_cast<ActiveStream>(producer->findStream(0));
917     ASSERT_NE(nullptr, activeStream);
918     EXPECT_FALSE(activeStream->isKeyOnly());
919     destroy_dcp_stream();
920 }
921 
922 /*
923  * Test that when have a producer with IncludeValue set to No and IncludeXattrs
924  * set to Yes an active stream created via a streamRequest returns false for
925  * isKeyOnly.
926  */
TEST_P(StreamTest, test_streamIsKeyOnlyFalseBecauseOfIncludeXattrs)927 TEST_P(StreamTest, test_streamIsKeyOnlyFalseBecauseOfIncludeXattrs) {
928     setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::Yes);
929     uint64_t rollbackSeqno;
930     auto err = producer->streamRequest(/*flags*/ 0,
931                                        /*opaque*/ 0,
932                                        /*vbucket*/ 0,
933                                        /*start_seqno*/ 0,
934                                        /*end_seqno*/ 0,
935                                        /*vb_uuid*/ 0,
936                                        /*snap_start*/ 0,
937                                        /*snap_end*/ 0,
938                                        &rollbackSeqno,
939                                        DCPTest::fakeDcpAddFailoverLog);
940     ASSERT_EQ(ENGINE_SUCCESS, err)
941         << "stream request did not return ENGINE_SUCCESS";
942 
943     auto activeStream =
944             std::dynamic_pointer_cast<ActiveStream>(producer->findStream(0));
945     ASSERT_NE(nullptr, activeStream);
946     EXPECT_FALSE(activeStream->isKeyOnly());
947     destroy_dcp_stream();
948 }
949 
950 /*
951  * Test for a dcpResponse retrieved from a stream where IncludeValue and
952  * IncludeXattrs are both No, that the message size does not include the size of
953  * the body.
954  */
TEST_P(StreamTest, test_keyOnlyMessageSize)955 TEST_P(StreamTest, test_keyOnlyMessageSize) {
956     auto item = makeItemWithXattrs();
957     auto keyOnlyMessageSize = MutationResponse::mutationBaseMsgBytes +
958             item->getKey().size();
959     queued_item qi(std::move(item));
960 
961     setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::No);
962     std::unique_ptr<DcpResponse> dcpResponse =
963             stream->public_makeResponseFromItem(qi);
964 
965     /**
966      * Create a DCP response and check that a new item is created
967      */
968     auto mutProdResponse = dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
969     ASSERT_NE(qi.get(), mutProdResponse->getItem().get());
970 
971     EXPECT_EQ(keyOnlyMessageSize, dcpResponse->getMessageSize());
972     destroy_dcp_stream();
973 }
974 
975 /*
976  * Test for a dcpResponse retrieved from a stream where IncludeValue and
977  * IncludeXattrs are both Yes, that the message size includes the size of the
978  * body.
979  */
TEST_P(StreamTest, test_keyValueAndXattrsMessageSize)980 TEST_P(StreamTest, test_keyValueAndXattrsMessageSize) {
981     auto item = makeItemWithXattrs();
982     auto keyAndValueMessageSize = MutationResponse::mutationBaseMsgBytes +
983             item->getKey().size() + item->getNBytes();
984     queued_item qi(std::move(item));
985 
986     setup_dcp_stream(0, IncludeValue::Yes, IncludeXattrs::Yes);
987     std::unique_ptr<DcpResponse> dcpResponse =
988             stream->public_makeResponseFromItem(qi);
989 
990     /**
991      * Create a DCP response and check that a new item is not created
992      */
993     auto mutProdResponse = dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
994     ASSERT_EQ(qi.get(), mutProdResponse->getItem().get());
995     EXPECT_EQ(keyAndValueMessageSize, dcpResponse->getMessageSize());
996     destroy_dcp_stream();
997 }
998 
999 /*
1000  * Test for a dcpResponse retrieved from a stream where IncludeValue and
1001  * IncludeXattrs are both Yes, however the document does not have any xattrs
1002  * and so the message size should equal the size of the value.
1003  */
TEST_P(StreamTest, test_keyAndValueMessageSize)1004 TEST_P(StreamTest, test_keyAndValueMessageSize) {
1005     auto item = makeItemWithoutXattrs();
1006     auto keyAndValueMessageSize = MutationResponse::mutationBaseMsgBytes +
1007             item->getKey().size() + item->getNBytes();
1008     queued_item qi(std::move(item));
1009 
1010     setup_dcp_stream(0, IncludeValue::Yes, IncludeXattrs::Yes);
1011     std::unique_ptr<DcpResponse> dcpResponse =
1012             stream->public_makeResponseFromItem(qi);
1013 
1014     /**
1015      * Create a DCP response and check that a new item is not created
1016      */
1017     auto mutProdResponse = dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
1018     ASSERT_EQ(qi.get(), mutProdResponse->getItem().get());
1019     EXPECT_EQ(keyAndValueMessageSize, dcpResponse->getMessageSize());
1020     destroy_dcp_stream();
1021 }
1022 
1023 /*
1024  * Test for a dcpResponse retrieved from a stream where IncludeValue is Yes and
1025  * IncludeXattrs is No, that the message size includes the size of only the
1026  * value (excluding the xattrs).
1027  */
TEST_P(StreamTest, test_keyAndValueExcludingXattrsMessageSize)1028 TEST_P(StreamTest, test_keyAndValueExcludingXattrsMessageSize) {
1029     auto item = makeItemWithXattrs();
1030     auto root = const_cast<char*>(item->getData());
1031     cb::byte_buffer buffer{(uint8_t*)root, item->getValue()->valueSize()};
1032     auto sz = cb::xattr::get_body_offset({
1033            reinterpret_cast<char*>(buffer.buf), buffer.len});
1034     auto keyAndValueMessageSize = MutationResponse::mutationBaseMsgBytes +
1035             item->getKey().size() + item->getNBytes() - sz;
1036     queued_item qi(std::move(item));
1037 
1038     setup_dcp_stream(0, IncludeValue::Yes, IncludeXattrs::No);
1039     std::unique_ptr<DcpResponse> dcpResponse =
1040             stream->public_makeResponseFromItem(qi);
1041 
1042     /**
1043      * Create a DCP response and check that a new item is created
1044      */
1045     auto mutProdResponse = dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
1046     ASSERT_NE(qi.get(), mutProdResponse->getItem().get());
1047     EXPECT_EQ(keyAndValueMessageSize, dcpResponse->getMessageSize());
1048     destroy_dcp_stream();
1049 }
1050 
1051 /*
1052  * Test for a dcpResponse retrieved from a stream where IncludeValue is Yes and
1053  * IncludeXattrs are No, and the document does not have any xattrs.  So again
1054  * the message size should equal the size of the value.
1055  */
TEST_P(StreamTest, test_keyAndValueExcludingXattrsAndNotContainXattrMessageSize)1056 TEST_P(StreamTest,
1057        test_keyAndValueExcludingXattrsAndNotContainXattrMessageSize) {
1058     auto item = makeItemWithoutXattrs();
1059     auto keyAndValueMessageSize = MutationResponse::mutationBaseMsgBytes +
1060             item->getKey().size() + item->getNBytes();
1061     queued_item qi(std::move(item));
1062 
1063     setup_dcp_stream(0, IncludeValue::Yes, IncludeXattrs::No);
1064     std::unique_ptr<DcpResponse> dcpResponse =
1065             stream->public_makeResponseFromItem(qi);
1066     /**
1067      * Create a DCP response and check that a new item is not created
1068      */
1069     auto mutProdResponse = dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
1070     ASSERT_EQ(qi.get(), mutProdResponse->getItem().get());
1071     EXPECT_EQ(keyAndValueMessageSize, dcpResponse->getMessageSize());
1072     destroy_dcp_stream();
1073 }
1074 
1075 /*
1076  * Test for a dcpResponse retrieved from a stream where IncludeValue is No and
1077  * IncludeXattrs is Yes, that the message size includes the size of only the
1078  * xattrs (excluding the value).
1079  */
TEST_P(StreamTest, test_keyAndValueExcludingValueDataMessageSize)1080 TEST_P(StreamTest, test_keyAndValueExcludingValueDataMessageSize) {
1081     auto item = makeItemWithXattrs();
1082     auto root = const_cast<char*>(item->getData());
1083     cb::byte_buffer buffer{(uint8_t*)root, item->getValue()->valueSize()};
1084     auto sz = cb::xattr::get_body_offset({
1085            reinterpret_cast<char*>(buffer.buf), buffer.len});
1086     auto keyAndValueMessageSize = MutationResponse::mutationBaseMsgBytes +
1087             item->getKey().size() + sz;
1088     queued_item qi(std::move(item));
1089 
1090     setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::Yes);
1091     std::unique_ptr<DcpResponse> dcpResponse =
1092             stream->public_makeResponseFromItem(qi);
1093 
1094     /**
1095      * Create a DCP response and check that a new item is created
1096      */
1097     auto mutProdResponse = dynamic_cast<MutationProducerResponse*>(dcpResponse.get());
1098     ASSERT_NE(qi.get(), mutProdResponse->getItem().get());
1099     EXPECT_EQ(keyAndValueMessageSize, dcpResponse->getMessageSize());
1100     destroy_dcp_stream();
1101 }
1102 
1103 /* MB-24159 - Test to confirm a dcp stream backfill from an ephemeral bucket
1104  * over a range which includes /no/ items doesn't cause the producer to
1105  * segfault.
1106  */
1107 
TEST_P(StreamTest, backfillGetsNoItems)1108 TEST_P(StreamTest, backfillGetsNoItems) {
1109     if (engine->getConfiguration().getBucketType() == "ephemeral") {
1110         setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::No);
1111         store_item(vbid, "key", "value1");
1112         store_item(vbid, "key", "value2");
1113 
1114         auto evb = std::shared_ptr<EphemeralVBucket>(
1115                 std::dynamic_pointer_cast<EphemeralVBucket>(vb0));
1116         auto dcpbfm = DCPBackfillMemory(evb, stream, 1, 1);
1117         dcpbfm.run();
1118         destroy_dcp_stream();
1119     }
1120 }
1121 
1122 /* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
1123  * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
1124  * doesn't incorrectly return false (meaning that there are no more checkpoint
1125  * items to send).
1126  */
TEST_P(StreamTest, test_mb17766)1127 TEST_P(StreamTest, test_mb17766) {
1128     // Add an item.
1129     store_item(vbid, "key", "value");
1130 
1131     setup_dcp_stream();
1132 
1133     // Should start with nextCheckpointItem() returning true.
1134     EXPECT_TRUE(stream->public_nextCheckpointItem())
1135             << "nextCheckpointItem() should initially be true.";
1136 
1137     // Get the set of outstanding items
1138     auto items = stream->public_getOutstandingItems(*vb0);
1139 
1140     // REGRESSION CHECK: nextCheckpointItem() should still return true
1141     EXPECT_TRUE(stream->public_nextCheckpointItem())
1142             << "nextCheckpointItem() after getting outstanding items should be "
1143                "true.";
1144 
1145     // Process the set of items
1146     stream->public_processItems(items);
1147 
1148     // Should finish with nextCheckpointItem() returning false.
1149     EXPECT_FALSE(stream->public_nextCheckpointItem())
1150             << "nextCheckpointItem() after processing items should be false.";
1151     destroy_dcp_stream();
1152 }
1153 
1154 // Check that the items remaining statistic is accurate and is unaffected
1155 // by de-duplication.
TEST_P(StreamTest, MB17653_ItemsRemaining)1156 TEST_P(StreamTest, MB17653_ItemsRemaining) {
1157     auto& manager =
1158             *(engine->getKVBucket()->getVBucket(vbid)->checkpointManager);
1159 
1160     ASSERT_EQ(0, manager.getNumOpenChkItems());
1161 
1162     // Create 10 mutations to the same key which, while increasing the high
1163     // seqno by 10 will result in de-duplication and hence only one actual
1164     // mutation being added to the checkpoint items.
1165     const int set_op_count = 10;
1166     for (unsigned int ii = 0; ii < set_op_count; ii++) {
1167         store_item(vbid, "key", "value");
1168     }
1169 
1170     ASSERT_EQ(1, manager.getNumOpenChkItems())
1171             << "Expected 1 items after population (set)";
1172 
1173     setup_dcp_stream();
1174 
1175     // Should start with one item remaining.
1176     EXPECT_EQ(1, stream->getItemsRemaining())
1177             << "Unexpected initial stream item count";
1178 
1179     // Populate the streams' ready queue with items from the checkpoint,
1180     // advancing the streams' cursor. Should result in no change in items
1181     // remaining (they still haven't been send out of the stream).
1182     stream->nextCheckpointItemTask();
1183     EXPECT_EQ(1, stream->getItemsRemaining())
1184             << "Mismatch after moving items to ready queue";
1185 
1186     // Add another mutation. As we have already iterated over all checkpoint
1187     // items and put into the streams' ready queue, de-duplication of this new
1188     // mutation (from the point of view of the stream) isn't possible, so items
1189     // remaining should increase by one.
1190     store_item(vbid, "key", "value");
1191     EXPECT_EQ(2, stream->getItemsRemaining())
1192             << "Mismatch after populating readyQ and storing 1 more item";
1193 
1194     // Now actually drain the items from the readyQ and see how many we received,
1195     // excluding meta items. This will result in all but one of the checkpoint
1196     // items (the one we added just above) being drained.
1197     std::unique_ptr<DcpResponse> response(stream->public_nextQueuedItem());
1198     ASSERT_NE(nullptr, response);
1199     EXPECT_TRUE(response->isMetaEvent()) << "Expected 1st item to be meta";
1200 
1201     response = stream->public_nextQueuedItem();
1202     ASSERT_NE(nullptr, response);
1203     EXPECT_FALSE(response->isMetaEvent()) << "Expected 2nd item to be non-meta";
1204 
1205     response = stream->public_nextQueuedItem();
1206     EXPECT_EQ(nullptr, response) << "Expected there to not be a 3rd item.";
1207 
1208     EXPECT_EQ(1, stream->getItemsRemaining()) << "Expected to have 1 item "
1209                                                  "remaining (in checkpoint) "
1210                                                  "after draining readyQ";
1211 
1212     // Add another 10 mutations on a different key. This should only result in
1213     // us having one more item (not 10) due to de-duplication in
1214     // checkpoints.
1215     for (unsigned int ii = 0; ii < set_op_count; ii++) {
1216         store_item(vbid, "key_2", "value");
1217     }
1218 
1219     EXPECT_EQ(2, stream->getItemsRemaining())
1220             << "Expected two items after adding 1 more to existing checkpoint";
1221 
1222     // Copy items into readyQ a second time, and drain readyQ so we should
1223     // have no items left.
1224     stream->nextCheckpointItemTask();
1225     do {
1226         response = stream->public_nextQueuedItem();
1227     } while (response);
1228     EXPECT_EQ(0, stream->getItemsRemaining()) << "Should have 0 items "
1229                                                  "remaining after advancing "
1230                                                  "cursor and draining readyQ";
1231     destroy_dcp_stream();
1232 }
1233 
1234 /* Stream items from a DCP backfill */
TEST_P(StreamTest, BackfillOnly)1235 TEST_P(StreamTest, BackfillOnly) {
1236     /* Add 3 items */
1237     int numItems = 3;
1238     addItemsAndRemoveCheckpoint(numItems);
1239 
1240     /* Set up a DCP stream for the backfill */
1241     setup_dcp_stream();
1242 
1243     /* We want the backfill task to run in a background thread */
1244     ExecutorPool::get()->setNumAuxIO(1);
1245     stream->transitionStateToBackfilling();
1246 
1247     // MB-27199: Just stir things up by doing some front-end ops whilst
1248     // backfilling. This would trigger a number of TSAN warnings
1249     std::thread thr([this]() {
1250         int i = 0;
1251         while (i < 100) {
1252             engine->get_and_touch(cookie, makeStoredDocKey("key1"), vbid, i);
1253             i++;
1254         }
1255     });
1256 
1257     // Ensure all GATs are done before evaluating the stream below
1258     thr.join();
1259 
1260     /* Wait for the backfill task to complete */
1261     {
1262         std::chrono::microseconds uSleepTime(128);
1263         while (stream->getLastReadSeqno() < numItems) {
1264             uSleepTime = decayingSleep(uSleepTime);
1265         }
1266     }
1267 
1268     /* Verify that all items are read in the backfill */
1269     EXPECT_EQ(numItems, stream->getNumBackfillItems());
1270 
1271     /* Since backfill items are sitting in the readyQ, check if the stat is
1272        updated correctly */
1273     EXPECT_EQ(numItems, stream->getNumBackfillItemsRemaining());
1274 
1275     destroy_dcp_stream();
1276     /* [TODO]: Expand the testcase to check if snapshot marker, all individual
1277                items are read correctly */
1278 }
1279 
1280 /* Negative test case that checks whether the stream gracefully goes to
1281    'dead' state upon disk backfill failure */
TEST_P(StreamTest, DiskBackfillFail)1282 TEST_P(StreamTest, DiskBackfillFail) {
1283     if (bucketType == "ephemeral") {
1284         /* Ephemeral buckets don't do disk backfill */
1285         return;
1286     }
1287 
1288     /* Add 3 items */
1289     int numItems = 3;
1290     addItemsAndRemoveCheckpoint(numItems);
1291 
1292     /* Delete the vb file so that the backfill would fail */
1293     engine->getKVBucket()->getRWUnderlying(vbid)->delVBucket(vbid,
1294                                                              /* file rev */ 1);
1295 
1296     /* Set up a DCP stream for the backfill */
1297     setup_dcp_stream();
1298 
1299     /* Run the backfill task in a background thread */
1300     ExecutorPool::get()->setNumAuxIO(1);
1301 
1302     /* Wait for the backfill task to fail and stream to transition to dead
1303        state */
1304     {
1305         std::chrono::microseconds uSleepTime(128);
1306         while (stream->isActive()) {
1307             uSleepTime = decayingSleep(uSleepTime);
1308         }
1309     }
1310 
1311     destroy_dcp_stream();
1312 }
1313 
1314 /* Stream items from a DCP backfill with very small backfill buffer.
1315    However small the backfill buffer is, backfill must not stop, it must
1316    proceed to completion eventually */
TEST_P(StreamTest, BackfillSmallBuffer)1317 TEST_P(StreamTest, BackfillSmallBuffer) {
1318     if (bucketType == "ephemeral") {
1319         /* Ephemeral buckets is not memory managed for now. Will be memory
1320            managed soon and then this test will be enabled */
1321         return;
1322     }
1323 
1324     /* Add 2 items */
1325     int numItems = 2;
1326     addItemsAndRemoveCheckpoint(numItems);
1327 
1328     /* Set up a DCP stream for the backfill */
1329     setup_dcp_stream();
1330 
1331     /* set the DCP backfill buffer size to a value that is smaller than the
1332        size of a mutation */
1333     producer->setBackfillBufferSize(1);
1334 
1335     /* We want the backfill task to run in a background thread */
1336     ExecutorPool::get()->setNumAuxIO(1);
1337     stream->transitionStateToBackfilling();
1338 
1339     /* Backfill can only read 1 as its buffer will become full after that */
1340     {
1341         std::chrono::microseconds uSleepTime(128);
1342         while ((numItems - 1) != stream->getLastReadSeqno()) {
1343             uSleepTime = decayingSleep(uSleepTime);
1344         }
1345     }
1346 
1347     /* Consume the backfill item(s) */
1348     stream->consumeBackfillItems(/*snapshot*/ 1 + /*mutation*/ 1);
1349 
1350     /* We should see that buffer full status must be false as we have read
1351        the item in the backfill buffer */
1352     EXPECT_FALSE(producer->getBackfillBufferFullStatus());
1353 
1354     /* Finish up with the backilling of the remaining item */
1355     {
1356         std::chrono::microseconds uSleepTime(128);
1357         while (numItems != stream->getLastReadSeqno()) {
1358             uSleepTime = decayingSleep(uSleepTime);
1359         }
1360     }
1361 
1362     /* Read the other item */
1363     stream->consumeBackfillItems(1);
1364     destroy_dcp_stream();
1365 }
1366 
1367 /* Checks that DCP backfill in Ephemeral buckets does not have duplicates in
1368  a snaphsot */
TEST_P(StreamTest, EphemeralBackfillSnapshotHasNoDuplicates)1369 TEST_P(StreamTest, EphemeralBackfillSnapshotHasNoDuplicates) {
1370     if (bucketType != "ephemeral") {
1371         return;
1372     }
1373     EphemeralVBucket* evb = dynamic_cast<EphemeralVBucket*>(vb0.get());
1374 
1375     /* Add 4 items */
1376     const int numItems = 4;
1377     for (int i = 0; i < numItems; ++i) {
1378         std::string key("key" + std::to_string(i));
1379         store_item(vbid, key, "value");
1380     }
1381 
1382     /* Update "key1" before range read cursors are on vb */
1383     store_item(vbid, "key1", "value1");
1384 
1385     /* Add fake range read cursor on vb and update items */
1386     {
1387         auto itr = evb->makeRangeIterator(/*isBackfill*/ true);
1388         /* update 'key2' and 'key3' */
1389         store_item(vbid, "key2", "value1");
1390         store_item(vbid, "key3", "value1");
1391     }
1392 
1393     /* update key2 once again with a range iterator again so that it has 2 stale
1394      values */
1395     {
1396         auto itr = evb->makeRangeIterator(/*isBackfill*/ true);
1397         /* update 'key2' */
1398         store_item(vbid, "key2", "value1");
1399     }
1400 
1401     removeCheckpoint(numItems);
1402 
1403     /* Set up a DCP stream for the backfill */
1404     setup_dcp_stream();
1405 
1406     /* We want the backfill task to run in a background thread */
1407     ExecutorPool::get()->setNumAuxIO(1);
1408     stream->transitionStateToBackfilling();
1409 
1410     /* Wait for the backfill task to complete */
1411     {
1412         std::chrono::microseconds uSleepTime(128);
1413         uint64_t expLastReadSeqno = 4 /*numItems*/ + 4 /*num updates*/;
1414         while (expLastReadSeqno !=
1415                static_cast<uint64_t>(stream->getLastReadSeqno())) {
1416             uSleepTime = decayingSleep(uSleepTime);
1417         }
1418     }
1419 
1420     /* Verify that only 4 items are read in the backfill (no duplicates) */
1421     EXPECT_EQ(numItems, stream->getNumBackfillItems());
1422 
1423     destroy_dcp_stream();
1424 }
1425 
TEST_P(StreamTest, CursorDroppingBasicBackfillState)1426 TEST_P(StreamTest, CursorDroppingBasicBackfillState) {
1427     /* Add 2 items; we need this to keep stream in backfill state */
1428     const int numItems = 2;
1429     addItemsAndRemoveCheckpoint(numItems);
1430 
1431     /* Set up a DCP stream */
1432     setup_dcp_stream();
1433 
1434     /* Transition stream to backfill state and expect cursor dropping call to
1435        succeed */
1436     stream->transitionStateToBackfilling();
1437     EXPECT_TRUE(stream->public_handleSlowStream());
1438 
1439     /* Run the backfill task in background thread to run so that it can
1440        complete/cancel itself */
1441     ExecutorPool::get()->setNumAuxIO(1);
1442     /* Finish up with the backilling of the remaining item */
1443     {
1444         std::chrono::microseconds uSleepTime(128);
1445         while (numItems != stream->getLastReadSeqno()) {
1446             uSleepTime = decayingSleep(uSleepTime);
1447         }
1448     }
1449     destroy_dcp_stream();
1450 }
1451 
TEST_P(StreamTest, CursorDroppingBasicInMemoryState)1452 TEST_P(StreamTest, CursorDroppingBasicInMemoryState) {
1453     /* Set up a DCP stream */
1454     setup_dcp_stream();
1455 
1456     /* Transition stream to in-memory state and expect cursor dropping call to
1457        succeed */
1458     stream->transitionStateToBackfilling();
1459     stream->transitionStateToInMemory();
1460     EXPECT_TRUE(stream->public_handleSlowStream());
1461     destroy_dcp_stream();
1462 }
1463 
TEST_P(StreamTest, CursorDroppingBasicNotAllowedStates)1464 TEST_P(StreamTest, CursorDroppingBasicNotAllowedStates) {
1465     /* Set up a DCP stream */
1466     setup_dcp_stream(DCP_ADD_STREAM_FLAG_TAKEOVER);
1467 
1468     /* Transition stream to takeoverSend state and expect cursor dropping call
1469        to fail */
1470     stream->transitionStateToTakeoverSend();
1471     EXPECT_FALSE(stream->public_handleSlowStream());
1472 
1473     /* Transition stream to takeoverWait state and expect cursor dropping call
1474        to fail */
1475     stream->transitionStateToTakeoverWait();
1476     EXPECT_FALSE(stream->public_handleSlowStream());
1477 
1478     /* Transition stream to takeoverSend state and expect cursor dropping call
1479        to fail */
1480     stream->transitionStateToTakeoverDead();
1481     EXPECT_FALSE(stream->public_handleSlowStream());
1482     destroy_dcp_stream();
1483 }
1484 
TEST_P(StreamTest, RollbackDueToPurge)1485 TEST_P(StreamTest, RollbackDueToPurge) {
1486     setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::No);
1487 
1488     /* Store 4 items */
1489     const int numItems = 4;
1490     for (int i = 0; i <= numItems; ++i) {
1491         store_item(vbid, std::string("key" + std::to_string(i)), "value");
1492     }
1493     uint64_t vbUuid = vb0->failovers->getLatestUUID();
1494     uint64_t rollbackSeqno;
1495     EXPECT_EQ(ENGINE_SUCCESS,
1496               producer->streamRequest(/*flags*/ 0,
1497                                       /*opaque*/ 0,
1498                                       /*vbucket*/ 0,
1499                                       /*start_seqno*/ numItems - 2,
1500                                       /*end_seqno*/ numItems,
1501                                       vbUuid,
1502                                       /*snap_start*/ numItems - 2,
1503                                       /*snap_end*/ numItems - 2,
1504                                       &rollbackSeqno,
1505                                       DCPTest::fakeDcpAddFailoverLog));
1506     EXPECT_EQ(ENGINE_SUCCESS,
1507               producer->closeStream(/*opaque*/ 0, vb0->getId()));
1508 
1509     /* Set a start_seqno > purge_seqno > snap_start_seqno */
1510     engine->getKVBucket()->getLockedVBucket(vbid)->setPurgeSeqno(numItems - 3);
1511 
1512     /* We don't expect a rollback for this */
1513     EXPECT_EQ(ENGINE_SUCCESS,
1514               producer->streamRequest(/*flags*/ 0,
1515                                       /*opaque*/ 0,
1516                                       /*vbucket*/ 0,
1517                                       /*start_seqno*/ numItems - 2,
1518                                       /*end_seqno*/ numItems,
1519                                       vbUuid,
1520                                       /*snap_start*/ 0,
1521                                       /*snap_end*/ numItems - 2,
1522                                       &rollbackSeqno,
1523                                       DCPTest::fakeDcpAddFailoverLog));
1524     EXPECT_EQ(ENGINE_SUCCESS,
1525               producer->closeStream(/*opaque*/ 0, vb0->getId()));
1526 
1527     /* Set a purge_seqno > start_seqno */
1528     engine->getKVBucket()->getLockedVBucket(vbid)->setPurgeSeqno(numItems - 1);
1529 
1530     /* Now we expect a rollback to 0 */
1531     EXPECT_EQ(ENGINE_ROLLBACK,
1532               producer->streamRequest(/*flags*/ 0,
1533                                       /*opaque*/ 0,
1534                                       /*vbucket*/ 0,
1535                                       /*start_seqno*/ numItems - 2,
1536                                       /*end_seqno*/ numItems,
1537                                       vbUuid,
1538                                       /*snap_start*/ numItems - 2,
1539                                       /*snap_end*/ numItems - 2,
1540                                       &rollbackSeqno,
1541                                       DCPTest::fakeDcpAddFailoverLog));
1542     EXPECT_EQ(0, rollbackSeqno);
1543     destroy_dcp_stream();
1544 }
1545 
1546 /*
1547  * Test to ensure that when a streamRequest is made to a dead vbucket, we
1548  * (1) return not my vbucket.
1549  * (2) do not invoke the callback function (which is passed as parameter).
1550  * The reason we don't want to invoke the callback function is that it will
1551  * invoke mcbp_response_handler and so generate a response (ENGINE_SUCCESS) and
1552  * then when we continue the execution of the streamRequest function we generate
1553  * a second response (ENGINE_NOT_MY_VBUCKET).
1554  */
TEST_P(StreamTest, MB_25820_callback_not_invoked_on_dead_vb_stream_request)1555 TEST_P(StreamTest, MB_25820_callback_not_invoked_on_dead_vb_stream_request) {
1556     setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::No);
1557     ASSERT_EQ(ENGINE_SUCCESS,
1558               engine->getKVBucket()->setVBucketState(vbid,
1559                                                      vbucket_state_dead,
1560                                                      true));
1561     uint64_t vbUuid = vb0->failovers->getLatestUUID();
1562     uint64_t rollbackSeqno;
1563     // Given the vbucket state is dead we should return not my vbucket.
1564     EXPECT_EQ(ENGINE_NOT_MY_VBUCKET,
1565               producer->streamRequest(/*flags*/ 0,
1566                                       /*opaque*/ 0,
1567                                       /*vbucket*/ 0,
1568                                       /*start_seqno*/ 0,
1569                                       /*end_seqno*/ 0,
1570                                       vbUuid,
1571                                       /*snap_start*/ 0,
1572                                       /*snap_end*/ 0,
1573                                       &rollbackSeqno,
1574                                       DCPTest::fakeDcpAddFailoverLog));
1575     // The callback function past to streamRequest should not be invoked.
1576     ASSERT_EQ(0, callbackCount);
1577 }
1578 
1579 class CacheCallbackTest : public StreamTest {
1580 protected:
1581     void SetUp() override {
1582         StreamTest::SetUp();
1583         store_item(vbid, key, "value");
1584 
1585         /* Create new checkpoint so that we can remove the current checkpoint
1586          * and force a backfill in the DCP stream */
1587         CheckpointManager& ckpt_mgr = *vb0->checkpointManager;
1588         ckpt_mgr.createNewCheckpoint();
1589 
1590         /* Wait for removal of the old checkpoint, this also would imply that
1591          * the items are persisted (in case of persistent buckets) */
1592         {
1593             bool new_ckpt_created;
1594             std::chrono::microseconds uSleepTime(128);
1595             while (numItems !=
1596                     ckpt_mgr.removeClosedUnrefCheckpoints(*vb0,
1597                                                           new_ckpt_created)) {
1598                 uSleepTime = decayingSleep(uSleepTime);
1599             }
1600         }
1601 
1602         /* Set up a DCP stream for the backfill */
1603         setup_dcp_stream();
1604     }
1605 
1606     void TearDown() override {
1607         producer->closeAllStreams();
1608         StreamTest::TearDown();
1609     }
1610 
1611     const size_t numItems = 1;
1612     const std::string key = "key";
1613     const DocKey docKey{key, DocNamespace::DefaultCollection};
1614 };
1615 
1616 /*
1617  * Tests the callback member function of the CacheCallback class.  This
1618  * particular test should result in the CacheCallback having a status of
1619  * ENGINE_KEY_EEXISTS.
1620  */
TEST_P(CacheCallbackTest, CacheCallback_key_eexists)1621 TEST_P(CacheCallbackTest, CacheCallback_key_eexists) {
1622     CacheCallback callback(*engine, stream);
1623 
1624     stream->transitionStateToBackfilling();
1625     CacheLookup lookup(docKey, /*BySeqno*/ 1, vbid, {});
1626     callback.callback(lookup);
1627 
1628     /* Invoking callback should result in backfillReceived being called on
1629      * activeStream, which should return true and hence set the callback status
1630      * to ENGINE_KEY_EEXISTS.
1631      */
1632     EXPECT_EQ(ENGINE_KEY_EEXISTS, callback.getStatus());
1633 
1634     /* Verify that the item is read in the backfill */
1635     EXPECT_EQ(numItems, stream->getNumBackfillItems());
1636 
1637     /* Verify have the backfill item sitting in the readyQ */
1638     EXPECT_EQ(numItems, stream->public_readyQ().size());
1639 }
1640 
1641 /*
1642  * Tests the callback member function of the CacheCallback class.  This
1643  * particular test should result in the CacheCallback having a status of
1644  * ENGINE_SUCCESS.
1645  */
TEST_P(CacheCallbackTest, CacheCallback_engine_success)1646 TEST_P(CacheCallbackTest, CacheCallback_engine_success) {
1647     CacheCallback callback(*engine, stream);
1648 
1649     stream->transitionStateToBackfilling();
1650     // Passing in wrong BySeqno - should be 1, but passing in 0
1651     CacheLookup lookup(docKey, /*BySeqno*/ 0, vbid, {});
1652     callback.callback(lookup);
1653 
1654     /* Invoking callback should result in backfillReceived NOT being called on
1655      * activeStream, and hence the callback status should be set to
1656      * ENGINE_SUCCESS.
1657      */
1658     EXPECT_EQ(ENGINE_SUCCESS, callback.getStatus());
1659 
1660     /* Verify that the item is not read in the backfill */
1661     EXPECT_EQ(0, stream->getNumBackfillItems());
1662 
1663     /* Verify do not have the backfill item sitting in the readyQ */
1664     EXPECT_EQ(0, stream->public_readyQ().size());
1665 }
1666 
1667 /*
1668  * Tests the callback member function of the CacheCallback class.  Due to the
1669  * key being evicted the test should result in the CacheCallback having a status
1670  * of ENGINE_SUCCESS.
1671  */
TEST_P(CacheCallbackTest, CacheCallback_engine_success_not_resident)1672 TEST_P(CacheCallbackTest, CacheCallback_engine_success_not_resident) {
1673     if (bucketType == "ephemeral") {
1674         /* The test relies on being able to evict a key from memory.
1675          * Eviction is not supported with empherial buckets.
1676          */
1677         return;
1678     }
1679     CacheCallback callback(*engine, stream);
1680 
1681     stream->transitionStateToBackfilling();
1682     CacheLookup lookup(docKey, /*BySeqno*/ 1, vbid, {});
1683     // Make the key non-resident by evicting the key
1684     const char* msg;
1685     engine->evictKey(docKey, vbid, &msg);
1686     callback.callback(lookup);
1687 
1688     /* With the key evicted, invoking callback should result in backfillReceived
1689      * NOT being called on activeStream, and hence the callback status should be
1690      * set to ENGINE_SUCCESS
1691      */
1692     EXPECT_EQ(ENGINE_SUCCESS, callback.getStatus());
1693 
1694     /* Verify that the item is not read in the backfill */
1695     EXPECT_EQ(0, stream->getNumBackfillItems());
1696 
1697     /* Verify do not have the backfill item sitting in the readyQ */
1698     EXPECT_EQ(0, stream->public_readyQ().size());
1699 }
1700 
1701 /*
1702  * Tests the callback member function of the CacheCallback class.  This
1703  * particular test should result in the CacheCallback having a status of
1704  * ENGINE_ENOMEM.
1705  */
TEST_P(CacheCallbackTest, CacheCallback_engine_enomem)1706 TEST_P(CacheCallbackTest, CacheCallback_engine_enomem) {
1707     /*
1708      * Ensure that DcpProducer::recordBackfillManagerBytesRead returns false
1709      * by setting the backfill buffer size to zero, and then setting bytes read
1710      * to one.
1711      */
1712     producer->setBackfillBufferSize(0);
1713     producer->bytesForceRead(1);
1714 
1715     CacheCallback callback(*engine, stream);
1716 
1717     stream->transitionStateToBackfilling();
1718     CacheLookup lookup(docKey, /*BySeqno*/ 1, vbid, {});
1719     callback.callback(lookup);
1720 
1721     /* Invoking callback should result in backfillReceived being called on
1722      * activeStream, which should return false (due to
1723      * DcpProducer::recordBackfillManagerBytesRead returning false), and hence
1724      * set the callback status to ENGINE_ENOMEM.
1725      */
1726     EXPECT_EQ(ENGINE_ENOMEM, callback.getStatus());
1727 
1728     /* Verify that the item is not read in the backfill */
1729     EXPECT_EQ(0, stream->getNumBackfillItems());
1730 
1731     /* Verify do not have the backfill item sitting in the readyQ */
1732     EXPECT_EQ(0, stream->public_readyQ().size());
1733 }
1734 
1735 class ConnectionTest : public DCPTest,
1736                        public ::testing::WithParamInterface<
1737                                std::tuple<std::string, std::string>> {
1738 protected:
1739     void SetUp() override {
1740         bucketType = std::get<0>(GetParam());
1741         DCPTest::SetUp();
1742         vbid = 0;
1743         if (bucketType == "ephemeral") {
1744             engine->getConfiguration().setEphemeralFullPolicy(
1745                     std::get<1>(GetParam()));
1746         }
1747     }
1748 
set_vb_state(uint16_t vbid, vbucket_state_t state)1749     ENGINE_ERROR_CODE set_vb_state(uint16_t vbid, vbucket_state_t state) {
1750         return engine->getKVBucket()->setVBucketState(vbid, state, true);
1751     }
1752 
1753     /**
1754      * Creates a consumer conn and sends items on the conn with memory usage
1755      * near to replication threshold
1756      *
1757      * @param beyondThreshold indicates if the memory usage should above the
1758      *                        threshold or just below it
1759      */
1760     void sendConsumerMutationsNearThreshold(bool beyondThreshold);
1761 
1762     /**
1763      * Creates a consumer conn and makes the consumer processor task run with
1764      * memory usage near to replication threshold
1765      *
1766      * @param beyondThreshold indicates if the memory usage should above the
1767      *                        threshold or just below it
1768      */
1769     void processConsumerMutationsNearThreshold(bool beyondThreshold);
1770 
1771     /* vbucket associated with this connection */
1772     uint16_t vbid;
1773 };
1774 
mock_noop_return_engine_e2big( gsl::not_null<const void*> cookie, uint32_t opaque)1775 ENGINE_ERROR_CODE mock_noop_return_engine_e2big(
1776         gsl::not_null<const void*> cookie, uint32_t opaque) {
1777     return ENGINE_E2BIG;
1778 }
1779 
1780 /*
1781  * Test that the connection manager interval is a multiple of the value we
1782  * are setting the noop interval to.  This ensures we do not set the the noop
1783  * interval to a value that cannot be adhered to.  The reason is that if there
1784  * is no DCP traffic we snooze for the connection manager interval before
1785  * sending the noop.
1786  */
TEST_P(ConnectionTest, test_mb19955)1787 TEST_P(ConnectionTest, test_mb19955) {
1788     const void* cookie = create_mock_cookie();
1789     engine->getConfiguration().setConnectionManagerInterval(2);
1790 
1791     // Create a Mock Dcp producer
1792     auto producer = std::make_shared<MockDcpProducer>(
1793             *engine,
1794             cookie,
1795             "test_producer",
1796             /*flags*/ 0,
1797             cb::const_byte_buffer() /*no json*/);
1798     // "1" is not a multiple of "2" and so we should return ENGINE_EINVAL
1799     EXPECT_EQ(ENGINE_EINVAL,
1800               producer->control(0,
1801                                 "set_noop_interval",
1802                                 sizeof("set_noop_interval"),
1803                                 "1",
1804                                 sizeof("1")))
1805             << "Expected producer.control to return ENGINE_EINVAL";
1806     destroy_mock_cookie(cookie);
1807 }
1808 
TEST_P(ConnectionTest, test_maybesendnoop_buffer_full)1809 TEST_P(ConnectionTest, test_maybesendnoop_buffer_full) {
1810     const void* cookie = create_mock_cookie();
1811     // Create a Mock Dcp producer
1812     auto producer = std::make_shared<MockDcpProducer>(
1813             *engine,
1814             cookie,
1815             "test_producer",
1816             /*flags*/ 0,
1817             cb::const_byte_buffer() /*no json*/);
1818 
1819     struct dcp_message_producers producers = {nullptr,
1820                                               nullptr,
1821                                               nullptr,
1822                                               nullptr,
1823                                               nullptr,
1824                                               nullptr,
1825                                               nullptr,
1826                                               nullptr,
1827                                               nullptr,
1828                                               nullptr,
1829                                               nullptr,
1830                                               nullptr,
1831                                               nullptr,
1832                                               mock_noop_return_engine_e2big,
1833                                               nullptr,
1834                                               nullptr};
1835 
1836     producer->setNoopEnabled(true);
1837     const auto send_time = ep_current_time() + 21;
1838     producer->setNoopSendTime(send_time);
1839     ENGINE_ERROR_CODE ret = producer->maybeSendNoop(&producers);
1840     EXPECT_EQ(ENGINE_E2BIG, ret)
1841     << "maybeSendNoop not returning ENGINE_E2BIG";
1842     EXPECT_FALSE(producer->getNoopPendingRecv())
1843             << "Waiting for noop acknowledgement";
1844     EXPECT_EQ(send_time, producer->getNoopSendTime())
1845             << "SendTime has been updated";
1846     producer->cancelCheckpointCreatorTask();
1847     destroy_mock_cookie(cookie);
1848 }
1849 
TEST_P(ConnectionTest, test_maybesendnoop_send_noop)1850 TEST_P(ConnectionTest, test_maybesendnoop_send_noop) {
1851     const void* cookie = create_mock_cookie();
1852     // Create a Mock Dcp producer
1853     auto producer = std::make_shared<MockDcpProducer>(
1854             *engine,
1855             cookie,
1856             "test_producer",
1857             /*flags*/ 0,
1858             cb::const_byte_buffer() /*no json*/);
1859 
1860     std::unique_ptr<dcp_message_producers> producers(get_dcp_producers(handle, engine_v1));
1861     producer->setNoopEnabled(true);
1862     const auto send_time = ep_current_time() + 21;
1863     producer->setNoopSendTime(send_time);
1864     ENGINE_ERROR_CODE ret = producer->maybeSendNoop(producers.get());
1865     EXPECT_EQ(ENGINE_WANT_MORE, ret)
1866     << "maybeSendNoop not returning ENGINE_WANT_MORE";
1867     EXPECT_TRUE(producer->getNoopPendingRecv())
1868             << "Not waiting for noop acknowledgement";
1869     EXPECT_NE(send_time, producer->getNoopSendTime())
1870             << "SendTime has not been updated";
1871     producer->cancelCheckpointCreatorTask();
1872     destroy_mock_cookie(cookie);
1873 }
1874 
TEST_P(ConnectionTest, test_maybesendnoop_noop_already_pending)1875 TEST_P(ConnectionTest, test_maybesendnoop_noop_already_pending) {
1876     const void* cookie = create_mock_cookie();
1877     // Create a Mock Dcp producer
1878     auto producer = std::make_shared<MockDcpProducer>(
1879             *engine,
1880             cookie,
1881             "test_producer",
1882             /*flags*/ 0,
1883             cb::const_byte_buffer() /*no json*/);
1884 
1885     std::unique_ptr<dcp_message_producers> producers(
1886             get_dcp_producers(handle, engine_v1));
1887     const auto send_time = ep_current_time();
1888     TimeTraveller marty(engine->getConfiguration().getDcpIdleTimeout() + 1);
1889     producer->setNoopEnabled(true);
1890     producer->setNoopSendTime(send_time);
1891     ENGINE_ERROR_CODE ret = producer->maybeSendNoop(producers.get());
1892     // Check to see if a noop was sent i.e. returned ENGINE_WANT_MORE
1893     EXPECT_EQ(ENGINE_WANT_MORE, ret)
1894         << "maybeSendNoop not returning ENGINE_WANT_MORE";
1895     EXPECT_TRUE(producer->getNoopPendingRecv())
1896             << "Not awaiting noop acknowledgement";
1897     EXPECT_NE(send_time, producer->getNoopSendTime())
1898             << "SendTime has not been updated";
1899     ret = producer->maybeSendNoop(producers.get());
1900     // Check to see if a noop was not sent i.e. returned ENGINE_FAILED
1901     EXPECT_EQ(ENGINE_FAILED, ret)
1902         << "maybeSendNoop not returning ENGINE_FAILED";
1903     producer->setLastReceiveTime(send_time);
1904     ret = producer->maybeDisconnect();
1905     // Check to see if we want to disconnect i.e. returned ENGINE_DISCONNECT
1906     EXPECT_EQ(ENGINE_DISCONNECT, ret)
1907         << "maybeDisconnect not returning ENGINE_DISCONNECT";
1908     producer->setLastReceiveTime(
1909             send_time + engine->getConfiguration().getDcpIdleTimeout() + 1);
1910     ret = producer->maybeDisconnect();
1911     // Check to see if we don't want to disconnect i.e. returned ENGINE_FAILED
1912     EXPECT_EQ(ENGINE_FAILED, ret)
1913         << "maybeDisconnect not returning ENGINE_FAILED";
1914     EXPECT_TRUE(producer->getNoopPendingRecv())
1915             << "Not waiting for noop acknowledgement";
1916     producer->cancelCheckpointCreatorTask();
1917     destroy_mock_cookie(cookie);
1918 }
1919 
TEST_P(ConnectionTest, test_maybesendnoop_not_enabled)1920 TEST_P(ConnectionTest, test_maybesendnoop_not_enabled) {
1921     const void* cookie = create_mock_cookie();
1922     // Create a Mock Dcp producer
1923     auto producer = std::make_shared<MockDcpProducer>(
1924             *engine,
1925             cookie,
1926             "test_producer",
1927             /*flags*/ 0,
1928             cb::const_byte_buffer() /*no json*/);
1929 
1930     std::unique_ptr<dcp_message_producers> producers(get_dcp_producers(handle, engine_v1));
1931     producer->setNoopEnabled(false);
1932     const auto send_time = ep_current_time() + 21;
1933     producer->setNoopSendTime(send_time);
1934     ENGINE_ERROR_CODE ret = producer->maybeSendNoop(producers.get());
1935     EXPECT_EQ(ENGINE_FAILED, ret)
1936     << "maybeSendNoop not returning ENGINE_FAILED";
1937     EXPECT_FALSE(producer->getNoopPendingRecv())
1938             << "Waiting for noop acknowledgement";
1939     EXPECT_EQ(send_time, producer->getNoopSendTime())
1940             << "SendTime has been updated";
1941     producer->cancelCheckpointCreatorTask();
1942     destroy_mock_cookie(cookie);
1943 }
1944 
TEST_P(ConnectionTest, test_maybesendnoop_not_sufficient_time_passed)1945 TEST_P(ConnectionTest, test_maybesendnoop_not_sufficient_time_passed) {
1946     const void* cookie = create_mock_cookie();
1947     // Create a Mock Dcp producer
1948     auto producer = std::make_shared<MockDcpProducer>(
1949             *engine,
1950             cookie,
1951             "test_producer",
1952             /*flags*/ 0,
1953             cb::const_byte_buffer() /*no json*/);
1954 
1955     std::unique_ptr<dcp_message_producers> producers(get_dcp_producers(handle, engine_v1));
1956     producer->setNoopEnabled(true);
1957     rel_time_t current_time = ep_current_time();
1958     producer->setNoopSendTime(current_time);
1959     ENGINE_ERROR_CODE ret = producer->maybeSendNoop(producers.get());
1960     EXPECT_EQ(ENGINE_FAILED, ret)
1961     << "maybeSendNoop not returning ENGINE_FAILED";
1962     EXPECT_FALSE(producer->getNoopPendingRecv())
1963             << "Waiting for noop acknowledgement";
1964     EXPECT_EQ(current_time, producer->getNoopSendTime())
1965             << "SendTime has been incremented";
1966     producer->cancelCheckpointCreatorTask();
1967     destroy_mock_cookie(cookie);
1968 }
1969 
TEST_P(ConnectionTest, test_deadConnections)1970 TEST_P(ConnectionTest, test_deadConnections) {
1971     MockDcpConnMap connMap(*engine);
1972     connMap.initialize();
1973     const void *cookie = create_mock_cookie();
1974     // Create a new Dcp producer
1975     connMap.newProducer(cookie,
1976                         "test_producer",
1977                         /*flags*/ 0);
1978 
1979     // Disconnect the producer connection
1980     connMap.disconnect(cookie);
1981     EXPECT_EQ(1, connMap.getNumberOfDeadConnections())
1982         << "Unexpected number of dead connections";
1983     connMap.manageConnections();
1984     // Should be zero deadConnections
1985     EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
1986         << "Dead connections still remain";
1987 }
1988 
TEST_P(ConnectionTest, test_mb23637_findByNameWithConnectionDoDisconnect)1989 TEST_P(ConnectionTest, test_mb23637_findByNameWithConnectionDoDisconnect) {
1990     MockDcpConnMap connMap(*engine);
1991     connMap.initialize();
1992     const void *cookie = create_mock_cookie();
1993     // Create a new Dcp producer
1994     connMap.newProducer(cookie,
1995                         "test_producer",
1996                         /*flags*/ 0);
1997     // should be able to find the connection
1998     ASSERT_NE(nullptr, connMap.findByName("eq_dcpq:test_producer"));
1999     // Disconnect the producer connection
2000     connMap.disconnect(cookie);
2001     ASSERT_EQ(1, connMap.getNumberOfDeadConnections())
2002         << "Unexpected number of dead connections";
2003     // should not be able to find because the connection has been marked as
2004     // wanting to disconnect
2005     EXPECT_EQ(nullptr, connMap.findByName("eq_dcpq:test_producer"));
2006     connMap.manageConnections();
2007     // Should be zero deadConnections
2008     EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
2009         << "Dead connections still remain";
2010 }
2011 
TEST_P(ConnectionTest, test_mb23637_findByNameWithDuplicateConnections)2012 TEST_P(ConnectionTest, test_mb23637_findByNameWithDuplicateConnections) {
2013     MockDcpConnMap connMap(*engine);
2014     connMap.initialize();
2015     const void* cookie1 = create_mock_cookie();
2016     const void* cookie2 = create_mock_cookie();
2017     // Create a new Dcp producer
2018     DcpProducer* producer = connMap.newProducer(cookie1,
2019                                                 "test_producer",
2020                                                 /*flags*/ 0);
2021     ASSERT_NE(nullptr, producer) << "producer is null";
2022     // should be able to find the connection
2023     ASSERT_NE(nullptr, connMap.findByName("eq_dcpq:test_producer"));
2024 
2025     // Create a duplicate Dcp producer
2026     DcpProducer* duplicateproducer =
2027             connMap.newProducer(cookie2, "test_producer", /*flags*/ 0);
2028     ASSERT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
2029     ASSERT_NE(nullptr, duplicateproducer) << "duplicateproducer is null";
2030 
2031     // should find the duplicateproducer as the first producer has been marked
2032     // as wanting to disconnect
2033     EXPECT_EQ(duplicateproducer,
2034               connMap.findByName("eq_dcpq:test_producer").get());
2035 
2036     // Disconnect the producer connection
2037     connMap.disconnect(cookie1);
2038     // Disconnect the duplicateproducer connection
2039     connMap.disconnect(cookie2);
2040     EXPECT_EQ(2, connMap.getNumberOfDeadConnections())
2041         << "Unexpected number of dead connections";
2042 
2043     connMap.manageConnections();
2044     // Should be zero deadConnections
2045     EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
2046         << "Dead connections still remain";
2047 }
2048 
2049 
TEST_P(ConnectionTest, test_mb17042_duplicate_name_producer_connections)2050 TEST_P(ConnectionTest, test_mb17042_duplicate_name_producer_connections) {
2051     MockDcpConnMap connMap(*engine);
2052     connMap.initialize();
2053     const void* cookie1 = create_mock_cookie();
2054     const void* cookie2 = create_mock_cookie();
2055     // Create a new Dcp producer
2056     DcpProducer* producer = connMap.newProducer(cookie1,
2057                                                 "test_producer",
2058                                                 /*flags*/ 0);
2059     EXPECT_NE(nullptr, producer) << "producer is null";
2060 
2061     // Create a duplicate Dcp producer
2062     DcpProducer* duplicateproducer = connMap.newProducer(cookie2,
2063                                                          "test_producer",
2064                                                          /*flags*/ 0);
2065     EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
2066     EXPECT_NE(nullptr, duplicateproducer) << "duplicateproducer is null";
2067 
2068     // Disconnect the producer connection
2069     connMap.disconnect(cookie1);
2070     // Disconnect the duplicateproducer connection
2071     connMap.disconnect(cookie2);
2072     // Cleanup the deadConnections
2073     connMap.manageConnections();
2074     // Should be zero deadConnections
2075     EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
2076         << "Dead connections still remain";
2077 }
2078 
TEST_P(ConnectionTest, test_mb17042_duplicate_name_consumer_connections)2079 TEST_P(ConnectionTest, test_mb17042_duplicate_name_consumer_connections) {
2080     MockDcpConnMap connMap(*engine);
2081     connMap.initialize();
2082     struct mock_connstruct* cookie1 = (struct mock_connstruct*)create_mock_cookie();
2083     struct mock_connstruct* cookie2 = (struct mock_connstruct*)create_mock_cookie();
2084     // Create a new Dcp consumer
2085     DcpConsumer* consumer = connMap.newConsumer(cookie1, "test_consumer");
2086     EXPECT_NE(nullptr, consumer) << "consumer is null";
2087 
2088     // Create a duplicate Dcp consumer
2089     DcpConsumer* duplicateconsumer =
2090             connMap.newConsumer(cookie2, "test_consumer");
2091     EXPECT_TRUE(consumer->doDisconnect()) << "consumer doDisconnect == false";
2092     EXPECT_NE(nullptr, duplicateconsumer) << "duplicateconsumer is null";
2093 
2094     // Disconnect the consumer connection
2095     connMap.disconnect(cookie1);
2096     // Disconnect the duplicateconsumer connection
2097     connMap.disconnect(cookie2);
2098     // Cleanup the deadConnections
2099     connMap.manageConnections();
2100     // Should be zero deadConnections
2101     EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
2102         << "Dead connections still remain";
2103 }
2104 
TEST_P(ConnectionTest, test_mb17042_duplicate_cookie_producer_connections)2105 TEST_P(ConnectionTest, test_mb17042_duplicate_cookie_producer_connections) {
2106     MockDcpConnMap connMap(*engine);
2107     connMap.initialize();
2108     const void* cookie = create_mock_cookie();
2109     // Create a new Dcp producer
2110     DcpProducer* producer = connMap.newProducer(cookie,
2111                                                 "test_producer1",
2112                                                 /*flags*/ 0);
2113 
2114     // Create a duplicate Dcp producer
2115     DcpProducer* duplicateproducer = connMap.newProducer(cookie,
2116                                                          "test_producer2",
2117                                                          /*flags*/ 0);
2118 
2119     EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
2120     EXPECT_EQ(nullptr, duplicateproducer) << "duplicateproducer is not null";
2121 
2122     // Disconnect the producer connection
2123     connMap.disconnect(cookie);
2124     // Cleanup the deadConnections
2125     connMap.manageConnections();
2126     // Should be zero deadConnections
2127     EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
2128         << "Dead connections still remain";
2129 }
2130 
2131 /* Checks that the DCP producer does an async stream close when the DCP client
2132    expects "DCP_STREAM_END" msg. */
TEST_P(ConnectionTest, test_producer_stream_end_on_client_close_stream)2133 TEST_P(ConnectionTest, test_producer_stream_end_on_client_close_stream) {
2134 #ifdef UNDEFINED_SANITIZER
2135     // See below MB-28739 comment for why this is skipped.
2136     std::cerr << "MB-28739[UBsan] skipping test\n";
2137     return;
2138 #endif
2139     const void* cookie = create_mock_cookie();
2140     /* Create a new Dcp producer */
2141     auto producer = std::make_shared<MockDcpProducer>(
2142             *engine,
2143             cookie,
2144             "test_producer",
2145             /*flags*/ 0,
2146             cb::const_byte_buffer() /*no json*/);
2147 
2148     /* Send a control message to the producer indicating that the DCP client
2149        expects a "DCP_STREAM_END" upon stream close */
2150     const std::string sendStreamEndOnClientStreamCloseCtrlMsg(
2151             "send_stream_end_on_client_close_stream");
2152     const std::string sendStreamEndOnClientStreamCloseCtrlValue("true");
2153     EXPECT_EQ(ENGINE_SUCCESS,
2154               producer->control(
2155                       0,
2156                       sendStreamEndOnClientStreamCloseCtrlMsg.c_str(),
2157                       sendStreamEndOnClientStreamCloseCtrlMsg.size(),
2158                       sendStreamEndOnClientStreamCloseCtrlValue.c_str(),
2159                       sendStreamEndOnClientStreamCloseCtrlValue.size()));
2160 
2161     /* Open stream */
2162     uint64_t rollbackSeqno = 0;
2163     uint32_t opaque = 0;
2164     const uint16_t vbid = 0;
2165     EXPECT_EQ(ENGINE_SUCCESS,
2166               producer->streamRequest(/*flags*/ 0,
2167                                       opaque,
2168                                       vbid,
2169                                       /*start_seqno*/ 0,
2170                                       /*end_seqno*/ ~0,
2171                                       /*vb_uuid*/ 0,
2172                                       /*snap_start*/ 0,
2173                                       /*snap_end*/ 0,
2174                                       &rollbackSeqno,
2175                                       DCPTest::fakeDcpAddFailoverLog));
2176 
2177     // MB-28739[UBSan]: The following cast is undefined behaviour - the DCP
2178     // connection map object is of type DcpConnMap; so it's undefined to cast
2179     // to MockDcpConnMap.
2180     // However, in this instance MockDcpConnMap has identical member variables
2181     // to DcpConnMap - the mock just exposes normally private data - and so
2182     // this /seems/ ok.
2183     // As such allow it in general, but skip this test under UBSan.
2184     MockDcpConnMap& mockConnMap =
2185             static_cast<MockDcpConnMap&>(engine->getDcpConnMap());
2186     mockConnMap.addConn(cookie, producer);
2187     EXPECT_TRUE(mockConnMap.doesConnHandlerExist(vbid, "test_producer"));
2188 
2189     /* Close stream */
2190     EXPECT_EQ(ENGINE_SUCCESS, producer->closeStream(opaque, vbid));
2191 
2192     /* Expect a stream end message */
2193     std::unique_ptr<dcp_message_producers> fakeProducers(
2194             get_dcp_producers(handle, engine_v1));
2195     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(fakeProducers.get()));
2196     EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_STREAM_END, dcp_last_op);
2197     EXPECT_EQ(END_STREAM_CLOSED, dcp_last_flags);
2198 
2199     /* Re-open stream for the same vbucket on the conn */
2200     EXPECT_EQ(ENGINE_SUCCESS,
2201               producer->streamRequest(/*flags*/ 0,
2202                                       opaque,
2203                                       vbid,
2204                                       /*start_seqno*/ 0,
2205                                       /*end_seqno*/ ~0,
2206                                       /*vb_uuid*/ 0,
2207                                       /*snap_start*/ 0,
2208                                       /*snap_end*/ 0,
2209                                       &rollbackSeqno,
2210                                       DCPTest::fakeDcpAddFailoverLog));
2211 
2212     /* Check that the new stream is opened properly */
2213     auto stream = producer->findStream(vbid);
2214     EXPECT_TRUE(stream->isInMemory());
2215 
2216     // MB-27769: Prior to the fix, this would fail here because we would skip
2217     // adding the connhandler into the connmap vbConns vector, causing the
2218     // stream to never get notified.
2219     EXPECT_TRUE(mockConnMap.doesConnHandlerExist(vbid, "test_producer"));
2220 
2221     mockConnMap.disconnect(cookie);
2222     EXPECT_FALSE(mockConnMap.doesConnHandlerExist(vbid, "test_producer"));
2223     mockConnMap.manageConnections();
2224 }
2225 
2226 /* Checks that the DCP producer does a synchronous stream close when the DCP
2227    client does not expect "DCP_STREAM_END" msg. */
TEST_P(ConnectionTest, test_producer_no_stream_end_on_client_close_stream)2228 TEST_P(ConnectionTest, test_producer_no_stream_end_on_client_close_stream) {
2229     MockDcpConnMap connMap(*engine);
2230     connMap.initialize();
2231     const void* cookie = create_mock_cookie();
2232 
2233     /* Create a new Dcp producer */
2234     DcpProducer* producer = connMap.newProducer(cookie,
2235                                                 "test_producer",
2236                                                 /*flags*/ 0);
2237 
2238     /* Open stream */
2239     uint64_t rollbackSeqno = 0;
2240     uint32_t opaque = 0;
2241     const uint16_t vbid = 0;
2242     EXPECT_EQ(ENGINE_SUCCESS,
2243               producer->streamRequest(/*flags*/ 0,
2244                                       opaque,
2245                                       vbid,
2246                                       /*start_seqno*/ 0,
2247                                       /*end_seqno*/ ~0,
2248                                       /*vb_uuid*/ 0,
2249                                       /*snap_start*/ 0,
2250                                       /*snap_end*/ 0,
2251                                       &rollbackSeqno,
2252                                       DCPTest::fakeDcpAddFailoverLog));
2253 
2254     /* Close stream */
2255     EXPECT_EQ(ENGINE_SUCCESS, producer->closeStream(opaque, vbid));
2256 
2257     /* Don't expect a stream end message (or any other message as the stream is
2258        closed) */
2259     std::unique_ptr<dcp_message_producers> fakeProducers(
2260             get_dcp_producers(handle, engine_v1));
2261     EXPECT_EQ(ENGINE_SUCCESS, producer->step(fakeProducers.get()));
2262 
2263     /* Check that the stream is not found in the producer's stream map */
2264     EXPECT_FALSE(producer->findStream(vbid));
2265 
2266     /* Disconnect the producer connection */
2267     connMap.disconnect(cookie);
2268     /* Cleanup the deadConnections */
2269     connMap.manageConnections();
2270 }
2271 
TEST_P(ConnectionTest, test_producer_unknown_ctrl_msg)2272 TEST_P(ConnectionTest, test_producer_unknown_ctrl_msg) {
2273     const void* cookie = create_mock_cookie();
2274     /* Create a new Dcp producer */
2275     auto producer = std::make_shared<MockDcpProducer>(
2276             *engine,
2277             cookie,
2278             "test_producer",
2279             /*flags*/ 0,
2280             cb::const_byte_buffer() /*no json*/);
2281 
2282     /* Send an unkown control message to the producer and expect an error code
2283        of "ENGINE_EINVAL" */
2284     const std::string unkownCtrlMsg("unknown");
2285     const std::string unkownCtrlValue("blah");
2286     EXPECT_EQ(ENGINE_EINVAL,
2287               producer->control(0,
2288                                 unkownCtrlMsg.c_str(),
2289                                 unkownCtrlMsg.size(),
2290                                 unkownCtrlValue.c_str(),
2291                                 unkownCtrlValue.size()));
2292     destroy_mock_cookie(cookie);
2293 }
2294 
TEST_P(ConnectionTest, test_mb17042_duplicate_cookie_consumer_connections)2295 TEST_P(ConnectionTest, test_mb17042_duplicate_cookie_consumer_connections) {
2296     MockDcpConnMap connMap(*engine);
2297     connMap.initialize();
2298     const void* cookie = create_mock_cookie();
2299     // Create a new Dcp consumer
2300     DcpConsumer* consumer = connMap.newConsumer(cookie, "test_consumer1");
2301 
2302     // Create a duplicate Dcp consumer
2303     DcpConsumer* duplicateconsumer =
2304             connMap.newConsumer(cookie, "test_consumer2");
2305     EXPECT_TRUE(consumer->doDisconnect()) << "consumer doDisconnect == false";
2306     EXPECT_EQ(nullptr, duplicateconsumer) << "duplicateconsumer is not null";
2307 
2308     // Disconnect the consumer connection
2309     connMap.disconnect(cookie);
2310     // Cleanup the deadConnections
2311     connMap.manageConnections();
2312     // Should be zero deadConnections
2313     EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
2314         << "Dead connections still remain";
2315 }
2316 
TEST_P(ConnectionTest, test_update_of_last_message_time_in_consumer)2317 TEST_P(ConnectionTest, test_update_of_last_message_time_in_consumer) {
2318     const void* cookie = create_mock_cookie();
2319     // Create a Mock Dcp consumer
2320     auto consumer =
2321             std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2322     consumer->setLastMessageTime(1234);
2323     consumer->addStream(/*opaque*/0, /*vbucket*/0, /*flags*/0);
2324     EXPECT_NE(1234, consumer->getLastMessageTime())
2325         << "lastMessagerTime not updated for addStream";
2326     consumer->setLastMessageTime(1234);
2327     consumer->closeStream(/*opaque*/0, /*vbucket*/0);
2328     EXPECT_NE(1234, consumer->getLastMessageTime())
2329         << "lastMessagerTime not updated for closeStream";
2330     consumer->setLastMessageTime(1234);
2331     consumer->streamEnd(/*opaque*/0, /*vbucket*/0, /*flags*/0);
2332     EXPECT_NE(1234, consumer->getLastMessageTime())
2333         << "lastMessagerTime not updated for streamEnd";
2334     const DocKey docKey{ nullptr, 0, DocNamespace::DefaultCollection};
2335     consumer->mutation(0, // opaque
2336                        docKey,
2337                        {}, // value
2338                        0, // priv bytes
2339                        PROTOCOL_BINARY_RAW_BYTES,
2340                        0, // cas
2341                        0, // vbucket
2342                        0, // flags
2343                        0, // locktime
2344                        0, // by seqno
2345                        0, // rev seqno
2346                        0, // exptime
2347                        {}, // meta
2348                        0); // nru
2349     EXPECT_NE(1234, consumer->getLastMessageTime())
2350         << "lastMessagerTime not updated for mutation";
2351     consumer->setLastMessageTime(1234);
2352     consumer->deletion(0, // opaque
2353                        docKey,
2354                        {}, // value
2355                        0, // priv bytes
2356                        PROTOCOL_BINARY_RAW_BYTES,
2357                        0, // cas
2358                        0, // vbucket
2359                        0, // by seqno
2360                        0, // rev seqno
2361                        {}); // meta
2362     EXPECT_NE(1234, consumer->getLastMessageTime())
2363         << "lastMessagerTime not updated for deletion";
2364     consumer->setLastMessageTime(1234);
2365     consumer->expiration(0, // opaque
2366                          docKey,
2367                          {}, // value
2368                          0, // priv bytes
2369                          PROTOCOL_BINARY_RAW_BYTES,
2370                          0, // cas
2371                          0, // vbucket
2372                          0, // by seqno
2373                          0, // rev seqno
2374                          {}); // meta
2375     EXPECT_NE(1234, consumer->getLastMessageTime())
2376         << "lastMessagerTime not updated for expiration";
2377     consumer->setLastMessageTime(1234);
2378     consumer->snapshotMarker(/*opaque*/0,
2379                              /*vbucket*/0,
2380                              /*start_seqno*/0,
2381                              /*end_seqno*/0,
2382                              /*flags*/0);
2383     EXPECT_NE(1234, consumer->getLastMessageTime())
2384         << "lastMessagerTime not updated for snapshotMarker";
2385     consumer->setLastMessageTime(1234);
2386     consumer->noop(/*opaque*/0);
2387     EXPECT_NE(1234, consumer->getLastMessageTime())
2388         << "lastMessagerTime not updated for noop";
2389     consumer->setLastMessageTime(1234);
2390     consumer->flush(/*opaque*/0, /*vbucket*/0);
2391     EXPECT_NE(1234, consumer->getLastMessageTime())
2392         << "lastMessagerTime not updated for flush";
2393     consumer->setLastMessageTime(1234);
2394     consumer->setVBucketState(/*opaque*/0,
2395                               /*vbucket*/0,
2396                               /*state*/vbucket_state_active);
2397     EXPECT_NE(1234, consumer->getLastMessageTime())
2398         << "lastMessagerTime not updated for setVBucketState";
2399     destroy_mock_cookie(cookie);
2400 }
2401 
TEST_P(ConnectionTest, test_consumer_add_stream)2402 TEST_P(ConnectionTest, test_consumer_add_stream) {
2403     const void* cookie = create_mock_cookie();
2404     uint16_t vbid = 0;
2405 
2406     /* Create a Mock Dcp consumer */
2407     auto consumer =
2408             std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2409 
2410     ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
2411     ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
2412                                                   /*flags*/0));
2413 
2414     /* Set the passive to dead state. Note that we want to set the stream to
2415        dead state but not erase it from the streams map in the consumer
2416        connection*/
2417     MockPassiveStream *stream = static_cast<MockPassiveStream*>
2418                                     ((consumer->getVbucketStream(vbid)).get());
2419 
2420     stream->transitionStateToDead();
2421 
2422     /* Add a passive stream on the same vb */
2423     ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
2424                                                   /*flags*/0));
2425 
2426     /* Expected the newly added stream to be in active state */
2427     stream = static_cast<MockPassiveStream*>
2428                                     ((consumer->getVbucketStream(vbid)).get());
2429     ASSERT_TRUE(stream->isActive());
2430 
2431     /* Close stream before deleting the connection */
2432     ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
2433 
2434     destroy_mock_cookie(cookie);
2435 }
2436 
TEST_P(ConnectionTest, consumer_get_error_map)2437 TEST_P(ConnectionTest, consumer_get_error_map) {
2438     // We want to test that the Consumer processes the GetErrorMap negotiation
2439     // with the Producer correctly. I.e., the Consumer must check the
2440     // Producer's version and set internal flags accordingly.
2441     // Note: we test both the cases of pre-5.0.0 and post-5.0.0 Producer
2442     for (auto prodIsV5orHigher : {true, false}) {
2443         const void* cookie = create_mock_cookie();
2444         // GetErrorMap negotiation performed only if NOOP is enabled
2445         engine->getConfiguration().setDcpEnableNoop(true);
2446         auto producers = get_dcp_producers(handle, engine_v1);
2447 
2448         // Create a mock DcpConsumer
2449         MockDcpConsumer consumer(*engine, cookie, "test_consumer");
2450         ASSERT_EQ(1 /*PendingRequest*/,
2451                   static_cast<uint8_t>(consumer.getGetErrorMapState()));
2452         ASSERT_EQ(false, consumer.getProducerIsVersion5orHigher());
2453 
2454         // If a Flow Control Policy is enabled, then the first call to step()
2455         // will handle the Flow Control negotiation. We do not want to test that
2456         // here, so this is just to let the test to work with all EP
2457         // configurations.
2458         if (engine->getConfiguration().getDcpFlowControlPolicy() != "none") {
2459             ASSERT_EQ(ENGINE_WANT_MORE, consumer.step(producers.get()));
2460         }
2461 
2462         // The next call to step() is expected to start the GetErrorMap
2463         // negotiation
2464         ASSERT_EQ(ENGINE_WANT_MORE, consumer.step(producers.get()));
2465         ASSERT_EQ(2 /*PendingResponse*/,
2466                   static_cast<uint8_t>(consumer.getGetErrorMapState()));
2467 
2468         // At this point the consumer is waiting for a response from the
2469         // producer. I simulate the producer's response with a call to
2470         // handleResponse()
2471         protocol_binary_response_header resp{};
2472         resp.response.opcode = PROTOCOL_BINARY_CMD_GET_ERROR_MAP;
2473         resp.response.status =
2474                 prodIsV5orHigher
2475                         ? htons(PROTOCOL_BINARY_RESPONSE_SUCCESS)
2476                         : htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND);
2477         ASSERT_TRUE(consumer.handleResponse(&resp));
2478         ASSERT_EQ(0 /*Skip*/,
2479                   static_cast<uint8_t>(consumer.getGetErrorMapState()));
2480         ASSERT_EQ(prodIsV5orHigher ? true : false,
2481                   consumer.getProducerIsVersion5orHigher());
2482 
2483         destroy_mock_cookie(cookie);
2484     }
2485 }
2486 
2487 // Regression test for MB 20645 - ensure that a call to addStats after a
2488 // connection has been disconnected (and closeAllStreams called) doesn't crash.
TEST_P(ConnectionTest, test_mb20645_stats_after_closeAllStreams)2489 TEST_P(ConnectionTest, test_mb20645_stats_after_closeAllStreams) {
2490     MockDcpConnMap connMap(*engine);
2491     connMap.initialize();
2492     const void *cookie = create_mock_cookie();
2493     // Create a new Dcp producer
2494     DcpProducer* producer = connMap.newProducer(cookie,
2495                                                 "test_producer",
2496                                                 /*flags*/ 0);
2497 
2498     // Disconnect the producer connection
2499     connMap.disconnect(cookie);
2500 
2501     // Try to read stats. Shouldn't crash.
2502     producer->addStats([](const char* key,
2503                           const uint16_t klen,
2504                           const char* val,
2505                           const uint32_t vlen,
2506                           gsl::not_null<const void*> cookie) {},
2507                        // Cookie is not being used in the callback, but the
2508                        // API requires it. Pass in the producer as cookie
2509                        static_cast<const void*>(producer));
2510 
2511     destroy_mock_cookie(cookie);
2512 }
2513 
2514 // Verify that when a DELETE_BUCKET event occurs, we correctly notify any
2515 // DCP connections which are currently in ewouldblock state, so the frontend
2516 // can correctly close the connection.
2517 // If we don't notify then front-end connections can hang for a long period of
2518 // time).
TEST_P(ConnectionTest, test_mb20716_connmap_notify_on_delete)2519 TEST_P(ConnectionTest, test_mb20716_connmap_notify_on_delete) {
2520     MockDcpConnMap connMap(*engine);
2521     connMap.initialize();
2522     const void *cookie = create_mock_cookie();
2523     // Create a new Dcp producer.
2524     DcpProducer* producer = connMap.newProducer(cookie,
2525                                                 "mb_20716r",
2526                                                 /*flags*/ 0);
2527 
2528     // Check preconditions.
2529     EXPECT_TRUE(producer->isPaused());
2530 
2531     // Hook into notify_io_complete.
2532     // We (ab)use the engine_specific API to pass a pointer to a count of
2533     // how many times notify_io_complete has been called.
2534     size_t notify_count = 0;
2535     SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
2536     scapi->store_engine_specific(cookie, &notify_count);
2537     auto orig_notify_io_complete = scapi->notify_io_complete;
2538     scapi->notify_io_complete = [](gsl::not_null<const void*> cookie,
2539                                    ENGINE_ERROR_CODE status) {
2540         auto* notify_ptr = reinterpret_cast<size_t*>(
2541                 get_mock_server_api()->cookie->get_engine_specific(cookie));
2542         (*notify_ptr)++;
2543     };
2544 
2545     // 0. Should start with no notifications.
2546     ASSERT_EQ(0, notify_count);
2547 
2548     // 1. Check that the periodic connNotifier (notifyAllPausedConnections)
2549     // isn't sufficient to notify (it shouldn't be, as our connection has
2550     // no notification pending).
2551     connMap.notifyAllPausedConnections();
2552     ASSERT_EQ(0, notify_count);
2553 
2554     // 1. Simulate a bucket deletion.
2555     connMap.shutdownAllConnections();
2556 
2557     // Can also get a second notify as part of manageConnections being called
2558     // in shutdownAllConnections().
2559     EXPECT_GE(notify_count, 1)
2560         << "expected at least one notify after shutting down all connections";
2561 
2562     // Restore notify_io_complete callback.
2563     scapi->notify_io_complete = orig_notify_io_complete;
2564     destroy_mock_cookie(cookie);
2565 }
2566 
2567 // Consumer variant of above test.
TEST_P(ConnectionTest, test_mb20716_connmap_notify_on_delete_consumer)2568 TEST_P(ConnectionTest, test_mb20716_connmap_notify_on_delete_consumer) {
2569     MockDcpConnMap connMap(*engine);
2570     connMap.initialize();
2571     const void *cookie = create_mock_cookie();
2572     // Create a new Dcp consumer
2573     DcpConsumer* consumer = connMap.newConsumer(cookie, "mb_20716_consumer");
2574 
2575     // Move consumer into paused state (aka EWOULDBLOCK).
2576     std::unique_ptr<dcp_message_producers> producers(
2577             get_dcp_producers(handle, engine_v1));
2578     ENGINE_ERROR_CODE result;
2579     do {
2580         result = consumer->step(producers.get());
2581         handleProducerResponseIfStepBlocked(*consumer);
2582     } while (result == ENGINE_WANT_MORE);
2583     EXPECT_EQ(ENGINE_SUCCESS, result);
2584 
2585     // Check preconditions.
2586     EXPECT_TRUE(consumer->isPaused());
2587 
2588     // Hook into notify_io_complete.
2589     // We (ab)use the engine_specific API to pass a pointer to a count of
2590     // how many times notify_io_complete has been called.
2591     size_t notify_count = 0;
2592     SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
2593     scapi->store_engine_specific(cookie, &notify_count);
2594     auto orig_notify_io_complete = scapi->notify_io_complete;
2595     scapi->notify_io_complete = [](gsl::not_null<const void*> cookie,
2596                                    ENGINE_ERROR_CODE status) {
2597         auto* notify_ptr = reinterpret_cast<size_t*>(
2598                 get_mock_server_api()->cookie->get_engine_specific(cookie));
2599         (*notify_ptr)++;
2600     };
2601 
2602     // 0. Should start with no notifications.
2603     ASSERT_EQ(0, notify_count);
2604 
2605     // 1. Check that the periodic connNotifier (notifyAllPausedConnections)
2606     // isn't sufficient to notify (it shouldn't be, as our connection has
2607     // no notification pending).
2608     connMap.notifyAllPausedConnections();
2609     ASSERT_EQ(0, notify_count);
2610 
2611     // 2. Simulate a bucket deletion.
2612     connMap.shutdownAllConnections();
2613 
2614     // Can also get a second notify as part of manageConnections being called
2615     // in shutdownAllConnections().
2616     EXPECT_GE(notify_count, 1)
2617         << "expected at least one notify after shutting down all connections";
2618 
2619     // Restore notify_io_complete callback.
2620     scapi->notify_io_complete = orig_notify_io_complete;
2621     destroy_mock_cookie(cookie);
2622 }
2623 
2624 /*
2625  * The following tests that once a vbucket has been put into a backfillphase
2626  * the openCheckpointID is 0.  In addition it checks that a subsequent
2627  * snapshotMarker results in a new checkpoint being created.
2628  */
TEST_P(ConnectionTest, test_mb21784)2629 TEST_P(ConnectionTest, test_mb21784) {
2630     // Make vbucket replica so can add passive stream
2631     ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
2632 
2633     const void *cookie = create_mock_cookie();
2634     /*
2635      * Create a Mock Dcp consumer. Since child class subobj of MockDcpConsumer
2636      *  obj are accounted for by SingleThreadedRCPtr, use the same here
2637      */
2638     auto consumer =
2639             std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2640 
2641     // Add passive stream
2642     ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
2643                                                   /*flags*/0));
2644     // Get the checkpointManager
2645     auto& manager =
2646             *(engine->getKVBucket()->getVBucket(vbid)->checkpointManager);
2647 
2648     // Because the vbucket was previously active it will have an
2649     // openCheckpointId of 2
2650     EXPECT_EQ(2, manager.getOpenCheckpointId());
2651 
2652     // Send a snapshotMarker to move the vbucket into a backfilling state
2653     consumer->snapshotMarker(/*opaque*/1,
2654                              /*vbucket*/0,
2655                              /*start_seqno*/0,
2656                              /*end_seqno*/0,
2657                              /*flags set to MARKER_FLAG_DISK*/0x2);
2658 
2659     // A side effect of moving the vbucket into a backfill state is that
2660     // the openCheckpointId is set to 0
2661     EXPECT_EQ(0, manager.getOpenCheckpointId());
2662 
2663     consumer->snapshotMarker(/*opaque*/1,
2664                              /*vbucket*/0,
2665                              /*start_seqno*/0,
2666                              /*end_seqno*/0,
2667                              /*flags*/0);
2668 
2669     // Check that a new checkpoint was created, which means the
2670     // opencheckpointid increases to 1
2671     EXPECT_EQ(1, manager.getOpenCheckpointId());
2672 
2673     // Close stream
2674     ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
2675     destroy_mock_cookie(cookie);
2676 }
2677 
2678 class DcpConnMapTest : public ::testing::Test {
2679 protected:
2680     void SetUp() override {
2681         /* Set up the bare minimum stuff needed by the 'SynchronousEPEngine'
2682            (mock engine) */
2683         ObjectRegistry::onSwitchThread(&engine);
2684         engine.setKVBucket(engine.public_makeBucket(engine.getConfiguration()));
2685         engine.public_initializeEngineCallbacks();
2686         initialize_time_functions(get_mock_server_api()->core);
2687 
2688         /* Set up one vbucket in the bucket */
2689         engine.getKVBucket()->setVBucketState(
2690                 vbid, vbucket_state_active, false);
2691     }
2692 
2693     void TearDown() override {
2694         destroy_mock_event_callbacks();
2695         ObjectRegistry::onSwitchThread(nullptr);
2696     }
2697 
2698     /**
2699      * Fake callback emulating dcp_add_failover_log
2700      */
fakeDcpAddFailoverLog( vbucket_failover_t* entry, size_t nentries, gsl::not_null<const void*> cookie)2701     static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(
2702             vbucket_failover_t* entry,
2703             size_t nentries,
2704             gsl::not_null<const void*> cookie) {
2705         return ENGINE_SUCCESS;
2706     }
2707 
2708     SynchronousEPEngine engine;
2709     const uint16_t vbid = 0;
2710 };
2711 
2712 /* Tests that there is no memory loss due to cyclic reference between connection
2713  * and other objects (like dcp streams). It is possible that connections are
2714  * deleted from the dcp connmap when dcp connmap is deleted due to abrupt
2715  * deletion of 'EventuallyPersistentEngine' obj.
2716  * This test simulates the abrupt deletion of dcp connmap object
2717  */
TEST_F(DcpConnMapTest, DeleteProducerOnUncleanDCPConnMapDelete)2718 TEST_F(DcpConnMapTest, DeleteProducerOnUncleanDCPConnMapDelete) {
2719     /* Create a new Dcp producer */
2720     const void* dummyMockCookie = create_mock_cookie();
2721     DcpProducer* producer = engine.getDcpConnMap().newProducer(dummyMockCookie,
2722                                                                "test_producer",
2723                                                                /*flags*/ 0);
2724     /* Open stream */
2725     uint64_t rollbackSeqno = 0;
2726     uint32_t opaque = 0;
2727     EXPECT_EQ(ENGINE_SUCCESS,
2728               producer->streamRequest(/*flags*/ 0,
2729                                       opaque,
2730                                       vbid,
2731                                       /*start_seqno*/ 0,
2732                                       /*end_seqno*/ ~0,
2733                                       /*vb_uuid*/ 0,
2734                                       /*snap_start*/ 0,
2735                                       /*snap_end*/ 0,
2736                                       &rollbackSeqno,
2737                                       fakeDcpAddFailoverLog));
2738 
2739     destroy_mock_cookie(dummyMockCookie);
2740 
2741     /* Delete the connmap, connection should be deleted as the owner of
2742        the connection (connmap) is deleted. Checks that there is no cyclic
2743        reference between conn (producer) and stream or any other object */
2744     engine.setDcpConnMap(nullptr);
2745 }
2746 
2747 /* Tests that there is no memory loss due to cyclic reference between a
2748  * notifier connection and a notifier stream.
2749  */
TEST_F(DcpConnMapTest, DeleteNotifierConnOnUncleanDCPConnMapDelete)2750 TEST_F(DcpConnMapTest, DeleteNotifierConnOnUncleanDCPConnMapDelete) {
2751     /* Create a new Dcp producer */
2752     const void* dummyMockCookie = create_mock_cookie();
2753     DcpProducer* producer = engine.getDcpConnMap().newProducer(
2754             dummyMockCookie, "test_producer", DCP_OPEN_NOTIFIER);
2755     /* Open notifier stream */
2756     uint64_t rollbackSeqno = 0;
2757     uint32_t opaque = 0;
2758     EXPECT_EQ(ENGINE_SUCCESS,
2759               producer->streamRequest(/*flags*/ 0,
2760                                       opaque,
2761                                       vbid,
2762                                       /*start_seqno*/ 0,
2763                                       /*end_seqno*/ ~0,
2764                                       /*vb_uuid*/ 0,
2765                                       /*snap_start*/ 0,
2766                                       /*snap_end*/ 0,
2767                                       &rollbackSeqno,
2768                                       fakeDcpAddFailoverLog));
2769 
2770     destroy_mock_cookie(dummyMockCookie);
2771 
2772     /* Delete the connmap, connection should be deleted as the owner of
2773        the connection (connmap) is deleted. Checks that there is no cyclic
2774        reference between conn (producer) and stream or any other object */
2775     engine.setDcpConnMap(nullptr);
2776 }
2777 
2778 /* Tests that there is no memory loss due to cyclic reference between a
2779  * consumer connection and a passive stream.
2780  */
TEST_F(DcpConnMapTest, DeleteConsumerConnOnUncleanDCPConnMapDelete)2781 TEST_F(DcpConnMapTest, DeleteConsumerConnOnUncleanDCPConnMapDelete) {
2782     /* Consumer stream needs a replica vbucket */
2783     engine.getKVBucket()->setVBucketState(vbid, vbucket_state_replica, false);
2784 
2785     /* Create a new Dcp consumer */
2786     const void* dummyMockCookie = create_mock_cookie();
2787     DcpConsumer* consumer = engine.getDcpConnMap().newConsumer(dummyMockCookie,
2788                                                                "test_consumer");
2789 
2790     /* Add passive stream */
2791     ASSERT_EQ(ENGINE_SUCCESS,
2792               consumer->addStream(/*opaque*/ 0,
2793                                   vbid,
2794                                   /*flags*/ 0));
2795 
2796     destroy_mock_cookie(dummyMockCookie);
2797 
2798     /* Delete the connmap, connection should be deleted as the owner of
2799        the connection (connmap) is deleted. Checks that there is no cyclic
2800        reference between conn (consumer) and stream or any other object */
2801     engine.setDcpConnMap(nullptr);
2802 }
2803 
2804 class NotifyTest : public DCPTest {
2805 protected:
SetUp()2806     void SetUp() {
2807         // The test is going to replace a server cookie API method, we must
2808         // be able to undo that
2809         scookie_api = *get_mock_server_api()->cookie;
2810         DCPTest::SetUp();
2811     }
2812 
TearDown()2813     void TearDown() {
2814         // Reset the server cookie api for other tests
2815         *get_mock_server_api()->cookie = scookie_api;
2816         DCPTest::TearDown();
2817     }
2818 
2819     SERVER_COOKIE_API scookie_api;
2820     std::unique_ptr<MockDcpConnMap> connMap;
2821     DcpProducer* producer;
2822     int callbacks;
2823 };
2824 
2825 class ConnMapNotifyTest {
2826 public:
ConnMapNotifyTest(EventuallyPersistentEngine& engine)2827     ConnMapNotifyTest(EventuallyPersistentEngine& engine)
2828         : connMap(new MockDcpConnMap(engine)),
2829           callbacks(0) {
2830         connMap->initialize();
2831 
2832         // Use 'this' instead of a mock cookie
2833         producer = connMap->newProducer(static_cast<void*>(this),
2834                                         "test_producer",
2835                                         /*flags*/ 0);
2836     }
2837 
notify()2838     void notify() {
2839         callbacks++;
2840         connMap->notifyPausedConnection(producer->shared_from_this(),
2841                                         /*schedule*/ true);
2842     }
2843 
getCallbacks()2844     int getCallbacks() {
2845         return callbacks;
2846     }
2847 
dcp_test_notify_io_complete(gsl::not_null<const void*> cookie, ENGINE_ERROR_CODE status)2848     static void dcp_test_notify_io_complete(gsl::not_null<const void*> cookie,
2849                                             ENGINE_ERROR_CODE status) {
2850         const auto* notifyTest =
2851                 reinterpret_cast<const ConnMapNotifyTest*>(cookie.get());
2852         // 3. Call notifyPausedConnection again. We're now interleaved inside
2853         //    of notifyAllPausedConnections, a second notification should occur.
2854         const_cast<ConnMapNotifyTest*>(notifyTest)->notify();
2855     }
2856 
2857     std::unique_ptr<MockDcpConnMap> connMap;
2858     DcpProducer* producer;
2859 
2860 private:
2861     int callbacks;
2862 
2863 };
2864 
2865 
TEST_F(NotifyTest, test_mb19503_connmap_notify)2866 TEST_F(NotifyTest, test_mb19503_connmap_notify) {
2867     ConnMapNotifyTest notifyTest(*engine);
2868 
2869     // Hook into notify_io_complete
2870     SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
2871     scapi->notify_io_complete = ConnMapNotifyTest::dcp_test_notify_io_complete;
2872 
2873     // Should be 0 when we begin
2874     ASSERT_EQ(0, notifyTest.getCallbacks());
2875     ASSERT_TRUE(notifyTest.producer->isPaused());
2876     ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
2877 
2878     // 1. Call notifyPausedConnection with schedule = true
2879     //    this will queue the producer
2880     notifyTest.connMap->notifyPausedConnection(
2881             notifyTest.producer->shared_from_this(),
2882             /*schedule*/ true);
2883     EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
2884 
2885     // 2. Call notifyAllPausedConnections this will invoke notifyIOComplete
2886     //    which we've hooked into. For step 3 go to dcp_test_notify_io_complete
2887     notifyTest.connMap->notifyAllPausedConnections();
2888 
2889     // 2.1 One callback should of occurred, and we should still have one
2890     //     notification pending (see dcp_test_notify_io_complete).
2891     EXPECT_EQ(1, notifyTest.getCallbacks());
2892     EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
2893 
2894     // 4. Call notifyAllPausedConnections again, is there a new connection?
2895     notifyTest.connMap->notifyAllPausedConnections();
2896 
2897     // 5. There should of been 2 callbacks
2898     EXPECT_EQ(2, notifyTest.getCallbacks());
2899 }
2900 
2901 // Variation on test_mb19503_connmap_notify - check that notification is correct
2902 // when notifiable is not paused.
TEST_F(NotifyTest, test_mb19503_connmap_notify_paused)2903 TEST_F(NotifyTest, test_mb19503_connmap_notify_paused) {
2904     ConnMapNotifyTest notifyTest(*engine);
2905 
2906     // Hook into notify_io_complete
2907     SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
2908     scapi->notify_io_complete = ConnMapNotifyTest::dcp_test_notify_io_complete;
2909 
2910     // Should be 0 when we begin
2911     ASSERT_EQ(notifyTest.getCallbacks(), 0);
2912     ASSERT_TRUE(notifyTest.producer->isPaused());
2913     ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
2914 
2915     // 1. Call notifyPausedConnection with schedule = true
2916     //    this will queue the producer
2917     notifyTest.connMap->notifyPausedConnection(
2918             notifyTest.producer->shared_from_this(),
2919             /*schedule*/ true);
2920     EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
2921 
2922     // 2. Mark connection as not paused.
2923     notifyTest.producer->unPause();
2924 
2925     // 3. Call notifyAllPausedConnections - as the connection is not paused
2926     // this should *not* invoke notifyIOComplete.
2927     notifyTest.connMap->notifyAllPausedConnections();
2928 
2929     // 3.1 Should have not had any callbacks.
2930     EXPECT_EQ(0, notifyTest.getCallbacks());
2931     // 3.2 Should have no pending notifications.
2932     EXPECT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
2933 
2934     // 4. Now mark the connection as paused.
2935     ASSERT_FALSE(notifyTest.producer->isPaused());
2936     notifyTest.producer->pause();
2937 
2938     // 4. Add another notification - should queue the producer again.
2939     notifyTest.connMap->notifyPausedConnection(
2940             notifyTest.producer->shared_from_this(),
2941             /*schedule*/ true);
2942     EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
2943 
2944     // 5. Call notifyAllPausedConnections a second time - as connection is
2945     //    paused this time we *should* get a callback.
2946     notifyTest.connMap->notifyAllPausedConnections();
2947     EXPECT_EQ(1, notifyTest.getCallbacks());
2948 }
2949 
2950 // Tests that the MutationResponse created for the deletion response is of the
2951 // correct size.
TEST_P(ConnectionTest, test_mb24424_deleteResponse)2952 TEST_P(ConnectionTest, test_mb24424_deleteResponse) {
2953     const void* cookie = create_mock_cookie();
2954     uint16_t vbid = 0;
2955 
2956     auto consumer =
2957             std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2958 
2959     ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
2960     ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
2961                                                   /*flags*/0));
2962 
2963     MockPassiveStream *stream = static_cast<MockPassiveStream*>
2964                                        ((consumer->
2965                                                getVbucketStream(vbid)).get());
2966     ASSERT_TRUE(stream->isActive());
2967 
2968     std::string key = "key";
2969     const DocKey docKey{ reinterpret_cast<const uint8_t*>(key.data()),
2970         key.size(),
2971         DocNamespace::DefaultCollection};
2972     uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
2973     cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
2974 
2975     consumer->deletion(/*opaque*/ 1,
2976                        /*key*/ docKey,
2977                        /*value*/ {},
2978                        /*priv_bytes*/ 0,
2979                        /*datatype*/ PROTOCOL_BINARY_RAW_BYTES,
2980                        /*cas*/ 0,
2981                        /*vbucket*/ vbid,
2982                        /*bySeqno*/ 1,
2983                        /*revSeqno*/ 0,
2984                        /*meta*/ meta);
2985 
2986     auto messageSize = MutationResponse::deletionBaseMsgBytes + key.size() +
2987                        sizeof(extMeta);
2988 
2989     EXPECT_EQ(messageSize, stream->responseMessageSize);
2990 
2991     /* Close stream before deleting the connection */
2992     ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
2993 
2994     destroy_mock_cookie(cookie);
2995 }
2996 
2997 // Tests that the MutationResponse created for the mutation response is of the
2998 // correct size.
TEST_P(ConnectionTest, test_mb24424_mutationResponse)2999 TEST_P(ConnectionTest, test_mb24424_mutationResponse) {
3000     const void* cookie = create_mock_cookie();
3001     uint16_t vbid = 0;
3002 
3003     auto consumer =
3004             std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
3005 
3006     ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
3007     ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
3008                                                   /*flags*/0));
3009 
3010     MockPassiveStream *stream = static_cast<MockPassiveStream*>
3011                                        ((consumer->
3012                                                getVbucketStream(vbid)).get());
3013     ASSERT_TRUE(stream->isActive());
3014 
3015     std::string key = "key";
3016     std::string data = R"({"json":"yes"})";
3017     const DocKey docKey{ reinterpret_cast<const uint8_t*>(key.data()),
3018         key.size(),
3019         DocNamespace::DefaultCollection};
3020     cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(data.data()),
3021         data.size()};
3022     uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
3023     cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
3024 
3025     consumer->mutation(/*opaque*/1,
3026                        /*key*/docKey,
3027                        /*values*/value,
3028                        /*priv_bytes*/0,
3029                        /*datatype*/PROTOCOL_BINARY_DATATYPE_JSON,
3030                        /*cas*/0,
3031                        /*vbucket*/vbid,
3032                        /*flags*/0,
3033                        /*bySeqno*/1,
3034                        /*revSeqno*/0,
3035                        /*exptime*/0,
3036                        /*lock_time*/0,
3037                        /*meta*/meta,
3038                        /*nru*/0);
3039 
3040     auto messageSize = MutationResponse::mutationBaseMsgBytes +
3041             key.size() + data.size() + sizeof(extMeta);
3042 
3043     EXPECT_EQ(messageSize, stream->responseMessageSize);
3044 
3045     /* Close stream before deleting the connection */
3046     ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
3047 
3048     destroy_mock_cookie(cookie);
3049 }
3050 
sendConsumerMutationsNearThreshold(bool beyondThreshold)3051 void ConnectionTest::sendConsumerMutationsNearThreshold(bool beyondThreshold) {
3052     const void* cookie = create_mock_cookie();
3053     const uint32_t opaque = 1;
3054     const uint64_t snapStart = 1;
3055     const uint64_t snapEnd = std::numeric_limits<uint64_t>::max();
3056     uint64_t bySeqno = snapStart;
3057 
3058     /* Set up a consumer connection */
3059     auto consumer =
3060             std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
3061 
3062     /* Replica vbucket */
3063     ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
3064 
3065     /* Passive stream */
3066     ASSERT_EQ(ENGINE_SUCCESS,
3067               consumer->addStream(/*opaque*/ 0,
3068                                   vbid,
3069                                   /*flags*/ 0));
3070     MockPassiveStream* stream = static_cast<MockPassiveStream*>(
3071             (consumer->getVbucketStream(vbid)).get());
3072     ASSERT_TRUE(stream->isActive());
3073 
3074     /* Send a snapshotMarker before sending items for replication */
3075     EXPECT_EQ(ENGINE_SUCCESS,
3076               consumer->snapshotMarker(opaque,
3077                                        vbid,
3078                                        snapStart,
3079                                        snapEnd,
3080                                        /* in-memory snapshot */ 0x1));
3081 
3082     /* Send an item for replication */
3083     const DocKey docKey{nullptr, 0, DocNamespace::DefaultCollection};
3084     EXPECT_EQ(ENGINE_SUCCESS,
3085               consumer->mutation(opaque,
3086                                  docKey,
3087                                  {}, // value
3088                                  0, // priv bytes
3089                                  PROTOCOL_BINARY_RAW_BYTES,
3090                                  0, // cas
3091                                  vbid,
3092                                  0, // flags
3093                                  bySeqno,
3094                                  0, // rev seqno
3095                                  0, // exptime
3096                                  0, // locktime
3097                                  {}, // meta
3098                                  0)); // nru
3099 
3100     /* Set 'mem_used' beyond the 'replication threshold' */
3101     EPStats& stats = engine->getEpStats();
3102     if (beyondThreshold) {
3103         stats.setMaxDataSize(stats.getPreciseTotalMemoryUsed());
3104     } else {
3105         /* Set 'mem_used' just 1 byte less than the 'replication threshold'.
3106            That is we are below 'replication threshold', but not enough space
3107            for  the new item */
3108         stats.setMaxDataSize(stats.getPreciseTotalMemoryUsed() + 1);
3109         /* Simpler to set the replication threshold to 1 and test, rather than
3110            testing with maxData = (memUsed / replicationThrottleThreshold);
3111            that is, we are avoiding a division */
3112         engine->getConfiguration().setReplicationThrottleThreshold(100);
3113     }
3114 
3115     if ((engine->getConfiguration().getBucketType() == "ephemeral") &&
3116         (engine->getConfiguration().getEphemeralFullPolicy()) ==
3117                 "fail_new_data") {
3118         /* Expect disconnect signal in Ephemeral with "fail_new_data" policy */
3119         while (1) {
3120             /* Keep sending items till the memory usage goes above the
3121                threshold and the connection is disconnected */
3122             if (ENGINE_DISCONNECT ==
3123                 consumer->mutation(opaque,
3124                                    docKey,
3125                                    {}, // value
3126                                    0, // priv bytes
3127                                    PROTOCOL_BINARY_RAW_BYTES,
3128                                    0, // cas
3129                                    vbid,
3130                                    0, // flags
3131                                    ++bySeqno,
3132                                    0, // rev seqno
3133                                    0, // exptime
3134                                    0, // locktime
3135                                    {}, // meta
3136                                    0)) {
3137                 break;
3138             }
3139         }
3140     } else {
3141         /* In 'couchbase' buckets we buffer the replica items and indirectly
3142            throttle replication by not sending flow control acks to the
3143            producer. Hence we do not drop the connection here */
3144         EXPECT_EQ(ENGINE_SUCCESS,
3145                   consumer->mutation(opaque,
3146                                      docKey,
3147                                      {}, // value
3148                                      0, // priv bytes
3149                                      PROTOCOL_BINARY_RAW_BYTES,
3150                                      0, // cas
3151                                      vbid,
3152                                      0, // flags
3153                                      bySeqno + 1,
3154                                      0, // rev seqno
3155                                      0, // exptime
3156                                      0, // locktime
3157                                      {}, // meta
3158                                      0)); // nru
3159     }
3160 
3161     /* Close stream before deleting the connection */
3162     EXPECT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
3163 
3164     destroy_mock_cookie(cookie);
3165 }
3166 
3167 /* Here we test how the DCP consumer handles the scenario where the memory
3168    usage is beyond the replication throttle threshold.
3169    In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
3170    indicate close of the consumer conn and in other cases it is expected to
3171    just defer processing. */
TEST_P(ConnectionTest, ReplicateAfterThrottleThreshold)3172 TEST_P(ConnectionTest, ReplicateAfterThrottleThreshold) {
3173     sendConsumerMutationsNearThreshold(true);
3174 }
3175 
3176 /* Here we test how the DCP consumer handles the scenario where the memory
3177    usage is just below the replication throttle threshold, but will go over the
3178    threshold when it adds the new mutation from the processor buffer to the
3179    hashtable.
3180    In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
3181    indicate close of the consumer conn and in other cases it is expected to
3182    just defer processing. */
TEST_P(ConnectionTest, ReplicateJustBeforeThrottleThreshold)3183 TEST_P(ConnectionTest, ReplicateJustBeforeThrottleThreshold) {
3184     sendConsumerMutationsNearThreshold(false);
3185 }
3186 
processConsumerMutationsNearThreshold( bool beyondThreshold)3187 void ConnectionTest::processConsumerMutationsNearThreshold(
3188         bool beyondThreshold) {
3189     const void* cookie = create_mock_cookie();
3190     const uint32_t opaque = 1;
3191     const uint64_t snapStart = 1, snapEnd = 10;
3192     const uint64_t bySeqno = snapStart;
3193 
3194     /* Set up a consumer connection */
3195     auto consumer =
3196             std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
3197 
3198     /* Replica vbucket */
3199     ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
3200 
3201     /* Passive stream */
3202     ASSERT_EQ(ENGINE_SUCCESS,
3203               consumer->addStream(/*opaque*/ 0,
3204                                   vbid,
3205                                   /*flags*/ 0));
3206     MockPassiveStream* stream = static_cast<MockPassiveStream*>(
3207             (consumer->getVbucketStream(vbid)).get());
3208     ASSERT_TRUE(stream->isActive());
3209 
3210     /* Send a snapshotMarker before sending items for replication */
3211     EXPECT_EQ(ENGINE_SUCCESS,
3212               consumer->snapshotMarker(opaque,
3213                                        vbid,
3214                                        snapStart,
3215                                        snapEnd,
3216                                        /* in-memory snapshot */ 0x1));
3217 
3218     /* Simulate a situation where adding a mutation temporarily fails
3219        and hence adds the mutation to a replication buffer. For that, we
3220        set vbucket::takeover_backed_up to true */
3221     engine->getKVBucket()->getVBucket(vbid)->setTakeoverBackedUpState(true);
3222 
3223     /* Send an item for replication and expect it to be buffered */
3224     const DocKey docKey{"mykey", DocNamespace::DefaultCollection};
3225     EXPECT_EQ(ENGINE_SUCCESS,
3226               consumer->mutation(opaque,
3227                                  docKey,
3228                                  {}, // value
3229                                  0, // priv bytes
3230                                  PROTOCOL_BINARY_RAW_BYTES,
3231                                  0, // cas
3232                                  vbid,
3233                                  0, // flags
3234                                  bySeqno,
3235                                  0, // rev seqno
3236                                  0, // exptime
3237                                  0, // locktime
3238                                  {}, // meta
3239                                  0)); // nru
3240     EXPECT_EQ(1, stream->getNumBufferItems());
3241 
3242     /* Set back the vbucket::takeover_backed_up to false */
3243     engine->getKVBucket()->getVBucket(vbid)->setTakeoverBackedUpState(false);
3244 
3245     /* Set 'mem_used' beyond the 'replication threshold' */
3246     EPStats& stats = engine->getEpStats();
3247     if (beyondThreshold) {
3248         /* Actually setting it well above also, as there can be a drop in memory
3249            usage during testing */
3250         stats.setMaxDataSize(stats.getEstimatedTotalMemoryUsed() / 4);
3251     } else {
3252         /* set max size to a value just over */
3253         stats.setMaxDataSize(stats.getEstimatedTotalMemoryUsed() + 1);
3254         /* Simpler to set the replication threshold to 1 and test, rather than
3255            testing with maxData = (memUsed / replicationThrottleThreshold); that
3256            is, we are avoiding a division */
3257         engine->getConfiguration().setReplicationThrottleThreshold(100);
3258     }
3259 
3260     std::unique_ptr<dcp_message_producers> dcpStepProducers(
3261             get_dcp_producers(handle, engine_v1));
3262     if ((engine->getConfiguration().getBucketType() == "ephemeral") &&
3263         (engine->getConfiguration().getEphemeralFullPolicy()) ==
3264                 "fail_new_data") {
3265         /* Make a call to the function that would be called by the processor
3266            task here */
3267         EXPECT_EQ(stop_processing, consumer->processBufferedItems());
3268 
3269         /* Expect the connection to be notified */
3270         EXPECT_FALSE(consumer->isPaused());
3271 
3272         /* Expect disconnect signal in Ephemeral with "fail_new_data" policy */
3273         EXPECT_EQ(ENGINE_DISCONNECT, consumer->step(dcpStepProducers.get()));
3274     } else {
3275         uint32_t backfoffs = consumer->getNumBackoffs();
3276 
3277         /* Make a call to the function that would be called by the processor
3278            task here */
3279         if (beyondThreshold) {
3280             EXPECT_EQ(more_to_process, consumer->processBufferedItems());
3281         } else {
3282             EXPECT_EQ(cannot_process, consumer->processBufferedItems());
3283         }
3284 
3285         EXPECT_EQ(backfoffs + 1, consumer->getNumBackoffs());
3286 
3287         /* In 'couchbase' buckets we buffer the replica items and indirectly
3288            throttle replication by not sending flow control acks to the
3289            producer. Hence we do not drop the connection here */
3290         EXPECT_EQ(ENGINE_WANT_MORE, consumer->step(dcpStepProducers.get()));
3291 
3292         /* Close stream before deleting the connection */
3293         EXPECT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
3294     }
3295     destroy_mock_cookie(cookie);
3296 }
3297 
3298 /* Here we test how the Processor task in DCP consumer handles the scenario
3299    where the memory usage is beyond the replication throttle threshold.
3300    In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
3301    indicate close of the consumer conn and in other cases it is expected to
3302    just defer processing. */
TEST_P(ConnectionTest, ProcessReplicationBufferAfterThrottleThreshold)3303 TEST_P(ConnectionTest, ProcessReplicationBufferAfterThrottleThreshold) {
3304     processConsumerMutationsNearThreshold(true);
3305 }
3306 
3307 /* Here we test how the Processor task in DCP consumer handles the scenario
3308    where the memory usage is just below the replication throttle threshold,
3309    but will go over the threshold when it adds the new mutation from the
3310    processor buffer to the hashtable.
3311    In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
3312    indicate close of the consumer conn and in other cases it is expected to
3313    just defer processing. */
TEST_P(ConnectionTest, DISABLED_ProcessReplicationBufferJustBeforeThrottleThreshold)3314 TEST_P(ConnectionTest,
3315        DISABLED_ProcessReplicationBufferJustBeforeThrottleThreshold) {
3316     /* There are sporadic failures seen while testing this. The problem is
3317        we need to have a memory usage just below max_size, so we need to
3318        start at that point. But sometimes the memory usage goes further below
3319        resulting in the test failure (a hang). Hence commenting out the test.
3320        Can be run locally as and when needed. */
3321     processConsumerMutationsNearThreshold(false);
3322 }
3323 
3324 class ActiveStreamChkptProcessorTaskTest : public SingleThreadedKVBucketTest {
3325