1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "evp_store_test.h"
19
20#include "fakes/fake_executorpool.h"
21#include "taskqueue.h"
22#include "../mock/mock_dcp_producer.h"
23#include "../mock/mock_dcp_consumer.h"
24#include "../mock/mock_stream.h"
25#include "programs/engine_testapp/mock_server.h"
26
27#include <thread>
28
29/*
30 * A subclass of EventuallyPersistentStoreTest which uses a fake ExecutorPool,
31 * which will not spawn ExecutorThreads and hence not run any tasks
32 * automatically in the background. All tasks must be manually run().
33 */
34class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
35    void TearDown() {
36        shutdownAndPurgeTasks();
37        EventuallyPersistentStoreTest::TearDown();
38    }
39
40public:
41    /*
42     * Run the next task from the taskQ
43     * The task must match the expectedTaskName parameter
44     */
45    void runNextTask(TaskQueue& taskQ, const std::string& expectedTaskName) {
46        CheckedExecutor executor(task_executor, taskQ);
47
48        // Run the task
49        executor.runCurrentTask(expectedTaskName);
50        executor.completeCurrentTask();
51    }
52
53    /*
54     * Run the next task from the taskQ
55     */
56    void runNextTask(TaskQueue& taskQ) {
57        CheckedExecutor executor(task_executor, taskQ);
58
59        // Run the task
60        executor.runCurrentTask();
61        executor.completeCurrentTask();
62    }
63
64protected:
65    void SetUp() {
66        SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
67        EventuallyPersistentStoreTest::SetUp();
68
69        task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
70            (ExecutorPool::get());
71    }
72
73    /*
74     * Change the vbucket state and run the VBStatePeristTask
75     * On return the state will be changed and the task completed.
76     */
77    void setVBucketStateAndRunPersistTask(uint16_t vbid, vbucket_state_t newState) {
78        // Change state - this should add 1 set_vbucket_state op to the
79        //VBuckets' persistence queue.
80        EXPECT_EQ(ENGINE_SUCCESS,
81                  store->setVBucketState(vbid, newState, /*transfer*/false));
82
83        // Trigger the flusher to flush state to disk.
84        EXPECT_EQ(0, store->flushVBucket(vbid));
85    }
86
87    /*
88     * Set the stats isShutdown and attempt to drive all tasks to cancel
89     */
90    void shutdownAndPurgeTasks() {
91        engine->getEpStats().isShutdown = true;
92        task_executor->cancelAll();
93
94        for (task_type_t t :
95             {WRITER_TASK_IDX, READER_TASK_IDX, AUXIO_TASK_IDX, NONIO_TASK_IDX}) {
96
97            // Define a lambda to drive all tasks from the queue, if hpTaskQ
98            // is implemented then trivial to add a second call to runTasks.
99            auto runTasks = [=](TaskQueue& queue) {
100                while (queue.getFutureQueueSize() > 0 || queue.getReadyQueueSize() > 0) {
101                    runNextTask(queue);
102                }
103            };
104            runTasks(*task_executor->getLpTaskQ()[t]);
105            task_executor->stopTaskGroup(engine->getTaskable().getGID(), t,
106                                         engine->getEpStats().forceShutdown);
107        }
108    }
109
110    /*
111     * Fake callback emulating dcp_add_failover_log
112     */
113    static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(vbucket_failover_t* entry,
114                                                   size_t nentries,
115                                                   const void *cookie) {
116        return ENGINE_SUCCESS;
117    }
118
119    SingleThreadedExecutorPool* task_executor;
120};
121
122
123/**
124 * Regression test for MB-22451: When handleSlowStream is called and in
125 * STREAM_BACKFILLING state and currently have a backfill scheduled (or running)
126 * ensure that when the backfill completes the new backfill is scheduled and
127 * the backfilling flag remains true.
128 */
129TEST_F(SingleThreadedEPStoreTest, test_mb22451) {
130    // Make vbucket active.
131    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
132    // Store a single Item
133    store_item(vbid, "key", "value");
134    // Ensure that it has persisted to disk
135    flush_vbucket_to_disk(vbid);
136
137    // Create a Mock Dcp producer
138    dcp_producer_t producer = new MockDcpProducer(*engine,
139                                                  cookie,
140                                                  "test_producer",
141                                                  /*notifyOnly*/false);
142    // Create a Mock Active Stream
143    stream_t stream = new MockActiveStream(
144            static_cast<EventuallyPersistentEngine*>(engine.get()),
145            producer,
146            producer->getName(),
147            /*flags*/0,
148            /*opaque*/0, vbid,
149            /*st_seqno*/0,
150            /*en_seqno*/~0,
151            /*vb_uuid*/0xabcd,
152            /*snap_start_seqno*/0,
153            /*snap_end_seqno*/~0);
154
155    MockActiveStream* mock_stream =
156            static_cast<MockActiveStream*>(stream.get());
157
158    /**
159      * The core of the test follows:
160      * Call completeBackfill whilst we are in the state of STREAM_BACKFILLING
161      * and the pendingBackfill flag is set to true.
162      * We expect that on leaving completeBackfill the isBackfillRunning flag is
163      * set to true.
164      */
165    mock_stream->public_setBackfillTaskRunning(true);
166    mock_stream->public_transitionState(STREAM_BACKFILLING);
167    mock_stream->handleSlowStream();
168    // The call to handleSlowStream should result in setting pendingBackfill
169    // flag to true
170    EXPECT_TRUE(mock_stream->public_getPendingBackfill())
171        << "pendingBackfill is not true";
172    mock_stream->completeBackfill();
173    EXPECT_TRUE(mock_stream->public_isBackfillTaskRunning())
174        << "isBackfillRunning is not true";
175
176    // Required to ensure that the backfillMgr is deleted
177    producer->closeAllStreams();
178}
179
180/* Regression / reproducer test for MB-19695 - an exception is thrown
181 * (and connection disconnected) if a couchstore file hasn't been re-created
182 * yet when doTapVbTakeoverStats() is called as part of
183 * tapNotify / TAP_OPAQUE_INITIAL_VBUCKET_STREAM.
184 */
185TEST_F(SingleThreadedEPStoreTest, MB19695_doTapVbTakeoverStats) {
186    auto* task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
187        (ExecutorPool::get());
188
189    // Should start with no tasks registered on any queues.
190    for (auto& queue : task_executor->getLpTaskQ()) {
191        ASSERT_EQ(0, queue->getFutureQueueSize());
192        ASSERT_EQ(0, queue->getReadyQueueSize());
193    }
194
195    // [[1] Set our state to replica.
196    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
197
198    auto& lpWriterQ = *task_executor->getLpTaskQ()[WRITER_TASK_IDX];
199    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
200
201    // [[2]] Perform a vbucket reset. This will perform some work synchronously,
202    // but also created 2 tasks and notifies the flusher:
203    //   1. vbucket memory deletion (NONIO)
204    //   2. vbucket disk deletion (WRITER)
205    //   3. FlusherTask notified (WRITER)
206    // MB-19695: If we try to get the number of persisted deletes between
207    // steps (2) and (3) running then an exception is thrown (and client
208    // disconnected).
209    EXPECT_TRUE(store->resetVBucket(vbid));
210
211    runNextTask(lpNonioQ, "Removing (dead) vb:0 from memory");
212    runNextTask(lpWriterQ, "Deleting VBucket:0");
213
214    // [[2]] Ok, let's see if we can get TAP takeover stats. This will
215    // fail with MB-19695.
216    // Dummy callback to pass into the stats function below.
217    auto dummy_cb = [](const char *key, const uint16_t klen,
218                          const char *val, const uint32_t vlen,
219                          const void *cookie) {};
220    std::string key{"MB19695_doTapVbTakeoverStats"};
221    EXPECT_NO_THROW(engine->public_doTapVbTakeoverStats
222                    (nullptr, dummy_cb, key, vbid));
223
224    // Also check DCP variant (MB-19815)
225    EXPECT_NO_THROW(engine->public_doDcpVbTakeoverStats
226                    (nullptr, dummy_cb, key, vbid));
227
228    // Cleanup - run flusher.
229    EXPECT_EQ(0, store->flushVBucket(vbid));
230}
231
232/*
233 * Test that
234 * 1. We cannot create a stream against a dead vb (MB-17230)
235 * 2. No tasks are scheduled as a side-effect of the streamRequest attempt.
236 */
237TEST_F(SingleThreadedEPStoreTest, MB19428_no_streams_against_dead_vbucket) {
238    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
239
240    store_item(vbid, "key", "value");
241
242    // Directly flush the vbucket
243    EXPECT_EQ(1, store->flushVBucket(vbid));
244
245    setVBucketStateAndRunPersistTask(vbid, vbucket_state_dead);
246    auto* task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
247        (ExecutorPool::get());
248    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
249
250    {
251        // Create a Mock Dcp producer
252        dcp_producer_t producer = new MockDcpProducer(*engine,
253                                                      cookie,
254                                                      "test_producer",
255                                                      /*notifyOnly*/false);
256
257        // Creating a producer will schedule one ActiveStreamCheckpointProcessorTask
258        // that task though sleeps forever, so won't run until woken.
259        EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
260
261        uint64_t rollbackSeqno;
262        auto err = producer->streamRequest(/*flags*/0,
263                                           /*opaque*/0,
264                                           /*vbucket*/vbid,
265                                           /*start_seqno*/0,
266                                           /*end_seqno*/-1,
267                                           /*vb_uuid*/0xabcd,
268                                           /*snap_start*/0,
269                                           /*snap_end*/0,
270                                           &rollbackSeqno,
271                                           SingleThreadedEPStoreTest::fakeDcpAddFailoverLog);
272
273        EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, err) << "Unexpected error code";
274
275        // The streamRequest failed and should not of created anymore tasks.
276        EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
277    }
278}
279
280// Check that in-progress disk backfills (`CouchKVStore::backfill`) are
281// correctly deleted when we delete a bucket. If not then we leak vBucket file
282// descriptors, which can prevent ns_server from cleaning up old vBucket files
283// and consequently re-adding a node to the cluster.
284//
285TEST_F(SingleThreadedEPStoreTest, MB19892_BackfillNotDeleted) {
286    // Make vbucket active.
287    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
288
289    // Perform one SET, then close it's checkpoint. This means that we no
290    // longer have all sequence numbers in memory checkpoints, forcing the
291    // DCP stream request to go to disk (backfill).
292    store_item(vbid, "key", "value");
293
294    // Force a new checkpoint.
295    auto vb = store->getVbMap().getBucket(vbid);
296    auto& ckpt_mgr = vb->checkpointManager;
297    ckpt_mgr.createNewCheckpoint();
298
299    // Directly flush the vbucket, ensuring data is on disk.
300    //  (This would normally also wake up the checkpoint remover task, but
301    //   as that task was never registered with the ExecutorPool in this test
302    //   environment, we need to manually remove the prev checkpoint).
303    EXPECT_EQ(1, store->flushVBucket(vbid));
304
305    bool new_ckpt_created;
306    EXPECT_EQ(1,
307              ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
308
309    // Create a DCP producer, and start a stream request.
310    std::string name{"test_producer"};
311    EXPECT_EQ(ENGINE_SUCCESS,
312              engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
313                              DCP_OPEN_PRODUCER, name.data(), name.size()));
314
315    uint64_t rollbackSeqno;
316    auto dummy_dcp_add_failover_cb = [](vbucket_failover_t* entry,
317                                       size_t nentries, const void *cookie) {
318        return ENGINE_SUCCESS;
319    };
320
321    // Actual stream request method (EvpDcpStreamReq) is static, so access via
322    // the engine_interface.
323    EXPECT_EQ(ENGINE_SUCCESS,
324              engine.get()->dcp.stream_req(
325                      &engine.get()->interface, cookie, /*flags*/0,
326                      /*opaque*/0, /*vbucket*/vbid, /*start_seqno*/0,
327                      /*end_seqno*/-1, /*vb_uuid*/0xabcd, /*snap_start*/0,
328                      /*snap_end*/0, &rollbackSeqno,
329                      dummy_dcp_add_failover_cb));
330}
331
332/*
333 * Test that the DCP processor returns a 'yield' return code when
334 * working on a large enough buffer size.
335 */
336TEST_F(SingleThreadedEPStoreTest, MB18452_yield_dcp_processor) {
337
338    // We need a replica VB
339    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
340
341    // Create a MockDcpConsumer
342    dcp_consumer_t consumer = new MockDcpConsumer(*engine, cookie, "test");
343
344    // Add the stream
345    EXPECT_EQ(ENGINE_SUCCESS,
346              consumer->addStream(/*opaque*/0, vbid, /*flags*/0));
347
348    // The processBufferedItems should yield every "yield * batchSize"
349    // So add '(n * (yield * batchSize)) + 1' messages and we should see
350    // processBufferedMessages return 'more_to_process' 'n' times and then
351    // 'all_processed' once.
352    const int n = 4;
353    const int yield = engine->getConfiguration().getDcpConsumerProcessBufferedMessagesYieldLimit();
354    const int batchSize = engine->getConfiguration().getDcpConsumerProcessBufferedMessagesBatchSize();
355    const int messages = n * (batchSize * yield);
356
357    // Force the stream to buffer rather than process messages immediately
358    const ssize_t queueCap = engine->getEpStats().replicationThrottleWriteQueueCap;
359    engine->getEpStats().replicationThrottleWriteQueueCap = 0;
360
361    // 1. Add the first message, a snapshot marker.
362    consumer->snapshotMarker(/*opaque*/1, vbid, /*startseq*/0,
363                             /*endseq*/messages, /*flags*/0);
364
365    // 2. Now add the rest as mutations.
366    for (int ii = 0; ii <= messages; ii++) {
367        std::string key = "key" + std::to_string(ii);
368        std::string value = "value";
369        consumer->mutation(/*opaque*/1, key.c_str(), key.length(),
370                           value.c_str(), value.length(), /*cas*/0,
371                           vbid, /*flags*/0, /*datatype*/0, /*locktime*/0,
372                           /*bySeqno*/ii, /*revSeqno*/0, /*exptime*/0,
373                           /*nru*/0, /*meta*/nullptr, /*nmeta*/0);
374    }
375
376    // Set the throttle back to the original value
377    engine->getEpStats().replicationThrottleWriteQueueCap = queueCap;
378
379    // Get our target stream ready.
380    static_cast<MockDcpConsumer*>(consumer.get())->public_notifyVbucketReady(vbid);
381
382    // 3. processBufferedItems returns more_to_process n times
383    for (int ii = 0; ii < n; ii++) {
384        EXPECT_EQ(more_to_process, consumer->processBufferedItems());
385    }
386
387    // 4. processBufferedItems returns a final all_processed
388    EXPECT_EQ(all_processed, consumer->processBufferedItems());
389
390    // Drop the stream
391    consumer->closeStream(/*opaque*/0, vbid);
392}
393
394/*
395 * Background thread used by MB20054_onDeleteItem_during_bucket_deletion
396 */
397static void MB20054_run_backfill_task(EventuallyPersistentEngine* engine,
398                                      CheckedExecutor& backfill,
399                                      SyncObject& backfill_cv,
400                                      SyncObject& destroy_cv,
401                                      TaskQueue* lpAuxioQ) {
402    std::unique_lock<std::mutex> destroy_lh(destroy_cv);
403    ObjectRegistry::onSwitchThread(engine);
404
405    // Run the BackfillManagerTask task to push items to readyQ. In sherlock
406    // upwards this runs multiple times - so should return true.
407    backfill.runCurrentTask("Backfilling items for a DCP Connection");
408
409    // Notify the main thread that it can progress with destroying the
410    // engine [A].
411    {
412        // if we can get the lock, then we know the main thread is waiting
413        std::lock_guard<std::mutex> backfill_lock(backfill_cv);
414        backfill_cv.notify_one(); // move the main thread along
415    }
416
417    // Now wait ourselves for destroy to be completed [B].
418    destroy_cv.wait(destroy_lh);
419
420    // This is the only "hacky" part of the test - we need to somehow
421    // keep the DCPBackfill task 'running' - i.e. not call
422    // completeCurrentTask - until the main thread is in
423    // ExecutorPool::_stopTaskGroup. However we have no way from the test
424    // to properly signal that we are *inside* _stopTaskGroup -
425    // called from EVPStore's destructor.
426    // Best we can do is spin on waiting for the DCPBackfill task to be
427    // set to 'dead' - and only then completeCurrentTask; which will
428    // cancel the task.
429    while (!backfill.getCurrentTask()->isdead()) {
430        // spin.
431    }
432    backfill.completeCurrentTask();
433}
434
435static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
436                                                   size_t nentries,
437                                                   const void *cookie) {
438    return ENGINE_SUCCESS;
439}
440
441// Test performs engine deletion interleaved with tasks so redefine TearDown
442// for this tests needs.
443class MB20054_SingleThreadedEPStoreTest : public SingleThreadedEPStoreTest {
444public:
445    void SetUp() {
446        SingleThreadedEPStoreTest::SetUp();
447    }
448
449    void TearDown() {
450        ExecutorPool::shutdown();
451    }
452};
453
454// Check that if onDeleteItem() is called during bucket deletion, we do not
455// abort due to not having a valid thread-local 'engine' pointer. This
456// has been observed when we have a DCPBackfill task which is deleted during
457// bucket shutdown, which has a non-zero number of Items which are destructed
458// (and call onDeleteItem).
459TEST_F(MB20054_SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
460    // Should start with no tasks registered on any queues.
461    TaskQ& lp_task_q = task_executor->getLpTaskQ();
462    for (auto& queue : lp_task_q) {
463        ASSERT_EQ(0, queue->getFutureQueueSize());
464        ASSERT_EQ(0, queue->getReadyQueueSize());
465    }
466
467    // [[1] Set our state to active.
468    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
469
470    // Perform one SET, then close it's checkpoint. This means that we no
471    // longer have all sequence numbers in memory checkpoints, forcing the
472    // DCP stream request to go to disk (backfill).
473    store_item(vbid, "key", "value");
474
475    // Force a new checkpoint.
476    RCPtr<VBucket> vb = store->getVbMap().getBucket(vbid);
477    CheckpointManager& ckpt_mgr = vb->checkpointManager;
478    ckpt_mgr.createNewCheckpoint();
479    auto lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
480    EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
481    EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
482
483    auto lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
484    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
485    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
486
487    // Directly flush the vbucket, ensuring data is on disk.
488    //  (This would normally also wake up the checkpoint remover task, but
489    //   as that task was never registered with the ExecutorPool in this test
490    //   environment, we need to manually remove the prev checkpoint).
491    EXPECT_EQ(1, store->flushVBucket(vbid));
492
493    bool new_ckpt_created;
494    EXPECT_EQ(1,
495              ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
496    vb.reset();
497
498    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
499    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
500
501    // Create a DCP producer, and start a stream request.
502    std::string name("test_producer");
503    EXPECT_EQ(ENGINE_SUCCESS,
504              engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
505                              DCP_OPEN_PRODUCER, name.data(), name.size()));
506
507    // Expect to have an ActiveStreamCheckpointProcessorTask, which is
508    // initially snoozed (so we can't run it).
509    EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
510    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
511
512    uint64_t rollbackSeqno;
513    // Actual stream request method (EvpDcpStreamReq) is static, so access via
514    // the engine_interface.
515    EXPECT_EQ(ENGINE_SUCCESS,
516              engine->dcp.stream_req(&engine->interface, cookie, /*flags*/0,
517                                     /*opaque*/0, /*vbucket*/vbid,
518                                     /*start_seqno*/0, /*end_seqno*/-1,
519                                     /*vb_uuid*/0xabcd, /*snap_start*/0,
520                                     /*snap_end*/0, &rollbackSeqno,
521                                     dummy_dcp_add_failover_cb));
522
523    // FutureQ should now have an additional DCPBackfill task.
524    EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
525    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
526
527    // Create an executor 'thread' to obtain shared ownership of the next
528    // AuxIO task (which should be BackfillManagerTask). As long as this
529    // object has it's currentTask set to BackfillManagerTask, the task
530    // will not be deleted.
531    // Essentially we are simulating a concurrent thread running this task.
532    CheckedExecutor backfill(task_executor, *lpAuxioQ);
533
534    // This is the one action we really need to perform 'concurrently' - delete
535    // the engine while a DCPBackfill task is still running. We spin up a
536    // separate thread which will run the DCPBackfill task
537    // concurrently with destroy - specifically DCPBackfill must start running
538    // (and add items to the readyQ) before destroy(), it must then continue
539    // running (stop after) _stopTaskGroup is invoked.
540    // To achieve this we use a couple of condition variables to synchronise
541    // between the two threads - the timeline needs to look like:
542    //
543    //  auxIO thread:  [------- DCPBackfill ----------]
544    //   main thread:          [destroy()]       [ExecutorPool::_stopTaskGroup]
545    //
546    //  --------------------------------------------------------> time
547    //
548    SyncObject backfill_cv;
549    SyncObject destroy_cv;
550    std::thread concurrent_task_thread;
551
552    {
553        // scope for the backfill lock
554        std::unique_lock<std::mutex> backfill_lh(backfill_cv);
555
556        concurrent_task_thread = std::thread(MB20054_run_backfill_task,
557                                             engine.get(),
558                                             std::ref(backfill),
559                                             std::ref(backfill_cv),
560                                             std::ref(destroy_cv),
561                                             lpAuxioQ);
562        // [A] Wait for DCPBackfill to complete.
563        backfill_cv.wait(backfill_lh);
564    }
565
566    ObjectRegistry::onSwitchThread(engine.get());
567    // 'Destroy' the engine - this doesn't delete the object, just shuts down
568    // connections, marks streams as dead etc.
569    engine->destroy(/*force*/false);
570
571    {
572        // If we can get the lock we know the thread is waiting for destroy.
573        std::lock_guard<std::mutex> lh(destroy_cv);
574        destroy_cv.notify_one(); // move the thread on.
575    }
576
577    // Force the one last task out of here
578    task_executor->cancelByName("Process checkpoint(s) for DCP producer");
579    runNextTask(*lpAuxioQ);
580
581    // Mark the connection as dead for clean shutdown
582    destroy_mock_cookie(cookie);
583    engine->getDcpConnMap().manageConnections();
584
585    // Nullify TLS engine and reset the smart pointer to force destruction.
586    // We need null as the engine to stop ~CheckedExecutor path from trying
587    // to touch the engine
588    ObjectRegistry::onSwitchThread(nullptr);
589    engine.reset();
590    destroy_mock_event_callbacks();
591    concurrent_task_thread.join();
592}
593
594/*
595 * MB-18953 is triggered by the executorpool wake path moving tasks directly
596 * into the readyQueue, thus allowing for high-priority tasks to dominiate
597 * a taskqueue.
598 */
599TEST_F(SingleThreadedEPStoreTest, MB18953_taskWake) {
600    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
601
602    class TestTask : public GlobalTask {
603    public:
604        TestTask(EventuallyPersistentEngine* e, TaskId id)
605          : GlobalTask(e, id, 0.0, false) {}
606
607        // returning true will also drive the ExecutorPool::reschedule path.
608        bool run() { return true; }
609
610        std::string getDescription() {
611            return std::string("TestTask ") + GlobalTask::getTaskName(getTypeId());
612        }
613    };
614
615    ExTask hpTask = new TestTask(engine.get(),
616                                 TaskId::PendingOpsNotification);
617    task_executor->schedule(hpTask, NONIO_TASK_IDX);
618
619    ExTask lpTask = new TestTask(engine.get(),
620                                 TaskId::DefragmenterTask);
621    task_executor->schedule(lpTask, NONIO_TASK_IDX);
622
623    runNextTask(lpNonioQ, "TestTask PendingOpsNotification"); // hptask goes first
624    // Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
625    lpNonioQ.wake(hpTask);
626
627    // Check 1 task is ready
628    EXPECT_EQ(1, task_executor->getTotReadyTasks());
629    EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
630
631    runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
632
633    // Run the tasks again to check that coming from ::reschedule our
634    // expectations are still met.
635    runNextTask(lpNonioQ, "TestTask PendingOpsNotification"); // hptask goes first
636
637    // Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
638    lpNonioQ.wake(hpTask);
639
640    // Check 1 task is ready
641    EXPECT_EQ(1, task_executor->getTotReadyTasks());
642    EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
643    runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
644}
645