1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "evp_store_single_threaded_test.h"
19
20#include "../mock/mock_dcp.h"
21#include "../mock/mock_dcp_consumer.h"
22#include "../mock/mock_dcp_producer.h"
23#include "../mock/mock_global_task.h"
24#include "../mock/mock_item_freq_decayer.h"
25#include "../mock/mock_stream.h"
26#include "bgfetcher.h"
27#include "checkpoint.h"
28#include "dcp/dcpconnmap.h"
29#include "ephemeral_tombstone_purger.h"
30#include "ep_time.h"
31#include "evp_store_test.h"
32#include "failover-table.h"
33#include "fakes/fake_executorpool.h"
34#include "item_freq_decayer_visitor.h"
35#include "programs/engine_testapp/mock_server.h"
36#include "taskqueue.h"
37#include "tests/module_tests/test_helpers.h"
38#include "tests/module_tests/test_task.h"
39
40#include <libcouchstore/couch_db.h>
41#include <string_utilities.h>
42#include <xattr/blob.h>
43#include <xattr/utils.h>
44
45#include <thread>
46#include <engines/ep/src/ephemeral_vb.h>
47
48ProcessClock::time_point SingleThreadedKVBucketTest::runNextTask(
49        TaskQueue& taskQ, const std::string& expectedTaskName) {
50    CheckedExecutor executor(task_executor, taskQ);
51
52    // Run the task
53    executor.runCurrentTask(expectedTaskName);
54    return executor.completeCurrentTask();
55}
56
57ProcessClock::time_point SingleThreadedKVBucketTest::runNextTask(TaskQueue& taskQ) {
58    CheckedExecutor executor(task_executor, taskQ);
59
60    // Run the task
61    executor.runCurrentTask();
62    return executor.completeCurrentTask();
63}
64
65void SingleThreadedKVBucketTest::SetUp() {
66    SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
67
68    // Disable warmup - we don't want to have to run/wait for the Warmup tasks
69    // to complete (and there's nothing to warmup from anyways).
70    if (!config_string.empty()) {
71        config_string += ";";
72    }
73    config_string += "warmup=false";
74
75    KVBucketTest::SetUp();
76
77    task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
78    (ExecutorPool::get());
79}
80
81void SingleThreadedKVBucketTest::TearDown() {
82    shutdownAndPurgeTasks(engine.get());
83    KVBucketTest::TearDown();
84}
85
86void SingleThreadedKVBucketTest::setVBucketStateAndRunPersistTask(uint16_t vbid,
87                                                                 vbucket_state_t
88                                                                 newState) {
89    // Change state - this should add 1 set_vbucket_state op to the
90    //VBuckets' persistence queue.
91    EXPECT_EQ(ENGINE_SUCCESS,
92              store->setVBucketState(vbid, newState, /*transfer*/false));
93
94    if (engine->getConfiguration().getBucketType() == "persistent") {
95        // Trigger the flusher to flush state to disk.
96        auto& ep = dynamic_cast<EPBucket&>(*store);
97        EXPECT_EQ(std::make_pair(false, size_t(0)), ep.flushVBucket(vbid));
98    }
99}
100
101void SingleThreadedKVBucketTest::shutdownAndPurgeTasks(
102        EventuallyPersistentEngine* ep) {
103    ep->getEpStats().isShutdown = true;
104    task_executor->cancelAndClearAll();
105
106    for (task_type_t t :
107         {WRITER_TASK_IDX, READER_TASK_IDX, AUXIO_TASK_IDX, NONIO_TASK_IDX}) {
108
109        // Define a lambda to drive all tasks from the queue, if hpTaskQ
110        // is implemented then trivial to add a second call to runTasks.
111        auto runTasks = [=](TaskQueue& queue) {
112            while (queue.getFutureQueueSize() > 0 || queue.getReadyQueueSize() > 0) {
113                runNextTask(queue);
114            }
115        };
116        runTasks(*task_executor->getLpTaskQ()[t]);
117        task_executor->stopTaskGroup(
118                ep->getTaskable().getGID(), t, ep->getEpStats().forceShutdown);
119    }
120}
121
122void SingleThreadedKVBucketTest::cancelAndPurgeTasks() {
123    task_executor->cancelAll();
124    for (task_type_t t :
125        {WRITER_TASK_IDX, READER_TASK_IDX, AUXIO_TASK_IDX, NONIO_TASK_IDX}) {
126
127        // Define a lambda to drive all tasks from the queue, if hpTaskQ
128        // is implemented then trivial to add a second call to runTasks.
129        auto runTasks = [=](TaskQueue& queue) {
130            while (queue.getFutureQueueSize() > 0 || queue.getReadyQueueSize() > 0) {
131                runNextTask(queue);
132            }
133        };
134        runTasks(*task_executor->getLpTaskQ()[t]);
135        task_executor->stopTaskGroup(engine->getTaskable().getGID(), t,
136                                     engine->getEpStats().forceShutdown);
137    }
138}
139
140void SingleThreadedKVBucketTest::runReadersUntilWarmedUp() {
141    auto& readerQueue = *task_executor->getLpTaskQ()[READER_TASK_IDX];
142    while (engine->getKVBucket()->isWarmingUp()) {
143        runNextTask(readerQueue);
144    }
145}
146
147/**
148 * Destroy engine and replace it with a new engine that can be warmed up.
149 * Finally, run warmup.
150 */
151void SingleThreadedKVBucketTest::resetEngineAndWarmup(std::string new_config) {
152    shutdownAndPurgeTasks(engine.get());
153    std::string config = config_string;
154
155    // check if warmup=false needs replacing with warmup=true
156    size_t pos;
157    std::string warmupT = "warmup=true";
158    std::string warmupF = "warmup=false";
159    if ((pos = config.find(warmupF)) != std::string::npos) {
160        config.replace(pos, warmupF.size(), warmupT);
161    } else {
162        config += warmupT;
163    }
164
165    if (new_config.length() > 0) {
166        config += ";";
167        config += new_config;
168    }
169
170    reinitialise(config);
171
172    engine->getKVBucket()->initializeWarmupTask();
173    engine->getKVBucket()->startWarmupTask();
174
175    // Now get the engine warmed up
176    runReadersUntilWarmedUp();
177}
178
179std::shared_ptr<MockDcpProducer> SingleThreadedKVBucketTest::createDcpProducer(
180        const void* cookie,
181        const std::string& filter,
182        bool dcpCollectionAware,
183        IncludeDeleteTime deleteTime) {
184    int flags = DCP_OPEN_INCLUDE_XATTRS;
185    if (dcpCollectionAware) {
186        flags |= DCP_OPEN_COLLECTIONS;
187    }
188    if (deleteTime == IncludeDeleteTime::Yes) {
189        flags |= DCP_OPEN_INCLUDE_DELETE_TIMES;
190    }
191    auto newProducer = std::make_shared<MockDcpProducer>(
192            *engine,
193            cookie,
194            "test_producer",
195            flags,
196            cb::const_byte_buffer(
197                    reinterpret_cast<const uint8_t*>(filter.data()),
198                    filter.size()),
199            false /*startTask*/);
200
201    // Create the task object, but don't schedule
202    newProducer->createCheckpointProcessorTask();
203
204    // Need to enable NOOP for XATTRS (and collections).
205    newProducer->setNoopEnabled(true);
206
207    return newProducer;
208}
209
210extern uint8_t dcp_last_op;
211void SingleThreadedKVBucketTest::notifyAndStepToCheckpoint(
212        MockDcpProducer& producer,
213        dcp_message_producers& producers,
214        cb::mcbp::ClientOpcode expectedOp,
215        bool fromMemory) {
216    auto vb = store->getVBucket(vbid);
217    ASSERT_NE(nullptr, vb.get());
218
219    if (fromMemory) {
220        producer.notifySeqnoAvailable(vbid, vb->getHighSeqno());
221        runCheckpointProcessor(producer, producers);
222    } else {
223        // Run a backfill
224        auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
225        // backfill:create()
226        runNextTask(lpAuxioQ);
227        // backfill:scan()
228        runNextTask(lpAuxioQ);
229        // backfill:complete()
230        runNextTask(lpAuxioQ);
231        if (engine->getConfiguration().getBucketType() == "persistent") {
232            // backfill:finished()
233            runNextTask(lpAuxioQ);
234        }
235    }
236
237    // Next step which will process a snapshot marker and then the caller
238    // should now be able to step through the checkpoint
239    if (expectedOp != cb::mcbp::ClientOpcode::Invalid) {
240        EXPECT_EQ(ENGINE_WANT_MORE, producer.step(&producers));
241        EXPECT_EQ(uint8_t(expectedOp), dcp_last_op);
242    } else {
243        EXPECT_EQ(ENGINE_SUCCESS, producer.step(&producers));
244    }
245}
246
247void SingleThreadedKVBucketTest::runCheckpointProcessor(
248        MockDcpProducer& producer, dcp_message_producers& producers) {
249    // Step which will notify the snapshot task
250    EXPECT_EQ(ENGINE_SUCCESS, producer.step(&producers));
251
252    EXPECT_EQ(1, producer.getCheckpointSnapshotTask().queueSize());
253
254    // Now call run on the snapshot task to move checkpoint into DCP
255    // stream
256    producer.getCheckpointSnapshotTask().run();
257}
258
259static ENGINE_ERROR_CODE dcpAddFailoverLog(vbucket_failover_t* entry,
260                                           size_t nentries,
261                                           gsl::not_null<const void*> cookie) {
262    return ENGINE_SUCCESS;
263}
264void SingleThreadedKVBucketTest::createDcpStream(MockDcpProducer& producer) {
265    createDcpStream(producer, vbid);
266}
267
268void SingleThreadedKVBucketTest::createDcpStream(MockDcpProducer& producer,
269                                                 uint16_t vbid) {
270    uint64_t rollbackSeqno;
271    ASSERT_EQ(ENGINE_SUCCESS,
272              producer.streamRequest(0, // flags
273                                     1, // opaque
274                                     vbid,
275                                     0, // start_seqno
276                                     ~0ull, // end_seqno
277                                     0, // vbucket_uuid,
278                                     0, // snap_start_seqno,
279                                     0, // snap_end_seqno,
280                                     &rollbackSeqno,
281                                     &dcpAddFailoverLog));
282}
283
284void SingleThreadedKVBucketTest::runCompaction(uint64_t purgeBeforeTime,
285                                               uint64_t purgeBeforeSeq) {
286    compaction_ctx compactreq;
287    compactreq.purge_before_ts = purgeBeforeTime;
288    compactreq.purge_before_seq = purgeBeforeSeq;
289    compactreq.drop_deletes = false;
290    compactreq.db_file_id = vbid;
291    store->scheduleCompaction(vbid, compactreq, nullptr);
292    // run the compaction task
293    runNextTask(*task_executor->getLpTaskQ()[WRITER_TASK_IDX],
294                "Compact DB file 0");
295}
296
297/*
298 * MB-31175
299 * The following test checks to see that when we call handleSlowStream in an
300 * in memory state and drop the cursor/schedule a backfill as a result, the
301 * resulting backfill checks the purgeSeqno and tells the stream to rollback
302 * if purgeSeqno > startSeqno.
303 */
304TEST_P(STParameterizedBucketTest, SlowStreamBackfillPurgeSeqnoCheck) {
305    // Make vbucket active.
306    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
307    auto vb = store->getVBuckets().getBucket(vbid);
308    ASSERT_TRUE(vb.get());
309
310    // Store two items
311    std::array<std::string, 2> initialKeys = {{"k1", "k2"}};
312    for (const auto& key : initialKeys) {
313        store_item(vbid, makeStoredDocKey(key), key);
314    }
315    flushVBucketToDiskIfPersistent(vbid, initialKeys.size());
316
317    // Delete the items so that we can advance the purgeSeqno using
318    // compaction later
319    for (const auto& key : initialKeys) {
320        delete_item(vbid, makeStoredDocKey(key));
321    }
322    flushVBucketToDiskIfPersistent(vbid, initialKeys.size());
323
324    auto& ckpt_mgr = *vb->checkpointManager;
325
326    // Create a Mock Dcp producer
327    // Create the Mock Active Stream with a startSeqno of 1
328    // as a startSeqno is always valid
329    auto producer = std::make_shared<MockDcpProducer>(
330            *engine,
331            cookie,
332            "test_producer",
333            /*notifyOnly*/ false,
334            cb::const_byte_buffer() /*no json*/);
335            // Create a Mock Active Stream
336            auto mock_stream = std::make_shared<MockActiveStream>(
337            static_cast<EventuallyPersistentEngine*>(engine.get()),
338            producer,
339            /*flags*/ 0,
340            /*opaque*/ 0,
341            *vb,
342            /*st_seqno*/ 1,
343            /*en_seqno*/ ~0,
344            /*vb_uuid*/ 0xabcd,
345            /*snap_start_seqno*/ 0,
346            /*snap_end_seqno*/ ~0,
347            IncludeValue::Yes,
348            IncludeXattrs::Yes);
349
350    producer->createCheckpointProcessorTask();
351    producer->scheduleCheckpointProcessorTask();
352
353    mock_stream->transitionStateToBackfilling();
354    ASSERT_TRUE(mock_stream->isInMemory())
355    << "stream state should have transitioned to InMemory";
356
357    // Check number of expected cursors (might not have persistence cursor)
358    int expectedCursors = persistent() ? 2 : 1;
359    EXPECT_EQ(expectedCursors, ckpt_mgr.getNumOfCursors());
360
361    EXPECT_TRUE(mock_stream->handleSlowStream());
362    EXPECT_TRUE(mock_stream->public_getPendingBackfill());
363
364    // Might not have persistence cursor
365    expectedCursors = persistent() ? 1 : 0;
366    EXPECT_EQ(expectedCursors, ckpt_mgr.getNumOfCursors())
367    << "stream cursor should have been dropped";
368
369    // This will schedule the backfill
370    mock_stream->transitionStateToBackfilling();
371    ASSERT_TRUE(mock_stream->isBackfilling());
372
373    // Advance the purgeSeqno
374    if (persistent()) {
375        runCompaction(~0, 3);
376    } else {
377        EphemeralVBucket::HTTombstonePurger purger(0);
378        auto vbptr = store->getVBucket(vbid);
379        EphemeralVBucket* evb = dynamic_cast<EphemeralVBucket*>(vbptr.get());
380        purger.setCurrentVBucket(*evb);
381        evb->ht.visit(purger);
382        evb->purgeStaleItems();
383    }
384
385    ASSERT_EQ(3, vb->getPurgeSeqno());
386
387    // Run the backfill we scheduled when we transitioned to the backfilling
388    // state
389    auto& bfm = producer->getBFM();
390    bfm.backfill();
391
392    // The backfill should have set the stream state to dead because
393    // purgeSeqno > startSeqno
394    EXPECT_TRUE(mock_stream->isDead());
395
396    // Stop Producer checkpoint processor task
397    producer->cancelCheckpointCreatorTask();
398
399    cancelAndPurgeTasks();
400}
401
402/*
403 * The following test checks to see if we call handleSlowStream when in a
404 * backfilling state, but the backfillTask is not running, we
405 * drop the existing cursor and set pendingBackfill to true.
406 */
407TEST_F(SingleThreadedEPBucketTest, MB22421_backfilling_but_task_finished) {
408    // Make vbucket active.
409     setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
410     auto vb = store->getVBuckets().getBucket(vbid);
411     ASSERT_NE(nullptr, vb.get());
412     auto& ckpt_mgr = *vb->checkpointManager;
413
414     // Create a Mock Dcp producer
415     auto producer = std::make_shared<MockDcpProducer>(
416             *engine,
417             cookie,
418             "test_producer",
419             /*notifyOnly*/ false,
420             cb::const_byte_buffer() /*no json*/);
421     // Create a Mock Active Stream
422     auto mock_stream = std::make_shared<MockActiveStream>(
423             static_cast<EventuallyPersistentEngine*>(engine.get()),
424             producer,
425             /*flags*/ 0,
426             /*opaque*/ 0,
427             *vb,
428             /*st_seqno*/ 0,
429             /*en_seqno*/ ~0,
430             /*vb_uuid*/ 0xabcd,
431             /*snap_start_seqno*/ 0,
432             /*snap_end_seqno*/ ~0,
433             IncludeValue::Yes,
434             IncludeXattrs::Yes);
435
436     mock_stream->transitionStateToBackfilling();
437     ASSERT_TRUE(mock_stream->isInMemory())
438         << "stream state should have transitioned to InMemory";
439     // Have a persistence cursor and DCP cursor
440     ASSERT_EQ(2, ckpt_mgr.getNumOfCursors());
441     // Set backfilling task to true so can transition to Backfilling State
442     mock_stream->public_setBackfillTaskRunning(true);
443     mock_stream->transitionStateToBackfilling();
444     ASSERT_TRUE(mock_stream->isBackfilling())
445            << "stream state should not have transitioned to Backfilling";
446     // Set backfilling task to false for test
447     mock_stream->public_setBackfillTaskRunning(false);
448     mock_stream->handleSlowStream();
449     // The call to handleSlowStream should result in setting pendingBackfill
450     // flag to true and the DCP cursor being dropped
451     EXPECT_TRUE(mock_stream->public_getPendingBackfill());
452     EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
453
454     // Stop Producer checkpoint processor task
455     producer->cancelCheckpointCreatorTask();
456}
457
458/*
459 * The following test checks to see if a cursor is re-registered after it is
460 * dropped in handleSlowStream. In particular the test is for when
461 * scheduleBackfill_UNLOCKED is called however the backfill task does not need
462 * to be scheduled and therefore the cursor is not re-registered in
463 * markDiskSnapshot.  The cursor must therefore be registered from within
464 * scheduleBackfill_UNLOCKED.
465 *
466 * At the end of the test we should have 2 cursors: 1 persistence cursor and 1
467 * DCP stream cursor.
468 */
469TEST_F(SingleThreadedEPBucketTest, MB22421_reregister_cursor) {
470    // Make vbucket active.
471    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
472    auto vb = store->getVBuckets().getBucket(vbid);
473    ASSERT_NE(nullptr, vb.get());
474    auto& ckpt_mgr = *vb->checkpointManager;
475
476    // Create a Mock Dcp producer
477    auto producer = std::make_shared<MockDcpProducer>(
478            *engine,
479            cookie,
480            "test_producer",
481            /*flags*/ 0,
482            cb::const_byte_buffer() /*no json*/);
483    // Create a Mock Active Stream
484    auto mock_stream = std::make_shared<MockActiveStream>(
485            static_cast<EventuallyPersistentEngine*>(engine.get()),
486            producer,
487            /*flags*/ 0,
488            /*opaque*/ 0,
489            *vb,
490            /*st_seqno*/ 0,
491            /*en_seqno*/ ~0,
492            /*vb_uuid*/ 0xabcd,
493            /*snap_start_seqno*/ 0,
494            /*snap_end_seqno*/ ~0,
495            IncludeValue::Yes,
496            IncludeXattrs::Yes);
497
498    mock_stream->transitionStateToBackfilling();
499    EXPECT_TRUE(mock_stream->isInMemory())
500        << "stream state should have transitioned to StreamInMemory";
501    // Have a persistence cursor and DCP cursor
502    EXPECT_EQ(2, ckpt_mgr.getNumOfCursors());
503
504    mock_stream->public_setBackfillTaskRunning(true);
505    mock_stream->transitionStateToBackfilling();
506    EXPECT_TRUE(mock_stream->isBackfilling())
507           << "stream state should not have transitioned to StreamBackfilling";
508    mock_stream->handleSlowStream();
509    // The call to handleSlowStream should result in setting pendingBackfill
510    // flag to true and the DCP cursor being dropped
511    EXPECT_TRUE(mock_stream->public_getPendingBackfill());
512    EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
513
514    mock_stream->public_setBackfillTaskRunning(false);
515
516    //schedule a backfill
517    mock_stream->next();
518    // Calling scheduleBackfill_UNLOCKED(reschedule == true) will not actually
519    // schedule a backfill task because backfillStart (is lastReadSeqno + 1) is
520    // 1 and backfillEnd is 0, however the cursor still needs to be
521    // re-registered.
522    EXPECT_EQ(2, ckpt_mgr.getNumOfCursors());
523
524    // Stop Producer checkpoint processor task
525    producer->cancelCheckpointCreatorTask();
526}
527
528/**
529 * The following test checks to see that if a cursor drop (and subsequent
530 * re-registration) is a safe operation in that the background checkpoint
531 * processor task cannot advance the streams cursor whilst backfilling is
532 * occurring.
533 *
534 * Check to see that cursor dropping correctly handles the following scenario:
535 *
536 * 1. vBucket is state:in-memory. Cursor dropping occurs
537 *    (ActiveStream::handleSlowStream)
538 *   a. Cursor is removed
539 *   b. pendingBackfill is set to true.
540 * 2. However, assume that ActiveStreamCheckpointProcessorTask has a pending
541 *    task for this vbid.
542 * 3. ActiveStream changes from state:in-memory to state:backfilling.
543 * 4. Backfill starts, re-registers cursor (ActiveStream::markDiskSnapshot) to
544 *    resume from after the end of the backfill.
545 * 5. ActiveStreamCheckpointProcessorTask wakes up, and finds the pending task
546 *    for this vb. At this point the newly woken task should be blocked from
547 *    doing any work (and return early).
548 */
549TEST_F(SingleThreadedEPBucketTest,
550       MB29369_CursorDroppingPendingCkptProcessorTask) {
551    // Create a Mock Dcp producer and schedule on executorpool.
552    auto producer =
553            createDcpProducer(cookie, {}, false, IncludeDeleteTime::Yes);
554    producer->scheduleCheckpointProcessorTask();
555
556    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
557    EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize()) << "Expected to have "
558                                                   "ActiveStreamCheckpointProce"
559                                                   "ssorTask in AuxIO Queue";
560
561    // Create dcp_producer_snapshot_marker_yield_limit + 1 streams -
562    // this means that we don't process all pending vBuckets on a single
563    // execution of ActiveStreamCheckpointProcessorTask - which can result
564    // in vBIDs being "left over" in ActiveStreamCheckpointProcessorTask::queue
565    // after an execution.
566    // This means that subsequently when we drop the cursor for this vb,
567    // there's a "stale" job queued for it.
568    const auto iterationLimit =
569            engine->getConfiguration().getDcpProducerSnapshotMarkerYieldLimit();
570    std::shared_ptr<MockActiveStream> stream;
571    for (size_t id = 0; id < iterationLimit + 1; id++) {
572        setVBucketStateAndRunPersistTask(id, vbucket_state_active);
573        auto vb = store->getVBucket(id);
574        stream = producer->mockActiveStreamRequest(/*flags*/ 0,
575                                                   /*opaque*/ 0,
576                                                   *vb,
577                                                   /*st_seqno*/ 0,
578                                                   /*en_seqno*/ ~0,
579                                                   /*vb_uuid*/ 0xabcd,
580                                                   /*snap_start_seqno*/ 0,
581                                                   /*snap_end_seqno*/ ~0);
582
583        // Request an item from each stream, so they all advance from
584        // backfilling to in-memory
585        auto result = stream->next();
586        EXPECT_FALSE(result);
587        EXPECT_TRUE(stream->isInMemory())
588                << "vb:" << id << " should be state:in-memory at start";
589
590        // Create an item and flush to disk (so ActiveStream::nextCheckpointItem
591        // will have data available when call next() - and will add vb to
592        // ActiveStreamCheckpointProcessorTask's queue.
593        EXPECT_TRUE(queueNewItem(*vb, "key1"));
594        EXPECT_EQ(std::make_pair(false, size_t(1)),
595                  getEPBucket().flushVBucket(id));
596
597        // And then request another item, to add the VBID to
598        // ActiveStreamCheckpointProcessorTask's queue.
599        result = stream->next();
600        EXPECT_FALSE(result);
601        EXPECT_EQ(id + 1, producer->getCheckpointSnapshotTask().queueSize())
602                << "Should have added vb:" << id << " to ProcessorTask queue";
603    }
604
605    // Should now have dcp_producer_snapshot_marker_yield_limit + 1 items
606    // in ActiveStreamCheckpointProcessorTask's pending VBs.
607    EXPECT_EQ(iterationLimit + 1,
608              producer->getCheckpointSnapshotTask().queueSize())
609            << "Should have all vBuckets in ProcessorTask queue";
610
611    // Use last Stream as the one we're going to drop the cursor on (this is
612    // also at the back of the queue).
613    auto vb = store->getVBuckets().getBucket(iterationLimit);
614    auto& ckptMgr = *vb->checkpointManager;
615
616    // 1. Now trigger cursor dropping for this stream.
617    EXPECT_TRUE(stream->handleSlowStream());
618    EXPECT_TRUE(stream->isInMemory())
619            << "should be state:in-memory immediately after handleSlowStream";
620    EXPECT_EQ(1, ckptMgr.getNumOfCursors()) << "Should only have persistence "
621                                               "cursor registered after "
622                                               "cursor dropping.";
623
624    // 2. Request next item from stream. Will transition to backfilling as part
625    // of this.
626    auto result = stream->next();
627    EXPECT_FALSE(result);
628    EXPECT_TRUE(stream->isBackfilling()) << "should be state:backfilling "
629                                            "after next() following "
630                                            "handleSlowStream";
631
632    // *Key point*:
633    //
634    // ActiveStreamCheckpointProcessorTask and Backfilling task are both
635    // waiting to run. However, ActiveStreamCheckpointProcessorTask
636    // has more than iterationLimit VBs in it, so when it runs it won't
637    // handle them all; and will sleep with the last VB remaining.
638    // If the Backfilling task then runs, which returns a disk snapshot and
639    // re-registers the cursor; we still have an
640    // ActiveStreamCheckpointProcessorTask outstanding with the vb in the queue.
641    EXPECT_EQ(2, lpAuxioQ.getFutureQueueSize());
642
643    // Run the ActiveStreamCheckpointProcessorTask; which should re-schedule
644    // due to having items outstanding.
645    runNextTask(lpAuxioQ,
646                "Process checkpoint(s) for DCP producer test_producer");
647
648    // Now run backfilling task.
649    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
650
651    // After Backfilltask scheduled create(); should have received a disk
652    // snapshot; which in turn calls markDiskShapshot to re-register cursor.
653    EXPECT_EQ(2, ckptMgr.getNumOfCursors()) << "Expected both persistence and "
654                                               "replication cursors after "
655                                               "markDiskShapshot";
656
657    result = stream->next();
658    ASSERT_TRUE(result);
659    EXPECT_EQ(DcpResponse::Event::SnapshotMarker, result->getEvent())
660            << "Expected Snapshot marker after running backfill task.";
661
662    // Add another item to the VBucket; after the cursor has been re-registered.
663    EXPECT_TRUE(queueNewItem(*vb, "key2"));
664
665    // Now run chkptProcessorTask to complete it's queue. With the bug, this
666    // results in us discarding the last item we just added to vBucket.
667    runNextTask(lpAuxioQ,
668                "Process checkpoint(s) for DCP producer test_producer");
669
670    // Let the backfill task complete running (it requires multiple steps to
671    // complete).
672    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
673    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
674    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
675
676    // Validate. We _should_ get two mutations: key1 & key2, but we have to
677    // respin the checkpoint task for key2
678    result = stream->next();
679    if (result && result->getEvent() == DcpResponse::Event::Mutation) {
680        auto* mutation = dynamic_cast<MutationResponse*>(result.get());
681        EXPECT_STREQ("key1", mutation->getItem()->getKey().c_str());
682    } else {
683        FAIL() << "Expected Event::Mutation named 'key1'";
684    }
685
686    // No items ready, but this should of rescheduled vb10
687    EXPECT_EQ(nullptr, stream->next());
688    EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize())
689            << "Should have 1 vBucket in ProcessorTask queue";
690
691    // Now run chkptProcessorTask to complete it's queue, this will now be able
692    // to access the checkpoint and get key2
693    runNextTask(lpAuxioQ,
694                "Process checkpoint(s) for DCP producer test_producer");
695
696    result = stream->next();
697    ASSERT_TRUE(result);
698    EXPECT_EQ(DcpResponse::Event::SnapshotMarker, result->getEvent())
699            << "Expected Snapshot marker after running snapshot task.";
700    result = stream->next();
701
702    if (result && result->getEvent() == DcpResponse::Event::Mutation) {
703        auto* mutation = dynamic_cast<MutationResponse*>(result.get());
704        EXPECT_STREQ("key2", mutation->getItem()->getKey().c_str());
705    } else {
706        FAIL() << "Expected second Event::Mutation named 'key2'";
707    }
708
709    result = stream->next();
710    EXPECT_FALSE(result) << "Expected no more than 2 mutatons.";
711
712    // Stop Producer checkpoint processor task
713    producer->cancelCheckpointCreatorTask();
714}
715
716// Test is demonstrating that if a checkpoint processor scheduled by a stream
717// that is subsequently closed/re-created, if that checkpoint processor runs
718// whilst the new stream is backfilling, it can't interfere with the new stream.
719// This issue was raised by MB-29585 but is fixed by MB-29369
720TEST_F(SingleThreadedEPBucketTest, MB29585_backfilling_whilst_snapshot_runs) {
721    auto producer =
722            createDcpProducer(cookie, {}, false, IncludeDeleteTime::Yes);
723    producer->scheduleCheckpointProcessorTask();
724    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
725
726    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
727    EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize()) << "Expected to have "
728                                                   "ActiveStreamCheckpointProce"
729                                                   "ssorTask in AuxIO Queue";
730
731    // Create first stream
732    auto vb = store->getVBucket(vbid);
733    auto stream = producer->mockActiveStreamRequest(/*flags*/ 0,
734                                                    /*opaque*/ 0,
735                                                    *vb,
736                                                    /*st_seqno*/ 0,
737                                                    /*en_seqno*/ ~0,
738                                                    /*vb_uuid*/ 0xabcd,
739                                                    /*snap_start_seqno*/ 0,
740                                                    /*snap_end_seqno*/ ~0);
741
742    // Write an item
743    EXPECT_TRUE(queueNewItem(*vb, "key1"));
744    EXPECT_EQ(std::make_pair(false, size_t(1)),
745              getEPBucket().flushVBucket(vbid));
746
747    // Request an item from the stream, so it advances from to in-memory
748    auto result = stream->next();
749    EXPECT_FALSE(result);
750    EXPECT_TRUE(stream->isInMemory());
751
752    // Now step the in-memory stream to schedule the checkpoint task
753    result = stream->next();
754    EXPECT_FALSE(result);
755    EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
756
757    // Now close the stream
758    EXPECT_EQ(ENGINE_SUCCESS, producer->closeStream(vbid, 0 /*opaque*/));
759
760    // Next we to ensure the recreated stream really does a backfill, so drop
761    // in-memory items
762    bool newcp;
763    vb->checkpointManager->createNewCheckpoint();
764    // Force persistence into new CP
765    queueNewItem(*vb, "key2");
766    EXPECT_EQ(std::make_pair(false, size_t(1)),
767              getEPBucket().flushVBucket(vbid));
768    EXPECT_EQ(1,
769              vb->checkpointManager->removeClosedUnrefCheckpoints(*vb, newcp));
770
771    // Now store another item, without MB-29369 fix we would lose this item
772    store_item(vbid, makeStoredDocKey("key3"), "value");
773
774    // Re-create the new stream
775    stream = producer->mockActiveStreamRequest(/*flags*/ 0,
776                                               /*opaque*/ 0,
777                                               *vb,
778                                               /*st_seqno*/ 0,
779                                               /*en_seqno*/ ~0,
780                                               /*vb_uuid*/ 0xabcd,
781                                               /*snap_start_seqno*/ 0,
782                                               /*snap_end_seqno*/ ~0);
783
784    // Step the stream which will now schedule a backfill
785    result = stream->next();
786    EXPECT_FALSE(result);
787    EXPECT_TRUE(stream->isBackfilling());
788
789    // Next we must deque, but not run the snapshot task, we will interleave it
790    // with backfill later
791    CheckedExecutor checkpointTask(task_executor, lpAuxioQ);
792    EXPECT_STREQ("Process checkpoint(s) for DCP producer test_producer",
793                 checkpointTask.getTaskName().data());
794
795    // Now start the backfilling task.
796    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
797
798    // After Backfilltask scheduled create(); should have received a disk
799    // snapshot; which in turn calls markDiskShapshot to re-register cursor.
800    EXPECT_EQ(2, vb->checkpointManager->getNumOfCursors())
801            << "Expected persistence + replication cursors after "
802               "markDiskShapshot";
803
804    result = stream->next();
805    ASSERT_TRUE(result);
806    EXPECT_EQ(DcpResponse::Event::SnapshotMarker, result->getEvent())
807            << "Expected Snapshot marker after running backfill task.";
808
809    // Let the backfill task complete running through its various states
810    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
811    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
812    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
813
814    // Now run the checkpoint processor task, whilst still backfilling
815    // With MB-29369 this should be safe
816    checkpointTask.runCurrentTask(
817            "Process checkpoint(s) for DCP producer test_producer");
818    checkpointTask.completeCurrentTask();
819
820    // Poke another item in
821    store_item(vbid, makeStoredDocKey("key4"), "value");
822
823    // Finally read back all the items and we should get two snapshots and
824    // key1/key2 key3/key4
825    result = stream->next();
826    if (result && result->getEvent() == DcpResponse::Event::Mutation) {
827        auto* mutation = dynamic_cast<MutationResponse*>(result.get());
828        EXPECT_STREQ("key1", mutation->getItem()->getKey().c_str());
829    } else {
830        FAIL() << "Expected Event::Mutation named 'key1'";
831    }
832
833    result = stream->next();
834    if (result && result->getEvent() == DcpResponse::Event::Mutation) {
835        auto* mutation = dynamic_cast<MutationResponse*>(result.get());
836        EXPECT_STREQ("key2", mutation->getItem()->getKey().c_str());
837    } else {
838        FAIL() << "Expected Event::Mutation named 'key2'";
839    }
840
841    runNextTask(lpAuxioQ,
842                "Process checkpoint(s) for DCP producer test_producer");
843
844    result = stream->next();
845    ASSERT_TRUE(result);
846    EXPECT_EQ(DcpResponse::Event::SnapshotMarker, result->getEvent())
847            << "Expected Snapshot marker after running snapshot task.";
848
849    result = stream->next();
850    if (result && result->getEvent() == DcpResponse::Event::Mutation) {
851        auto* mutation = dynamic_cast<MutationResponse*>(result.get());
852        EXPECT_STREQ("key3", mutation->getItem()->getKey().c_str());
853    } else {
854        FAIL() << "Expected Event::Mutation named 'key3'";
855    }
856
857    result = stream->next();
858    if (result && result->getEvent() == DcpResponse::Event::Mutation) {
859        auto* mutation = dynamic_cast<MutationResponse*>(result.get());
860        EXPECT_STREQ("key4", mutation->getItem()->getKey().c_str());
861    } else {
862        FAIL() << "Expected Event::Mutation named 'key4'";
863    }
864
865    // Stop Producer checkpoint processor task
866    producer->cancelCheckpointCreatorTask();
867}
868
869/*
870 * The following test checks to see if data is lost after a cursor is
871 * re-registered after being dropped.
872 *
873 * It first sets-up an active stream associated with the active vbucket 0.  We
874 * then move the stream into a StreamInMemory state, which results in creating
875 * a DCP cursor (in addition to the persistence cursor created on construction
876 * of the stream).
877 *
878 * We then add two documents closing the previous checkpoint and opening a new
879 * one after each add.  This means that after adding 2 documents we have 3
880 * checkpoints, (and 2 cursors).
881 *
882 * We then call handleSlowStream which results in the DCP cursor being dropped,
883 * the steam being moved into the StreamBackfilling state and, the
884 * pendingBackfill flag being set.
885 *
886 * As the DCP cursor is dropped we can remove the first checkpoint which the
887 * persistence cursor has moved past.  As the DCP stream no longer has its own
888 * cursor it will use the persistence cursor.  Therefore we need to schedule a
889 * backfill task, which clears the pendingBackfill flag.
890 *
891 * The key part of the test is that we now move the persistence cursor on by
892 * adding two more documents, and again closing the previous checkpoint and
893 * opening a new one after each add.
894 *
895 * Now that the persistence cursor has moved on we can remove the earlier
896 * checkpoints.
897 *
898 * We now run the backfill task that we scheduled for the active stream.
899 * And the key result of the test is whether it backfills all 4 documents.
900 * If it does then we have demonstrated that data is not lost.
901 *
902 */
903
904// This callback function is called every time a backfill is performed on
905// test MB22960_cursor_dropping_data_loss.
906void MB22960callbackBeforeRegisterCursor(
907        EPBucket* store,
908        MockActiveStreamWithOverloadedRegisterCursor& mock_stream,
909        VBucketPtr vb,
910        size_t& registerCursorCount) {
911    EXPECT_LE(registerCursorCount, 1);
912    // The test performs two backfills, and the callback is only required
913    // on the first, so that it can test what happens when checkpoints are
914    // moved forward during a backfill.
915    if (registerCursorCount == 0) {
916        bool new_ckpt_created;
917        CheckpointManager& ckpt_mgr = *vb->checkpointManager;
918
919        //pendingBackfill has now been cleared
920        EXPECT_FALSE(mock_stream.public_getPendingBackfill())
921                << "pendingBackfill is not false";
922        // we are now in backfill mode
923        EXPECT_TRUE(mock_stream.public_isBackfillTaskRunning())
924                << "isBackfillRunning is not true";
925
926        // This method is bypassing store->set to avoid a test only lock
927        // inversion with collections read locks
928        queued_item qi1(new Item(makeStoredDocKey("key3"),
929                                 0,
930                                 0,
931                                 "v",
932                                 1,
933                                 PROTOCOL_BINARY_RAW_BYTES,
934                                 0,
935                                 -1,
936                                 vb->getId()));
937
938        // queue an Item and close previous checkpoint
939        vb->checkpointManager->queueDirty(*vb,
940                                          qi1,
941                                          GenerateBySeqno::Yes,
942                                          GenerateCas::Yes,
943                                          /*preLinkDocCtx*/ nullptr);
944
945        EXPECT_EQ(std::make_pair(false, size_t(1)),
946                  store->flushVBucket(vb->getId()));
947        ckpt_mgr.createNewCheckpoint();
948        EXPECT_EQ(3, ckpt_mgr.getNumCheckpoints());
949        EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
950
951        // Now remove the earlier checkpoint
952        EXPECT_EQ(1, ckpt_mgr.removeClosedUnrefCheckpoints(
953                *vb, new_ckpt_created));
954        EXPECT_EQ(2, ckpt_mgr.getNumCheckpoints());
955        EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
956
957        queued_item qi2(new Item(makeStoredDocKey("key3"),
958                                 0,
959                                 0,
960                                 "v",
961                                 1,
962                                 PROTOCOL_BINARY_RAW_BYTES,
963                                 0,
964                                 -1,
965                                 vb->getId()));
966
967        // queue an Item and close previous checkpoint
968        vb->checkpointManager->queueDirty(*vb,
969                                          qi2,
970                                          GenerateBySeqno::Yes,
971                                          GenerateCas::Yes,
972                                          /*preLinkDocCtx*/ nullptr);
973
974        EXPECT_EQ(std::make_pair(false, size_t(1)),
975                  store->flushVBucket(vb->getId()));
976        ckpt_mgr.createNewCheckpoint();
977        EXPECT_EQ(3, ckpt_mgr.getNumCheckpoints());
978        EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
979
980        // Now remove the earlier checkpoint
981        EXPECT_EQ(1, ckpt_mgr.removeClosedUnrefCheckpoints(
982                *vb, new_ckpt_created));
983        EXPECT_EQ(2, ckpt_mgr.getNumCheckpoints());
984        EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
985    }
986}
987
988TEST_F(SingleThreadedEPBucketTest, MB22960_cursor_dropping_data_loss) {
989    // Records the number of times ActiveStream::registerCursor is invoked.
990    size_t registerCursorCount = 0;
991    // Make vbucket active.
992    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
993    auto vb = store->getVBuckets().getBucket(vbid);
994    ASSERT_NE(nullptr, vb.get());
995    auto& ckpt_mgr = *vb->checkpointManager;
996    EXPECT_EQ(1, ckpt_mgr.getNumCheckpoints());
997    EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
998
999    // Create a Mock Dcp producer
1000    auto producer = std::make_shared<MockDcpProducer>(
1001            *engine,
1002            cookie,
1003            "test_producer",
1004            /*flags*/ 0,
1005            cb::const_byte_buffer() /*no json*/);
1006
1007    // Since we are creating a mock active stream outside of
1008    // DcpProducer::streamRequest(), and we want the checkpt processor task,
1009    // create it explicitly here
1010    producer->createCheckpointProcessorTask();
1011    producer->scheduleCheckpointProcessorTask();
1012
1013    // Create a Mock Active Stream
1014    auto mock_stream =
1015            std::make_shared<MockActiveStreamWithOverloadedRegisterCursor>(
1016                    static_cast<EventuallyPersistentEngine*>(engine.get()),
1017                    producer,
1018                    /*flags*/ 0,
1019                    /*opaque*/ 0,
1020                    *vb,
1021                    /*st_seqno*/ 0,
1022                    /*en_seqno*/ ~0,
1023                    /*vb_uuid*/ 0xabcd,
1024                    /*snap_start_seqno*/ 0,
1025                    /*snap_end_seqno*/ ~0,
1026                    IncludeValue::Yes,
1027                    IncludeXattrs::Yes);
1028
1029    auto& mockStreamObj = *mock_stream;
1030    mock_stream->setCallbackBeforeRegisterCursor(
1031            [this, &mockStreamObj, vb, &registerCursorCount]() {
1032                MB22960callbackBeforeRegisterCursor(
1033                        &getEPBucket(), mockStreamObj, vb, registerCursorCount);
1034            });
1035
1036    mock_stream->setCallbackAfterRegisterCursor(
1037            [&mock_stream, &registerCursorCount]() {
1038                // This callback is called every time a backfill is performed.
1039                // It is called immediately after completing
1040                // ActiveStream::registerCursor.
1041                registerCursorCount++;
1042                if (registerCursorCount == 1) {
1043                    EXPECT_TRUE(mock_stream->public_getPendingBackfill());
1044                } else {
1045                    EXPECT_EQ(2, registerCursorCount);
1046                    EXPECT_FALSE(mock_stream->public_getPendingBackfill());
1047                }
1048            });
1049
1050    EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
1051    mock_stream->transitionStateToBackfilling();
1052    EXPECT_EQ(2, ckpt_mgr.getNumOfCursors());
1053    // When we call transitionStateToBackfilling going from a StreamPending
1054    // state to a StreamBackfilling state, we end up calling
1055    // scheduleBackfill_UNLOCKED and as no backfill is required we end-up in a
1056    // StreamInMemory state.
1057    EXPECT_TRUE(mock_stream->isInMemory())
1058        << "stream state should have transitioned to StreamInMemory";
1059
1060    store_item(vbid, makeStoredDocKey("key1"), "value");
1061    EXPECT_EQ(std::make_pair(false, size_t(1)),
1062              getEPBucket().flushVBucket(vbid));
1063    ckpt_mgr.createNewCheckpoint();
1064    EXPECT_EQ(2, ckpt_mgr.getNumCheckpoints());
1065
1066    store_item(vbid, makeStoredDocKey("key2"), "value");
1067    EXPECT_EQ(std::make_pair(false, size_t(1)),
1068              getEPBucket().flushVBucket(vbid));
1069    ckpt_mgr.createNewCheckpoint();
1070    EXPECT_EQ(3, ckpt_mgr.getNumCheckpoints());
1071
1072    // can't remove checkpoint because of DCP stream.
1073    bool new_ckpt_created;
1074    EXPECT_EQ(0, ckpt_mgr.removeClosedUnrefCheckpoints(*vb, new_ckpt_created));
1075    EXPECT_EQ(2, ckpt_mgr.getNumOfCursors());
1076
1077    mock_stream->handleSlowStream();
1078
1079    EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
1080    EXPECT_TRUE(mock_stream->isInMemory())
1081        << "stream state should not have changed";
1082    EXPECT_TRUE(mock_stream->public_getPendingBackfill())
1083        << "pendingBackfill is not true";
1084    EXPECT_EQ(3, ckpt_mgr.getNumCheckpoints());
1085
1086    // Because we dropped the cursor we can now remove checkpoint
1087    EXPECT_EQ(1, ckpt_mgr.removeClosedUnrefCheckpoints(*vb, new_ckpt_created));
1088    EXPECT_EQ(2, ckpt_mgr.getNumCheckpoints());
1089
1090    //schedule a backfill
1091    mock_stream->next();
1092
1093    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1094    EXPECT_EQ(2, lpAuxioQ.getFutureQueueSize());
1095    // backfill:create()
1096    runNextTask(lpAuxioQ);
1097    // backfill:scan()
1098    runNextTask(lpAuxioQ);
1099    // backfill:complete()
1100    runNextTask(lpAuxioQ);
1101    // backfill:finished()
1102    runNextTask(lpAuxioQ);
1103    // inMemoryPhase and pendingBackfill is true and so transitions to
1104    // backfillPhase
1105    // take snapshot marker off the ReadyQ
1106    auto resp = mock_stream->next();
1107    // backfillPhase() - take doc "key1" off the ReadyQ
1108    resp = mock_stream->next();
1109    // backfillPhase - take doc "key2" off the ReadyQ
1110    resp = mock_stream->next();
1111    runNextTask(lpAuxioQ);
1112    runNextTask(lpAuxioQ);
1113    runNextTask(lpAuxioQ);
1114    runNextTask(lpAuxioQ);
1115    // Assert that the callback (and hence backfill) was only invoked twice
1116    ASSERT_EQ(2, registerCursorCount);
1117    // take snapshot marker off the ReadyQ
1118    resp = mock_stream->next();
1119    // backfillPhase - take doc "key3" off the ReadyQ
1120    resp = mock_stream->next();
1121    // backfillPhase() - take doc "key4" off the ReadyQ
1122    // isBackfillTaskRunning is not running and ReadyQ is now empty so also
1123    // transitionState from StreamBackfilling to StreamInMemory
1124    resp = mock_stream->next();
1125    EXPECT_TRUE(mock_stream->isInMemory())
1126        << "stream state should have transitioned to StreamInMemory";
1127    // inMemoryPhase.  ReadyQ is empty and pendingBackfill is false and so
1128    // return NULL
1129    resp = mock_stream->next();
1130    EXPECT_EQ(nullptr, resp);
1131    EXPECT_EQ(2, ckpt_mgr.getNumCheckpoints());
1132    EXPECT_EQ(2, ckpt_mgr.getNumOfCursors());
1133
1134    // BackfillManagerTask
1135    runNextTask(lpAuxioQ);
1136
1137    // Stop Producer checkpoint processor task
1138    producer->cancelCheckpointCreatorTask();
1139}
1140
1141/* The following is a regression test for MB25056, which came about due the fix
1142 * for MB22960 having a bug where it is set pendingBackfill to true too often.
1143 *
1144 * To demonstrate the issue we need:
1145 *
1146 * 1. vbucket state to be replica
1147 *
1148 * 2. checkpoint state to be similar to the following:
1149 * CheckpointManager[0x10720d908] with numItems:3 checkpoints:1
1150 *   Checkpoint[0x10723d2a0] with seqno:{2,4} state:CHECKPOINT_OPEN items:[
1151 *   {1,empty,dummy_key}
1152 *   {2,checkpoint_start,checkpoint_start}
1153 *   {2,set,key2}
1154 *   {4,set,key3}
1155 * ]
1156 *   connCursors:[
1157 *       persistence: CheckpointCursor[0x7fff5ca0cf98] with name:persistence
1158 *       currentCkpt:{id:1 state:CHECKPOINT_OPEN} currentPos:2 offset:2
1159 *       ckptMetaItemsRead:1
1160 *
1161 *       test_producer: CheckpointCursor[0x7fff5ca0cf98] with name:test_producer
1162 *       currentCkpt:{id:1 state:CHECKPOINT_OPEN} currentPos:1 offset:0
1163 *       ckptMetaItemsRead:0
1164 *   ]
1165 *
1166 * 3. active stream to the vbucket requesting start seqno=0 and end seqno=4
1167 *
1168 * The test behaviour is that we perform a backfill.  In markDiskSnapshot (which
1169 * is invoked when we perform a backfill) we merge items in the open checkpoint.
1170 * In the test below this means the snapshot {start, end} is originally {0, 2}
1171 * but is extended to {0, 4}.
1172 *
1173 * We then call registerCursor with the lastProcessedSeqno of 2, which then
1174 * calls through to registerCursorBySeqno and returns 4.  Given that
1175 * 4 - 1 > 2 in the original fix for MB25056 we incorrectly set pendingBackfill
1176 * to true.  However by checking if the seqno returned is the first in the
1177 * checkpoint we can confirm whether a backfill is actually required, and hence
1178 * whether pendingBackfill should be set to true.
1179 *
1180 * In this test the result is not the first seqno in the checkpoint and so
1181 * pendingBackfill should be false.
1182 */
1183
1184TEST_F(SingleThreadedEPBucketTest, MB25056_do_not_set_pendingBackfill_to_true) {
1185    // Records the number of times registerCursor is invoked.
1186    size_t registerCursorCount = 0;
1187    // Make vbucket a replica.
1188    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
1189    auto vb = store->getVBuckets().getBucket(vbid);
1190    ASSERT_NE(nullptr, vb.get());
1191    auto& ckpt_mgr = *vb->checkpointManager;
1192    EXPECT_EQ(1, ckpt_mgr.getNumCheckpoints());
1193    EXPECT_EQ(1, ckpt_mgr.getNumOfCursors());
1194
1195    // Add an item and flush to vbucket
1196    auto item = make_item(vbid, makeStoredDocKey("key1"), "value");
1197    item.setCas(1);
1198    uint64_t seqno;
1199    store->setWithMeta(std::ref(item),
1200                       0,
1201                       &seqno,
1202                       cookie,
1203                       {vbucket_state_replica},
1204                       CheckConflicts::No,
1205                       /*allowExisting*/ true);
1206    getEPBucket().flushVBucket(vbid);
1207
1208    // Close the first checkpoint and create a second one
1209    ckpt_mgr.createNewCheckpoint();
1210
1211    // Remove the first checkpoint
1212    bool new_ckpt_created;
1213    ckpt_mgr.removeClosedUnrefCheckpoints(*vb, new_ckpt_created);
1214
1215    // Add a second item and flush to bucket
1216    auto item2 = make_item(vbid, makeStoredDocKey("key2"), "value");
1217    item2.setCas(1);
1218    store->setWithMeta(std::ref(item2),
1219                       0,
1220                       &seqno,
1221                       cookie,
1222                       {vbucket_state_replica},
1223                       CheckConflicts::No,
1224                       /*allowExisting*/ true);
1225    getEPBucket().flushVBucket(vbid);
1226
1227    // Add 2 further items to the second checkpoint.  As both have the key
1228    // "key3" the first of the two items will be de-duplicated away.
1229    // Do NOT flush to vbucket.
1230    for (int ii = 0; ii < 2; ii++) {
1231        auto item = make_item(vbid, makeStoredDocKey("key3"), "value");
1232        item.setCas(1);
1233        store->setWithMeta(std::ref(item),
1234                           0,
1235                           &seqno,
1236                           cookie,
1237                           {vbucket_state_replica},
1238                           CheckConflicts::No,
1239                           /*allowExisting*/ true);
1240    }
1241
1242    // Create a Mock Dcp producer
1243    const std::string testName("test_producer");
1244    auto producer = std::make_shared<MockDcpProducer>(
1245            *engine,
1246            cookie,
1247            testName,
1248            /*flags*/ 0,
1249            cb::const_byte_buffer() /*no json*/);
1250
1251    // Since we are creating a mock active stream outside of
1252    // DcpProducer::streamRequest(), and we want the checkpt processor task,
1253    // create it explicitly here
1254    producer->createCheckpointProcessorTask();
1255    producer->scheduleCheckpointProcessorTask();
1256
1257    // Create a Mock Active Stream
1258    auto mock_stream =
1259            std::make_shared<MockActiveStreamWithOverloadedRegisterCursor>(
1260                    static_cast<EventuallyPersistentEngine*>(engine.get()),
1261                    producer,
1262                    /*flags*/ 0,
1263                    /*opaque*/ 0,
1264                    *vb,
1265                    /*st_seqno*/ 0,
1266                    /*en_seqno*/ 4,
1267                    /*vb_uuid*/ 0xabcd,
1268                    /*snap_start_seqno*/ 0,
1269                    /*snap_end_seqno*/ ~0,
1270                    IncludeValue::Yes,
1271                    IncludeXattrs::Yes);
1272
1273    mock_stream->setCallbackBeforeRegisterCursor(
1274            [this, vb, &registerCursorCount]() {
1275                // This callback function is called every time a backfill is
1276                // performed. It is called immediately prior to executing
1277                // ActiveStream::registerCursor.
1278                EXPECT_EQ(0, registerCursorCount);
1279            });
1280
1281    mock_stream->setCallbackAfterRegisterCursor(
1282            [&mock_stream, &registerCursorCount]() {
1283                // This callback function is called every time a backfill is
1284                // performed. It is called immediately after completing
1285                // ActiveStream::registerCursor.
1286                // The key point of the test is pendingBackfill is set to false
1287                registerCursorCount++;
1288                EXPECT_EQ(1, registerCursorCount);
1289                EXPECT_FALSE(mock_stream->public_getPendingBackfill());
1290            });
1291
1292    // transitioning to Backfilling results in calling
1293    // scheduleBackfill_UNLOCKED(false)
1294    mock_stream->transitionStateToBackfilling();
1295    // schedule the backfill
1296    mock_stream->next();
1297
1298    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1299    EXPECT_EQ(2, lpAuxioQ.getFutureQueueSize());
1300    // backfill:create()
1301    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
1302    // backfill:scan()
1303    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
1304    // backfill:complete()
1305    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
1306    // backfill:finished()
1307    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
1308    // inMemoryPhase and pendingBackfill is true and so transitions to
1309    // backfillPhase
1310    // take snapshot marker off the ReadyQ
1311    std::unique_ptr<DcpResponse> resp =
1312            static_cast< std::unique_ptr<DcpResponse> >(mock_stream->next());
1313    EXPECT_EQ(DcpResponse::Event::SnapshotMarker, resp->getEvent());
1314
1315    // backfillPhase() - take doc "key1" off the ReadyQ
1316    resp = mock_stream->next();
1317    EXPECT_EQ(DcpResponse::Event::Mutation, resp->getEvent());
1318    EXPECT_EQ(std::string("key1"),
1319              dynamic_cast<MutationResponse*>(resp.get())->
1320              getItem()->getKey().c_str());
1321
1322    // backfillPhase - take doc "key2" off the ReadyQ
1323    resp = mock_stream->next();
1324    EXPECT_EQ(DcpResponse::Event::Mutation, resp->getEvent());
1325    EXPECT_EQ(std::string("key2"),
1326              dynamic_cast<MutationResponse*>(resp.get())->
1327              getItem()->getKey().c_str());
1328
1329    EXPECT_TRUE(mock_stream->isInMemory())
1330            << "stream state should have transitioned to StreamInMemory";
1331
1332    resp = mock_stream->next();
1333    EXPECT_FALSE(resp);
1334
1335    EXPECT_EQ(1, ckpt_mgr.getNumCheckpoints());
1336    EXPECT_EQ(2, ckpt_mgr.getNumOfCursors());
1337    // Assert that registerCursor (and hence backfill) was only invoked once
1338    ASSERT_EQ(1, registerCursorCount);
1339
1340    // ActiveStreamCheckpointProcessorTask
1341    runNextTask(lpAuxioQ, "Process checkpoint(s) for DCP producer " + testName);
1342    // BackfillManagerTask
1343    runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
1344
1345    // Stop Producer checkpoint processor task
1346    producer->cancelCheckpointCreatorTask();
1347}
1348
1349/**
1350 * Regression test for MB-22451: When handleSlowStream is called and in
1351 * StreamBackfilling state and currently have a backfill scheduled (or running)
1352 * ensure that when the backfill completes pendingBackfill remains true,
1353 * isBackfillTaskRunning is false and, the stream state remains set to
1354 * StreamBackfilling.
1355 */
1356TEST_F(SingleThreadedEPBucketTest, test_mb22451) {
1357    // Make vbucket active.
1358    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
1359    // Store a single Item
1360    store_item(vbid, makeStoredDocKey("key"), "value");
1361    // Ensure that it has persisted to disk
1362    flush_vbucket_to_disk(vbid);
1363
1364    // Create a Mock Dcp producer
1365    auto producer = std::make_shared<MockDcpProducer>(
1366            *engine,
1367            cookie,
1368            "test_producer",
1369            /*flags*/ 0,
1370            cb::const_byte_buffer() /*no json*/);
1371    // Create a Mock Active Stream
1372    auto vb = store->getVBucket(vbid);
1373    ASSERT_NE(nullptr, vb.get());
1374    auto mock_stream = std::make_shared<MockActiveStream>(
1375            static_cast<EventuallyPersistentEngine*>(engine.get()),
1376            producer,
1377            /*flags*/ 0,
1378            /*opaque*/ 0,
1379            *vb,
1380            /*st_seqno*/ 0,
1381            /*en_seqno*/ ~0,
1382            /*vb_uuid*/ 0xabcd,
1383            /*snap_start_seqno*/ 0,
1384            /*snap_end_seqno*/ ~0,
1385            IncludeValue::Yes,
1386            IncludeXattrs::Yes);
1387
1388    /**
1389      * The core of the test follows:
1390      * Call completeBackfill whilst we are in the state of StreamBackfilling
1391      * and the pendingBackfill flag is set to true.
1392      * We expect that on leaving completeBackfill the isBackfillRunning flag is
1393      * set to true.
1394      */
1395    mock_stream->public_setBackfillTaskRunning(true);
1396    mock_stream->transitionStateToBackfilling();
1397    mock_stream->handleSlowStream();
1398    // The call to handleSlowStream should result in setting pendingBackfill
1399    // flag to true
1400    EXPECT_TRUE(mock_stream->public_getPendingBackfill())
1401        << "handleSlowStream should set pendingBackfill to True";
1402    mock_stream->completeBackfill();
1403    EXPECT_FALSE(mock_stream->public_isBackfillTaskRunning())
1404        << "completeBackfill should set isBackfillTaskRunning to False";
1405    EXPECT_TRUE(mock_stream->isBackfilling())
1406        << "stream state should not have changed";
1407    // Required to ensure that the backfillMgr is deleted
1408    producer->closeAllStreams();
1409
1410    // Stop Producer checkpoint processor task
1411    producer->cancelCheckpointCreatorTask();
1412}
1413
1414/* Regression / reproducer test for MB-19815 - an exception is thrown
1415 * (and connection disconnected) if a couchstore file hasn't been re-created
1416 * yet when doDcpVbTakeoverStats() is called.
1417 */
1418TEST_F(SingleThreadedEPBucketTest, MB19815_doDcpVbTakeoverStats) {
1419    auto* task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
1420        (ExecutorPool::get());
1421
1422    // Should start with no tasks registered on any queues.
1423    for (auto& queue : task_executor->getLpTaskQ()) {
1424        ASSERT_EQ(0, queue->getFutureQueueSize());
1425        ASSERT_EQ(0, queue->getReadyQueueSize());
1426    }
1427
1428    // [[1] Set our state to replica.
1429    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
1430
1431    // [[2]] Perform a vbucket reset. This will perform some work synchronously,
1432    // but also creates the task that will delete the VB.
1433    //   * vbucket memory and disk deletion (AUXIO)
1434    // MB-19695: If we try to get the number of persisted deletes between
1435    // steps [[2]] and [[3]] running then an exception is thrown (and client
1436    // disconnected).
1437    EXPECT_TRUE(store->resetVBucket(vbid));
1438    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1439    runNextTask(lpAuxioQ, "Removing (dead) vb:0 from memory and disk");
1440
1441    // [[3]] Ok, let's see if we can get DCP takeover stats.
1442    // Dummy callback to pass into the stats function below.
1443    auto dummy_cb = [](const char* key,
1444                       const uint16_t klen,
1445                       const char* val,
1446                       const uint32_t vlen,
1447                       gsl::not_null<const void*> cookie) {};
1448    std::string key{"MB19815_doDCPVbTakeoverStats"};
1449
1450    // We can't call stats with a nullptr as the cookie. Given that
1451    // the callback don't use the cookie "at all" we can just use the key
1452    // as the cookie
1453    EXPECT_NO_THROW(engine->public_doDcpVbTakeoverStats(
1454            static_cast<const void*>(key.c_str()), dummy_cb, key, vbid));
1455
1456    // Cleanup - run flusher.
1457    EXPECT_EQ(std::make_pair(false, size_t(0)),
1458              getEPBucket().flushVBucket(vbid));
1459}
1460
1461/*
1462 * Test that
1463 * 1. We cannot create a stream against a dead vb (MB-17230)
1464 * 2. No tasks are scheduled as a side-effect of the streamRequest attempt.
1465 */
1466TEST_F(SingleThreadedEPBucketTest, MB19428_no_streams_against_dead_vbucket) {
1467    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
1468
1469    store_item(vbid, makeStoredDocKey("key"), "value");
1470
1471    // Directly flush the vbucket
1472    EXPECT_EQ(std::make_pair(false, size_t(1)),
1473              getEPBucket().flushVBucket(vbid));
1474
1475    setVBucketStateAndRunPersistTask(vbid, vbucket_state_dead);
1476    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1477
1478    {
1479        // Create a Mock Dcp producer
1480        auto producer = std::make_shared<MockDcpProducer>(
1481                *engine,
1482                cookie,
1483                "test_producer",
1484                /*flags*/ 0,
1485                cb::const_byte_buffer() /*no json*/);
1486
1487        // Creating a producer will not create an
1488        // ActiveStreamCheckpointProcessorTask until a stream is created.
1489        EXPECT_EQ(0, lpAuxioQ.getFutureQueueSize());
1490
1491        uint64_t rollbackSeqno;
1492        auto err = producer->streamRequest(
1493                /*flags*/ 0,
1494                /*opaque*/ 0,
1495                /*vbucket*/ vbid,
1496                /*start_seqno*/ 0,
1497                /*end_seqno*/ -1,
1498                /*vb_uuid*/ 0,
1499                /*snap_start*/ 0,
1500                /*snap_end*/ 0,
1501                &rollbackSeqno,
1502                SingleThreadedEPBucketTest::fakeDcpAddFailoverLog);
1503
1504        EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, err) << "Unexpected error code";
1505
1506        // The streamRequest failed and should not of created anymore tasks than
1507        // ActiveStreamCheckpointProcessorTask.
1508        EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
1509
1510        // Stop Producer checkpoint processor task
1511        producer->cancelCheckpointCreatorTask();
1512    }
1513}
1514
1515/*
1516 * Test that TaskQueue::wake results in a sensible ExecutorPool work count
1517 * Incorrect counting can result in the run loop spinning for many threads.
1518 */
1519TEST_F(SingleThreadedEPBucketTest, MB20235_wake_and_work_count) {
1520    class TestTask : public GlobalTask {
1521    public:
1522        TestTask(EventuallyPersistentEngine *e, double s) :
1523                 GlobalTask(e, TaskId::ActiveStreamCheckpointProcessorTask, s) {}
1524        bool run() {
1525            return false;
1526        }
1527
1528        std::string getDescription() {
1529            return "Test MB20235";
1530        }
1531
1532        std::chrono::microseconds maxExpectedDuration() {
1533            return std::chrono::seconds(0);
1534        }
1535    };
1536
1537    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1538
1539    // New task with a massive sleep
1540    ExTask task = std::make_shared<TestTask>(engine.get(), 99999.0);
1541    EXPECT_EQ(0, lpAuxioQ.getFutureQueueSize());
1542
1543    // schedule the task, futureQueue grows
1544    task_executor->schedule(task);
1545    EXPECT_EQ(lpAuxioQ.getReadyQueueSize(), task_executor->getTotReadyTasks());
1546    EXPECT_EQ(lpAuxioQ.getReadyQueueSize(),
1547              task_executor->getNumReadyTasks(AUXIO_TASK_IDX));
1548    EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
1549
1550    // Wake task, but stays in futureQueue (fetch can now move it)
1551    task_executor->wake(task->getId());
1552    EXPECT_EQ(lpAuxioQ.getReadyQueueSize(), task_executor->getTotReadyTasks());
1553    EXPECT_EQ(lpAuxioQ.getReadyQueueSize(),
1554              task_executor->getNumReadyTasks(AUXIO_TASK_IDX));
1555    EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
1556    EXPECT_EQ(0, lpAuxioQ.getReadyQueueSize());
1557
1558    runNextTask(lpAuxioQ);
1559    EXPECT_EQ(lpAuxioQ.getReadyQueueSize(), task_executor->getTotReadyTasks());
1560    EXPECT_EQ(lpAuxioQ.getReadyQueueSize(),
1561              task_executor->getNumReadyTasks(AUXIO_TASK_IDX));
1562    EXPECT_EQ(0, lpAuxioQ.getFutureQueueSize());
1563    EXPECT_EQ(0, lpAuxioQ.getReadyQueueSize());
1564}
1565
1566// Check that in-progress disk backfills (`CouchKVStore::backfill`) are
1567// correctly deleted when we delete a bucket. If not then we leak vBucket file
1568// descriptors, which can prevent ns_server from cleaning up old vBucket files
1569// and consequently re-adding a node to the cluster.
1570//
1571TEST_F(SingleThreadedEPBucketTest, MB19892_BackfillNotDeleted) {
1572    // Make vbucket active.
1573    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
1574
1575    // Perform one SET, then close it's checkpoint. This means that we no
1576    // longer have all sequence numbers in memory checkpoints, forcing the
1577    // DCP stream request to go to disk (backfill).
1578    store_item(vbid, makeStoredDocKey("key"), "value");
1579
1580    // Force a new checkpoint.
1581    auto vb = store->getVBuckets().getBucket(vbid);
1582    auto& ckpt_mgr = *vb->checkpointManager;
1583    ckpt_mgr.createNewCheckpoint();
1584
1585    // Directly flush the vbucket, ensuring data is on disk.
1586    //  (This would normally also wake up the checkpoint remover task, but
1587    //   as that task was never registered with the ExecutorPool in this test
1588    //   environment, we need to manually remove the prev checkpoint).
1589    EXPECT_EQ(std::make_pair(false, size_t(1)),
1590              getEPBucket().flushVBucket(vbid));
1591
1592    bool new_ckpt_created;
1593    EXPECT_EQ(1, ckpt_mgr.removeClosedUnrefCheckpoints(*vb, new_ckpt_created));
1594
1595    // Create a DCP producer, and start a stream request.
1596    std::string name{"test_producer"};
1597    EXPECT_EQ(ENGINE_SUCCESS,
1598              engine->dcpOpen(cookie,
1599                              /*opaque:unused*/ {},
1600                              /*seqno:unused*/ {},
1601                              DCP_OPEN_PRODUCER,
1602                              name,
1603                              {}));
1604
1605    uint64_t rollbackSeqno;
1606    auto dummy_dcp_add_failover_cb = [](vbucket_failover_t* entry,
1607                                        size_t nentries,
1608                                        gsl::not_null<const void*> cookie) {
1609        return ENGINE_SUCCESS;
1610    };
1611
1612    // Actual stream request method (EvpDcpStreamReq) is static, so access via
1613    // the engine_interface.
1614    EXPECT_EQ(ENGINE_SUCCESS,
1615              engine.get()->dcp.stream_req(&engine.get()->interface,
1616                                           cookie,
1617                                           /*flags*/ 0,
1618                                           /*opaque*/ 0,
1619                                           /*vbucket*/ vbid,
1620                                           /*start_seqno*/ 0,
1621                                           /*end_seqno*/ -1,
1622                                           /*vb_uuid*/ 0,
1623                                           /*snap_start*/ 0,
1624                                           /*snap_end*/ 0,
1625                                           &rollbackSeqno,
1626                                           dummy_dcp_add_failover_cb));
1627}
1628
1629/*
1630 * Test that the DCP processor returns a 'yield' return code when
1631 * working on a large enough buffer size.
1632 */
1633TEST_F(SingleThreadedEPBucketTest, MB18452_yield_dcp_processor) {
1634
1635    // We need a replica VB
1636    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
1637
1638    // Create a MockDcpConsumer
1639    auto consumer = std::make_shared<MockDcpConsumer>(*engine, cookie, "test");
1640
1641    // Add the stream
1642    EXPECT_EQ(ENGINE_SUCCESS,
1643              consumer->addStream(/*opaque*/0, vbid, /*flags*/0));
1644
1645    // The processBufferedItems should yield every "yield * batchSize"
1646    // So add '(n * (yield * batchSize)) + 1' messages and we should see
1647    // processBufferedMessages return 'more_to_process' 'n' times and then
1648    // 'all_processed' once.
1649    const int n = 4;
1650    const int yield = engine->getConfiguration().getDcpConsumerProcessBufferedMessagesYieldLimit();
1651    const int batchSize = engine->getConfiguration().getDcpConsumerProcessBufferedMessagesBatchSize();
1652    const int messages = n * (batchSize * yield);
1653
1654    // Force the stream to buffer rather than process messages immediately
1655    const ssize_t queueCap = engine->getEpStats().replicationThrottleWriteQueueCap;
1656    engine->getEpStats().replicationThrottleWriteQueueCap = 0;
1657
1658    // 1. Add the first message, a snapshot marker.
1659    consumer->snapshotMarker(/*opaque*/1, vbid, /*startseq*/0,
1660                             /*endseq*/messages, /*flags*/0);
1661
1662    // 2. Now add the rest as mutations.
1663    for (int ii = 0; ii <= messages; ii++) {
1664        const std::string key = "key" + std::to_string(ii);
1665        const DocKey docKey{key, DocNamespace::DefaultCollection};
1666        std::string value = "value";
1667
1668        consumer->mutation(1/*opaque*/,
1669                           docKey,
1670                           {(const uint8_t*)value.c_str(), value.length()},
1671                           0, // privileged bytes
1672                           PROTOCOL_BINARY_RAW_BYTES, // datatype
1673                           0, // cas
1674                           vbid, // vbucket
1675                           0, // flags
1676                           ii, // bySeqno
1677                           0, // revSeqno
1678                           0, // exptime
1679                           0, // locktime
1680                           {}, // meta
1681                           0); // nru
1682    }
1683
1684    // Set the throttle back to the original value
1685    engine->getEpStats().replicationThrottleWriteQueueCap = queueCap;
1686
1687    // Get our target stream ready.
1688    static_cast<MockDcpConsumer*>(consumer.get())->public_notifyVbucketReady(vbid);
1689
1690    // 3. processBufferedItems returns more_to_process n times
1691    for (int ii = 0; ii < n; ii++) {
1692        EXPECT_EQ(more_to_process, consumer->processBufferedItems());
1693    }
1694
1695    // 4. processBufferedItems returns a final all_processed
1696    EXPECT_EQ(all_processed, consumer->processBufferedItems());
1697
1698    // Drop the stream
1699    consumer->closeStream(/*opaque*/0, vbid);
1700}
1701
1702/**
1703 * MB-29861: Ensure that a delete time is generated for a document
1704 * that is received on the consumer side as a result of a disk
1705 * backfill
1706 */
1707TEST_F(SingleThreadedEPBucketTest, MB_29861) {
1708    // We need a replica VB
1709    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
1710
1711    // Create a MockDcpConsumer
1712    auto consumer = std::make_shared<MockDcpConsumer>(*engine, cookie, "test");
1713
1714    // Add the stream
1715    EXPECT_EQ(ENGINE_SUCCESS,
1716              consumer->addStream(/*opaque*/ 0, vbid, /*flags*/ 0));
1717
1718    // 1. Add the first message, a snapshot marker to ensure that the
1719    //    vbucket goes to the backfill state
1720    consumer->snapshotMarker(/*opaque*/ 1,
1721                             vbid,
1722                             /*startseq*/ 0,
1723                             /*endseq*/ 2,
1724                             /*flags*/ MARKER_FLAG_DISK);
1725
1726    // 2. Now add a deletion.
1727    consumer->deletion(/*opaque*/ 1,
1728                       {"key1", DocNamespace::DefaultCollection},
1729                       /*value*/ {},
1730                       /*priv_bytes*/ 0,
1731                       /*datatype*/ PROTOCOL_BINARY_RAW_BYTES,
1732                       /*cas*/ 0,
1733                       /*vbucket*/ vbid,
1734                       /*bySeqno*/ 1,
1735                       /*revSeqno*/ 0,
1736                       /*meta*/ {});
1737
1738    EXPECT_EQ(std::make_pair(false, size_t(1)),
1739              getEPBucket().flushVBucket(vbid));
1740
1741    // Drop the stream
1742    consumer->closeStream(/*opaque*/ 0, vbid);
1743
1744    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
1745    // Now read back and verify key1 has a non-zero delete time
1746    ItemMetaData metadata;
1747    uint32_t deleted = 0;
1748    uint8_t datatype = 0;
1749    EXPECT_EQ(ENGINE_EWOULDBLOCK,
1750              store->getMetaData(makeStoredDocKey("key1"),
1751                                 vbid,
1752                                 cookie,
1753                                 metadata,
1754                                 deleted,
1755                                 datatype));
1756
1757    // Manually run the bgfetch task.
1758    MockGlobalTask mockTask(engine->getTaskable(), TaskId::MultiBGFetcherTask);
1759    store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
1760    EXPECT_EQ(ENGINE_SUCCESS,
1761              store->getMetaData(makeStoredDocKey("key1"),
1762                                 vbid,
1763                                 cookie,
1764                                 metadata,
1765                                 deleted,
1766                                 datatype));
1767    EXPECT_EQ(1, deleted);
1768    EXPECT_EQ(PROTOCOL_BINARY_RAW_BYTES, datatype);
1769    EXPECT_NE(0, metadata.exptime); // A locally created deleteTime
1770}
1771
1772/*
1773 * Test that the consumer will use the delete time given
1774 */
1775TEST_P(STParameterizedBucketTest, MB_27457) {
1776    // We need a replica VB
1777    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
1778
1779    // Create a MockDcpConsumer
1780    auto consumer = std::make_shared<MockDcpConsumer>(*engine, cookie, "test");
1781
1782    // Bump forwards so ep_current_time cannot be 0
1783    TimeTraveller biff(64000);
1784
1785    // Add the stream
1786    EXPECT_EQ(ENGINE_SUCCESS,
1787              consumer->addStream(/*opaque*/ 0, vbid, /*flags*/ 0));
1788
1789    // 1. Add the first message, a snapshot marker.
1790    consumer->snapshotMarker(/*opaque*/ 1,
1791                             vbid,
1792                             /*startseq*/ 0,
1793                             /*endseq*/ 2,
1794                             /*flags*/ 0);
1795    // 2. Now add two deletions, one without deleteTime, one with
1796    consumer->deletionV2(/*opaque*/ 1,
1797                         {"key1", DocNamespace::DefaultCollection},
1798                         /*values*/ {},
1799                         /*priv_bytes*/ 0,
1800                         /*datatype*/ PROTOCOL_BINARY_RAW_BYTES,
1801                         /*cas*/ 1,
1802                         /*vbucket*/ vbid,
1803                         /*bySeqno*/ 1,
1804                         /*revSeqno*/ 0,
1805                         /*deleteTime*/ 0);
1806
1807    const uint32_t deleteTime = 1958601165;
1808    consumer->deletionV2(/*opaque*/ 1,
1809                         {"key2", DocNamespace::DefaultCollection},
1810                         /*value*/ {},
1811                         /*priv_bytes*/ 0,
1812                         /*datatype*/ PROTOCOL_BINARY_RAW_BYTES,
1813                         /*cas*/ 2,
1814                         /*vbucket*/ vbid,
1815                         /*bySeqno*/ 2,
1816                         /*revSeqno*/ 0,
1817                         deleteTime);
1818
1819    flushVBucketToDiskIfPersistent(vbid, 2);
1820
1821    // Drop the stream
1822    consumer->closeStream(/*opaque*/ 0, vbid);
1823
1824    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
1825    // Now read back and verify key2 has our test deleteTime of 10
1826    ItemMetaData metadata;
1827    uint32_t deleted = 0;
1828    uint8_t datatype = 0;
1829    uint64_t tombstoneTime;
1830    if (persistent()) {
1831        EXPECT_EQ(ENGINE_EWOULDBLOCK,
1832                  store->getMetaData(makeStoredDocKey("key1"),
1833                                     vbid,
1834                                     cookie,
1835                                     metadata,
1836                                     deleted,
1837                                     datatype));
1838
1839        // Manually run the bgfetch task.
1840        MockGlobalTask mockTask(engine->getTaskable(),
1841                                TaskId::MultiBGFetcherTask);
1842        store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
1843
1844        EXPECT_EQ(ENGINE_SUCCESS,
1845                  store->getMetaData(makeStoredDocKey("key1"),
1846                                     vbid,
1847                                     cookie,
1848                                     metadata,
1849                                     deleted,
1850                                     datatype));
1851        tombstoneTime = uint64_t(metadata.exptime);
1852    } else {
1853        //  Ephemeral tombstone time is not in the expiry field, we can only
1854        // check the value by directly peeking at the StoredValue
1855        auto vb = store->getVBucket(vbid);
1856        auto* sv = vb->ht.find({"key1", DocNamespace::DefaultCollection},
1857                               TrackReference::No,
1858                               WantsDeleted::Yes);
1859        ASSERT_NE(nullptr, sv);
1860        deleted = sv->isDeleted();
1861        tombstoneTime = uint64_t(sv->toOrderedStoredValue()->getDeletedTime());
1862    }
1863
1864    EXPECT_EQ(1, deleted);
1865    EXPECT_EQ(PROTOCOL_BINARY_RAW_BYTES, datatype);
1866    EXPECT_GE(tombstoneTime, biff.get())
1867            << "Expected a tombstone to have been set which is equal or "
1868               "greater than our time traveller jump";
1869
1870    deleted = 0;
1871    datatype = 0;
1872    if (persistent()) {
1873        EXPECT_EQ(ENGINE_EWOULDBLOCK,
1874                  store->getMetaData(makeStoredDocKey("key2"),
1875                                     vbid,
1876                                     cookie,
1877                                     metadata,
1878                                     deleted,
1879                                     datatype));
1880        // Manually run the bgfetch task.
1881        MockGlobalTask mockTask(engine->getTaskable(),
1882                                TaskId::MultiBGFetcherTask);
1883        store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
1884
1885        EXPECT_EQ(ENGINE_SUCCESS,
1886                  store->getMetaData(makeStoredDocKey("key2"),
1887                                     vbid,
1888                                     cookie,
1889                                     metadata,
1890                                     deleted,
1891                                     datatype));
1892
1893        tombstoneTime = uint64_t(metadata.exptime);
1894    } else {
1895        auto vb = store->getVBucket(vbid);
1896        auto* sv = vb->ht.find({"key2", DocNamespace::DefaultCollection},
1897                               TrackReference::No,
1898                               WantsDeleted::Yes);
1899        ASSERT_NE(nullptr, sv);
1900        deleted = sv->isDeleted();
1901        tombstoneTime =
1902                ep_abs_time((sv->toOrderedStoredValue()->getDeletedTime()));
1903    }
1904    EXPECT_EQ(1, deleted);
1905    EXPECT_EQ(PROTOCOL_BINARY_RAW_BYTES, datatype);
1906    EXPECT_EQ(deleteTime, tombstoneTime)
1907            << "key2 did not have our replicated deleteTime:" << deleteTime;
1908}
1909
1910/*
1911 * Background thread used by MB20054_onDeleteItem_during_bucket_deletion
1912 */
1913static void MB20054_run_backfill_task(EventuallyPersistentEngine* engine,
1914                                      CheckedExecutor& backfill,
1915                                      SyncObject& backfill_cv,
1916                                      SyncObject& destroy_cv,
1917                                      TaskQueue* lpAuxioQ) {
1918    std::unique_lock<std::mutex> destroy_lh(destroy_cv);
1919    ObjectRegistry::onSwitchThread(engine);
1920
1921    // Run the BackfillManagerTask task to push items to readyQ. In sherlock
1922    // upwards this runs multiple times - so should return true.
1923    backfill.runCurrentTask("Backfilling items for a DCP Connection");
1924
1925    // Notify the main thread that it can progress with destroying the
1926    // engine [A].
1927    {
1928        // if we can get the lock, then we know the main thread is waiting
1929        std::lock_guard<std::mutex> backfill_lock(backfill_cv);
1930        backfill_cv.notify_one(); // move the main thread along
1931    }
1932
1933    // Now wait ourselves for destroy to be completed [B].
1934    destroy_cv.wait(destroy_lh);
1935
1936    // This is the only "hacky" part of the test - we need to somehow
1937    // keep the DCPBackfill task 'running' - i.e. not call
1938    // completeCurrentTask - until the main thread is in
1939    // ExecutorPool::_stopTaskGroup. However we have no way from the test
1940    // to properly signal that we are *inside* _stopTaskGroup -
1941    // called from EVPStore's destructor.
1942    // Best we can do is spin on waiting for the DCPBackfill task to be
1943    // set to 'dead' - and only then completeCurrentTask; which will
1944    // cancel the task.
1945    while (!backfill.getCurrentTask()->isdead()) {
1946        // spin.
1947    }
1948    backfill.completeCurrentTask();
1949}
1950
1951static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(
1952        vbucket_failover_t* entry,
1953        size_t nentries,
1954        gsl::not_null<const void*> cookie) {
1955    return ENGINE_SUCCESS;
1956}
1957
1958// Test performs engine deletion interleaved with tasks so redefine TearDown
1959// for this tests needs.
1960class MB20054_SingleThreadedEPStoreTest : public SingleThreadedEPBucketTest {
1961public:
1962    void SetUp() {
1963        SingleThreadedEPBucketTest::SetUp();
1964        engine->initializeConnmap();
1965    }
1966
1967    void TearDown() {
1968        ExecutorPool::shutdown();
1969    }
1970};
1971
1972// Check that if onDeleteItem() is called during bucket deletion, we do not
1973// abort due to not having a valid thread-local 'engine' pointer. This
1974// has been observed when we have a DCPBackfill task which is deleted during
1975// bucket shutdown, which has a non-zero number of Items which are destructed
1976// (and call onDeleteItem).
1977TEST_F(MB20054_SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
1978
1979    // [[1] Set our state to active.
1980    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
1981
1982    // Perform one SET, then close it's checkpoint. This means that we no
1983    // longer have all sequence numbers in memory checkpoints, forcing the
1984    // DCP stream request to go to disk (backfill).
1985    store_item(vbid, makeStoredDocKey("key"), "value");
1986
1987    // Force a new checkpoint.
1988    VBucketPtr vb = store->getVBuckets().getBucket(vbid);
1989    CheckpointManager& ckpt_mgr = *vb->checkpointManager;
1990    ckpt_mgr.createNewCheckpoint();
1991    auto lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
1992    EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
1993    EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
1994
1995    auto lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1996    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
1997    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
1998
1999    // Directly flush the vbucket, ensuring data is on disk.
2000    //  (This would normally also wake up the checkpoint remover task, but
2001    //   as that task was never registered with the ExecutorPool in this test
2002    //   environment, we need to manually remove the prev checkpoint).
2003    EXPECT_EQ(std::make_pair(false, size_t(1)),
2004              getEPBucket().flushVBucket(vbid));
2005
2006    bool new_ckpt_created;
2007    EXPECT_EQ(1, ckpt_mgr.removeClosedUnrefCheckpoints(*vb, new_ckpt_created));
2008    vb.reset();
2009
2010    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
2011    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
2012
2013    // Create a DCP producer, and start a stream request.
2014    std::string name("test_producer");
2015    EXPECT_EQ(ENGINE_SUCCESS,
2016              engine->dcpOpen(cookie,
2017                              /*opaque:unused*/ {},
2018                              /*seqno:unused*/ {},
2019                              DCP_OPEN_PRODUCER,
2020                              name,
2021                              {}));
2022
2023    // ActiveStreamCheckpointProcessorTask and DCPBackfill task are created
2024    // when the first DCP stream is created.
2025    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
2026    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
2027
2028    uint64_t rollbackSeqno;
2029    // Actual stream request method (EvpDcpStreamReq) is static, so access via
2030    // the engine_interface.
2031    EXPECT_EQ(ENGINE_SUCCESS,
2032              engine->dcp.stream_req(&engine->interface,
2033                                     cookie,
2034                                     /*flags*/ 0,
2035                                     /*opaque*/ 0,
2036                                     /*vbucket*/ vbid,
2037                                     /*start_seqno*/ 0,
2038                                     /*end_seqno*/ -1,
2039                                     /*vb_uuid*/ 0,
2040                                     /*snap_start*/ 0,
2041                                     /*snap_end*/ 0,
2042                                     &rollbackSeqno,
2043                                     dummy_dcp_add_failover_cb));
2044
2045    // FutureQ should now have an additional DCPBackfill task.
2046    EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
2047    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
2048
2049    // Create an executor 'thread' to obtain shared ownership of the next
2050    // AuxIO task (which should be BackfillManagerTask). As long as this
2051    // object has it's currentTask set to BackfillManagerTask, the task
2052    // will not be deleted.
2053    // Essentially we are simulating a concurrent thread running this task.
2054    CheckedExecutor backfill(task_executor, *lpAuxioQ);
2055
2056    // This is the one action we really need to perform 'concurrently' - delete
2057    // the engine while a DCPBackfill task is still running. We spin up a
2058    // separate thread which will run the DCPBackfill task
2059    // concurrently with destroy - specifically DCPBackfill must start running
2060    // (and add items to the readyQ) before destroy(), it must then continue
2061    // running (stop after) _stopTaskGroup is invoked.
2062    // To achieve this we use a couple of condition variables to synchronise
2063    // between the two threads - the timeline needs to look like:
2064    //
2065    //  auxIO thread:  [------- DCPBackfill ----------]
2066    //   main thread:          [destroy()]       [ExecutorPool::_stopTaskGroup]
2067    //
2068    //  --------------------------------------------------------> time
2069    //
2070    SyncObject backfill_cv;
2071    SyncObject destroy_cv;
2072    std::thread concurrent_task_thread;
2073
2074    {
2075        // scope for the backfill lock
2076        std::unique_lock<std::mutex> backfill_lh(backfill_cv);
2077
2078        concurrent_task_thread = std::thread(MB20054_run_backfill_task,
2079                                             engine.get(),
2080                                             std::ref(backfill),
2081                                             std::ref(backfill_cv),
2082                                             std::ref(destroy_cv),
2083                                             lpAuxioQ);
2084        // [A] Wait for DCPBackfill to complete.
2085        backfill_cv.wait(backfill_lh);
2086    }
2087
2088    ObjectRegistry::onSwitchThread(engine.get());
2089    // 'Destroy' the engine - this doesn't delete the object, just shuts down
2090    // connections, marks streams as dead etc.
2091    engine->destroy(/*force*/false);
2092
2093    {
2094        // If we can get the lock we know the thread is waiting for destroy.
2095        std::lock_guard<std::mutex> lh(destroy_cv);
2096        destroy_cv.notify_one(); // move the thread on.
2097    }
2098
2099    // Force all tasks to cancel (so we can shutdown)
2100    cancelAndPurgeTasks();
2101
2102    // Mark the connection as dead for clean shutdown
2103    destroy_mock_cookie(cookie);
2104    engine->getDcpConnMap().manageConnections();
2105
2106    // Nullify TLS engine and reset the smart pointer to force destruction.
2107    // We need null as the engine to stop ~CheckedExecutor path from trying
2108    // to touch the engine
2109    ObjectRegistry::onSwitchThread(nullptr);
2110    engine.reset();
2111    destroy_mock_event_callbacks();
2112    concurrent_task_thread.join();
2113}
2114
2115/*
2116 * MB-18953 is triggered by the executorpool wake path moving tasks directly
2117 * into the readyQueue, thus allowing for high-priority tasks to dominiate
2118 * a taskqueue.
2119 */
2120TEST_F(SingleThreadedEPBucketTest, MB18953_taskWake) {
2121    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
2122
2123    ExTask hpTask = std::make_shared<TestTask>(engine->getTaskable(),
2124                                               TaskId::PendingOpsNotification);
2125    task_executor->schedule(hpTask);
2126
2127    ExTask lpTask = std::make_shared<TestTask>(engine->getTaskable(),
2128                                               TaskId::DefragmenterTask);
2129    task_executor->schedule(lpTask);
2130
2131    runNextTask(lpNonioQ, "TestTask PendingOpsNotification"); // hptask goes first
2132    // Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
2133    lpNonioQ.wake(hpTask);
2134
2135    // Check 1 task is ready
2136    EXPECT_EQ(1, task_executor->getTotReadyTasks());
2137    EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
2138
2139    runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
2140
2141    // Run the tasks again to check that coming from ::reschedule our
2142    // expectations are still met.
2143    runNextTask(lpNonioQ, "TestTask PendingOpsNotification"); // hptask goes first
2144
2145    // Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
2146    lpNonioQ.wake(hpTask);
2147
2148    // Check 1 task is ready
2149    EXPECT_EQ(1, task_executor->getTotReadyTasks());
2150    EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
2151    runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
2152}
2153
2154/*
2155 * MB-20735 waketime is not correctly picked up on reschedule
2156 */
2157TEST_F(SingleThreadedEPBucketTest, MB20735_rescheduleWaketime) {
2158    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
2159
2160    class SnoozingTestTask : public TestTask {
2161    public:
2162        SnoozingTestTask(Taskable& t, TaskId id) : TestTask(t, id) {
2163        }
2164
2165        bool run() override {
2166            snooze(0.1); // snooze for 100milliseconds only
2167            // Rescheduled to run 100 milliseconds later..
2168            return true;
2169        }
2170    };
2171
2172    auto task = std::make_shared<SnoozingTestTask>(
2173            engine->getTaskable(), TaskId::PendingOpsNotification);
2174    ExTask hpTask = task;
2175    task_executor->schedule(hpTask);
2176
2177    ProcessClock::time_point waketime = runNextTask(lpNonioQ,
2178                                    "TestTask PendingOpsNotification");
2179    EXPECT_EQ(waketime, task->getWaketime()) <<
2180                           "Rescheduled to much later time!";
2181}
2182
2183/*
2184 * Tests that we stream from only active vbuckets for DCP clients with that
2185 * preference
2186 */
2187TEST_F(SingleThreadedEPBucketTest, stream_from_active_vbucket_only) {
2188    std::map<vbucket_state_t, bool> states;
2189    states[vbucket_state_active] = true; /* Positive test case */
2190    states[vbucket_state_replica] = false; /* Negative test case */
2191    states[vbucket_state_pending] = false; /* Negative test case */
2192    states[vbucket_state_dead] = false; /* Negative test case */
2193
2194    for (auto& it : states) {
2195        setVBucketStateAndRunPersistTask(vbid, it.first);
2196
2197        /* Create a Mock Dcp producer */
2198        auto producer = std::make_shared<MockDcpProducer>(
2199                *engine,
2200                cookie,
2201                "test_producer",
2202                /*flags*/ 0,
2203                cb::const_byte_buffer() /*no json*/);
2204
2205        /* Try to open stream on replica vb with
2206           DCP_ADD_STREAM_ACTIVE_VB_ONLY flag */
2207        uint64_t rollbackSeqno;
2208        auto err = producer->streamRequest(/*flags*/
2209                                           DCP_ADD_STREAM_ACTIVE_VB_ONLY,
2210                                           /*opaque*/ 0,
2211                                           /*vbucket*/ vbid,
2212                                           /*start_seqno*/ 0,
2213                                           /*end_seqno*/ -1,
2214                                           /*vb_uuid*/ 0,
2215                                           /*snap_start*/ 0,
2216                                           /*snap_end*/ 0,
2217                                           &rollbackSeqno,
2218                                           SingleThreadedEPBucketTest::
2219                                                   fakeDcpAddFailoverLog);
2220
2221        if (it.second) {
2222            EXPECT_EQ(ENGINE_SUCCESS, err) << "Unexpected error code";
2223            producer->closeStream(/*opaque*/0, /*vbucket*/vbid);
2224        } else {
2225            EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, err) << "Unexpected error code";
2226        }
2227
2228        // Stop Producer checkpoint processor task
2229        producer->cancelCheckpointCreatorTask();
2230    }
2231}
2232
2233class XattrSystemUserTest : public SingleThreadedEPBucketTest,
2234                            public ::testing::WithParamInterface<bool> {
2235};
2236
2237TEST_P(XattrSystemUserTest, pre_expiry_xattrs) {
2238    auto& kvbucket = *engine->getKVBucket();
2239
2240    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2241
2242    auto xattr_data = createXattrValue("value", GetParam());
2243
2244    auto itm = store_item(vbid,
2245                          makeStoredDocKey("key"),
2246                          xattr_data,
2247                          1,
2248                          {cb::engine_errc::success},
2249                          PROTOCOL_BINARY_DATATYPE_XATTR);
2250
2251    ItemMetaData metadata;
2252    uint32_t deleted;
2253    uint8_t datatype;
2254    kvbucket.getMetaData(makeStoredDocKey("key"), vbid, cookie, metadata,
2255                         deleted, datatype);
2256    auto prev_revseqno = metadata.revSeqno;
2257    EXPECT_EQ(1, prev_revseqno) << "Unexpected revision sequence number";
2258    itm.setRevSeqno(1);
2259    kvbucket.deleteExpiredItem(itm, ep_real_time() + 1, ExpireBy::Pager);
2260
2261    get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
2262                                                       HONOR_STATES |
2263                                                       TRACK_REFERENCE |
2264                                                       DELETE_TEMP |
2265                                                       HIDE_LOCKED_CAS |
2266                                                       TRACK_STATISTICS |
2267                                                       GET_DELETED_VALUE);
2268    GetValue gv = kvbucket.get(makeStoredDocKey("key"), vbid, cookie, options);
2269    EXPECT_EQ(ENGINE_SUCCESS, gv.getStatus());
2270
2271    auto get_itm = gv.item.get();
2272    auto get_data = const_cast<char*>(get_itm->getData());
2273
2274    cb::char_buffer value_buf{get_data, get_itm->getNBytes()};
2275    cb::xattr::Blob new_blob(value_buf, false);
2276
2277    // If testing with system xattrs
2278    if (GetParam()) {
2279        const std::string& cas_str{"{\"cas\":\"0xdeadbeefcafefeed\"}"};
2280        const std::string& sync_str = to_string(new_blob.get("_sync"));
2281
2282        EXPECT_EQ(cas_str, sync_str) << "Unexpected system xattrs";
2283    }
2284    EXPECT_TRUE(new_blob.get("user").empty())
2285            << "The user attribute should be gone";
2286    EXPECT_TRUE(new_blob.get("meta").empty())
2287            << "The meta attribute should be gone";
2288
2289    kvbucket.getMetaData(makeStoredDocKey("key"), vbid, cookie, metadata,
2290                         deleted, datatype);
2291    EXPECT_EQ(prev_revseqno + 1, metadata.revSeqno) <<
2292             "Unexpected revision sequence number";
2293
2294}
2295
2296class WarmupTest : public SingleThreadedKVBucketTest {
2297public:
2298    /**
2299     * Test is currently using couchstore API directly to make the VB appear old
2300     */
2301    static void rewriteVBStateAs25x(uint16_t vbucket) {
2302        std::string filename = std::string(test_dbname) + "/" +
2303                               std::to_string(vbucket) + ".couch.1";
2304        Db* handle;
2305        couchstore_error_t err = couchstore_open_db(
2306                filename.c_str(), COUCHSTORE_OPEN_FLAG_CREATE, &handle);
2307
2308        ASSERT_EQ(COUCHSTORE_SUCCESS, err) << "Failed to open new database";
2309
2310        // Create a 2.5 _local/vbstate
2311        std::string vbstate2_5_x =
2312                "{\"state\": \"active\","
2313                " \"checkpoint_id\": \"1\","
2314                " \"max_deleted_seqno\": \"0\"}";
2315        LocalDoc vbstate;
2316        vbstate.id.buf = (char*)"_local/vbstate";
2317        vbstate.id.size = sizeof("_local/vbstate") - 1;
2318        vbstate.json.buf = (char*)vbstate2_5_x.c_str();
2319        vbstate.json.size = vbstate2_5_x.size();
2320        vbstate.deleted = 0;
2321
2322        err = couchstore_save_local_document(handle, &vbstate);
2323        ASSERT_EQ(COUCHSTORE_SUCCESS, err) << "Failed to write local document";
2324        couchstore_commit(handle);
2325        couchstore_close_file(handle);
2326        couchstore_free_db(handle);
2327    }
2328
2329    void resetEngineAndWarmup(std::string new_config = "") {
2330        resetEngineAndEnableWarmup(new_config);
2331        // Now get the engine warmed up
2332        runReadersUntilWarmedUp();
2333    }
2334
2335    /**
2336     * Destroy engine and replace it with a new engine that can be warmed up.
2337     * Finally, run warmup.
2338     */
2339    void resetEngineAndEnableWarmup(std::string new_config = "") {
2340        shutdownAndPurgeTasks(engine.get());
2341        std::string config = config_string;
2342
2343        // check if warmup=false needs replacing with warmup=true
2344        size_t pos;
2345        std::string warmupT = "warmup=true";
2346        std::string warmupF = "warmup=false";
2347        if ((pos = config.find(warmupF)) != std::string::npos) {
2348            config.replace(pos, warmupF.size(), warmupT);
2349        } else {
2350            config += warmupT;
2351        }
2352
2353        if (new_config.length() > 0) {
2354            config += ";";
2355            config += new_config;
2356        }
2357
2358        reinitialise(config);
2359
2360        engine->getKVBucket()->initializeWarmupTask();
2361        engine->getKVBucket()->startWarmupTask();
2362    }
2363};
2364
2365// Test that the FreqSaturatedCallback of a vbucket is initialized and after
2366// warmup is set to the "wakeup" function of ItemFreqDecayerTask.
2367TEST_F(WarmupTest, setFreqSaturatedCallback) {
2368    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2369    // The FreqSaturatedCallback should be initialised
2370    {
2371        auto vb = engine->getKVBucket()->getVBucket(vbid);
2372        EXPECT_TRUE(vb->ht.getFreqSaturatedCallback());
2373    }
2374    // Store an item, then make the VB appear old ready for warmup
2375    store_item(vbid, makeStoredDocKey("key1"), "value");
2376    flush_vbucket_to_disk(vbid);
2377    rewriteVBStateAs25x(vbid);
2378
2379    // Resetting the engine and running warmup will result in the
2380    // Warmup::createVBuckets being invoked for vbid.
2381    resetEngineAndWarmup();
2382
2383    dynamic_cast<MockEPBucket*>(store)->createItemFreqDecayerTask();
2384    auto itemFreqTask =
2385            dynamic_cast<MockEPBucket*>(store)->getMockItemFreqDecayerTask();
2386    auto vb = engine->getKVBucket()->getVBucket(vbid);
2387    // The FreqSaturatedCallback should be initialised
2388    EXPECT_TRUE(vb->ht.getFreqSaturatedCallback());
2389    ASSERT_FALSE(itemFreqTask->wakeupCalled);
2390    // We now invoke the FreqSaturatedCallback function.
2391    vb->ht.getFreqSaturatedCallback()();
2392    // This should have resulted in calling the wakeup function of the
2393    // MockItemFreqDecayerTask.
2394    EXPECT_TRUE(itemFreqTask->wakeupCalled);
2395}
2396
2397TEST_F(WarmupTest, hlcEpoch) {
2398    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2399
2400    // Store an item, then make the VB appear old ready for warmup
2401    store_item(vbid, makeStoredDocKey("key1"), "value");
2402    flush_vbucket_to_disk(vbid);
2403    rewriteVBStateAs25x(vbid);
2404
2405    resetEngineAndWarmup();
2406
2407    {
2408        auto vb = engine->getKVBucket()->getVBucket(vbid);
2409        // We've warmed up from a down-level vbstate, so expect epoch to be
2410        // HlcCasSeqnoUninitialised
2411        ASSERT_EQ(HlcCasSeqnoUninitialised, vb->getHLCEpochSeqno());
2412
2413        // Store a new key, the flush will change hlc_epoch to be the next seqno
2414        // (2)
2415        store_item(vbid, makeStoredDocKey("key2"), "value");
2416        flush_vbucket_to_disk(vbid);
2417
2418        EXPECT_EQ(2, vb->getHLCEpochSeqno());
2419
2420        // Store a 3rd item
2421        store_item(vbid, makeStoredDocKey("key3"), "value");
2422        flush_vbucket_to_disk(vbid);
2423    }
2424
2425    // Warmup again, hlcEpoch will still be 2
2426    resetEngineAndWarmup();
2427    auto vb = engine->getKVBucket()->getVBucket(vbid);
2428    EXPECT_EQ(2, vb->getHLCEpochSeqno());
2429
2430    // key1 stored before we established the epoch should have cas_is_hlc==false
2431    auto item1 = store->get(makeStoredDocKey("key1"), vbid, nullptr, {});
2432    ASSERT_EQ(ENGINE_SUCCESS, item1.getStatus());
2433    auto info1 = engine->getItemInfo(*item1.item);
2434    EXPECT_FALSE(info1.cas_is_hlc);
2435
2436    // key2 should have a CAS generated from the HLC
2437    auto item2 = store->get(makeStoredDocKey("key2"), vbid, nullptr, {});
2438    ASSERT_EQ(ENGINE_SUCCESS, item2.getStatus());
2439    auto info2 = engine->getItemInfo(*item2.item);
2440    EXPECT_TRUE(info2.cas_is_hlc);
2441}
2442
2443TEST_F(WarmupTest, fetchDocInDifferentCompressionModes) {
2444    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2445
2446    std::string valueData("{\"product\": \"car\",\"price\": \"100\"},"
2447                          "{\"product\": \"bus\",\"price\": \"1000\"},"
2448                          "{\"product\": \"Train\",\"price\": \"100000\"}");
2449
2450    //Store an item, then make the VB appear old ready for warmup
2451    store_item(vbid, makeStoredDocKey("key1"), valueData);
2452    flush_vbucket_to_disk(vbid);
2453
2454    resetEngineAndWarmup("compression_mode=off");
2455    auto item1 = store->get(makeStoredDocKey("key1"), vbid, nullptr, {});
2456    ASSERT_EQ(ENGINE_SUCCESS, item1.getStatus());
2457    auto info1 = engine->getItemInfo(*item1.item);
2458    EXPECT_EQ(PROTOCOL_BINARY_DATATYPE_JSON, info1.datatype);
2459
2460    resetEngineAndWarmup("compression_mode=passive");
2461    item1 = store->get(makeStoredDocKey("key1"), vbid, nullptr, {});
2462    ASSERT_EQ(ENGINE_SUCCESS, item1.getStatus());
2463    info1 = engine->getItemInfo(*item1.item);
2464    EXPECT_EQ((PROTOCOL_BINARY_DATATYPE_JSON | PROTOCOL_BINARY_DATATYPE_SNAPPY),
2465              info1.datatype);
2466
2467    resetEngineAndWarmup("compression_mode=active");
2468    item1 = store->get(makeStoredDocKey("key1"), vbid, nullptr, {});
2469    ASSERT_EQ(ENGINE_SUCCESS, item1.getStatus());
2470    info1 = engine->getItemInfo(*item1.item);
2471    EXPECT_EQ((PROTOCOL_BINARY_DATATYPE_JSON | PROTOCOL_BINARY_DATATYPE_SNAPPY),
2472              info1.datatype);
2473}
2474
2475TEST_F(WarmupTest, mightContainXattrs) {
2476    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2477
2478    // Store an item, then make the VB appear old ready for warmup
2479    store_item(vbid, makeStoredDocKey("key1"), "value");
2480    flush_vbucket_to_disk(vbid);
2481    rewriteVBStateAs25x(vbid);
2482
2483    resetEngineAndWarmup();
2484    {
2485        auto vb = engine->getKVBucket()->getVBucket(vbid);
2486        EXPECT_FALSE(vb->mightContainXattrs());
2487
2488        auto xattr_data = createXattrValue("value");
2489
2490        auto itm = store_item(vbid,
2491                              makeStoredDocKey("key"),
2492                              xattr_data,
2493                              1,
2494                              {cb::engine_errc::success},
2495                              PROTOCOL_BINARY_DATATYPE_XATTR);
2496
2497        EXPECT_TRUE(vb->mightContainXattrs());
2498
2499        flush_vbucket_to_disk(vbid);
2500    }
2501    // Warmup - we should have xattr dirty
2502    resetEngineAndWarmup();
2503
2504    EXPECT_TRUE(engine->getKVBucket()->getVBucket(vbid)->mightContainXattrs());
2505}
2506
2507/**
2508 * Performs the following operations
2509 * 1. Store an item
2510 * 2. Delete the item
2511 * 3. Recreate the item
2512 * 4. Perform a warmup
2513 * 5. Get meta data of the key to verify the revision seq no is
2514 *    equal to number of updates on it
2515 */
2516TEST_F(WarmupTest, MB_27162) {
2517    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2518
2519    auto key = makeStoredDocKey("key");
2520
2521    store_item(vbid, key, "value");
2522    flush_vbucket_to_disk(vbid);
2523
2524    delete_item(vbid, key);
2525    flush_vbucket_to_disk(vbid);
2526
2527    store_item(vbid, key, "value");
2528    flush_vbucket_to_disk(vbid);
2529
2530    resetEngineAndWarmup();
2531
2532    ItemMetaData itemMeta;
2533    uint32_t deleted = 0;
2534    uint8_t datatype = 0;
2535    auto doGetMetaData = [&]() {
2536        return store->getMetaData(key, vbid, cookie, itemMeta, deleted,
2537                                  datatype);
2538    };
2539    auto engineResult = doGetMetaData();
2540
2541    ASSERT_EQ(ENGINE_SUCCESS, engineResult);
2542    EXPECT_EQ(3, itemMeta.revSeqno);
2543}
2544
2545TEST_F(WarmupTest, MB_25197) {
2546    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2547
2548    // Store an item, then make the VB appear old ready for warmup
2549    store_item(vbid, makeStoredDocKey("key1"), "value");
2550    flush_vbucket_to_disk(vbid);
2551
2552    resetEngineAndEnableWarmup();
2553
2554    // Manually run the reader queue so that the warmup tasks execute whilst we
2555    // perform setVbucketState calls
2556    auto& readerQueue = *task_executor->getLpTaskQ()[READER_TASK_IDX];
2557    EXPECT_EQ(nullptr, store->getVBuckets().getBucket(vbid));
2558    auto notifications = get_number_of_mock_cookie_io_notifications(cookie);
2559    while (engine->getKVBucket()->shouldSetVBStateBlock(cookie)) {
2560        CheckedExecutor executor(task_executor, readerQueue);
2561        // Do a setVBState but don't flush it through. This call should be
2562        // failed ewouldblock whilst warmup has yet to attempt to create VBs.
2563        EXPECT_EQ(ENGINE_EWOULDBLOCK,
2564                  store->setVBucketState(vbid,
2565                                         vbucket_state_active,
2566                                         /*transfer*/ false,
2567                                         cookie));
2568        executor.runCurrentTask();
2569    }
2570
2571    EXPECT_GT(get_number_of_mock_cookie_io_notifications(cookie),
2572              notifications);
2573    EXPECT_NE(nullptr, store->getVBuckets().getBucket(vbid));
2574    EXPECT_EQ(ENGINE_SUCCESS,
2575              store->setVBucketState(vbid,
2576                                     vbucket_state_active,
2577                                     /*transfer*/ false));
2578
2579    // finish warmup so the test can exit
2580    while (engine->getKVBucket()->isWarmingUp()) {
2581        CheckedExecutor executor(task_executor, readerQueue);
2582        executor.runCurrentTask();
2583    }
2584}
2585
2586// KV-engine should warmup dead vbuckets
2587TEST_F(WarmupTest, MB_35599) {
2588    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2589
2590    store_item(vbid, makeStoredDocKey("key"), "value");
2591    flush_vbucket_to_disk(vbid);
2592
2593    setVBucketStateAndRunPersistTask(vbid, vbucket_state_dead);
2594
2595    resetEngineAndWarmup();
2596
2597    // We should be able to switch the vbucket back to active/replica and read
2598    // the value
2599    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
2600
2601    auto gv = store->getReplica(makeStoredDocKey("key"), vbid, cookie, {});
2602
2603    ASSERT_EQ(ENGINE_SUCCESS, gv.getStatus());
2604    EXPECT_EQ(0, memcmp("value", gv.item->getData(), 5));
2605}
2606
2607// Test that we can push a DCP_DELETION which pretends to be from a delete
2608// with xattrs, i.e. the delete has a value containing only system xattrs
2609// The MB was created because this code would actually trigger an exception
2610TEST_F(SingleThreadedEPBucketTest, mb25273) {
2611    // We need a replica VB
2612    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
2613
2614    auto consumer =
2615            std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2616    int opaque = 1;
2617    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(opaque, vbid, /*flags*/ 0));
2618
2619    std::string key = "key";
2620    std::string body = "body";
2621
2622    // Manually manage the xattr blob - later we will prune user keys
2623    cb::xattr::Blob blob;
2624
2625    blob.set("key1", "{\"author\":\"bubba\"}");
2626    blob.set("_sync", "{\"cas\":\"0xdeadbeefcafefeed\"}");
2627
2628    auto xattr_value = blob.finalize();
2629
2630    std::string data;
2631    std::copy(xattr_value.buf,
2632              xattr_value.buf + xattr_value.len,
2633              std::back_inserter(data));
2634    std::copy(
2635            body.c_str(), body.c_str() + body.size(), std::back_inserter(data));
2636
2637    const DocKey docKey{key, DocNamespace::DefaultCollection};
2638    cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(data.data()),
2639                                data.size()};
2640
2641    // Send mutation in a single seqno snapshot
2642    int64_t bySeqno = 1;
2643    EXPECT_EQ(ENGINE_SUCCESS,
2644              consumer->snapshotMarker(
2645                      opaque, vbid, bySeqno, bySeqno, MARKER_FLAG_CHK));
2646    EXPECT_EQ(ENGINE_SUCCESS,
2647              consumer->mutation(opaque,
2648                                 docKey,
2649                                 value,
2650                                 0, // priv bytes
2651                                 PROTOCOL_BINARY_DATATYPE_XATTR,
2652                                 2, // cas
2653                                 vbid,
2654                                 0xf1a95, // flags
2655                                 bySeqno,
2656                                 0, // rev seqno
2657                                 0, // exptime
2658                                 0, // locktime
2659                                 {}, // meta
2660                                 0)); // nru
2661    EXPECT_EQ(std::make_pair(false, size_t(1)),
2662              getEPBucket().flushVBucket(vbid));
2663    bySeqno++;
2664
2665    // Send deletion in a single seqno snapshot and send a doc with only system
2666    // xattrs to simulate what an active would send
2667    blob.prune_user_keys();
2668    auto finalizedXttr = blob.finalize();
2669    value = {reinterpret_cast<const uint8_t*>(finalizedXttr.data()),
2670             finalizedXttr.size()};
2671    EXPECT_NE(0, value.size());
2672    EXPECT_EQ(ENGINE_SUCCESS,
2673              consumer->snapshotMarker(
2674                      opaque, vbid, bySeqno, bySeqno, MARKER_FLAG_CHK));
2675    EXPECT_EQ(ENGINE_SUCCESS,
2676              consumer->deletion(opaque,
2677                                 docKey,
2678                                 value,
2679                                 /*priv_bytes*/ 0,
2680                                 PROTOCOL_BINARY_DATATYPE_XATTR,
2681                                 /*cas*/ 3,
2682                                 vbid,
2683                                 bySeqno,
2684                                 /*revSeqno*/ 0,
2685                                 /*meta*/ {}));
2686    EXPECT_EQ(std::make_pair(false, size_t(1)),
2687              getEPBucket().flushVBucket(vbid));
2688    /* Close stream before deleting the connection */
2689    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
2690
2691    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2692
2693    get_options_t options = static_cast<get_options_t>(
2694            QUEUE_BG_FETCH | HONOR_STATES | TRACK_REFERENCE | DELETE_TEMP |
2695            HIDE_LOCKED_CAS | TRACK_STATISTICS | GET_DELETED_VALUE);
2696    auto gv = store->get(docKey, vbid, cookie, options);
2697    EXPECT_EQ(ENGINE_EWOULDBLOCK, gv.getStatus());
2698
2699    // Manually run the bgfetch task.
2700    MockGlobalTask mockTask(engine->getTaskable(), TaskId::MultiBGFetcherTask);
2701    store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
2702    gv = store->get(docKey, vbid, cookie, GET_DELETED_VALUE);
2703    ASSERT_EQ(ENGINE_SUCCESS, gv.getStatus());
2704
2705    // check it's there and deleted with the expected value length
2706    EXPECT_TRUE(gv.item->isDeleted());
2707    EXPECT_EQ(0, gv.item->getFlags()); // flags also still zero
2708    EXPECT_EQ(3, gv.item->getCas());
2709    EXPECT_EQ(value.size(), gv.item->getValue()->valueSize());
2710}
2711
2712// Test the item freq decayer task.  A mock version of the task is used,
2713// which has the ChunkDuration reduced to 0ms which mean as long as the
2714// number of documents is greater than
2715// ProgressTracker:INITIAL_VISIT_COUNT_CHECK the task will require multiple
2716// runs to complete.  If the task takes less than or more than two passes to
2717// complete then an error will be reported.
2718TEST_F(SingleThreadedEPBucketTest, ItemFreqDecayerTaskTest) {
2719    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2720
2721    // ProgressTracker:INITIAL_VISIT_COUNT_CHECK = 100 and therefore
2722    // add 110 documents to the hash table to ensure all documents cannot be
2723    // visited in a single pass.
2724    for (uint32_t ii = 1; ii < 110; ii++) {
2725        auto key = makeStoredDocKey("DOC_" + std::to_string(ii));
2726        store_item(vbid, key, "value");
2727    }
2728
2729    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
2730    auto itemFreqDecayerTask =
2731            std::make_shared<MockItemFreqDecayerTask>(engine.get(), 50);
2732
2733    EXPECT_EQ(0, lpNonioQ.getFutureQueueSize());
2734    task_executor->schedule(itemFreqDecayerTask);
2735    EXPECT_EQ(1, lpNonioQ.getFutureQueueSize());
2736    itemFreqDecayerTask->wakeup();
2737
2738    EXPECT_FALSE(itemFreqDecayerTask->isCompleted());
2739    runNextTask(lpNonioQ, "Item frequency count decayer task");
2740    EXPECT_FALSE(itemFreqDecayerTask->isCompleted());
2741    runNextTask(lpNonioQ, "Item frequency count decayer task");
2742    // The item freq decayer task should have completed.
2743    EXPECT_TRUE(itemFreqDecayerTask->isCompleted());
2744}
2745
2746// Test to confirm that the ItemFreqDecayerTask gets created on kv_bucket
2747// initialisation.  The task should be runnable.  However once run should
2748// enter a "snoozed" state.
2749TEST_F(SingleThreadedEPBucketTest, CreatedItemFreqDecayerTask) {
2750    store->initialize();
2751    EXPECT_FALSE(isItemFreqDecayerTaskSnoozed());
2752    store->runItemFreqDecayerTask();
2753    EXPECT_TRUE(isItemFreqDecayerTaskSnoozed());
2754}
2755
2756extern uint32_t dcp_last_delete_time;
2757extern std::string dcp_last_key;
2758TEST_P(STParameterizedBucketTest, produce_delete_times) {
2759    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2760    auto t1 = ep_real_time();
2761    storeAndDeleteItem(
2762            vbid, {"KEY1", DocNamespace::DefaultCollection}, "value");
2763    auto t2 = ep_real_time();
2764
2765    // Clear checkpoint so DCP will goto backfill
2766    auto vb = engine->getKVBucket()->getVBucket(vbid);
2767    vb->checkpointManager->clear(*vb, 2);
2768
2769    auto cookie = create_mock_cookie();
2770    auto producer =
2771            createDcpProducer(cookie, {}, false, IncludeDeleteTime::Yes);
2772    auto producers = get_dcp_producers(
2773            reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
2774            reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
2775
2776    createDcpStream(*producer);
2777
2778    // noop off as we will play with time travel
2779    producer->setNoopEnabled(false);
2780
2781    auto step = [this, producer, &producers](bool inMemory) {
2782        notifyAndStepToCheckpoint(*producer,
2783                                  *producers,
2784                                  cb::mcbp::ClientOpcode::DcpSnapshotMarker,
2785                                  inMemory);
2786
2787        // Now step the producer to transfer the delete/tombstone
2788        EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
2789    };
2790
2791    step(false);
2792    EXPECT_NE(0, dcp_last_delete_time);
2793    EXPECT_GE(dcp_last_delete_time, t1);
2794    EXPECT_LE(dcp_last_delete_time, t2);
2795    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_DELETION, dcp_last_op);
2796    EXPECT_EQ("KEY1", dcp_last_key);
2797    size_t expectedBytes = SnapshotMarker::baseMsgBytes +
2798                           MutationResponse::deletionV2BaseMsgBytes +
2799                           (sizeof("KEY1") - 1);
2800    EXPECT_EQ(expectedBytes, producer->getBytesOutstanding());
2801
2802    // Now a new delete, in-memory will also have a delete time
2803    t1 = ep_real_time();
2804    storeAndDeleteItem(
2805            vbid, {"KEY2", DocNamespace::DefaultCollection}, "value");
2806    t2 = ep_real_time();
2807
2808    step(true);
2809
2810    EXPECT_NE(0, dcp_last_delete_time);
2811    EXPECT_GE(dcp_last_delete_time, t1);
2812    EXPECT_LE(dcp_last_delete_time, t2);
2813    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_DELETION, dcp_last_op);
2814    EXPECT_EQ("KEY2", dcp_last_key);
2815    expectedBytes += SnapshotMarker::baseMsgBytes +
2816                     MutationResponse::deletionV2BaseMsgBytes +
2817                     (sizeof("KEY2") - 1);
2818    EXPECT_EQ(expectedBytes, producer->getBytesOutstanding());
2819
2820    // Finally expire a key and check that the delete_time we receive is not the
2821    // expiry time, the delete time should always be re-created by the server to
2822    // ensure old/future expiry times don't disrupt tombstone purging (MB-33919)
2823    auto expiryTime = ep_real_time() + 32000;
2824    store_item(vbid,
2825               {"KEY3", DocNamespace::DefaultCollection},
2826               "value",
2827               expiryTime);
2828
2829    step(true);
2830    expectedBytes += SnapshotMarker::baseMsgBytes +
2831                     MutationResponse::mutationBaseMsgBytes +
2832                     (sizeof("value") - 1) + (sizeof("KEY3") - 1);
2833    EXPECT_EQ(expectedBytes, producer->getBytesOutstanding());
2834
2835    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
2836    TimeTraveller arron(64000);
2837
2838    // Trigger expiry on a GET
2839    auto gv = store->get(
2840            {"KEY3", DocNamespace::DefaultCollection}, vbid, cookie, NONE);
2841    EXPECT_EQ(ENGINE_KEY_ENOENT, gv.getStatus());
2842
2843    step(true);
2844
2845    EXPECT_NE(expiryTime, dcp_last_delete_time);
2846    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_DELETION, dcp_last_op);
2847    EXPECT_EQ("KEY3", dcp_last_key);
2848    expectedBytes += SnapshotMarker::baseMsgBytes +
2849                     MutationResponse::deletionV2BaseMsgBytes +
2850                     (sizeof("KEY3") - 1);
2851    EXPECT_EQ(expectedBytes, producer->getBytesOutstanding());
2852
2853    destroy_mock_cookie(cookie);
2854    producer->closeAllStreams();
2855    producer->cancelCheckpointCreatorTask();
2856    producer.reset();
2857}
2858
2859TEST_P(XattrSystemUserTest, MB_29040) {
2860    auto& kvbucket = *engine->getKVBucket();
2861    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2862    store_item(vbid,
2863               {"key", DocNamespace::DefaultCollection},
2864               createXattrValue("{}", GetParam()),
2865               ep_real_time() + 1 /*1 second TTL*/,
2866               {cb::engine_errc::success},
2867
2868               PROTOCOL_BINARY_DATATYPE_XATTR | PROTOCOL_BINARY_DATATYPE_JSON);
2869
2870    EXPECT_EQ(std::make_pair(false, size_t(1)),
2871              getEPBucket().flushVBucket(vbid));
2872    TimeTraveller ted(64000);
2873    runCompaction();
2874    // An expired item should of been pushed to the checkpoint
2875    EXPECT_EQ(std::make_pair(false, size_t(1)),
2876              getEPBucket().flushVBucket(vbid));
2877    get_options_t options = static_cast<get_options_t>(
2878            QUEUE_BG_FETCH | HONOR_STATES | TRACK_REFERENCE | DELETE_TEMP |
2879            HIDE_LOCKED_CAS | TRACK_STATISTICS | GET_DELETED_VALUE);
2880    GetValue gv = kvbucket.get(
2881            {"key", DocNamespace::DefaultCollection}, vbid, cookie, options);
2882    EXPECT_EQ(ENGINE_EWOULDBLOCK, gv.getStatus());
2883
2884    MockGlobalTask mockTask(engine->getTaskable(), TaskId::MultiBGFetcherTask);
2885    store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
2886
2887    gv = kvbucket.get(
2888            {"key", DocNamespace::DefaultCollection}, vbid, cookie, options);
2889    ASSERT_EQ(ENGINE_SUCCESS, gv.getStatus());
2890
2891    auto get_itm = gv.item.get();
2892    auto get_data = const_cast<char*>(get_itm->getData());
2893
2894    cb::char_buffer value_buf{get_data, get_itm->getNBytes()};
2895    cb::xattr::Blob new_blob(value_buf, false);
2896
2897    // If testing with system xattrs
2898    if (GetParam()) {
2899        const std::string& cas_str{"{\"cas\":\"0xdeadbeefcafefeed\"}"};
2900        const std::string& sync_str = to_string(new_blob.get("_sync"));
2901
2902        EXPECT_EQ(cas_str, sync_str) << "Unexpected system xattrs";
2903        EXPECT_EQ(PROTOCOL_BINARY_DATATYPE_XATTR, get_itm->getDataType())
2904                << "Wrong datatype Item:" << *get_itm;
2905    } else {
2906        EXPECT_EQ(PROTOCOL_BINARY_RAW_BYTES, get_itm->getDataType())
2907                << "Wrong datatype Item:" << *get_itm;
2908    }
2909
2910    // Non-system xattrs should be removed
2911    EXPECT_TRUE(new_blob.get("user").empty())
2912            << "The user attribute should be gone";
2913    EXPECT_TRUE(new_blob.get("meta").empty())
2914            << "The meta attribute should be gone";
2915}
2916
2917extern uint8_t dcp_last_op;
2918class MB_29287 : public SingleThreadedEPBucketTest {
2919public:
2920    void SetUp() override {
2921        SingleThreadedEPBucketTest::SetUp();
2922        cookie = create_mock_cookie();
2923        setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2924
2925        // 1. Mock producer
2926        producer = std::make_shared<MockDcpProducer>(
2927                *engine, cookie, "test_producer", 0, cb::const_byte_buffer{});
2928        producer->createCheckpointProcessorTask();
2929
2930        producers = get_dcp_producers(
2931                reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
2932                reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
2933        auto vb = store->getVBuckets().getBucket(vbid);
2934        ASSERT_NE(nullptr, vb.get());
2935        // 2. Mock active stream
2936        producer->mockActiveStreamRequest(0, // flags
2937                                          1, // opaque
2938                                          *vb,
2939                                          0, // start_seqno
2940                                          ~0, // end_seqno
2941                                          0, // vbucket_uuid,
2942                                          0, // snap_start_seqno,
2943                                          0); // snap_end_seqno,
2944
2945        store_item(vbid, makeStoredDocKey("1"), "value1");
2946        store_item(vbid, makeStoredDocKey("2"), "value2");
2947        store_item(vbid, makeStoredDocKey("3"), "value3");
2948        flush_vbucket_to_disk(vbid, 3);
2949        notifyAndStepToCheckpoint(*producer, *producers);
2950
2951        for (int i = 0; i < 3; i++) { // 1, 2 and 3
2952            EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
2953            EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
2954        }
2955
2956        store_item(vbid, makeStoredDocKey("4"), "value4");
2957
2958        auto stream = producer->findStream(vbid);
2959        auto* mockStream = static_cast<MockActiveStream*>(stream.get());
2960        mockStream->preGetOutstandingItemsCallback =
2961                std::bind(&MB_29287::closeAndRecreateStream, this);
2962
2963        // call next - get success (nothing ready, but task has been scheduled)
2964        EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
2965
2966        // Run the snapshot task and step (triggering
2967        // preGetOutstandingItemsCallback)
2968        notifyAndStepToCheckpoint(*producer, *producers);
2969    }
2970
2971    void TearDown() override {
2972        destroy_mock_cookie(cookie);
2973        producer->closeAllStreams();
2974        producer->cancelCheckpointCreatorTask();
2975        producer.reset();
2976        SingleThreadedEPBucketTest::TearDown();
2977    }
2978
2979    void closeAndRecreateStream() {
2980        // Without the fix, 5 will be lost
2981        store_item(vbid, makeStoredDocKey("5"), "don't lose me");
2982        producer->closeStream(1, 0);
2983        auto vb = store->getVBuckets().getBucket(vbid);
2984        ASSERT_NE(nullptr, vb.get());
2985        producer->mockActiveStreamRequest(DCP_ADD_STREAM_FLAG_TAKEOVER,
2986                                          1, // opaque
2987                                          *vb,
2988                                          3, // start_seqno
2989                                          ~0, // end_seqno
2990                                          vb->failovers->getLatestUUID(),
2991                                          3, // snap_start_seqno
2992                                          ~0); // snap_end_seqno
2993    }
2994
2995    const void* cookie = nullptr;
2996    std::shared_ptr<MockDcpProducer> producer;
2997    std::unique_ptr<dcp_message_producers> producers;
2998};
2999
3000// NEXT two test are TEMP disabled as this commit will cause a deadlock
3001// because the same thread is calling back with streamMutex held onto a function
3002// which wants to acquire...
3003
3004// Stream takeover with no more writes
3005TEST_F(MB_29287, DISABLED_dataloss_end) {
3006    auto stream = producer->findStream(vbid);
3007    auto* as = static_cast<ActiveStream*>(stream.get());
3008
3009    EXPECT_TRUE(stream->isTakeoverSend());
3010    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3011    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3012    dcp_last_op = 0;
3013    EXPECT_EQ("4", dcp_last_key);
3014
3015    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3016    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3017    dcp_last_op = 0;
3018    EXPECT_EQ("5", dcp_last_key);
3019
3020    // Snapshot received
3021    as->snapshotMarkerAckReceived();
3022
3023    // set-vb-state now underway
3024    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3025    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
3026
3027    // Move stream to pending and vb to dead
3028    as->setVBucketStateAckRecieved();
3029
3030    // Cannot store anymore items
3031    store_item(vbid,
3032               makeStoredDocKey("K6"),
3033               "value6",
3034               0,
3035               {cb::engine_errc::not_my_vbucket});
3036
3037    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3038    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
3039    as->setVBucketStateAckRecieved();
3040    EXPECT_TRUE(!stream->isActive());
3041
3042    auto vb = store->getVBuckets().getBucket(vbid);
3043    ASSERT_NE(nullptr, vb.get());
3044    // Have persistence cursor only (dcp now closed down)
3045    EXPECT_EQ(1, vb->checkpointManager->getNumOfCursors());
3046}
3047
3048// takeover when more writes occur
3049TEST_F(MB_29287, DISABLED_dataloss_hole) {
3050    auto stream = producer->findStream(vbid);
3051    auto* as = static_cast<ActiveStream*>(stream.get());
3052
3053    store_item(vbid, makeStoredDocKey("6"), "value6");
3054
3055    EXPECT_TRUE(stream->isTakeoverSend());
3056    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3057    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3058    dcp_last_op = 0;
3059    EXPECT_EQ("4", dcp_last_key);
3060
3061    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3062    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3063    dcp_last_op = 0;
3064    EXPECT_EQ("5", dcp_last_key);
3065
3066    // Snapshot received
3067    as->snapshotMarkerAckReceived();
3068
3069    // More data in the checkpoint (key 6)
3070
3071    // call next - get success (nothing ready, but task has been scheduled)
3072    EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
3073
3074    // Run the snapshot task and step
3075    notifyAndStepToCheckpoint(*producer, *producers);
3076
3077    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3078    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3079    EXPECT_EQ("6", dcp_last_key);
3080
3081    // Snapshot received
3082    as->snapshotMarkerAckReceived();
3083
3084    // Now send
3085    EXPECT_TRUE(stream->isTakeoverSend());
3086
3087    // set-vb-state now underway
3088    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3089    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
3090    dcp_last_op = 0;
3091
3092    // Move stream to pending and vb to dead
3093    as->setVBucketStateAckRecieved();
3094
3095    // Cannot store anymore items
3096    store_item(vbid,
3097               makeStoredDocKey("K6"),
3098               "value6",
3099               0,
3100               {cb::engine_errc::not_my_vbucket});
3101
3102    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3103    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
3104    as->setVBucketStateAckRecieved();
3105    EXPECT_TRUE(!stream->isActive());
3106
3107    auto vb = store->getVBuckets().getBucket(vbid);
3108    ASSERT_NE(nullptr, vb.get());
3109    // Have persistence cursor only (dcp now closed down)
3110    EXPECT_EQ(1, vb->checkpointManager->getNumOfCursors());
3111}
3112
3113class XattrCompressedTest
3114    : public SingleThreadedEPBucketTest,
3115      public ::testing::WithParamInterface<::testing::tuple<bool, bool>> {
3116public:
3117    bool isXattrSystem() const {
3118        return ::testing::get<0>(GetParam());
3119    }
3120    bool isSnappy() const {
3121        return ::testing::get<1>(GetParam());
3122    }
3123};
3124
3125// Create a replica VB and consumer, then send it an xattr value which should
3126// of been stripped at the source, but wasn't because of MB29040. Then check
3127// the consumer sanitises the document. Run the test with user/system xattrs
3128// and snappy on/off
3129TEST_P(XattrCompressedTest, MB_29040_sanitise_input) {
3130    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
3131
3132    auto consumer = std::make_shared<MockDcpConsumer>(
3133            *engine, cookie, "MB_29040_sanitise_input");
3134    int opaque = 1;
3135    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(opaque, vbid, /*flags*/ 0));
3136
3137    std::string body;
3138    if (!isXattrSystem()) {
3139        body.assign("value");
3140    }
3141    auto value = createXattrValue(body, isXattrSystem(), isSnappy());
3142
3143    // Send deletion in a single seqno snapshot
3144    int64_t bySeqno = 1;
3145    EXPECT_EQ(ENGINE_SUCCESS,
3146              consumer->snapshotMarker(
3147                      opaque, vbid, bySeqno, bySeqno, MARKER_FLAG_CHK));
3148
3149    cb::const_byte_buffer valueBuf{
3150            reinterpret_cast<const uint8_t*>(value.data()), value.size()};
3151    EXPECT_EQ(
3152            ENGINE_SUCCESS,
3153            consumer->deletion(
3154                    opaque,
3155                    {"key", DocNamespace::DefaultCollection},
3156                    valueBuf,
3157                    /*priv_bytes*/ 0,
3158                    PROTOCOL_BINARY_DATATYPE_XATTR |
3159                            (isSnappy() ? PROTOCOL_BINARY_DATATYPE_SNAPPY : 0),
3160                    /*cas*/ 3,
3161                    vbid,
3162                    bySeqno,
3163                    /*revSeqno*/ 0,
3164                    /*meta*/ {}));
3165
3166    EXPECT_EQ(std::make_pair(false, size_t(1)),
3167              getEPBucket().flushVBucket(vbid));
3168
3169    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
3170
3171    // Switch to active
3172    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3173
3174    get_options_t options = static_cast<get_options_t>(
3175            QUEUE_BG_FETCH | HONOR_STATES | TRACK_REFERENCE | DELETE_TEMP |
3176            HIDE_LOCKED_CAS | TRACK_STATISTICS | GET_DELETED_VALUE);
3177    auto gv = store->get(
3178            {"key", DocNamespace::DefaultCollection}, vbid, cookie, options);
3179    EXPECT_EQ(ENGINE_EWOULDBLOCK, gv.getStatus());
3180
3181    // Manually run the bgfetch task.
3182    MockGlobalTask mockTask(engine->getTaskable(), TaskId::MultiBGFetcherTask);
3183    store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
3184    gv = store->get({"key", DocNamespace::DefaultCollection},
3185                    vbid,
3186                    cookie,
3187                    GET_DELETED_VALUE);
3188    ASSERT_EQ(ENGINE_SUCCESS, gv.getStatus());
3189
3190    // This is the only system key test_helpers::createXattrValue gives us
3191    cb::xattr::Blob blob;
3192    blob.set("_sync", "{\"cas\":\"0xdeadbeefcafefeed\"}");
3193
3194    EXPECT_TRUE(gv.item->isDeleted());
3195    EXPECT_EQ(0, gv.item->getFlags());
3196    EXPECT_EQ(3, gv.item->getCas());
3197    EXPECT_EQ(isXattrSystem() ? blob.size() : 0,
3198              gv.item->getValue()->valueSize());
3199    EXPECT_EQ(isXattrSystem() ? PROTOCOL_BINARY_DATATYPE_XATTR
3200                              : PROTOCOL_BINARY_RAW_BYTES,
3201              gv.item->getDataType());
3202}
3203
3204// Create a replica VB and consumer, then send it an delete with value which
3205// should never of been created on the source.
3206TEST_F(SingleThreadedEPBucketTest, MB_31141_sanitise_input) {
3207    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
3208
3209    auto consumer = std::make_shared<MockDcpConsumer>(
3210            *engine, cookie, "MB_31141_sanitise_input");
3211    int opaque = 1;
3212    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(opaque, vbid, /*flags*/ 0));
3213
3214    std::string body = "value";
3215
3216    // Send deletion in a single seqno snapshot
3217    int64_t bySeqno = 1;
3218    EXPECT_EQ(ENGINE_SUCCESS,
3219              consumer->snapshotMarker(
3220                      opaque, vbid, bySeqno, bySeqno, MARKER_FLAG_CHK));
3221
3222    EXPECT_EQ(ENGINE_SUCCESS,
3223              consumer->deletion(opaque,
3224                                 {"key", DocNamespace::DefaultCollection},
3225                                 {reinterpret_cast<const uint8_t*>(body.data()),
3226                                  body.size()},
3227                                 /*priv_bytes*/ 0,
3228                                 PROTOCOL_BINARY_DATATYPE_SNAPPY |
3229                                         PROTOCOL_BINARY_RAW_BYTES,
3230                                 /*cas*/ 3,
3231                                 vbid,
3232                                 bySeqno,
3233                                 /*revSeqno*/ 0,
3234                                 /*meta*/ {}));
3235
3236    EXPECT_EQ(std::make_pair(false, size_t(1)),
3237              getEPBucket().flushVBucket(vbid));
3238
3239    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
3240
3241    // Switch to active
3242    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3243
3244    get_options_t options = static_cast<get_options_t>(
3245            QUEUE_BG_FETCH | HONOR_STATES | TRACK_REFERENCE | DELETE_TEMP |
3246            HIDE_LOCKED_CAS | TRACK_STATISTICS | GET_DELETED_VALUE);
3247    auto gv = store->get(
3248            {"key", DocNamespace::DefaultCollection}, vbid, cookie, options);
3249    EXPECT_EQ(ENGINE_EWOULDBLOCK, gv.getStatus());
3250
3251    // Manually run the bgfetch task.
3252    MockGlobalTask mockTask(engine->getTaskable(), TaskId::MultiBGFetcherTask);
3253    store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
3254    gv = store->get({"key", DocNamespace::DefaultCollection},
3255                    vbid,
3256                    cookie,
3257                    GET_DELETED_VALUE);
3258    ASSERT_EQ(ENGINE_SUCCESS, gv.getStatus());
3259
3260    EXPECT_TRUE(gv.item->isDeleted());
3261    EXPECT_EQ(0, gv.item->getFlags());
3262    EXPECT_EQ(3, gv.item->getCas());
3263    EXPECT_EQ(0, gv.item->getValue()->valueSize());
3264    EXPECT_EQ(PROTOCOL_BINARY_RAW_BYTES, gv.item->getDataType());
3265}
3266
3267// Test highlighting MB_29480 - this is not demonstrating the issue is fixed.
3268TEST_F(SingleThreadedEPBucketTest, MB_29480) {
3269    // Make vbucket active.
3270    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3271    auto vb = store->getVBuckets().getBucket(vbid);
3272    ASSERT_NE(nullptr, vb.get());
3273
3274    // Create a Mock Dcp producer
3275    auto producer = std::make_shared<MockDcpProducer>(
3276            *engine,
3277            cookie,
3278            "test_producer",
3279            /*flags*/ 0,
3280            cb::const_byte_buffer() /*no json*/);
3281
3282    producer->createCheckpointProcessorTask();
3283
3284    auto producers = get_dcp_producers(
3285            reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
3286            reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
3287
3288    producer->mockActiveStreamRequest(0, // flags
3289                                      1, // opaque
3290                                      *vb,
3291                                      0, // start_seqno
3292                                      ~0, // end_seqno
3293                                      0, // vbucket_uuid,
3294                                      0, // snap_start_seqno,
3295                                      0); // snap_end_seqno,
3296
3297    // 1) First store 5 keys
3298    std::array<std::string, 2> initialKeys = {{"k1", "k2"}};
3299    for (const auto& key : initialKeys) {
3300        store_item(vbid, makeStoredDocKey(key), key);
3301    }
3302    flush_vbucket_to_disk(vbid, initialKeys.size());
3303
3304    // 2) And receive them, client knows of k1,k2,k3,k4,k5
3305    notifyAndStepToCheckpoint(*producer, *producers);
3306    for (const auto& key : initialKeys) {
3307        EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3308        EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3309        EXPECT_EQ(key, dcp_last_key);
3310        dcp_last_op = 0;
3311    }
3312
3313    auto stream = producer->findStream(vbid);
3314    auto* mock_stream = static_cast<MockActiveStream*>(stream.get());
3315
3316    // 3) Next delete k1/k2, compact (purging the tombstone)
3317    // NOTE: compaction will not purge a tombstone if it is the highest item
3318    // in the seqno index, hence why k1 will be purged but k2 won't
3319    for (const auto& key : initialKeys) {
3320        delete_item(vbid, makeStoredDocKey(key));
3321    }
3322    flush_vbucket_to_disk(vbid, initialKeys.size());
3323
3324    // 4) Compact drop tombstones less than time=maxint and below seqno 3
3325    // as per earlier comment, only seqno 1 will be purged...
3326    runCompaction(~0, 3);
3327
3328    // 5) Begin cursor dropping
3329    mock_stream->handleSlowStream();
3330
3331    // Kick the stream into backfill
3332    EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
3333
3334    // 6) Store more items (don't flush these)
3335    std::array<std::string, 2> extraKeys = {{"k3", "k4"}};
3336    for (const auto& key : extraKeys) {
3337        store_item(vbid, makeStoredDocKey(key), key);
3338    }
3339
3340    auto vb0Stream = producer->findStream(0);
3341    ASSERT_NE(nullptr, vb0Stream.get());
3342
3343    EXPECT_TRUE(vb0Stream->isBackfilling());
3344
3345    // 7) Backfill now starts up, but should quickly cancel
3346    runNextTask(*task_executor->getLpTaskQ()[AUXIO_TASK_IDX]);
3347
3348    // Stream is now dead
3349    EXPECT_FALSE(vb0Stream->isActive());
3350
3351    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3352    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_STREAM_END, dcp_last_op);
3353
3354    // Stop Producer checkpoint processor task
3355    producer->cancelCheckpointCreatorTask();
3356}
3357
3358// MB-29512: Ensure if compaction ran in between stream-request and backfill
3359// starting, we don't backfill from before the purge-seqno.
3360TEST_F(SingleThreadedEPBucketTest, MB_29512) {
3361    // Make vbucket active.
3362    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3363    auto vb = store->getVBuckets().getBucket(vbid);
3364    ASSERT_NE(nullptr, vb.get());
3365
3366    // Create a Mock Dcp producer
3367    auto producer = std::make_shared<MockDcpProducer>(
3368            *engine,
3369            cookie,
3370            "test_producer",
3371            /*flags*/ 0,
3372            cb::const_byte_buffer() /*no json*/);
3373
3374    producer->createCheckpointProcessorTask();
3375
3376    auto producers = get_dcp_producers(
3377            reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
3378            reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
3379
3380    // 1) First store k1/k2 (creating seq 1 and seq 2)
3381    std::array<std::string, 2> initialKeys = {{"k1", "k2"}};
3382    for (const auto& key : initialKeys) {
3383        store_item(vbid, makeStoredDocKey(key), key);
3384    }
3385    flush_vbucket_to_disk(vbid, initialKeys.size());
3386
3387    // Assume the DCP client connects here and receives seq 1 and 2 then drops
3388
3389    // 2) delete k1/k2 (creating seq 3 and seq 4)
3390    for (const auto& key : initialKeys) {
3391        delete_item(vbid, makeStoredDocKey(key));
3392    }
3393    flush_vbucket_to_disk(vbid, initialKeys.size());
3394
3395    // Disk index now has two items, seq3 and seq4 (deletes of k1/k2)
3396
3397    // 3) Force all memory items out so DCP will definitely go to disk and
3398    //    not memory.
3399    bool newcp;
3400    vb->checkpointManager->createNewCheckpoint();
3401    // Force persistence into new CP
3402    store_item(vbid, makeStoredDocKey("k3"), "k3");
3403    flush_vbucket_to_disk(vbid, 1);
3404    EXPECT_EQ(2,
3405              vb->checkpointManager->removeClosedUnrefCheckpoints(*vb, newcp));
3406
3407    // 4) Stream request picking up where we left off.
3408    uint64_t rollbackSeqno = 0;
3409    EXPECT_EQ(ENGINE_SUCCESS,
3410              producer->streamRequest(0, // flags
3411                                      1, // opaque
3412                                      vb->getId(),
3413                                      2, // start_seqno
3414                                      ~0, // end_seqno
3415                                      vb->failovers->getLatestUUID(),
3416                                      0, // snap_start_seqno,
3417                                      2,
3418                                      &rollbackSeqno,
3419                                      &dcpAddFailoverLog)); // snap_end_seqno,
3420
3421    // 5) Now compaction kicks in, which will purge the deletes of k1/k2 setting
3422    //    the purgeSeqno to seq 4 (the last purged seqno)
3423    runCompaction(~0, 5);
3424
3425    EXPECT_EQ(vb->getPurgeSeqno(), 4);
3426
3427    auto vb0Stream = producer->findStream(0);
3428    ASSERT_NE(nullptr, vb0Stream.get());
3429
3430    EXPECT_TRUE(vb0Stream->isBackfilling());
3431
3432    // 6) Backfill now starts up, but should quickly cancel
3433    runNextTask(*task_executor->getLpTaskQ()[AUXIO_TASK_IDX]);
3434
3435    EXPECT_FALSE(vb0Stream->isActive());
3436
3437    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3438    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_STREAM_END, dcp_last_op);
3439
3440    // Stop Producer checkpoint processor task
3441    producer->cancelCheckpointCreatorTask();
3442}
3443
3444TEST_F(SingleThreadedEPBucketTest, MB_29541) {
3445    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3446
3447    // 1) First store 2 keys which we will backfill
3448    std::array<std::string, 2> keys = {{"k1", "k2"}};
3449    for (const auto& key : keys) {
3450        store_item(vbid, makeStoredDocKey(key), key);
3451    }
3452    flush_vbucket_to_disk(vbid, keys.size());
3453
3454    // Simplest way to ensure DCP has todo a backfill - 'wipe memory'
3455    resetEngineAndWarmup();
3456
3457    // Setup DCP, 1 producer and we will do a takeover of the vbucket
3458    auto producer = std::make_shared<MockDcpProducer>(
3459            *engine,
3460            cookie,
3461            "mb-29541",
3462            /*flags*/ 0,
3463            cb::const_byte_buffer() /*no json*/);
3464
3465    producer->createCheckpointProcessorTask();
3466
3467    auto producers = get_dcp_producers(
3468            reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
3469            reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
3470
3471    uint64_t rollbackSeqno = 0;
3472    auto vb = store->getVBuckets().getBucket(vbid);
3473    ASSERT_NE(nullptr, vb.get());
3474    EXPECT_EQ(ENGINE_SUCCESS,
3475              producer->streamRequest(DCP_ADD_STREAM_FLAG_TAKEOVER, // flags
3476                                      1, // opaque
3477                                      vbid,
3478                                      0, // start_seqno
3479                                      vb->getHighSeqno(), // end_seqno
3480                                      vb->failovers->getLatestUUID(),
3481                                      0, // snap_start_seqno
3482                                      vb->getHighSeqno(), // snap_end_seqno
3483                                      &rollbackSeqno,
3484                                      &dcpAddFailoverLog));
3485
3486    // This MB also relies on the consumer draining the stream as the backfill
3487    // runs, rather than running the backfill then sequentially then draining
3488    // the readyQ, basically when backfill complete occurs we should have
3489    // shipped all items to ensure the state transition to takeover-send would
3490    // indeed block (unless we have the fix applied...)
3491
3492    // Manually drive the backfill (not using notifyAndStepToCheckpoint)
3493
3494    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
3495    // backfill:create()
3496    runNextTask(lpAuxioQ);
3497    // backfill:scan()
3498    runNextTask(lpAuxioQ);
3499
3500    // Now drain all items before we proceed to complete
3501    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3502    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER, dcp_last_op);
3503    for (const auto& key : keys) {
3504        EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3505        EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3506        EXPECT_EQ(key, dcp_last_key);
3507    }
3508
3509    // backfill:complete()
3510    runNextTask(lpAuxioQ);
3511    // backfill:finished()
3512    runNextTask(lpAuxioQ);
3513
3514    dcp_last_op = 0;
3515
3516    // Next the backfill should switch to takeover-send and progress to close
3517    // with the correct sequence of step/ack
3518
3519    auto vb0Stream = producer->findStream(0);
3520    ASSERT_NE(nullptr, vb0Stream.get());
3521    // However without the fix from MB-29541 this would return success, meaning
3522    // the front-end thread should sleep until notified the stream is ready.
3523    // However no notify will ever come if MB-29541 is not applied
3524    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3525    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
3526
3527    EXPECT_TRUE(vb0Stream->isTakeoverWait());
3528
3529    // For completeness step to end
3530    // we must ack the VB state
3531    protocol_binary_response_header message;
3532    message.response.opcode = PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE;
3533    message.response.opaque = 1;
3534    EXPECT_TRUE(producer->handleResponse(&message));
3535
3536    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3537    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE, dcp_last_op);
3538
3539    EXPECT_TRUE(producer->handleResponse(&message));
3540    EXPECT_FALSE(vb0Stream->isActive());
3541    // Stop Producer checkpoint processor task
3542    producer->cancelCheckpointCreatorTask();
3543}
3544
3545/* When a backfill is activated along with a slow stream trigger,
3546 * the stream end message gets stuck in the readyQ as the stream is
3547 * never notified as ready to send it. As the stream transitions state
3548 * to InMemory as well as having sent all requested sequence numbers,
3549 * the stream is meant to end but Stream::itemsReady can cause this
3550 * to never trigger. This means that DCP consumers can hang waiting
3551 * for this closure message.
3552 * This test checks that the DCP stream actually sends the end stream
3553 * message when triggering this problematic sequence.
3554 */
3555TEST_F(SingleThreadedEPBucketTest, MB_31481) {
3556    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3557
3558    // 1) First store 2 keys which we will backfill
3559    std::array<std::string, 2> keys = {{"k1", "k2"}};
3560    store_item(vbid, makeStoredDocKey(keys[0]), keys[0]);
3561    store_item(vbid, makeStoredDocKey(keys[1]), keys[1]);
3562
3563    flush_vbucket_to_disk(vbid, keys.size());
3564
3565    // Simplest way to ensure DCP has to do a backfill - 'wipe memory'
3566    resetEngineAndWarmup();
3567
3568    // Setup DCP, 1 producer and we will do a takeover of the vbucket
3569    auto producer = createDcpProducer(cookie, {}, false, IncludeDeleteTime::No);
3570
3571    auto producers = get_dcp_producers(
3572            reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
3573            reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
3574
3575    ASSERT_TRUE(producer->getReadyQueue().empty());
3576
3577    uint64_t rollbackSeqno = 0;
3578    auto vb = store->getVBuckets().getBucket(vbid);
3579    ASSERT_NE(nullptr, vb.get());
3580    EXPECT_EQ(ENGINE_SUCCESS,
3581              producer->streamRequest(0, // flags
3582                                      1, // opaque
3583                                      vbid,
3584                                      0, // start_seqno
3585                                      vb->getHighSeqno(), // end_seqno
3586                                      vb->failovers->getLatestUUID(),
3587                                      0, // snap_start_seqno
3588                                      vb->getHighSeqno(), // snap_end_seqno
3589                                      &rollbackSeqno,
3590                                      &dcpAddFailoverLog));
3591
3592    auto vb0Stream =
3593            dynamic_cast<ActiveStream*>(producer->findStream(vbid).get());
3594    ASSERT_NE(nullptr, vb0Stream);
3595
3596    // Manually drive the backfill (not using notifyAndStepToCheckpoint)
3597    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
3598    // Trigger slow stream handle
3599    ASSERT_TRUE(vb0Stream->handleSlowStream());
3600    // backfill:create()
3601    runNextTask(lpAuxioQ);
3602    // backfill:scan()
3603    runNextTask(lpAuxioQ);
3604
3605    ASSERT_TRUE(producer->getReadyQueue().exists(vbid));
3606
3607    // Now drain all items before we proceed to complete, which triggers disk
3608    // snapshot.
3609    ASSERT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3610    ASSERT_EQ(PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER, dcp_last_op);
3611    for (const auto& key : keys) {
3612        ASSERT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3613        ASSERT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3614        ASSERT_EQ(key, dcp_last_key);
3615    }
3616
3617    // Another producer step should report SUCCESS (no more data) as all items
3618    // have been backfilled.
3619    EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
3620    // Also the readyQ should be empty
3621    EXPECT_TRUE(producer->getReadyQueue().empty());
3622
3623    // backfill:complete()
3624    runNextTask(lpAuxioQ);
3625
3626    // Notified to allow stream to transition to in-memory phase.
3627    EXPECT_TRUE(producer->getReadyQueue().exists(vbid));
3628
3629    // Step should cause stream closed message, previously this would
3630    // keep the "ENGINE_SUCCESS" response due to the itemsReady flag,
3631    // which is not expected with that message already being in the readyQ.
3632    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3633    EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_STREAM_END, dcp_last_op);
3634
3635    // Stepping forward should now show that stream end message has been
3636    // completed and no more messages are needed to send.
3637    EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
3638
3639    // Similarly, the readyQ should be empty again
3640    EXPECT_TRUE(producer->getReadyQueue().empty());
3641
3642    // backfill:finished() - just to cleanup.
3643    runNextTask(lpAuxioQ);
3644
3645    // vb0Stream should be closed
3646    EXPECT_FALSE(vb0Stream->isActive());
3647
3648    // Stop Producer checkpoint processor task
3649    producer->cancelCheckpointCreatorTask();
3650}
3651
3652INSTANTIATE_TEST_CASE_P(XattrSystemUserTest,
3653                        XattrSystemUserTest,
3654                        ::testing::Bool(), );
3655
3656INSTANTIATE_TEST_CASE_P(XattrCompressedTest,
3657                        XattrCompressedTest,
3658                        ::testing::Combine(::testing::Bool(),
3659                                           ::testing::Bool()), );
3660
3661static auto allConfigValues = ::testing::Values(
3662        std::make_tuple(std::string("ephemeral"), std::string("auto_delete")),
3663        std::make_tuple(std::string("ephemeral"), std::string("fail_new_data")),
3664        std::make_tuple(std::string("persistent"), std::string{}));
3665
3666// Test cases which run for persistent and ephemeral buckets
3667INSTANTIATE_TEST_CASE_P(EphemeralOrPersistent,
3668                        STParameterizedBucketTest,
3669                        allConfigValues, );
3670