1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2017 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 /**
19 * Tests for Collection functionality in EPStore.
20 */
21 #include "bgfetcher.h"
22 #include "dcp/dcpconnmap.h"
23 #include "kvstore.h"
24 #include "programs/engine_testapp/mock_server.h"
25 #include "tests/mock/mock_dcp.h"
26 #include "tests/mock/mock_dcp_consumer.h"
27 #include "tests/mock/mock_dcp_producer.h"
28 #include "tests/mock/mock_global_task.h"
29 #include "tests/module_tests/evp_store_single_threaded_test.h"
30 #include "tests/module_tests/evp_store_test.h"
31 #include "tests/module_tests/test_helpers.h"
32
33 #include <functional>
34 #include <thread>
35
36 extern uint8_t dcp_last_op;
37 extern std::string dcp_last_key;
38 extern uint32_t dcp_last_flags;
39
40 class CollectionsDcpTest : public SingleThreadedKVBucketTest {
41 public:
CollectionsDcpTest()42 CollectionsDcpTest()
43 : cookieC(create_mock_cookie()), cookieP(create_mock_cookie()) {
44 mock_set_collections_support(cookieP, true);
45 mock_set_collections_support(cookieC, true);
46 }
47
48 // Setup a producer/consumer ready for the test
SetUp()49 void SetUp() override {
50 config_string += "collections_prototype_enabled=true";
51 SingleThreadedKVBucketTest::SetUp();
52 // Start vbucket as active to allow us to store items directly to it.
53 store->setVBucketState(vbid, vbucket_state_active, false);
54 producers = get_dcp_producers(
55 reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
56 reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
57 createDcpObjects({/*no filter*/}, true /*collections on*/);
58 }
getManifest(uint16_t vb) const59 std::string getManifest(uint16_t vb) const {
60 return store->getVBucket(vb)
61 ->getShard()
62 ->getRWUnderlying()
63 ->getCollectionsManifest(vbid);
64 }
65
createDcpStream()66 void createDcpStream() {
67 uint64_t rollbackSeqno;
68 ASSERT_EQ(ENGINE_SUCCESS,
69 producer->streamRequest(
70 0, // flags
71 1, // opaque
72 vbid,
73 0, // start_seqno
74 ~0ull, // end_seqno
75 0, // vbucket_uuid,
76 0, // snap_start_seqno,
77 0, // snap_end_seqno,
78 &rollbackSeqno,
79 &CollectionsDcpTest::dcpAddFailoverLog));
80 }
81
createDcpConsumer()82 void createDcpConsumer() {
83 CollectionsDcpTest::consumer = std::make_shared<MockDcpConsumer>(
84 *engine, cookieC, "test_consumer");
85 store->setVBucketState(replicaVB, vbucket_state_replica, false);
86 ASSERT_EQ(ENGINE_SUCCESS,
87 consumer->addStream(/*opaque*/ 0,
88 replicaVB,
89 /*flags*/ 0));
90 // Setup a snapshot on the consumer
91 ASSERT_EQ(ENGINE_SUCCESS,
92 consumer->snapshotMarker(/*opaque*/ 1,
93 /*vbucket*/ replicaVB,
94 /*start_seqno*/ 0,
95 /*end_seqno*/ 100,
96 /*flags*/ 0));
97 }
98
createDcpObjects(const std::string & filter,bool dcpCollectionAware)99 void createDcpObjects(const std::string& filter, bool dcpCollectionAware) {
100 createDcpConsumer();
101 producer = SingleThreadedKVBucketTest::createDcpProducer(
102 cookieP, filter, dcpCollectionAware, IncludeDeleteTime::No);
103 // Patch our local callback into the handlers
104 producers->system_event = &CollectionsDcpTest::sendSystemEvent;
105 createDcpStream();
106 }
107
TearDown()108 void TearDown() override {
109 teardown();
110 SingleThreadedKVBucketTest::TearDown();
111 }
112
teardown()113 void teardown() {
114 destroy_mock_cookie(cookieC);
115 destroy_mock_cookie(cookieP);
116 consumer->closeAllStreams();
117 consumer->cancelTask();
118 producer->closeAllStreams();
119 producer->cancelCheckpointCreatorTask();
120 producer.reset();
121 consumer.reset();
122 }
123
runCheckpointProcessor()124 void runCheckpointProcessor() {
125 SingleThreadedKVBucketTest::runCheckpointProcessor(*producer,
126 *producers);
127 }
128
notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode expectedOp=cb::mcbp::ClientOpcode::DcpSnapshotMarker,bool fromMemory=true)129 void notifyAndStepToCheckpoint(
130 cb::mcbp::ClientOpcode expectedOp =
131 cb::mcbp::ClientOpcode::DcpSnapshotMarker,
132 bool fromMemory = true) {
133 // Call parent class function with our producer
134 SingleThreadedKVBucketTest::notifyAndStepToCheckpoint(
135 *producer, *producers, expectedOp, fromMemory);
136 }
137
138 void testDcpCreateDelete(int expectedCreates,
139 int expectedDeletes,
140 int expectedMutations,
141 bool fromMemory = true);
142
resetEngineAndWarmup(std::string new_config="")143 void resetEngineAndWarmup(std::string new_config = "") {
144 teardown();
145 SingleThreadedKVBucketTest::resetEngineAndWarmup(new_config);
146 producers = get_dcp_producers(
147 reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
148 reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
149 cookieC = create_mock_cookie();
150 cookieP = create_mock_cookie();
151 }
152
153 static const uint16_t replicaVB{1};
154 static std::shared_ptr<MockDcpConsumer> consumer;
155 static mcbp::systemevent::id dcp_last_system_event;
156
157 /*
158 * DCP callback method to push SystemEvents on to the consumer
159 */
sendSystemEvent(gsl::not_null<const void * > cookie,uint32_t opaque,uint16_t vbucket,mcbp::systemevent::id event,uint64_t bySeqno,cb::const_byte_buffer key,cb::const_byte_buffer eventData)160 static ENGINE_ERROR_CODE sendSystemEvent(gsl::not_null<const void*> cookie,
161 uint32_t opaque,
162 uint16_t vbucket,
163 mcbp::systemevent::id event,
164 uint64_t bySeqno,
165 cb::const_byte_buffer key,
166 cb::const_byte_buffer eventData) {
167 (void)cookie;
168 (void)vbucket; // ignored as we are connecting VBn to VBn+1
169 dcp_last_op = PROTOCOL_BINARY_CMD_DCP_SYSTEM_EVENT;
170 dcp_last_key.assign(reinterpret_cast<const char*>(key.data()),
171 key.size());
172 dcp_last_system_event = event;
173 return consumer->systemEvent(
174 opaque, replicaVB, event, bySeqno, key, eventData);
175 }
176
dcpAddFailoverLog(vbucket_failover_t * entry,size_t nentries,gsl::not_null<const void * > cookie)177 static ENGINE_ERROR_CODE dcpAddFailoverLog(
178 vbucket_failover_t* entry,
179 size_t nentries,
180 gsl::not_null<const void*> cookie) {
181 return ENGINE_SUCCESS;
182 }
183
184 const void* cookieC;
185 const void* cookieP;
186 std::unique_ptr<dcp_message_producers> producers;
187 std::shared_ptr<MockDcpProducer> producer;
188 };
189
190 std::shared_ptr<MockDcpConsumer> CollectionsDcpTest::consumer;
191 mcbp::systemevent::id CollectionsDcpTest::dcp_last_system_event;
192
TEST_F(CollectionsDcpTest,test_dcp_consumer)193 TEST_F(CollectionsDcpTest, test_dcp_consumer) {
194 const void* cookie = create_mock_cookie();
195
196 auto consumer =
197 std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
198
199 store->setVBucketState(vbid, vbucket_state_replica, false);
200 ASSERT_EQ(ENGINE_SUCCESS,
201 consumer->addStream(/*opaque*/ 0, vbid, /*flags*/ 0));
202
203 // Create meat with uid 4 as if it came from manifest uid cafef00d
204 std::string collection = "meat";
205 Collections::uid_t uid = 4;
206 Collections::uid_t manifestUid = 0xcafef00d;
207 Collections::SystemEventDCPData eventData{htonll(manifestUid), htonll(uid)};
208
209 ASSERT_EQ(ENGINE_SUCCESS,
210 consumer->snapshotMarker(/*opaque*/ 1,
211 vbid,
212 /*start_seqno*/ 0,
213 /*end_seqno*/ 100,
214 /*flags*/ 0));
215
216 VBucketPtr vb = store->getVBucket(vbid);
217
218 EXPECT_FALSE(vb->lockCollections().doesKeyContainValidCollection(
219 {"meat:bacon", DocNamespace::Collections}));
220
221 // Call the consumer function for handling DCP events
222 // create the meat collection
223 EXPECT_EQ(ENGINE_SUCCESS,
224 consumer->systemEvent(
225 /*opaque*/ 1,
226 vbid,
227 mcbp::systemevent::id::CreateCollection,
228 /*seqno*/ 1,
229 {reinterpret_cast<const uint8_t*>(collection.data()),
230 collection.size()},
231 {reinterpret_cast<const uint8_t*>(&eventData),
232 sizeof(eventData)}));
233
234 // We can now access the collection
235 EXPECT_TRUE(vb->lockCollections().doesKeyContainValidCollection(
236 {"meat:bacon", DocNamespace::Collections}));
237 EXPECT_TRUE(vb->lockCollections().isCollectionOpen("meat"));
238 EXPECT_TRUE(vb->lockCollections().isCollectionOpen(
239 Collections::Identifier{"meat", 4}));
240 EXPECT_EQ(0xcafef00d, vb->lockCollections().getManifestUid());
241
242 // Call the consumer function for handling DCP events
243 // delete the meat collection
244 EXPECT_EQ(ENGINE_SUCCESS,
245 consumer->systemEvent(
246 /*opaque*/ 1,
247 vbid,
248 mcbp::systemevent::id::DeleteCollection,
249 /*seqno*/ 2,
250 {reinterpret_cast<const uint8_t*>(collection.data()),
251 collection.size()},
252 {reinterpret_cast<const uint8_t*>(&eventData),
253 sizeof(eventData)}));
254
255 // It's gone!
256 EXPECT_FALSE(vb->lockCollections().doesKeyContainValidCollection(
257 {"meat:bacon", DocNamespace::Collections}));
258
259 consumer->closeAllStreams();
260 destroy_mock_cookie(cookie);
261 consumer->cancelTask();
262 }
263
264 /*
265 * test_dcp connects a producer and consumer to test that collections created
266 * on the producer are transferred to the consumer
267 *
268 * The test replicates VBn to VBn+1
269 */
TEST_F(CollectionsDcpTest,test_dcp)270 TEST_F(CollectionsDcpTest, test_dcp) {
271 VBucketPtr vb = store->getVBucket(vbid);
272
273 // Add a collection, then remove it. This generated events into the CP which
274 // we'll manually replicate with calls to step
275 vb->updateFromManifest({R"({"separator":":","uid":"0",
276 "collections":[{"name":"$default", "uid":"0"},
277 {"name":"meat","uid":"1"}]})"});
278
279 notifyAndStepToCheckpoint();
280
281 VBucketPtr replica = store->getVBucket(replicaVB);
282
283 // 1. Replica does not know about meat
284 EXPECT_FALSE(replica->lockCollections().doesKeyContainValidCollection(
285 {"meat:bacon", DocNamespace::Collections}));
286
287 // Now step the producer to transfer the collection creation
288 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
289
290 // 1. Replica now knows the collection
291 EXPECT_TRUE(replica->lockCollections().doesKeyContainValidCollection(
292 {"meat:bacon", DocNamespace::Collections}));
293
294 // remove meat
295 vb->updateFromManifest({R"({"separator":":","uid":"1",
296 "collections":[{"name":"$default", "uid":"0"}]})"});
297
298 notifyAndStepToCheckpoint();
299
300 // Now step the producer to transfer the collection deletion
301 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
302
303 // 3. Replica now blocking access to meat
304 EXPECT_FALSE(replica->lockCollections().doesKeyContainValidCollection(
305 {"meat:bacon", DocNamespace::Collections}));
306
307 // Now step the producer, no more collection events
308 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
309 }
310
testDcpCreateDelete(int expectedCreates,int expectedDeletes,int expectedMutations,bool fromMemory)311 void CollectionsDcpTest::testDcpCreateDelete(int expectedCreates,
312 int expectedDeletes,
313 int expectedMutations,
314 bool fromMemory) {
315 notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpSnapshotMarker,
316 fromMemory);
317
318 int creates = 0, deletes = 0, mutations = 0;
319 // step until done
320 while (ENGINE_WANT_MORE == producer->step(producers.get())) {
321 if (dcp_last_op == PROTOCOL_BINARY_CMD_DCP_SYSTEM_EVENT) {
322 switch (dcp_last_system_event) {
323 case mcbp::systemevent::id::CreateCollection:
324 creates++;
325 break;
326 case mcbp::systemevent::id::DeleteCollection:
327 deletes++;
328 break;
329 case mcbp::systemevent::id::CollectionsSeparatorChanged: {
330 EXPECT_FALSE(true);
331 break;
332 }
333 }
334 } else if (dcp_last_op == PROTOCOL_BINARY_CMD_DCP_MUTATION) {
335 mutations++;
336 }
337 }
338
339 EXPECT_EQ(expectedCreates, creates);
340 EXPECT_EQ(expectedDeletes, deletes);
341 EXPECT_EQ(expectedMutations, mutations);
342
343 // Finally check that the active and replica have the same manifest, our
344 // BeginDeleteCollection should of contained enough information to form
345 // an equivalent manifest
346 EXPECT_EQ(getManifest(vbid), getManifest(vbid + 1));
347 }
348
349 // Test that a create/delete don't dedup (collections creates new checkpoints)
TEST_F(CollectionsDcpTest,test_dcp_create_delete)350 TEST_F(CollectionsDcpTest, test_dcp_create_delete) {
351 const int items = 3;
352 {
353 VBucketPtr vb = store->getVBucket(vbid);
354 // Create dairy
355 vb->updateFromManifest({R"({"separator":":","uid":"0",
356 "collections":[{"name":"$default", "uid":"0"},
357 {"name":"fruit","uid":"1"},
358 {"name":"dairy","uid":"1"}]})"});
359
360 // Mutate dairy
361 for (int ii = 0; ii < items; ii++) {
362 std::string key = "dairy:" + std::to_string(ii);
363 store_item(vbid, {key, DocNamespace::Collections}, "value");
364 }
365
366 // Mutate fruit
367 for (int ii = 0; ii < items; ii++) {
368 std::string key = "fruit:" + std::to_string(ii);
369 store_item(vbid, {key, DocNamespace::Collections}, "value");
370 }
371
372 // Delete dairy
373 vb->updateFromManifest({R"({"separator":":","uid":"1",
374 "collections":[{"name":"$default", "uid":"0"},
375 {"name":"fruit","uid":"1"}]})"});
376
377 // Persist everything ready for warmup and check.
378 // Flusher will merge create/delete and we only flush the delete
379 flush_vbucket_to_disk(0, (2 * items) + 2);
380
381 // We will see create fruit/dairy and delete dairy (from another CP)
382 // In-memory stream will also see all 2*items mutations (ordered with
383 // create
384 // and delete)
385 testDcpCreateDelete(2, 1, (2 * items));
386 }
387
388 resetEngineAndWarmup();
389
390 createDcpObjects({}, true); // from disk
391
392 // Streamed from disk, one create (create of fruit) and items of fruit
393 testDcpCreateDelete(1, 0, items, false);
394
395 EXPECT_TRUE(store->getVBucket(vbid)->lockCollections().isCollectionOpen(
396 "fruit"));
397 }
398
399 // Test that a create/delete don't dedup (collections creates new checkpoints)
TEST_F(CollectionsDcpTest,test_dcp_create_delete_create)400 TEST_F(CollectionsDcpTest, test_dcp_create_delete_create) {
401 {
402 VBucketPtr vb = store->getVBucket(vbid);
403 // Create dairy
404 vb->updateFromManifest({R"({"separator":":","uid":"0",
405 "collections":[{"name":"$default", "uid":"0"},
406 {"name":"dairy","uid":"1"}]})"});
407
408 // Mutate dairy
409 const int items = 3;
410 for (int ii = 0; ii < items; ii++) {
411 std::string key = "dairy:" + std::to_string(ii);
412 store_item(vbid, {key, DocNamespace::Collections}, "value");
413 }
414
415 // Delete dairy
416 vb->updateFromManifest(
417 {R"({"uid":"1","separator":":","collections":[{"name":"$default", "uid":"0"}]})"});
418
419 // Create dairy (new uid)
420 vb->updateFromManifest({R"({"separator":":","uid":"2",
421 "collections":[{"name":"$default", "uid":"0"},
422 {"name":"dairy","uid":"2"}]})"});
423
424 // Persist everything ready for warmup and check.
425 // Flusher will merge create/delete and we only flush the delete
426 flush_vbucket_to_disk(0, items + 1);
427
428 // Should see 2x create dairy and 1x delete dairy
429 testDcpCreateDelete(2, 1, items);
430 }
431
432 resetEngineAndWarmup();
433
434 createDcpObjects({}, true /* from disk*/);
435
436 // Streamed from disk, we won't see the 2x create events or the intermediate
437 // delete. So check DCP sends only 1 collection create.
438 testDcpCreateDelete(1, 0, 0, false);
439
440 EXPECT_TRUE(store->getVBucket(vbid)->lockCollections().isCollectionOpen(
441 "dairy"));
442 }
443
444 // Test that a create/delete/create don't dedup
TEST_F(CollectionsDcpTest,test_dcp_create_delete_create2)445 TEST_F(CollectionsDcpTest, test_dcp_create_delete_create2) {
446 {
447 VBucketPtr vb = store->getVBucket(vbid);
448 // Create dairy
449 vb->updateFromManifest({R"({"separator":":","uid":"0",
450 "collections":[{"name":"$default", "uid":"0"},
451 {"name":"dairy","uid":"1"}]})"});
452
453 // Mutate dairy
454 const int items = 3;
455 for (int ii = 0; ii < items; ii++) {
456 std::string key = "dairy:" + std::to_string(ii);
457 store_item(vbid, {key, DocNamespace::Collections}, "value");
458 }
459
460 // Delete dairy/create dairy in one update
461 vb->updateFromManifest({R"({"separator":":","uid":"1",
462 "collections":[{"name":"$default", "uid":"0"},
463 {"name":"dairy","uid":"2"}]})"});
464
465 // Persist everything ready for warmup and check.
466 // Flusher will merge create/delete and we only flush the delete
467 flush_vbucket_to_disk(0, items + 1);
468
469 testDcpCreateDelete(2, 1, 3);
470 }
471
472 resetEngineAndWarmup();
473
474 createDcpObjects({}, true /* from disk*/);
475
476 // Streamed from disk, we won't see the first create or delete
477 testDcpCreateDelete(1, 0, 0, false);
478
479 EXPECT_TRUE(store->getVBucket(vbid)->lockCollections().isCollectionOpen(
480 "dairy"));
481 }
482
TEST_F(CollectionsDcpTest,test_dcp_separator)483 TEST_F(CollectionsDcpTest, test_dcp_separator) {
484 VBucketPtr vb = store->getVBucket(vbid);
485
486 // Change the separator
487 vb->updateFromManifest({R"({"separator":"@@","uid":"0",
488 "collections":[{"name":"$default", "uid":"0"}]})"});
489
490 // Add a collection
491 vb->updateFromManifest({R"({"separator":"@@","uid":"1",
492 "collections":[{"name":"$default", "uid":"0"},
493 {"name":"meat","uid":"1"}]})"});
494
495 // The producer should start with the old separator
496 EXPECT_EQ(":", producer->getCurrentSeparatorForStream(vbid));
497
498 notifyAndStepToCheckpoint();
499
500 VBucketPtr replica = store->getVBucket(replicaVB);
501
502 // The replica should have the old : separator
503 EXPECT_EQ(":", replica->lockCollections().getSeparator());
504
505 // Now step the producer to transfer the separator
506 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
507
508 // The producer should now have the new separator
509 EXPECT_EQ("@@", producer->getCurrentSeparatorForStream(vbid));
510
511 // The replica should now have the new separator
512 EXPECT_EQ("@@", replica->lockCollections().getSeparator());
513
514 // Now step the producer to transfer the collection
515 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
516
517 // Collection should now be live on the replica
518 EXPECT_TRUE(replica->lockCollections().doesKeyContainValidCollection(
519 {"meat@@bacon", DocNamespace::Collections}));
520
521 // And done
522 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
523 }
524
TEST_F(CollectionsDcpTest,test_dcp_separator_many)525 TEST_F(CollectionsDcpTest, test_dcp_separator_many) {
526 auto vb = store->getVBucket(vbid);
527
528 // Change the separator
529 vb->updateFromManifest({R"({"separator": "@@","uid":"0",
530 "collections":[{"name":"$default", "uid":"0"}]})"});
531 // Change the separator
532 vb->updateFromManifest({R"({"separator": "-","uid":"1",
533 "collections":[{"name":"$default", "uid":"0"}]})"});
534 // Change the separator
535 vb->updateFromManifest({R"({"separator": ",","uid":"2",
536 "collections":[{"name":"$default", "uid":"0"}]})"});
537 // Add a collection
538 vb->updateFromManifest({R"({"separator": ",","uid":"3",
539 "collections":[{"name":"$default", "uid":"0"},
540 {"name":"meat", "uid":"1"}]})"});
541
542 // All the changes will be collapsed into one update and we will expect
543 // to see , as the separator once DCP steps through the checkpoint
544
545 // The producer should start with the initial separator
546 EXPECT_EQ(":", producer->getCurrentSeparatorForStream(vbid));
547
548 notifyAndStepToCheckpoint();
549
550 auto replica = store->getVBucket(replicaVB);
551
552 // The replica should have the old separator
553 EXPECT_EQ(":", replica->lockCollections().getSeparator());
554
555 std::array<std::string, 3> expectedData = {{"@@", "-", ","}};
556 for (auto expected : expectedData) {
557 // Now step the producer to transfer the separator
558 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
559
560 // The producer should now have the new separator
561 EXPECT_EQ(expected, producer->getCurrentSeparatorForStream(vbid));
562
563 // The replica should now have the new separator
564 EXPECT_EQ(expected, replica->lockCollections().getSeparator());
565 }
566
567 // Now step the producer to transfer the create "meat"
568 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
569
570 // Collection should now be live on the replica with the final separator
571 EXPECT_TRUE(replica->lockCollections().doesKeyContainValidCollection(
572 {"meat,bacon", DocNamespace::Collections}));
573
574 // And done
575 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
576 }
577
578 // Test that a create/delete don't dedup (collections creates new checkpoints)
TEST_F(CollectionsDcpTest,MB_26455)579 TEST_F(CollectionsDcpTest, MB_26455) {
580 const int items = 3;
581 {
582 auto vb = store->getVBucket(vbid);
583
584 std::vector<std::string> separators = {
585 "::", "*", "**", "***", "-=-=-=-=-="};
586
587 for (size_t n = 0; n < separators.size(); n++) {
588 // change sep
589 std::string manifest =
590 R"({"uid":"0","separator":")" + separators.at(n) +
591 R"(","collections":[{"name":"$default", "uid":"0"}]})";
592 vb->updateFromManifest({manifest});
593
594 // add fruit
595 manifest = R"({"uid":"1","separator":")" + separators.at(n) +
596 R"(","collections":[{"name":"$default", "uid":"0"},)" +
597 R"({"name":"fruit", "uid":")" + std::to_string(n) +
598 R"("}]})";
599
600 vb->updateFromManifest({manifest});
601
602 // Mutate fruit
603 for (int ii = 0; ii < items; ii++) {
604 std::string key =
605 "fruit" + separators.at(n) + std::to_string(ii);
606 store_item(vbid, {key, DocNamespace::Collections}, "value");
607 }
608
609 // expect change_separator + create_collection + items
610 flush_vbucket_to_disk(vbid, 2 + items);
611
612 if (n < (separators.size() - 1)) {
613 // Drop fruit, except for the last 'generation'
614 manifest =
615 R"({"uid":"2","separator":")" + separators.at(n) +
616 R"(","collections":[{"name":"$default", "uid":"0"}]})";
617 vb->updateFromManifest({manifest});
618
619 flush_vbucket_to_disk(vbid, 1);
620 }
621 }
622 }
623
624 resetEngineAndWarmup();
625
626 // Stream again!
627 createDcpObjects({}, true);
628
629 // Streamed from disk, one create (create of fruit) and items of fruit
630 testDcpCreateDelete(1, 0, items, false /*fromMemory*/);
631
632 EXPECT_TRUE(store->getVBucket(vbid)->lockCollections().isCollectionOpen(
633 "fruit"));
634 }
635
636 class CollectionsFilteredDcpErrorTest : public SingleThreadedKVBucketTest {
637 public:
CollectionsFilteredDcpErrorTest()638 CollectionsFilteredDcpErrorTest() : cookieP(create_mock_cookie()) {
639 }
SetUp()640 void SetUp() override {
641 config_string += "collections_prototype_enabled=true";
642 SingleThreadedKVBucketTest::SetUp();
643 // Start vbucket as active to allow us to store items directly to it.
644 store->setVBucketState(vbid, vbucket_state_active, false);
645 }
646
TearDown()647 void TearDown() override {
648 destroy_mock_cookie(cookieP);
649 producer.reset();
650 SingleThreadedKVBucketTest::TearDown();
651 }
652
653 protected:
654 std::shared_ptr<MockDcpProducer> producer;
655 const void* cookieP;
656 };
657
TEST_F(CollectionsFilteredDcpErrorTest,error1)658 TEST_F(CollectionsFilteredDcpErrorTest, error1) {
659 // Set some collections
660 store->setCollections({R"({"separator": "@@","uid":"0",
661 "collections":[{"name":"$default", "uid":"0"},
662 {"name":"meat", "uid":"1"},
663 {"name":"dairy", "uid":"2"}]})"});
664
665 std::string filter = R"({"collections":["fruit"]})";
666 cb::const_byte_buffer buffer{
667 reinterpret_cast<const uint8_t*>(filter.data()), filter.size()};
668 // Can't create a filter for unknown collections
669 try {
670 MockDcpProducer mock(*engine,
671 cookieP,
672 "test_producer",
673 DCP_OPEN_COLLECTIONS,
674 buffer,
675 false /*startTask*/);
676 FAIL() << "Expected an exception";
677 } catch (const cb::engine_error& e) {
678 EXPECT_EQ(cb::engine_errc::unknown_collection, e.code());
679 }
680 }
681
682 class CollectionsFilteredDcpTest : public CollectionsDcpTest {
683 public:
CollectionsFilteredDcpTest()684 CollectionsFilteredDcpTest() : CollectionsDcpTest() {
685 }
686
SetUp()687 void SetUp() override {
688 config_string += "collections_prototype_enabled=true";
689 SingleThreadedKVBucketTest::SetUp();
690 producers = get_dcp_producers(
691 reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
692 reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
693 // Start vbucket as active to allow us to store items directly to it.
694 store->setVBucketState(vbid, vbucket_state_active, false);
695 }
696 };
697
TEST_F(CollectionsFilteredDcpTest,filtering)698 TEST_F(CollectionsFilteredDcpTest, filtering) {
699 VBucketPtr vb = store->getVBucket(vbid);
700
701 // Perform a create of meat/dairy via the bucket level (filters are
702 // worked out from the bucket manifest)
703 store->setCollections({R"({"separator": ":","uid":"0",
704 "collections":[{"name":"$default", "uid":"0"},
705 {"name":"meat", "uid":"1"},
706 {"name":"dairy", "uid":"2"}]})"});
707 // Setup filtered DCP
708 createDcpObjects(R"({"collections":["dairy"]})", true);
709
710 notifyAndStepToCheckpoint();
711
712 // SystemEvent createCollection
713 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
714 EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SYSTEM_EVENT, dcp_last_op);
715 EXPECT_EQ("dairy", dcp_last_key);
716
717 // Store collection documents
718 std::array<std::string, 2> expectedKeys = {{"dairy:one", "dairy:two"}};
719 store_item(vbid, {"meat:one", DocNamespace::Collections}, "value");
720 store_item(vbid, {expectedKeys[0], DocNamespace::Collections}, "value");
721 store_item(vbid, {"meat:two", DocNamespace::Collections}, "value");
722 store_item(vbid, {expectedKeys[1], DocNamespace::Collections}, "value");
723 store_item(vbid, {"meat:three", DocNamespace::Collections}, "value");
724
725 auto vb0Stream = producer->findStream(0);
726 ASSERT_NE(nullptr, vb0Stream.get());
727
728 notifyAndStepToCheckpoint();
729
730 // Now step DCP to transfer keys, only two keys are expected as all "meat"
731 // keys are filtered
732 for (auto& key : expectedKeys) {
733 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
734 EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
735 EXPECT_EQ(key, dcp_last_key);
736 }
737 // And no more
738 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
739
740 flush_vbucket_to_disk(vbid, 7);
741
742 vb.reset();
743
744 // Now stream back from disk and check filtering
745 resetEngineAndWarmup();
746
747 // In order to create a filter, a manifest needs to be set
748 store->setCollections({R"({"separator": ":","uid":"0",
749 "collections":[{"name":"$default", "uid":"0"},
750 {"name":"meat", "uid":"1"},
751 {"name":"dairy", "uid":"2"}]})"});
752
753 createDcpObjects(R"({"collections":["dairy"]})", true);
754
755 // Streamed from disk
756 // 1 create - create of dairy
757 // 2 mutations in the dairy collection
758 testDcpCreateDelete(1, 0, 2, false);
759 }
760
761 // Check that when filtering is on, we don't send snapshots for fully filtered
762 // snapshots
TEST_F(CollectionsFilteredDcpTest,MB_24572)763 TEST_F(CollectionsFilteredDcpTest, MB_24572) {
764 VBucketPtr vb = store->getVBucket(vbid);
765
766 // Perform a create of meat/dairy via the bucket level (filters are
767 // worked out from the bucket manifest)
768 store->setCollections({R"({"separator": ":","uid":"0",
769 "collections":[{"name":"$default", "uid":"0"},
770 {"name":"meat", "uid":"1"},
771 {"name":"dairy", "uid":"2"}]})"});
772 // Setup filtered DCP
773 createDcpObjects(R"({"collections":["dairy"]})", true);
774
775 // Store collection documents
776 store_item(vbid, {"meat::one", DocNamespace::Collections}, "value");
777 store_item(vbid, {"meat::two", DocNamespace::Collections}, "value");
778 store_item(vbid, {"meat::three", DocNamespace::Collections}, "value");
779
780 notifyAndStepToCheckpoint();
781
782 // SystemEvent createCollection for dairy is expected
783 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
784 EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SYSTEM_EVENT, dcp_last_op);
785 EXPECT_EQ("dairy", dcp_last_key);
786
787 // And no more for this stream - no meat
788 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
789
790 // and new mutations?
791 store_item(vbid, {"meat::one1", DocNamespace::Collections}, "value");
792 store_item(vbid, {"meat::two2", DocNamespace::Collections}, "value");
793 store_item(vbid, {"meat::three3", DocNamespace::Collections}, "value");
794 notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::Invalid);
795 }
796
TEST_F(CollectionsFilteredDcpTest,default_only)797 TEST_F(CollectionsFilteredDcpTest, default_only) {
798 VBucketPtr vb = store->getVBucket(vbid);
799
800 // Perform a create of meat/dairy via the bucket level (filters are
801 // worked out from the bucket manifest)
802 store->setCollections({R"({"separator": ":","uid":"0",
803 "collections":[{"name":"$default", "uid":"0"},
804 {"name":"meat", "uid":"1"},
805 {"name":"dairy", "uid":"2"}]})"});
806 // Setup DCP
807 createDcpObjects({/*no filter*/}, false /*don't know about collections*/);
808
809 // Store collection documents and one default collection document
810 store_item(vbid, {"meat:one", DocNamespace::Collections}, "value");
811 store_item(vbid, {"dairy:one", DocNamespace::Collections}, "value");
812 store_item(vbid, {"anykey", DocNamespace::DefaultCollection}, "value");
813 store_item(vbid, {"dairy:two", DocNamespace::Collections}, "value");
814 store_item(vbid, {"meat:three", DocNamespace::Collections}, "value");
815
816 auto vb0Stream = producer->findStream(0);
817 ASSERT_NE(nullptr, vb0Stream.get());
818
819 // Now step into the items of which we expect to see only anykey
820 notifyAndStepToCheckpoint();
821
822 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
823 EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
824 EXPECT_EQ("anykey", dcp_last_key);
825
826 // And no more
827 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
828 }
829
TEST_F(CollectionsFilteredDcpTest,stream_closes)830 TEST_F(CollectionsFilteredDcpTest, stream_closes) {
831 VBucketPtr vb = store->getVBucket(vbid);
832
833 // Perform a create of meat via the bucket level (filters are worked out
834 // from the bucket manifest)
835 store->setCollections({R"({"separator": ":","uid":"0",
836 "collections":[{"name":"$default", "uid":"0"},
837 {"name":"meat", "uid":"1"}]})"});
838 // Setup filtered DCP
839 createDcpObjects(R"({"collections":["meat"]})", true);
840
841 auto vb0Stream = producer->findStream(0);
842 ASSERT_NE(nullptr, vb0Stream.get());
843
844 notifyAndStepToCheckpoint();
845
846 // Now step DCP to transfer system events. We expect that the stream will
847 // close once we transfer DeleteCollection
848
849 // Now step the producer to transfer the collection creation
850 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
851
852 // Not dead yet...
853 EXPECT_TRUE(vb0Stream->isActive());
854
855 // Perform a delete of meat via the bucket level (filters are worked out
856 // from the bucket manifest)
857 store->setCollections({R"({"separator": ":","uid":"1",
858 "collections":[{"name":"$default", "uid":"0"}]})"});
859
860 notifyAndStepToCheckpoint();
861
862 // Now step the producer to transfer the collection deletion
863 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
864
865 // Done... collection deletion of meat has closed the stream
866 EXPECT_FALSE(vb0Stream->isActive());
867
868 // Now step the producer to transfer the close stream
869 EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
870
871 // And no more
872 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
873 }
874
875 /**
876 * Test that creation of a stream that resulted in an empty filter, because
877 * all the collection have been deleted, just goes to stream end.
878 */
TEST_F(CollectionsFilteredDcpTest,empty_filter_stream_closes)879 TEST_F(CollectionsFilteredDcpTest, empty_filter_stream_closes) {
880 VBucketPtr vb = store->getVBucket(vbid);
881
882 // Perform a create of meat via the bucket level (filters are worked out
883 // from the bucket manifest)
884 store->setCollections({R"({"separator": ":","uid":"0",
885 "collections":[{"name":"$default", "uid":"0"},
886 {"name":"meat", "uid":"1"}]})"});
887
888 producer = createDcpProducer(cookieP,
889 R"({"collections":["meat"]})",
890 true,
891 IncludeDeleteTime::No);
892 createDcpConsumer();
893
894 // Perform a delete of meat
895 store->setCollections({R"({"separator": ":","uid":"1",
896 "collections":[{"name":"$default", "uid":"0"}]})"});
897
898 createDcpStream();
899
900 auto vb0Stream = producer->findStream(0);
901 ASSERT_NE(nullptr, vb0Stream.get());
902
903 EXPECT_TRUE(vb0Stream->isActive());
904
905 // Expect a stream end marker and no data
906 notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpStreamEnd);
907 EXPECT_EQ(END_STREAM_FILTER_EMPTY, dcp_last_flags);
908
909 // Done... collection deletion of meat has closed the stream
910 EXPECT_FALSE(vb0Stream->isActive());
911
912 // And no more
913 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
914 }
915
916 /**
917 * Test that creation of a stream that resulted in an empty filter, because
918 * all the collection have been deleted, just goes to stream end.
919 * This test uses a filter with name:uid
920 */
TEST_F(CollectionsFilteredDcpTest,empty_filter_stream_closes2)921 TEST_F(CollectionsFilteredDcpTest, empty_filter_stream_closes2) {
922 VBucketPtr vb = store->getVBucket(vbid);
923
924 // Perform a create of meat via the bucket level (filters are worked out
925 // from the bucket manifest)
926 store->setCollections({R"({"separator": ":","uid":"0",
927 "collections":[{"name":"$default", "uid":"0"},
928 {"name":"meat", "uid":"1"}]})"});
929
930 // specific collection uid
931 producer =
932 createDcpProducer(cookieP,
933 R"({"collections":[{"name":"meat", "uid":"1"}]})",
934 true,
935 IncludeDeleteTime::No);
936 createDcpConsumer();
937
938 // Perform a delete of meat
939 store->setCollections({R"({"separator": ":","uid":"0",
940 "collections":[{"name":"$default", "uid":"0"}]})"});
941
942 createDcpStream();
943
944 auto vb0Stream = producer->findStream(0);
945 ASSERT_NE(nullptr, vb0Stream.get());
946
947 EXPECT_TRUE(vb0Stream->isActive());
948
949 // Expect a stream end marker and no data
950 notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpStreamEnd);
951 EXPECT_EQ(END_STREAM_FILTER_EMPTY, dcp_last_flags);
952
953 // Done... collection deletion of meat:1 has closed the stream
954 EXPECT_FALSE(vb0Stream->isActive());
955
956 // And no more
957 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
958 }
959
TEST_F(CollectionsFilteredDcpTest,legacy_stream_closes)960 TEST_F(CollectionsFilteredDcpTest, legacy_stream_closes) {
961 VBucketPtr vb = store->getVBucket(vbid);
962
963 // Perform a create of meat via the bucket level (filters are worked out
964 // from the bucket manifest)
965 store->setCollections({R"({"separator": ":","uid":"0",
966 "collections":[{"name":"$default", "uid":"0"},
967 {"name":"meat", "uid":"1"}]})"});
968 // Make cookie look like a non-collection client
969 mock_set_collections_support(cookieP, false);
970 mock_set_collections_support(cookieC, false);
971 // Setup legacy DCP, it only receives default collection mutation/deletion
972 // and should self-close if the default collection were to be deleted
973 createDcpObjects({}, false);
974
975 auto vb0Stream = producer->findStream(0);
976 ASSERT_NE(nullptr, vb0Stream.get());
977
978 // No keys have been written and no event can be sent, so expect nothing
979 // after kicking the stream into life
980 notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::Invalid);
981
982 EXPECT_TRUE(vb0Stream->isActive());
983
984 // Perform a delete of $default
985 store->setCollections({R"({"separator": ":","uid":"1",
986 "collections":[{"name":"meat", "uid":"1"}]})"});
987
988 // Expect a stream end marker
989 notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpStreamEnd);
990 EXPECT_EQ(END_STREAM_OK, dcp_last_flags);
991
992 // Done... collection deletion of default has closed the stream
993 EXPECT_FALSE(vb0Stream->isActive());
994
995 // And no more
996 EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
997 }
998