1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /**
19  * Tests for Collection functionality in EPStore.
20  */
21 #include "bgfetcher.h"
22 #include "dcp/dcpconnmap.h"
23 #include "kvstore.h"
24 #include "programs/engine_testapp/mock_server.h"
25 #include "tests/mock/mock_dcp.h"
26 #include "tests/mock/mock_dcp_consumer.h"
27 #include "tests/mock/mock_dcp_producer.h"
28 #include "tests/mock/mock_global_task.h"
29 #include "tests/module_tests/evp_store_single_threaded_test.h"
30 #include "tests/module_tests/evp_store_test.h"
31 #include "tests/module_tests/test_helpers.h"
32 
33 #include <functional>
34 #include <thread>
35 
36 extern uint8_t dcp_last_op;
37 extern std::string dcp_last_key;
38 extern uint32_t dcp_last_flags;
39 
40 class CollectionsDcpTest : public SingleThreadedKVBucketTest {
41 public:
CollectionsDcpTest()42     CollectionsDcpTest()
43         : cookieC(create_mock_cookie()), cookieP(create_mock_cookie()) {
44         mock_set_collections_support(cookieP, true);
45         mock_set_collections_support(cookieC, true);
46     }
47 
48     // Setup a producer/consumer ready for the test
SetUp()49     void SetUp() override {
50         config_string += "collections_prototype_enabled=true";
51         SingleThreadedKVBucketTest::SetUp();
52         // Start vbucket as active to allow us to store items directly to it.
53         store->setVBucketState(vbid, vbucket_state_active, false);
54         producers = get_dcp_producers(
55                 reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
56                 reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
57         createDcpObjects({/*no filter*/}, true /*collections on*/);
58     }
getManifest(uint16_t vb) const59     std::string getManifest(uint16_t vb) const {
60         return store->getVBucket(vb)
61                 ->getShard()
62                 ->getRWUnderlying()
63                 ->getCollectionsManifest(vbid);
64     }
65 
createDcpStream()66     void createDcpStream() {
67         uint64_t rollbackSeqno;
68         ASSERT_EQ(ENGINE_SUCCESS,
69                   producer->streamRequest(
70                           0, // flags
71                           1, // opaque
72                           vbid,
73                           0, // start_seqno
74                           ~0ull, // end_seqno
75                           0, // vbucket_uuid,
76                           0, // snap_start_seqno,
77                           0, // snap_end_seqno,
78                           &rollbackSeqno,
79                           &CollectionsDcpTest::dcpAddFailoverLog));
80     }
81 
createDcpConsumer()82     void createDcpConsumer() {
83         CollectionsDcpTest::consumer = std::make_shared<MockDcpConsumer>(
84                 *engine, cookieC, "test_consumer");
85         store->setVBucketState(replicaVB, vbucket_state_replica, false);
86         ASSERT_EQ(ENGINE_SUCCESS,
87                   consumer->addStream(/*opaque*/ 0,
88                                       replicaVB,
89                                       /*flags*/ 0));
90         // Setup a snapshot on the consumer
91         ASSERT_EQ(ENGINE_SUCCESS,
92                   consumer->snapshotMarker(/*opaque*/ 1,
93                                            /*vbucket*/ replicaVB,
94                                            /*start_seqno*/ 0,
95                                            /*end_seqno*/ 100,
96                                            /*flags*/ 0));
97     }
98 
createDcpObjects(const std::string & filter,bool dcpCollectionAware)99     void createDcpObjects(const std::string& filter, bool dcpCollectionAware) {
100         createDcpConsumer();
101         producer = SingleThreadedKVBucketTest::createDcpProducer(
102                 cookieP, filter, dcpCollectionAware, IncludeDeleteTime::No);
103         // Patch our local callback into the handlers
104         producers->system_event = &CollectionsDcpTest::sendSystemEvent;
105         createDcpStream();
106     }
107 
TearDown()108     void TearDown() override {
109         teardown();
110         SingleThreadedKVBucketTest::TearDown();
111     }
112 
teardown()113     void teardown() {
114         destroy_mock_cookie(cookieC);
115         destroy_mock_cookie(cookieP);
116         consumer->closeAllStreams();
117         consumer->cancelTask();
118         producer->closeAllStreams();
119         producer->cancelCheckpointCreatorTask();
120         producer.reset();
121         consumer.reset();
122     }
123 
runCheckpointProcessor()124     void runCheckpointProcessor() {
125         SingleThreadedKVBucketTest::runCheckpointProcessor(*producer,
126                                                            *producers);
127     }
128 
notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode expectedOp=cb::mcbp::ClientOpcode::DcpSnapshotMarker,bool fromMemory=true)129     void notifyAndStepToCheckpoint(
130             cb::mcbp::ClientOpcode expectedOp =
131                     cb::mcbp::ClientOpcode::DcpSnapshotMarker,
132             bool fromMemory = true) {
133         // Call parent class function with our producer
134         SingleThreadedKVBucketTest::notifyAndStepToCheckpoint(
135                 *producer, *producers, expectedOp, fromMemory);
136     }
137 
138     void testDcpCreateDelete(int expectedCreates,
139                              int expectedDeletes,
140                              int expectedMutations,
141                              bool fromMemory = true);
142 
resetEngineAndWarmup(std::string new_config="")143     void resetEngineAndWarmup(std::string new_config = "") {
144         teardown();
145         SingleThreadedKVBucketTest::resetEngineAndWarmup(new_config);
146         producers = get_dcp_producers(
147                 reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
148                 reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
149         cookieC = create_mock_cookie();
150         cookieP = create_mock_cookie();
151     }
152 
153     static const uint16_t replicaVB{1};
154     static std::shared_ptr<MockDcpConsumer> consumer;
155     static mcbp::systemevent::id dcp_last_system_event;
156 
157     /*
158      * DCP callback method to push SystemEvents on to the consumer
159      */
sendSystemEvent(gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,mcbp::systemevent::id event,uint64_t bySeqno,cb::const_byte_buffer key,cb::const_byte_buffer eventData)160     static ENGINE_ERROR_CODE sendSystemEvent(gsl::not_null<const void*> cookie,
161                                              uint32_t opaque,
162                                              uint16_t vbucket,
163                                              mcbp::systemevent::id event,
164                                              uint64_t bySeqno,
165                                              cb::const_byte_buffer key,
166                                              cb::const_byte_buffer eventData) {
167         (void)cookie;
168         (void)vbucket; // ignored as we are connecting VBn to VBn+1
169         dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SYSTEM_EVENT;
170         dcp_last_key.assign(reinterpret_cast<const char*>(key.data()),
171                             key.size());
172         dcp_last_system_event = event;
173         return consumer->systemEvent(
174                 opaque, replicaVB, event, bySeqno, key, eventData);
175     }
176 
dcpAddFailoverLog(vbucket_failover_t * entry,size_t nentries,gsl::not_null<const void * > cookie)177     static ENGINE_ERROR_CODE dcpAddFailoverLog(
178             vbucket_failover_t* entry,
179             size_t nentries,
180             gsl::not_null<const void*> cookie) {
181         return ENGINE_SUCCESS;
182     }
183 
184     const void* cookieC;
185     const void* cookieP;
186     std::unique_ptr<dcp_message_producers> producers;
187     std::shared_ptr<MockDcpProducer> producer;
188 };
189 
190 std::shared_ptr<MockDcpConsumer> CollectionsDcpTest::consumer;
191 mcbp::systemevent::id CollectionsDcpTest::dcp_last_system_event;
192 
TEST_F(CollectionsDcpTest,test_dcp_consumer)193 TEST_F(CollectionsDcpTest, test_dcp_consumer) {
194     const void* cookie = create_mock_cookie();
195 
196     auto consumer =
197             std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
198 
199     store->setVBucketState(vbid, vbucket_state_replica, false);
200     ASSERT_EQ(ENGINE_SUCCESS,
201               consumer->addStream(/*opaque*/ 0, vbid, /*flags*/ 0));
202 
203     // Create meat with uid 4 as if it came from manifest uid cafef00d
204     std::string collection = "meat";
205     Collections::uid_t uid = 4;
206     Collections::uid_t manifestUid = 0xcafef00d;
207     Collections::SystemEventDCPData eventData{htonll(manifestUid), htonll(uid)};
208 
209     ASSERT_EQ(ENGINE_SUCCESS,
210               consumer->snapshotMarker(/*opaque*/ 1,
211                                        vbid,
212                                        /*start_seqno*/ 0,
213                                        /*end_seqno*/ 100,
214                                        /*flags*/ 0));
215 
216     VBucketPtr vb = store->getVBucket(vbid);
217 
218     EXPECT_FALSE(vb->lockCollections().doesKeyContainValidCollection(
219             {"meat:bacon", DocNamespace::Collections}));
220 
221     // Call the consumer function for handling DCP events
222     // create the meat collection
223     EXPECT_EQ(ENGINE_SUCCESS,
224               consumer->systemEvent(
225                       /*opaque*/ 1,
226                       vbid,
227                       mcbp::systemevent::id::CreateCollection,
228                       /*seqno*/ 1,
229                       {reinterpret_cast<const uint8_t*>(collection.data()),
230                        collection.size()},
231                       {reinterpret_cast<const uint8_t*>(&eventData),
232                        sizeof(eventData)}));
233 
234     // We can now access the collection
235     EXPECT_TRUE(vb->lockCollections().doesKeyContainValidCollection(
236             {"meat:bacon", DocNamespace::Collections}));
237     EXPECT_TRUE(vb->lockCollections().isCollectionOpen("meat"));
238     EXPECT_TRUE(vb->lockCollections().isCollectionOpen(
239             Collections::Identifier{"meat", 4}));
240     EXPECT_EQ(0xcafef00d, vb->lockCollections().getManifestUid());
241 
242     // Call the consumer function for handling DCP events
243     // delete the meat collection
244     EXPECT_EQ(ENGINE_SUCCESS,
245               consumer->systemEvent(
246                       /*opaque*/ 1,
247                       vbid,
248                       mcbp::systemevent::id::DeleteCollection,
249                       /*seqno*/ 2,
250                       {reinterpret_cast<const uint8_t*>(collection.data()),
251                        collection.size()},
252                       {reinterpret_cast<const uint8_t*>(&eventData),
253                        sizeof(eventData)}));
254 
255     // It's gone!
256     EXPECT_FALSE(vb->lockCollections().doesKeyContainValidCollection(
257             {"meat:bacon", DocNamespace::Collections}));
258 
259     consumer->closeAllStreams();
260     destroy_mock_cookie(cookie);
261     consumer->cancelTask();
262 }
263 
264 /*
265  * test_dcp connects a producer and consumer to test that collections created
266  * on the producer are transferred to the consumer
267  *
268  * The test replicates VBn to VBn+1
269  */
TEST_F(CollectionsDcpTest,test_dcp)270 TEST_F(CollectionsDcpTest, test_dcp) {
271     VBucketPtr vb = store->getVBucket(vbid);
272 
273     // Add a collection, then remove it. This generated events into the CP which
274     // we'll manually replicate with calls to step
275     vb->updateFromManifest({R"({"separator":":","uid":"0",
276               "collections":[{"name":"$default", "uid":"0"},
277                              {"name":"meat","uid":"1"}]})"});
278 
279     notifyAndStepToCheckpoint();
280 
281     VBucketPtr replica = store->getVBucket(replicaVB);
282 
283     // 1. Replica does not know about meat
284     EXPECT_FALSE(replica->lockCollections().doesKeyContainValidCollection(
285             {"meat:bacon", DocNamespace::Collections}));
286 
287     // Now step the producer to transfer the collection creation
288     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
289 
290     // 1. Replica now knows the collection
291     EXPECT_TRUE(replica->lockCollections().doesKeyContainValidCollection(
292             {"meat:bacon", DocNamespace::Collections}));
293 
294     // remove meat
295     vb->updateFromManifest({R"({"separator":":","uid":"1",
296               "collections":[{"name":"$default", "uid":"0"}]})"});
297 
298     notifyAndStepToCheckpoint();
299 
300     // Now step the producer to transfer the collection deletion
301     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
302 
303     // 3. Replica now blocking access to meat
304     EXPECT_FALSE(replica->lockCollections().doesKeyContainValidCollection(
305             {"meat:bacon", DocNamespace::Collections}));
306 
307     // Now step the producer, no more collection events
308     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
309 }
310 
testDcpCreateDelete(int expectedCreates,int expectedDeletes,int expectedMutations,bool fromMemory)311 void CollectionsDcpTest::testDcpCreateDelete(int expectedCreates,
312                                              int expectedDeletes,
313                                              int expectedMutations,
314                                              bool fromMemory) {
315     notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpSnapshotMarker,
316                               fromMemory);
317 
318     int creates = 0, deletes = 0, mutations = 0;
319     // step until done
320     while (ENGINE_WANT_MORE == producer->step(producers.get())) {
321         if (dcp_last_op == PROTOCOL_BINARY_CMD_DCP_SYSTEM_EVENT) {
322             switch (dcp_last_system_event) {
323             case mcbp::systemevent::id::CreateCollection:
324                 creates++;
325                 break;
326             case mcbp::systemevent::id::DeleteCollection:
327                 deletes++;
328                 break;
329             case mcbp::systemevent::id::CollectionsSeparatorChanged: {
330                 EXPECT_FALSE(true);
331                 break;
332             }
333             }
334         } else if (dcp_last_op == PROTOCOL_BINARY_CMD_DCP_MUTATION) {
335             mutations++;
336         }
337     }
338 
339     EXPECT_EQ(expectedCreates, creates);
340     EXPECT_EQ(expectedDeletes, deletes);
341     EXPECT_EQ(expectedMutations, mutations);
342 
343     // Finally check that the active and replica have the same manifest, our
344     // BeginDeleteCollection should of contained enough information to form
345     // an equivalent manifest
346     EXPECT_EQ(getManifest(vbid), getManifest(vbid + 1));
347 }
348 
349 // Test that a create/delete don't dedup (collections creates new checkpoints)
TEST_F(CollectionsDcpTest,test_dcp_create_delete)350 TEST_F(CollectionsDcpTest, test_dcp_create_delete) {
351     const int items = 3;
352     {
353         VBucketPtr vb = store->getVBucket(vbid);
354         // Create dairy
355         vb->updateFromManifest({R"({"separator":":","uid":"0",
356               "collections":[{"name":"$default", "uid":"0"},
357                              {"name":"fruit","uid":"1"},
358                              {"name":"dairy","uid":"1"}]})"});
359 
360         // Mutate dairy
361         for (int ii = 0; ii < items; ii++) {
362             std::string key = "dairy:" + std::to_string(ii);
363             store_item(vbid, {key, DocNamespace::Collections}, "value");
364         }
365 
366         // Mutate fruit
367         for (int ii = 0; ii < items; ii++) {
368             std::string key = "fruit:" + std::to_string(ii);
369             store_item(vbid, {key, DocNamespace::Collections}, "value");
370         }
371 
372         // Delete dairy
373         vb->updateFromManifest({R"({"separator":":","uid":"1",
374               "collections":[{"name":"$default", "uid":"0"},
375                              {"name":"fruit","uid":"1"}]})"});
376 
377         // Persist everything ready for warmup and check.
378         // Flusher will merge create/delete and we only flush the delete
379         flush_vbucket_to_disk(0, (2 * items) + 2);
380 
381         // We will see create fruit/dairy and delete dairy (from another CP)
382         // In-memory stream will also see all 2*items mutations (ordered with
383         // create
384         // and delete)
385         testDcpCreateDelete(2, 1, (2 * items));
386     }
387 
388     resetEngineAndWarmup();
389 
390     createDcpObjects({}, true); // from disk
391 
392     // Streamed from disk, one create (create of fruit) and items of fruit
393     testDcpCreateDelete(1, 0, items, false);
394 
395     EXPECT_TRUE(store->getVBucket(vbid)->lockCollections().isCollectionOpen(
396             "fruit"));
397 }
398 
399 // Test that a create/delete don't dedup (collections creates new checkpoints)
TEST_F(CollectionsDcpTest,test_dcp_create_delete_create)400 TEST_F(CollectionsDcpTest, test_dcp_create_delete_create) {
401     {
402         VBucketPtr vb = store->getVBucket(vbid);
403         // Create dairy
404         vb->updateFromManifest({R"({"separator":":","uid":"0",
405               "collections":[{"name":"$default", "uid":"0"},
406                              {"name":"dairy","uid":"1"}]})"});
407 
408         // Mutate dairy
409         const int items = 3;
410         for (int ii = 0; ii < items; ii++) {
411             std::string key = "dairy:" + std::to_string(ii);
412             store_item(vbid, {key, DocNamespace::Collections}, "value");
413         }
414 
415         // Delete dairy
416         vb->updateFromManifest(
417                 {R"({"uid":"1","separator":":","collections":[{"name":"$default", "uid":"0"}]})"});
418 
419         // Create dairy (new uid)
420         vb->updateFromManifest({R"({"separator":":","uid":"2",
421               "collections":[{"name":"$default", "uid":"0"},
422                              {"name":"dairy","uid":"2"}]})"});
423 
424         // Persist everything ready for warmup and check.
425         // Flusher will merge create/delete and we only flush the delete
426         flush_vbucket_to_disk(0, items + 1);
427 
428         // Should see 2x create dairy and 1x delete dairy
429         testDcpCreateDelete(2, 1, items);
430     }
431 
432     resetEngineAndWarmup();
433 
434     createDcpObjects({}, true /* from disk*/);
435 
436     // Streamed from disk, we won't see the 2x create events or the intermediate
437     // delete. So check DCP sends only 1 collection create.
438     testDcpCreateDelete(1, 0, 0, false);
439 
440     EXPECT_TRUE(store->getVBucket(vbid)->lockCollections().isCollectionOpen(
441             "dairy"));
442 }
443 
444 // Test that a create/delete/create don't dedup
TEST_F(CollectionsDcpTest,test_dcp_create_delete_create2)445 TEST_F(CollectionsDcpTest, test_dcp_create_delete_create2) {
446     {
447         VBucketPtr vb = store->getVBucket(vbid);
448         // Create dairy
449         vb->updateFromManifest({R"({"separator":":","uid":"0",
450               "collections":[{"name":"$default", "uid":"0"},
451                              {"name":"dairy","uid":"1"}]})"});
452 
453         // Mutate dairy
454         const int items = 3;
455         for (int ii = 0; ii < items; ii++) {
456             std::string key = "dairy:" + std::to_string(ii);
457             store_item(vbid, {key, DocNamespace::Collections}, "value");
458         }
459 
460         // Delete dairy/create dairy in one update
461         vb->updateFromManifest({R"({"separator":":","uid":"1",
462               "collections":[{"name":"$default", "uid":"0"},
463                              {"name":"dairy","uid":"2"}]})"});
464 
465         // Persist everything ready for warmup and check.
466         // Flusher will merge create/delete and we only flush the delete
467         flush_vbucket_to_disk(0, items + 1);
468 
469         testDcpCreateDelete(2, 1, 3);
470     }
471 
472     resetEngineAndWarmup();
473 
474     createDcpObjects({}, true /* from disk*/);
475 
476     // Streamed from disk, we won't see the first create or delete
477     testDcpCreateDelete(1, 0, 0, false);
478 
479     EXPECT_TRUE(store->getVBucket(vbid)->lockCollections().isCollectionOpen(
480             "dairy"));
481 }
482 
TEST_F(CollectionsDcpTest,test_dcp_separator)483 TEST_F(CollectionsDcpTest, test_dcp_separator) {
484     VBucketPtr vb = store->getVBucket(vbid);
485 
486     // Change the separator
487     vb->updateFromManifest({R"({"separator":"@@","uid":"0",
488               "collections":[{"name":"$default", "uid":"0"}]})"});
489 
490     // Add a collection
491     vb->updateFromManifest({R"({"separator":"@@","uid":"1",
492               "collections":[{"name":"$default", "uid":"0"},
493                              {"name":"meat","uid":"1"}]})"});
494 
495     // The producer should start with the old separator
496     EXPECT_EQ(":", producer->getCurrentSeparatorForStream(vbid));
497 
498     notifyAndStepToCheckpoint();
499 
500     VBucketPtr replica = store->getVBucket(replicaVB);
501 
502     // The replica should have the old : separator
503     EXPECT_EQ(":", replica->lockCollections().getSeparator());
504 
505     // Now step the producer to transfer the separator
506     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
507 
508     // The producer should now have the new separator
509     EXPECT_EQ("@@", producer->getCurrentSeparatorForStream(vbid));
510 
511     // The replica should now have the new separator
512     EXPECT_EQ("@@", replica->lockCollections().getSeparator());
513 
514     // Now step the producer to transfer the collection
515     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
516 
517     // Collection should now be live on the replica
518     EXPECT_TRUE(replica->lockCollections().doesKeyContainValidCollection(
519             {"meat@@bacon", DocNamespace::Collections}));
520 
521     // And done
522     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
523 }
524 
TEST_F(CollectionsDcpTest,test_dcp_separator_many)525 TEST_F(CollectionsDcpTest, test_dcp_separator_many) {
526     auto vb = store->getVBucket(vbid);
527 
528     // Change the separator
529     vb->updateFromManifest({R"({"separator": "@@","uid":"0",
530               "collections":[{"name":"$default", "uid":"0"}]})"});
531     // Change the separator
532     vb->updateFromManifest({R"({"separator": "-","uid":"1",
533               "collections":[{"name":"$default", "uid":"0"}]})"});
534     // Change the separator
535     vb->updateFromManifest({R"({"separator": ",","uid":"2",
536               "collections":[{"name":"$default", "uid":"0"}]})"});
537     // Add a collection
538     vb->updateFromManifest({R"({"separator": ",","uid":"3",
539               "collections":[{"name":"$default", "uid":"0"},
540                              {"name":"meat", "uid":"1"}]})"});
541 
542     // All the changes will be collapsed into one update and we will expect
543     // to see , as the separator once DCP steps through the checkpoint
544 
545     // The producer should start with the initial separator
546     EXPECT_EQ(":", producer->getCurrentSeparatorForStream(vbid));
547 
548     notifyAndStepToCheckpoint();
549 
550     auto replica = store->getVBucket(replicaVB);
551 
552     // The replica should have the old separator
553     EXPECT_EQ(":", replica->lockCollections().getSeparator());
554 
555     std::array<std::string, 3> expectedData = {{"@@", "-", ","}};
556     for (auto expected : expectedData) {
557         // Now step the producer to transfer the separator
558         EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
559 
560         // The producer should now have the new separator
561         EXPECT_EQ(expected, producer->getCurrentSeparatorForStream(vbid));
562 
563         // The replica should now have the new separator
564         EXPECT_EQ(expected, replica->lockCollections().getSeparator());
565     }
566 
567     // Now step the producer to transfer the create "meat"
568     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
569 
570     // Collection should now be live on the replica with the final separator
571     EXPECT_TRUE(replica->lockCollections().doesKeyContainValidCollection(
572             {"meat,bacon", DocNamespace::Collections}));
573 
574     // And done
575     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
576 }
577 
578 // Test that a create/delete don't dedup (collections creates new checkpoints)
TEST_F(CollectionsDcpTest,MB_26455)579 TEST_F(CollectionsDcpTest, MB_26455) {
580     const int items = 3;
581     {
582         auto vb = store->getVBucket(vbid);
583 
584         std::vector<std::string> separators = {
585                 "::", "*", "**", "***", "-=-=-=-=-="};
586 
587         for (size_t n = 0; n < separators.size(); n++) {
588             // change sep
589             std::string manifest =
590                     R"({"uid":"0","separator":")" + separators.at(n) +
591                     R"(","collections":[{"name":"$default", "uid":"0"}]})";
592             vb->updateFromManifest({manifest});
593 
594             // add fruit
595             manifest = R"({"uid":"1","separator":")" + separators.at(n) +
596                        R"(","collections":[{"name":"$default", "uid":"0"},)" +
597                        R"({"name":"fruit", "uid":")" + std::to_string(n) +
598                        R"("}]})";
599 
600             vb->updateFromManifest({manifest});
601 
602             // Mutate fruit
603             for (int ii = 0; ii < items; ii++) {
604                 std::string key =
605                         "fruit" + separators.at(n) + std::to_string(ii);
606                 store_item(vbid, {key, DocNamespace::Collections}, "value");
607             }
608 
609             // expect change_separator + create_collection + items
610             flush_vbucket_to_disk(vbid, 2 + items);
611 
612             if (n < (separators.size() - 1)) {
613                 // Drop fruit, except for the last 'generation'
614                 manifest =
615                         R"({"uid":"2","separator":")" + separators.at(n) +
616                         R"(","collections":[{"name":"$default", "uid":"0"}]})";
617                 vb->updateFromManifest({manifest});
618 
619                 flush_vbucket_to_disk(vbid, 1);
620             }
621         }
622     }
623 
624     resetEngineAndWarmup();
625 
626     // Stream again!
627     createDcpObjects({}, true);
628 
629     // Streamed from disk, one create (create of fruit) and items of fruit
630     testDcpCreateDelete(1, 0, items, false /*fromMemory*/);
631 
632     EXPECT_TRUE(store->getVBucket(vbid)->lockCollections().isCollectionOpen(
633             "fruit"));
634 }
635 
636 class CollectionsFilteredDcpErrorTest : public SingleThreadedKVBucketTest {
637 public:
CollectionsFilteredDcpErrorTest()638     CollectionsFilteredDcpErrorTest() : cookieP(create_mock_cookie()) {
639     }
SetUp()640     void SetUp() override {
641         config_string += "collections_prototype_enabled=true";
642         SingleThreadedKVBucketTest::SetUp();
643         // Start vbucket as active to allow us to store items directly to it.
644         store->setVBucketState(vbid, vbucket_state_active, false);
645     }
646 
TearDown()647     void TearDown() override {
648         destroy_mock_cookie(cookieP);
649         producer.reset();
650         SingleThreadedKVBucketTest::TearDown();
651     }
652 
653 protected:
654     std::shared_ptr<MockDcpProducer> producer;
655     const void* cookieP;
656 };
657 
TEST_F(CollectionsFilteredDcpErrorTest,error1)658 TEST_F(CollectionsFilteredDcpErrorTest, error1) {
659     // Set some collections
660     store->setCollections({R"({"separator": "@@","uid":"0",
661               "collections":[{"name":"$default", "uid":"0"},
662                              {"name":"meat", "uid":"1"},
663                              {"name":"dairy", "uid":"2"}]})"});
664 
665     std::string filter = R"({"collections":["fruit"]})";
666     cb::const_byte_buffer buffer{
667             reinterpret_cast<const uint8_t*>(filter.data()), filter.size()};
668     // Can't create a filter for unknown collections
669     try {
670         MockDcpProducer mock(*engine,
671                              cookieP,
672                              "test_producer",
673                              DCP_OPEN_COLLECTIONS,
674                              buffer,
675                              false /*startTask*/);
676         FAIL() << "Expected an exception";
677     } catch (const cb::engine_error& e) {
678         EXPECT_EQ(cb::engine_errc::unknown_collection, e.code());
679     }
680 }
681 
682 class CollectionsFilteredDcpTest : public CollectionsDcpTest {
683 public:
CollectionsFilteredDcpTest()684     CollectionsFilteredDcpTest() : CollectionsDcpTest() {
685     }
686 
SetUp()687     void SetUp() override {
688         config_string += "collections_prototype_enabled=true";
689         SingleThreadedKVBucketTest::SetUp();
690         producers = get_dcp_producers(
691                 reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
692                 reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
693         // Start vbucket as active to allow us to store items directly to it.
694         store->setVBucketState(vbid, vbucket_state_active, false);
695     }
696 };
697 
TEST_F(CollectionsFilteredDcpTest,filtering)698 TEST_F(CollectionsFilteredDcpTest, filtering) {
699     VBucketPtr vb = store->getVBucket(vbid);
700 
701     // Perform a create of meat/dairy via the bucket level (filters are
702     // worked out from the bucket manifest)
703     store->setCollections({R"({"separator": ":","uid":"0",
704               "collections":[{"name":"$default", "uid":"0"},
705                              {"name":"meat", "uid":"1"},
706                              {"name":"dairy", "uid":"2"}]})"});
707     // Setup filtered DCP
708     createDcpObjects(R"({"collections":["dairy"]})", true);
709 
710     notifyAndStepToCheckpoint();
711 
712     // SystemEvent createCollection
713     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
714     EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SYSTEM_EVENT, dcp_last_op);
715     EXPECT_EQ("dairy", dcp_last_key);
716 
717     // Store collection documents
718     std::array<std::string, 2> expectedKeys = {{"dairy:one", "dairy:two"}};
719     store_item(vbid, {"meat:one", DocNamespace::Collections}, "value");
720     store_item(vbid, {expectedKeys[0], DocNamespace::Collections}, "value");
721     store_item(vbid, {"meat:two", DocNamespace::Collections}, "value");
722     store_item(vbid, {expectedKeys[1], DocNamespace::Collections}, "value");
723     store_item(vbid, {"meat:three", DocNamespace::Collections}, "value");
724 
725     auto vb0Stream = producer->findStream(0);
726     ASSERT_NE(nullptr, vb0Stream.get());
727 
728     notifyAndStepToCheckpoint();
729 
730     // Now step DCP to transfer keys, only two keys are expected as all "meat"
731     // keys are filtered
732     for (auto& key : expectedKeys) {
733         EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
734         EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
735         EXPECT_EQ(key, dcp_last_key);
736     }
737     // And no more
738     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
739 
740     flush_vbucket_to_disk(vbid, 7);
741 
742     vb.reset();
743 
744     // Now stream back from disk and check filtering
745     resetEngineAndWarmup();
746 
747     // In order to create a filter, a manifest needs to be set
748     store->setCollections({R"({"separator": ":","uid":"0",
749               "collections":[{"name":"$default", "uid":"0"},
750                              {"name":"meat", "uid":"1"},
751                              {"name":"dairy", "uid":"2"}]})"});
752 
753     createDcpObjects(R"({"collections":["dairy"]})", true);
754 
755     // Streamed from disk
756     // 1 create - create of dairy
757     // 2 mutations in the dairy collection
758     testDcpCreateDelete(1, 0, 2, false);
759 }
760 
761 // Check that when filtering is on, we don't send snapshots for fully filtered
762 // snapshots
TEST_F(CollectionsFilteredDcpTest,MB_24572)763 TEST_F(CollectionsFilteredDcpTest, MB_24572) {
764     VBucketPtr vb = store->getVBucket(vbid);
765 
766     // Perform a create of meat/dairy via the bucket level (filters are
767     // worked out from the bucket manifest)
768     store->setCollections({R"({"separator": ":","uid":"0",
769               "collections":[{"name":"$default", "uid":"0"},
770                              {"name":"meat", "uid":"1"},
771                              {"name":"dairy", "uid":"2"}]})"});
772     // Setup filtered DCP
773     createDcpObjects(R"({"collections":["dairy"]})", true);
774 
775     // Store collection documents
776     store_item(vbid, {"meat::one", DocNamespace::Collections}, "value");
777     store_item(vbid, {"meat::two", DocNamespace::Collections}, "value");
778     store_item(vbid, {"meat::three", DocNamespace::Collections}, "value");
779 
780     notifyAndStepToCheckpoint();
781 
782     // SystemEvent createCollection for dairy is expected
783     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
784     EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SYSTEM_EVENT, dcp_last_op);
785     EXPECT_EQ("dairy", dcp_last_key);
786 
787     // And no more for this stream - no meat
788     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
789 
790     // and new mutations?
791     store_item(vbid, {"meat::one1", DocNamespace::Collections}, "value");
792     store_item(vbid, {"meat::two2", DocNamespace::Collections}, "value");
793     store_item(vbid, {"meat::three3", DocNamespace::Collections}, "value");
794     notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::Invalid);
795 }
796 
TEST_F(CollectionsFilteredDcpTest,default_only)797 TEST_F(CollectionsFilteredDcpTest, default_only) {
798     VBucketPtr vb = store->getVBucket(vbid);
799 
800     // Perform a create of meat/dairy via the bucket level (filters are
801     // worked out from the bucket manifest)
802     store->setCollections({R"({"separator": ":","uid":"0",
803               "collections":[{"name":"$default", "uid":"0"},
804                              {"name":"meat", "uid":"1"},
805                              {"name":"dairy", "uid":"2"}]})"});
806     // Setup DCP
807     createDcpObjects({/*no filter*/}, false /*don't know about collections*/);
808 
809     // Store collection documents and one default collection document
810     store_item(vbid, {"meat:one", DocNamespace::Collections}, "value");
811     store_item(vbid, {"dairy:one", DocNamespace::Collections}, "value");
812     store_item(vbid, {"anykey", DocNamespace::DefaultCollection}, "value");
813     store_item(vbid, {"dairy:two", DocNamespace::Collections}, "value");
814     store_item(vbid, {"meat:three", DocNamespace::Collections}, "value");
815 
816     auto vb0Stream = producer->findStream(0);
817     ASSERT_NE(nullptr, vb0Stream.get());
818 
819     // Now step into the items of which we expect to see only anykey
820     notifyAndStepToCheckpoint();
821 
822     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
823     EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
824     EXPECT_EQ("anykey", dcp_last_key);
825 
826     // And no more
827     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
828 }
829 
TEST_F(CollectionsFilteredDcpTest,stream_closes)830 TEST_F(CollectionsFilteredDcpTest, stream_closes) {
831     VBucketPtr vb = store->getVBucket(vbid);
832 
833     // Perform a create of meat via the bucket level (filters are worked out
834     // from the bucket manifest)
835     store->setCollections({R"({"separator": ":","uid":"0",
836               "collections":[{"name":"$default", "uid":"0"},
837                              {"name":"meat", "uid":"1"}]})"});
838     // Setup filtered DCP
839     createDcpObjects(R"({"collections":["meat"]})", true);
840 
841     auto vb0Stream = producer->findStream(0);
842     ASSERT_NE(nullptr, vb0Stream.get());
843 
844     notifyAndStepToCheckpoint();
845 
846     // Now step DCP to transfer system events. We expect that the stream will
847     // close once we transfer DeleteCollection
848 
849     // Now step the producer to transfer the collection creation
850     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
851 
852     // Not dead yet...
853     EXPECT_TRUE(vb0Stream->isActive());
854 
855     // Perform a delete of meat via the bucket level (filters are worked out
856     // from the bucket manifest)
857     store->setCollections({R"({"separator": ":","uid":"1",
858               "collections":[{"name":"$default", "uid":"0"}]})"});
859 
860     notifyAndStepToCheckpoint();
861 
862     // Now step the producer to transfer the collection deletion
863     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
864 
865     // Done... collection deletion of meat has closed the stream
866     EXPECT_FALSE(vb0Stream->isActive());
867 
868     // Now step the producer to transfer the close stream
869     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
870 
871     // And no more
872     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
873 }
874 
875 /**
876  * Test that creation of a stream that resulted in an empty filter, because
877  * all the collection have been deleted, just goes to stream end.
878  */
TEST_F(CollectionsFilteredDcpTest,empty_filter_stream_closes)879 TEST_F(CollectionsFilteredDcpTest, empty_filter_stream_closes) {
880     VBucketPtr vb = store->getVBucket(vbid);
881 
882     // Perform a create of meat via the bucket level (filters are worked out
883     // from the bucket manifest)
884     store->setCollections({R"({"separator": ":","uid":"0",
885               "collections":[{"name":"$default", "uid":"0"},
886                              {"name":"meat", "uid":"1"}]})"});
887 
888     producer = createDcpProducer(cookieP,
889                                  R"({"collections":["meat"]})",
890                                  true,
891                                  IncludeDeleteTime::No);
892     createDcpConsumer();
893 
894     // Perform a delete of meat
895     store->setCollections({R"({"separator": ":","uid":"1",
896                             "collections":[{"name":"$default", "uid":"0"}]})"});
897 
898     createDcpStream();
899 
900     auto vb0Stream = producer->findStream(0);
901     ASSERT_NE(nullptr, vb0Stream.get());
902 
903     EXPECT_TRUE(vb0Stream->isActive());
904 
905     // Expect a stream end marker and no data
906     notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpStreamEnd);
907     EXPECT_EQ(END_STREAM_FILTER_EMPTY, dcp_last_flags);
908 
909     // Done... collection deletion of meat has closed the stream
910     EXPECT_FALSE(vb0Stream->isActive());
911 
912     // And no more
913     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
914 }
915 
916 /**
917  * Test that creation of a stream that resulted in an empty filter, because
918  * all the collection have been deleted, just goes to stream end.
919  * This test uses a filter with name:uid
920  */
TEST_F(CollectionsFilteredDcpTest,empty_filter_stream_closes2)921 TEST_F(CollectionsFilteredDcpTest, empty_filter_stream_closes2) {
922     VBucketPtr vb = store->getVBucket(vbid);
923 
924     // Perform a create of meat via the bucket level (filters are worked out
925     // from the bucket manifest)
926     store->setCollections({R"({"separator": ":","uid":"0",
927               "collections":[{"name":"$default", "uid":"0"},
928                              {"name":"meat", "uid":"1"}]})"});
929 
930     // specific collection uid
931     producer =
932             createDcpProducer(cookieP,
933                               R"({"collections":[{"name":"meat", "uid":"1"}]})",
934                               true,
935                               IncludeDeleteTime::No);
936     createDcpConsumer();
937 
938     // Perform a delete of meat
939     store->setCollections({R"({"separator": ":","uid":"0",
940                             "collections":[{"name":"$default", "uid":"0"}]})"});
941 
942     createDcpStream();
943 
944     auto vb0Stream = producer->findStream(0);
945     ASSERT_NE(nullptr, vb0Stream.get());
946 
947     EXPECT_TRUE(vb0Stream->isActive());
948 
949     // Expect a stream end marker and no data
950     notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpStreamEnd);
951     EXPECT_EQ(END_STREAM_FILTER_EMPTY, dcp_last_flags);
952 
953     // Done... collection deletion of meat:1 has closed the stream
954     EXPECT_FALSE(vb0Stream->isActive());
955 
956     // And no more
957     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
958 }
959 
TEST_F(CollectionsFilteredDcpTest,legacy_stream_closes)960 TEST_F(CollectionsFilteredDcpTest, legacy_stream_closes) {
961     VBucketPtr vb = store->getVBucket(vbid);
962 
963     // Perform a create of meat via the bucket level (filters are worked out
964     // from the bucket manifest)
965     store->setCollections({R"({"separator": ":","uid":"0",
966               "collections":[{"name":"$default", "uid":"0"},
967                              {"name":"meat", "uid":"1"}]})"});
968     // Make cookie look like a non-collection client
969     mock_set_collections_support(cookieP, false);
970     mock_set_collections_support(cookieC, false);
971     // Setup legacy DCP, it only receives default collection mutation/deletion
972     // and should self-close if the default collection were to be deleted
973     createDcpObjects({}, false);
974 
975     auto vb0Stream = producer->findStream(0);
976     ASSERT_NE(nullptr, vb0Stream.get());
977 
978     // No keys have been written and no event can be sent, so expect nothing
979     // after kicking the stream into life
980     notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::Invalid);
981 
982     EXPECT_TRUE(vb0Stream->isActive());
983 
984     // Perform a delete of $default
985     store->setCollections({R"({"separator": ":","uid":"1",
986               "collections":[{"name":"meat", "uid":"1"}]})"});
987 
988     // Expect a stream end marker
989     notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpStreamEnd);
990     EXPECT_EQ(END_STREAM_OK, dcp_last_flags);
991 
992     // Done... collection deletion of default has closed the stream
993     EXPECT_FALSE(vb0Stream->isActive());
994 
995     // And no more
996     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
997 }
998