1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "evp_store_test.h"
19
20#include "fakes/fake_executorpool.h"
21#include "taskqueue.h"
22#include "../mock/mock_dcp_producer.h"
23#include "../mock/mock_dcp_consumer.h"
24#include "../mock/mock_stream.h"
25#include "programs/engine_testapp/mock_server.h"
26
27#include <thread>
28
29/*
30 * A subclass of EventuallyPersistentStoreTest which uses a fake ExecutorPool,
31 * which will not spawn ExecutorThreads and hence not run any tasks
32 * automatically in the background. All tasks must be manually run().
33 */
34class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
35    void TearDown() {
36        shutdownAndPurgeTasks();
37        EventuallyPersistentStoreTest::TearDown();
38    }
39
40public:
41    /*
42     * Run the next task from the taskQ
43     * The task must match the expectedTaskName parameter
44     */
45    void runNextTask(TaskQueue& taskQ, const std::string& expectedTaskName) {
46        CheckedExecutor executor(task_executor, taskQ);
47
48        // Run the task
49        executor.runCurrentTask(expectedTaskName);
50        executor.completeCurrentTask();
51    }
52
53    /*
54     * Run the next task from the taskQ
55     */
56    void runNextTask(TaskQueue& taskQ) {
57        CheckedExecutor executor(task_executor, taskQ);
58
59        // Run the task
60        executor.runCurrentTask();
61        executor.completeCurrentTask();
62    }
63
64protected:
65    void SetUp() {
66        SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
67        EventuallyPersistentStoreTest::SetUp();
68
69        task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
70            (ExecutorPool::get());
71    }
72
73    /*
74     * Change the vbucket state and run the VBStatePeristTask
75     * On return the state will be changed and the task completed.
76     */
77    void setVBucketStateAndRunPersistTask(uint16_t vbid, vbucket_state_t newState) {
78        // Change state - this should add 1 set_vbucket_state op to the
79        //VBuckets' persistence queue.
80        EXPECT_EQ(ENGINE_SUCCESS,
81                  store->setVBucketState(vbid, newState, /*transfer*/false));
82
83        // Trigger the flusher to flush state to disk.
84        EXPECT_EQ(0, store->flushVBucket(vbid));
85    }
86
87    /*
88     * Set the stats isShutdown and attempt to drive all tasks to cancel
89     */
90    void shutdownAndPurgeTasks() {
91        engine->getEpStats().isShutdown = true;
92        task_executor->cancelAll();
93
94        for (task_type_t t :
95             {WRITER_TASK_IDX, READER_TASK_IDX, AUXIO_TASK_IDX, NONIO_TASK_IDX}) {
96
97            // Define a lambda to drive all tasks from the queue, if hpTaskQ
98            // is implemented then trivial to add a second call to runTasks.
99            auto runTasks = [=](TaskQueue& queue) {
100                while (queue.getFutureQueueSize() > 0 || queue.getReadyQueueSize() > 0) {
101                    runNextTask(queue);
102                }
103            };
104            runTasks(*task_executor->getLpTaskQ()[t]);
105            task_executor->stopTaskGroup(engine->getTaskable().getGID(), t,
106                                         engine->getEpStats().forceShutdown);
107        }
108    }
109
110    /*
111     * Fake callback emulating dcp_add_failover_log
112     */
113    static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(vbucket_failover_t* entry,
114                                                   size_t nentries,
115                                                   const void *cookie) {
116        return ENGINE_SUCCESS;
117    }
118
119    SingleThreadedExecutorPool* task_executor;
120};
121
122
123/**
124 * Regression test for MB-22451: When handleSlowStream is called and in
125 * STREAM_BACKFILLING state and currently have a backfill scheduled (or running)
126 * ensure that when the backfill completes pendingBackfill remains true,
127 * isBackfillTaskRunning is false and, the stream state remains set to
128 * STREAM_BACKFILLING.
129 */
130TEST_F(SingleThreadedEPStoreTest, test_mb22451) {
131    // Make vbucket active.
132    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
133    // Store a single Item
134    store_item(vbid, "key", "value");
135    // Ensure that it has persisted to disk
136    flush_vbucket_to_disk(vbid);
137
138    // Create a Mock Dcp producer
139    dcp_producer_t producer = new MockDcpProducer(*engine,
140                                                  cookie,
141                                                  "test_producer",
142                                                  /*notifyOnly*/false);
143    // Create a Mock Active Stream
144    stream_t stream = new MockActiveStream(
145            static_cast<EventuallyPersistentEngine*>(engine.get()),
146            producer,
147            producer->getName(),
148            /*flags*/0,
149            /*opaque*/0, vbid,
150            /*st_seqno*/0,
151            /*en_seqno*/~0,
152            /*vb_uuid*/0xabcd,
153            /*snap_start_seqno*/0,
154            /*snap_end_seqno*/~0);
155
156    MockActiveStream* mock_stream =
157            static_cast<MockActiveStream*>(stream.get());
158
159    /**
160      * The core of the test follows:
161      * Call completeBackfill whilst we are in the state of STREAM_BACKFILLING
162      * and the pendingBackfill flag is set to true.
163      * We expect that on leaving completeBackfill the isBackfillRunning flag is
164      * set to true.
165      */
166    mock_stream->public_setBackfillTaskRunning(true);
167    mock_stream->public_transitionState(STREAM_BACKFILLING);
168    mock_stream->handleSlowStream();
169    // The call to handleSlowStream should result in setting pendingBackfill
170    // flag to true
171    EXPECT_TRUE(mock_stream->public_getPendingBackfill())
172        << "handleSlowStream should set pendingBackfill to True";
173    mock_stream->completeBackfill();
174    EXPECT_FALSE(mock_stream->public_isBackfillTaskRunning())
175        << "completeBackfill should set isBackfillTaskRunning to False";
176    EXPECT_EQ(STREAM_BACKFILLING, mock_stream->getState())
177            << "stream state should not have changed";
178    // Required to ensure that the backfillMgr is deleted
179    producer->closeAllStreams();
180}
181
182/* Regression / reproducer test for MB-19695 - an exception is thrown
183 * (and connection disconnected) if a couchstore file hasn't been re-created
184 * yet when doTapVbTakeoverStats() is called as part of
185 * tapNotify / TAP_OPAQUE_INITIAL_VBUCKET_STREAM.
186 */
187TEST_F(SingleThreadedEPStoreTest, MB19695_doTapVbTakeoverStats) {
188    auto* task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
189        (ExecutorPool::get());
190
191    // Should start with no tasks registered on any queues.
192    for (auto& queue : task_executor->getLpTaskQ()) {
193        ASSERT_EQ(0, queue->getFutureQueueSize());
194        ASSERT_EQ(0, queue->getReadyQueueSize());
195    }
196
197    // [[1] Set our state to replica.
198    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
199
200    auto& lpWriterQ = *task_executor->getLpTaskQ()[WRITER_TASK_IDX];
201    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
202
203    // [[2]] Perform a vbucket reset. This will perform some work synchronously,
204    // but also created 2 tasks and notifies the flusher:
205    //   1. vbucket memory deletion (NONIO)
206    //   2. vbucket disk deletion (WRITER)
207    //   3. FlusherTask notified (WRITER)
208    // MB-19695: If we try to get the number of persisted deletes between
209    // steps (2) and (3) running then an exception is thrown (and client
210    // disconnected).
211    EXPECT_TRUE(store->resetVBucket(vbid));
212
213    runNextTask(lpNonioQ, "Removing (dead) vb:0 from memory");
214    runNextTask(lpWriterQ, "Deleting VBucket:0");
215
216    // [[2]] Ok, let's see if we can get TAP takeover stats. This will
217    // fail with MB-19695.
218    // Dummy callback to pass into the stats function below.
219    auto dummy_cb = [](const char *key, const uint16_t klen,
220                          const char *val, const uint32_t vlen,
221                          const void *cookie) {};
222    std::string key{"MB19695_doTapVbTakeoverStats"};
223    EXPECT_NO_THROW(engine->public_doTapVbTakeoverStats
224                    (nullptr, dummy_cb, key, vbid));
225
226    // Also check DCP variant (MB-19815)
227    EXPECT_NO_THROW(engine->public_doDcpVbTakeoverStats
228                    (nullptr, dummy_cb, key, vbid));
229
230    // Cleanup - run flusher.
231    EXPECT_EQ(0, store->flushVBucket(vbid));
232}
233
234/*
235 * Test that
236 * 1. We cannot create a stream against a dead vb (MB-17230)
237 * 2. No tasks are scheduled as a side-effect of the streamRequest attempt.
238 */
239TEST_F(SingleThreadedEPStoreTest, MB19428_no_streams_against_dead_vbucket) {
240    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
241
242    store_item(vbid, "key", "value");
243
244    // Directly flush the vbucket
245    EXPECT_EQ(1, store->flushVBucket(vbid));
246
247    setVBucketStateAndRunPersistTask(vbid, vbucket_state_dead);
248    auto* task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
249        (ExecutorPool::get());
250    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
251
252    {
253        // Create a Mock Dcp producer
254        dcp_producer_t producer = new MockDcpProducer(*engine,
255                                                      cookie,
256                                                      "test_producer",
257                                                      /*notifyOnly*/false);
258
259        // Creating a producer will schedule one ActiveStreamCheckpointProcessorTask
260        // that task though sleeps forever, so won't run until woken.
261        EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
262
263        uint64_t rollbackSeqno;
264        auto err = producer->streamRequest(/*flags*/0,
265                                           /*opaque*/0,
266                                           /*vbucket*/vbid,
267                                           /*start_seqno*/0,
268                                           /*end_seqno*/-1,
269                                           /*vb_uuid*/0xabcd,
270                                           /*snap_start*/0,
271                                           /*snap_end*/0,
272                                           &rollbackSeqno,
273                                           SingleThreadedEPStoreTest::fakeDcpAddFailoverLog);
274
275        EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, err) << "Unexpected error code";
276
277        // The streamRequest failed and should not of created anymore tasks.
278        EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
279    }
280}
281
282// Check that in-progress disk backfills (`CouchKVStore::backfill`) are
283// correctly deleted when we delete a bucket. If not then we leak vBucket file
284// descriptors, which can prevent ns_server from cleaning up old vBucket files
285// and consequently re-adding a node to the cluster.
286//
287TEST_F(SingleThreadedEPStoreTest, MB19892_BackfillNotDeleted) {
288    // Make vbucket active.
289    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
290
291    // Perform one SET, then close it's checkpoint. This means that we no
292    // longer have all sequence numbers in memory checkpoints, forcing the
293    // DCP stream request to go to disk (backfill).
294    store_item(vbid, "key", "value");
295
296    // Force a new checkpoint.
297    auto vb = store->getVbMap().getBucket(vbid);
298    auto& ckpt_mgr = vb->checkpointManager;
299    ckpt_mgr.createNewCheckpoint();
300
301    // Directly flush the vbucket, ensuring data is on disk.
302    //  (This would normally also wake up the checkpoint remover task, but
303    //   as that task was never registered with the ExecutorPool in this test
304    //   environment, we need to manually remove the prev checkpoint).
305    EXPECT_EQ(1, store->flushVBucket(vbid));
306
307    bool new_ckpt_created;
308    EXPECT_EQ(1,
309              ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
310
311    // Create a DCP producer, and start a stream request.
312    std::string name{"test_producer"};
313    EXPECT_EQ(ENGINE_SUCCESS,
314              engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
315                              DCP_OPEN_PRODUCER, name.data(), name.size()));
316
317    uint64_t rollbackSeqno;
318    auto dummy_dcp_add_failover_cb = [](vbucket_failover_t* entry,
319                                       size_t nentries, const void *cookie) {
320        return ENGINE_SUCCESS;
321    };
322
323    // Actual stream request method (EvpDcpStreamReq) is static, so access via
324    // the engine_interface.
325    EXPECT_EQ(ENGINE_SUCCESS,
326              engine.get()->dcp.stream_req(
327                      &engine.get()->interface, cookie, /*flags*/0,
328                      /*opaque*/0, /*vbucket*/vbid, /*start_seqno*/0,
329                      /*end_seqno*/-1, /*vb_uuid*/0xabcd, /*snap_start*/0,
330                      /*snap_end*/0, &rollbackSeqno,
331                      dummy_dcp_add_failover_cb));
332}
333
334/*
335 * Test that the DCP processor returns a 'yield' return code when
336 * working on a large enough buffer size.
337 */
338TEST_F(SingleThreadedEPStoreTest, MB18452_yield_dcp_processor) {
339
340    // We need a replica VB
341    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
342
343    // Create a MockDcpConsumer
344    dcp_consumer_t consumer = new MockDcpConsumer(*engine, cookie, "test");
345
346    // Add the stream
347    EXPECT_EQ(ENGINE_SUCCESS,
348              consumer->addStream(/*opaque*/0, vbid, /*flags*/0));
349
350    // The processBufferedItems should yield every "yield * batchSize"
351    // So add '(n * (yield * batchSize)) + 1' messages and we should see
352    // processBufferedMessages return 'more_to_process' 'n' times and then
353    // 'all_processed' once.
354    const int n = 4;
355    const int yield = engine->getConfiguration().getDcpConsumerProcessBufferedMessagesYieldLimit();
356    const int batchSize = engine->getConfiguration().getDcpConsumerProcessBufferedMessagesBatchSize();
357    const int messages = n * (batchSize * yield);
358
359    // Force the stream to buffer rather than process messages immediately
360    const ssize_t queueCap = engine->getEpStats().replicationThrottleWriteQueueCap;
361    engine->getEpStats().replicationThrottleWriteQueueCap = 0;
362
363    // 1. Add the first message, a snapshot marker.
364    consumer->snapshotMarker(/*opaque*/1, vbid, /*startseq*/0,
365                             /*endseq*/messages, /*flags*/0);
366
367    // 2. Now add the rest as mutations.
368    for (int ii = 0; ii <= messages; ii++) {
369        std::string key = "key" + std::to_string(ii);
370        std::string value = "value";
371        consumer->mutation(/*opaque*/1, key.c_str(), key.length(),
372                           value.c_str(), value.length(), /*cas*/0,
373                           vbid, /*flags*/0, /*datatype*/0, /*locktime*/0,
374                           /*bySeqno*/ii, /*revSeqno*/0, /*exptime*/0,
375                           /*nru*/0, /*meta*/nullptr, /*nmeta*/0);
376    }
377
378    // Set the throttle back to the original value
379    engine->getEpStats().replicationThrottleWriteQueueCap = queueCap;
380
381    // Get our target stream ready.
382    static_cast<MockDcpConsumer*>(consumer.get())->public_notifyVbucketReady(vbid);
383
384    // 3. processBufferedItems returns more_to_process n times
385    for (int ii = 0; ii < n; ii++) {
386        EXPECT_EQ(more_to_process, consumer->processBufferedItems());
387    }
388
389    // 4. processBufferedItems returns a final all_processed
390    EXPECT_EQ(all_processed, consumer->processBufferedItems());
391
392    // Drop the stream
393    consumer->closeStream(/*opaque*/0, vbid);
394}
395
396/*
397 * Background thread used by MB20054_onDeleteItem_during_bucket_deletion
398 */
399static void MB20054_run_backfill_task(EventuallyPersistentEngine* engine,
400                                      CheckedExecutor& backfill,
401                                      SyncObject& backfill_cv,
402                                      SyncObject& destroy_cv,
403                                      TaskQueue* lpAuxioQ) {
404    std::unique_lock<std::mutex> destroy_lh(destroy_cv);
405    ObjectRegistry::onSwitchThread(engine);
406
407    // Run the BackfillManagerTask task to push items to readyQ. In sherlock
408    // upwards this runs multiple times - so should return true.
409    backfill.runCurrentTask("Backfilling items for a DCP Connection");
410
411    // Notify the main thread that it can progress with destroying the
412    // engine [A].
413    {
414        // if we can get the lock, then we know the main thread is waiting
415        std::lock_guard<std::mutex> backfill_lock(backfill_cv);
416        backfill_cv.notify_one(); // move the main thread along
417    }
418
419    // Now wait ourselves for destroy to be completed [B].
420    destroy_cv.wait(destroy_lh);
421
422    // This is the only "hacky" part of the test - we need to somehow
423    // keep the DCPBackfill task 'running' - i.e. not call
424    // completeCurrentTask - until the main thread is in
425    // ExecutorPool::_stopTaskGroup. However we have no way from the test
426    // to properly signal that we are *inside* _stopTaskGroup -
427    // called from EVPStore's destructor.
428    // Best we can do is spin on waiting for the DCPBackfill task to be
429    // set to 'dead' - and only then completeCurrentTask; which will
430    // cancel the task.
431    while (!backfill.getCurrentTask()->isdead()) {
432        // spin.
433    }
434    backfill.completeCurrentTask();
435}
436
437static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
438                                                   size_t nentries,
439                                                   const void *cookie) {
440    return ENGINE_SUCCESS;
441}
442
443// Test performs engine deletion interleaved with tasks so redefine TearDown
444// for this tests needs.
445class MB20054_SingleThreadedEPStoreTest : public SingleThreadedEPStoreTest {
446public:
447    void SetUp() {
448        SingleThreadedEPStoreTest::SetUp();
449    }
450
451    void TearDown() {
452        ExecutorPool::shutdown();
453    }
454};
455
456// Check that if onDeleteItem() is called during bucket deletion, we do not
457// abort due to not having a valid thread-local 'engine' pointer. This
458// has been observed when we have a DCPBackfill task which is deleted during
459// bucket shutdown, which has a non-zero number of Items which are destructed
460// (and call onDeleteItem).
461TEST_F(MB20054_SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
462    // Should start with no tasks registered on any queues.
463    TaskQ& lp_task_q = task_executor->getLpTaskQ();
464    for (auto& queue : lp_task_q) {
465        ASSERT_EQ(0, queue->getFutureQueueSize());
466        ASSERT_EQ(0, queue->getReadyQueueSize());
467    }
468
469    // [[1] Set our state to active.
470    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
471
472    // Perform one SET, then close it's checkpoint. This means that we no
473    // longer have all sequence numbers in memory checkpoints, forcing the
474    // DCP stream request to go to disk (backfill).
475    store_item(vbid, "key", "value");
476
477    // Force a new checkpoint.
478    RCPtr<VBucket> vb = store->getVbMap().getBucket(vbid);
479    CheckpointManager& ckpt_mgr = vb->checkpointManager;
480    ckpt_mgr.createNewCheckpoint();
481    auto lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
482    EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
483    EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
484
485    auto lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
486    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
487    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
488
489    // Directly flush the vbucket, ensuring data is on disk.
490    //  (This would normally also wake up the checkpoint remover task, but
491    //   as that task was never registered with the ExecutorPool in this test
492    //   environment, we need to manually remove the prev checkpoint).
493    EXPECT_EQ(1, store->flushVBucket(vbid));
494
495    bool new_ckpt_created;
496    EXPECT_EQ(1,
497              ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
498    vb.reset();
499
500    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
501    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
502
503    // Create a DCP producer, and start a stream request.
504    std::string name("test_producer");
505    EXPECT_EQ(ENGINE_SUCCESS,
506              engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
507                              DCP_OPEN_PRODUCER, name.data(), name.size()));
508
509    // Expect to have an ActiveStreamCheckpointProcessorTask, which is
510    // initially snoozed (so we can't run it).
511    EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
512    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
513
514    uint64_t rollbackSeqno;
515    // Actual stream request method (EvpDcpStreamReq) is static, so access via
516    // the engine_interface.
517    EXPECT_EQ(ENGINE_SUCCESS,
518              engine->dcp.stream_req(&engine->interface, cookie, /*flags*/0,
519                                     /*opaque*/0, /*vbucket*/vbid,
520                                     /*start_seqno*/0, /*end_seqno*/-1,
521                                     /*vb_uuid*/0xabcd, /*snap_start*/0,
522                                     /*snap_end*/0, &rollbackSeqno,
523                                     dummy_dcp_add_failover_cb));
524
525    // FutureQ should now have an additional DCPBackfill task.
526    EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
527    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
528
529    // Create an executor 'thread' to obtain shared ownership of the next
530    // AuxIO task (which should be BackfillManagerTask). As long as this
531    // object has it's currentTask set to BackfillManagerTask, the task
532    // will not be deleted.
533    // Essentially we are simulating a concurrent thread running this task.
534    CheckedExecutor backfill(task_executor, *lpAuxioQ);
535
536    // This is the one action we really need to perform 'concurrently' - delete
537    // the engine while a DCPBackfill task is still running. We spin up a
538    // separate thread which will run the DCPBackfill task
539    // concurrently with destroy - specifically DCPBackfill must start running
540    // (and add items to the readyQ) before destroy(), it must then continue
541    // running (stop after) _stopTaskGroup is invoked.
542    // To achieve this we use a couple of condition variables to synchronise
543    // between the two threads - the timeline needs to look like:
544    //
545    //  auxIO thread:  [------- DCPBackfill ----------]
546    //   main thread:          [destroy()]       [ExecutorPool::_stopTaskGroup]
547    //
548    //  --------------------------------------------------------> time
549    //
550    SyncObject backfill_cv;
551    SyncObject destroy_cv;
552    std::thread concurrent_task_thread;
553
554    {
555        // scope for the backfill lock
556        std::unique_lock<std::mutex> backfill_lh(backfill_cv);
557
558        concurrent_task_thread = std::thread(MB20054_run_backfill_task,
559                                             engine.get(),
560                                             std::ref(backfill),
561                                             std::ref(backfill_cv),
562                                             std::ref(destroy_cv),
563                                             lpAuxioQ);
564        // [A] Wait for DCPBackfill to complete.
565        backfill_cv.wait(backfill_lh);
566    }
567
568    ObjectRegistry::onSwitchThread(engine.get());
569    // 'Destroy' the engine - this doesn't delete the object, just shuts down
570    // connections, marks streams as dead etc.
571    engine->destroy(/*force*/false);
572
573    {
574        // If we can get the lock we know the thread is waiting for destroy.
575        std::lock_guard<std::mutex> lh(destroy_cv);
576        destroy_cv.notify_one(); // move the thread on.
577    }
578
579    // Force the one last task out of here
580    task_executor->cancelByName("Process checkpoint(s) for DCP producer");
581    runNextTask(*lpAuxioQ);
582
583    // Mark the connection as dead for clean shutdown
584    destroy_mock_cookie(cookie);
585    engine->getDcpConnMap().manageConnections();
586
587    // Nullify TLS engine and reset the smart pointer to force destruction.
588    // We need null as the engine to stop ~CheckedExecutor path from trying
589    // to touch the engine
590    ObjectRegistry::onSwitchThread(nullptr);
591    engine.reset();
592    destroy_mock_event_callbacks();
593    concurrent_task_thread.join();
594}
595
596/*
597 * MB-18953 is triggered by the executorpool wake path moving tasks directly
598 * into the readyQueue, thus allowing for high-priority tasks to dominiate
599 * a taskqueue.
600 */
601TEST_F(SingleThreadedEPStoreTest, MB18953_taskWake) {
602    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
603
604    class TestTask : public GlobalTask {
605    public:
606        TestTask(EventuallyPersistentEngine* e, TaskId id)
607          : GlobalTask(e, id, 0.0, false) {}
608
609        // returning true will also drive the ExecutorPool::reschedule path.
610        bool run() { return true; }
611
612        std::string getDescription() {
613            return std::string("TestTask ") + GlobalTask::getTaskName(getTypeId());
614        }
615    };
616
617    ExTask hpTask = new TestTask(engine.get(),
618                                 TaskId::PendingOpsNotification);
619    task_executor->schedule(hpTask, NONIO_TASK_IDX);
620
621    ExTask lpTask = new TestTask(engine.get(),
622                                 TaskId::DefragmenterTask);
623    task_executor->schedule(lpTask, NONIO_TASK_IDX);
624
625    runNextTask(lpNonioQ, "TestTask PendingOpsNotification"); // hptask goes first
626    // Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
627    lpNonioQ.wake(hpTask);
628
629    // Check 1 task is ready
630    EXPECT_EQ(1, task_executor->getTotReadyTasks());
631    EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
632
633    runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
634
635    // Run the tasks again to check that coming from ::reschedule our
636    // expectations are still met.
637    runNextTask(lpNonioQ, "TestTask PendingOpsNotification"); // hptask goes first
638
639    // Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
640    lpNonioQ.wake(hpTask);
641
642    // Check 1 task is ready
643    EXPECT_EQ(1, task_executor->getTotReadyTasks());
644    EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
645    runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
646}
647