1e32900fbSTrond Norbye/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2e32900fbSTrond Norbye/*
3216d722bSDave Rigby *     Copyright 2015 Couchbase, Inc.
4e32900fbSTrond Norbye *
5e32900fbSTrond Norbye *   Licensed under the Apache License, Version 2.0 (the "License");
6e32900fbSTrond Norbye *   you may not use this file except in compliance with the License.
7e32900fbSTrond Norbye *   You may obtain a copy of the License at
8e32900fbSTrond Norbye *
9e32900fbSTrond Norbye *       http://www.apache.org/licenses/LICENSE-2.0
10e32900fbSTrond Norbye *
11e32900fbSTrond Norbye *   Unless required by applicable law or agreed to in writing, software
12e32900fbSTrond Norbye *   distributed under the License is distributed on an "AS IS" BASIS,
13e32900fbSTrond Norbye *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14e32900fbSTrond Norbye *   See the License for the specific language governing permissions and
15e32900fbSTrond Norbye *   limitations under the License.
16e32900fbSTrond Norbye */
17e32900fbSTrond Norbye#include "config.h"
18e32900fbSTrond Norbye
19e32900fbSTrond Norbye#include "taskqueue.h"
201c55c549STrond Norbye#include "executorpool.h"
21b6534b8aSSundar Sridharan#include "executorthread.h"
22e32900fbSTrond Norbye
23d8577c54SDaniel Owen#include <cmath>
24d8577c54SDaniel Owen
25e32900fbSTrond NorbyeTaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
26b6534b8aSSundar Sridharan    name(nm), queueType(t), manager(m), sleepers(0)
27e32900fbSTrond Norbye{
28e32900fbSTrond Norbye    // EMPTY
29e32900fbSTrond Norbye}
30e32900fbSTrond Norbye
31e32900fbSTrond NorbyeTaskQueue::~TaskQueue() {
32e32900fbSTrond Norbye    LOG(EXTENSION_LOG_INFO, "Task Queue killing %s", name.c_str());
33e32900fbSTrond Norbye}
34e32900fbSTrond Norbye
35e32900fbSTrond Norbyeconst std::string TaskQueue::getName() const {
36e32900fbSTrond Norbye    return (name+taskType2Str(queueType));
37e32900fbSTrond Norbye}
38e32900fbSTrond Norbye
39f1770e55SDave Rigbysize_t TaskQueue::getReadyQueueSize() {
40f1770e55SDave Rigby    LockHolder lh(mutex);
41f1770e55SDave Rigby    return readyQueue.size();
42f1770e55SDave Rigby}
43f1770e55SDave Rigby
44f1770e55SDave Rigbysize_t TaskQueue::getFutureQueueSize() {
45f1770e55SDave Rigby    LockHolder lh(mutex);
46f1770e55SDave Rigby    return futureQueue.size();
47f1770e55SDave Rigby}
48f1770e55SDave Rigby
49f1770e55SDave Rigbysize_t TaskQueue::getPendingQueueSize() {
50f1770e55SDave Rigby    LockHolder lh(mutex);
51f1770e55SDave Rigby    return pendingQueue.size();
52f1770e55SDave Rigby}
53f1770e55SDave Rigby
542900340fSChiyoung SeoExTask TaskQueue::_popReadyTask(void) {
55e32900fbSTrond Norbye    ExTask t = readyQueue.top();
56e32900fbSTrond Norbye    readyQueue.pop();
5780656224SSundar Sridharan    manager->lessWork(queueType);
58e32900fbSTrond Norbye    return t;
59e32900fbSTrond Norbye}
60e32900fbSTrond Norbye
61b6534b8aSSundar Sridharanvoid TaskQueue::doWake(size_t &numToWake) {
62e32900fbSTrond Norbye    LockHolder lh(mutex);
63b6534b8aSSundar Sridharan    _doWake_UNLOCKED(numToWake);
64b6534b8aSSundar Sridharan}
65e32900fbSTrond Norbye
66b6534b8aSSundar Sridharanvoid TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
67b6534b8aSSundar Sridharan    if (sleepers && numToWake)  {
68b6534b8aSSundar Sridharan        if (numToWake < sleepers) {
69b6534b8aSSundar Sridharan            for (; numToWake; --numToWake) {
70df3730beSDave Rigby                mutex.notify_one(); // cond_signal 1
71b6534b8aSSundar Sridharan            }
72b6534b8aSSundar Sridharan        } else {
73df3730beSDave Rigby            mutex.notify_all(); // cond_broadcast
74b6534b8aSSundar Sridharan            numToWake -= sleepers;
75b6534b8aSSundar Sridharan        }
76e32900fbSTrond Norbye    }
77b6534b8aSSundar Sridharan}
78e32900fbSTrond Norbye
79b9334207SDave Rigbybool TaskQueue::_doSleep(ExecutorThread &t,
80b9334207SDave Rigby                         std::unique_lock<std::mutex>& lock) {
81d8577c54SDaniel Owen    t.updateCurrentTime();
82e629ac43SDave Rigby
83e629ac43SDave Rigby    // Determine the time point to wake this thread - either "forever" if the
84e629ac43SDave Rigby    // futureQueue is empty, or the earliest wake time in the futureQueue.
85e629ac43SDave Rigby    const auto wakeTime = futureQueue.empty()
86e629ac43SDave Rigby                                  ? std::chrono::steady_clock::time_point::max()
87e629ac43SDave Rigby                                  : futureQueue.top()->getWaketime();
88e629ac43SDave Rigby
89e629ac43SDave Rigby    if (t.getCurTime() < wakeTime && manager->trySleep(queueType)) {
902bfb51a1SDave Rigby        // Atomically switch from running to sleeping; iff we were previously
912bfb51a1SDave Rigby        // running.
922bfb51a1SDave Rigby        executor_state_t expected_state = EXECUTOR_RUNNING;
932bfb51a1SDave Rigby        if (!t.state.compare_exchange_strong(expected_state,
942bfb51a1SDave Rigby                                             EXECUTOR_SLEEPING)) {
95b6534b8aSSundar Sridharan            return false;
96e32900fbSTrond Norbye        }
97b6534b8aSSundar Sridharan        sleepers++;
98b6534b8aSSundar Sridharan        // zzz....
99e629ac43SDave Rigby        const auto snooze = wakeTime - t.getCurTime();
1005f3eb257SSundar Sridharan
101d8577c54SDaniel Owen        if (snooze > std::chrono::seconds((int)round(MIN_SLEEP_TIME))) {
102b9334207SDave Rigby            mutex.wait_for(lock, MIN_SLEEP_TIME);
103b6534b8aSSundar Sridharan        } else {
104d8577c54SDaniel Owen            mutex.wait_for(lock, snooze);
105b6534b8aSSundar Sridharan        }
106b6534b8aSSundar Sridharan        // ... woke!
107b6534b8aSSundar Sridharan        sleepers--;
108b6534b8aSSundar Sridharan        manager->woke();
109b6534b8aSSundar Sridharan
1102bfb51a1SDave Rigby        // Finished our sleep, atomically switch back to running iff we were
1112bfb51a1SDave Rigby        // previously sleeping.
1122bfb51a1SDave Rigby        expected_state = EXECUTOR_SLEEPING;
1132bfb51a1SDave Rigby        if (!t.state.compare_exchange_strong(expected_state,
1142bfb51a1SDave Rigby                                             EXECUTOR_RUNNING)) {
115b6534b8aSSundar Sridharan            return false;
116b6534b8aSSundar Sridharan        }
117d8577c54SDaniel Owen        t.updateCurrentTime();
118b6534b8aSSundar Sridharan    }
119e7ed2862SDave Rigby
120b6534b8aSSundar Sridharan    return true;
121b6534b8aSSundar Sridharan}
122b6534b8aSSundar Sridharan
123b6534b8aSSundar Sridharanbool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
124b6534b8aSSundar Sridharan    bool ret = false;
125b9334207SDave Rigby    std::unique_lock<std::mutex> lh(mutex);
126b6534b8aSSundar Sridharan
127b9334207SDave Rigby    if (toSleep && !_doSleep(t, lh)) {
128b6534b8aSSundar Sridharan        return ret; // shutting down
129b6534b8aSSundar Sridharan    }
130b6534b8aSSundar Sridharan
131d8577c54SDaniel Owen    size_t numToWake = _moveReadyTasks(t.getCurTime());
132b6534b8aSSundar Sridharan
133b6534b8aSSundar Sridharan    if (!readyQueue.empty() && readyQueue.top()->isdead()) {
134052a2cb2SDave Rigby        t.setCurrentTask(_popReadyTask()); // clean out dead tasks first
135b6534b8aSSundar Sridharan        ret = true;
136b6534b8aSSundar Sridharan    } else if (!readyQueue.empty() || !pendingQueue.empty()) {
137e7ce1c4fSJames Harrison        // we must consider any pending tasks too. To ensure prioritized run
138e7ce1c4fSJames Harrison        // order, the function below will push any pending task back into the
139e7ce1c4fSJames Harrison        // readyQueue (sorted by priority)
140e7ce1c4fSJames Harrison        _checkPendingQueue();
141e7ce1c4fSJames Harrison        ExTask tid = _popReadyTask(); // and pop out the top task
142e7ce1c4fSJames Harrison        t.setCurrentTask(tid);
143e7ce1c4fSJames Harrison        ret = true;
144e7ce1c4fSJames Harrison    } else { // Let the task continue waiting in pendingQueue
145e7ce1c4fSJames Harrison        numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
146e32900fbSTrond Norbye    }
147e32900fbSTrond Norbye
148b6534b8aSSundar Sridharan    _doWake_UNLOCKED(numToWake);
149b6534b8aSSundar Sridharan    lh.unlock();
150b6534b8aSSundar Sridharan
151b6534b8aSSundar Sridharan    return ret;
152e32900fbSTrond Norbye}
153e32900fbSTrond Norbye
154b6534b8aSSundar Sridharanbool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
1552900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
1561b4f629dSJim Walker    bool rv = _fetchNextTask(thread, toSleep);
1572900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
1582900340fSChiyoung Seo    return rv;
1592900340fSChiyoung Seo}
1602900340fSChiyoung Seo
161d8577c54SDaniel Owensize_t TaskQueue::_moveReadyTasks(const ProcessClock::time_point tv) {
162e32900fbSTrond Norbye    if (!readyQueue.empty()) {
163b6534b8aSSundar Sridharan        return 0;
164e32900fbSTrond Norbye    }
165e32900fbSTrond Norbye
166d197ec73SSundar Sridharan    size_t numReady = 0;
167e32900fbSTrond Norbye    while (!futureQueue.empty()) {
168e32900fbSTrond Norbye        ExTask tid = futureQueue.top();
169b4a6687cSabhinavdangeti        if (tid->getWaketime() <= tv) {
170d197ec73SSundar Sridharan            futureQueue.pop();
171d197ec73SSundar Sridharan            readyQueue.push(tid);
172d197ec73SSundar Sridharan            numReady++;
173e32900fbSTrond Norbye        } else {
174d197ec73SSundar Sridharan            break;
175e32900fbSTrond Norbye        }
176d197ec73SSundar Sridharan    }
177d197ec73SSundar Sridharan
17880656224SSundar Sridharan    manager->addWork(numReady, queueType);
179b6534b8aSSundar Sridharan
180b6534b8aSSundar Sridharan    // Current thread will pop one task, so wake up one less thread
181b6534b8aSSundar Sridharan    return numReady ? numReady - 1 : 0;
182e32900fbSTrond Norbye}
183e32900fbSTrond Norbye
1842900340fSChiyoung Seovoid TaskQueue::_checkPendingQueue(void) {
185495aaee4SSundar Sridharan    if (!pendingQueue.empty()) {
186495aaee4SSundar Sridharan        ExTask runnableTask = pendingQueue.front();
187d197ec73SSundar Sridharan        readyQueue.push(runnableTask);
18880656224SSundar Sridharan        manager->addWork(1, queueType);
189495aaee4SSundar Sridharan        pendingQueue.pop_front();
190495aaee4SSundar Sridharan    }
191495aaee4SSundar Sridharan}
192e32900fbSTrond Norbye
1937f156521SDave RigbyProcessClock::time_point TaskQueue::_reschedule(ExTask &task) {
194e32900fbSTrond Norbye    LockHolder lh(mutex);
195495aaee4SSundar Sridharan
196e32900fbSTrond Norbye    futureQueue.push(task);
1977f156521SDave Rigby    return futureQueue.top()->getWaketime();
198e32900fbSTrond Norbye}
199e32900fbSTrond Norbye
2007f156521SDave RigbyProcessClock::time_point TaskQueue::reschedule(ExTask &task) {
2012900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
2027f156521SDave Rigby    auto rv = _reschedule(task);
2032900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
2042900340fSChiyoung Seo    return rv;
2052900340fSChiyoung Seo}
2062900340fSChiyoung Seo
2072900340fSChiyoung Seovoid TaskQueue::_schedule(ExTask &task) {
2083e85e50aSWill Gardner    TaskQueue* sleepQ;
2093e85e50aSWill Gardner    size_t numToWake = 1;
210495aaee4SSundar Sridharan
2113e85e50aSWill Gardner    {
2123e85e50aSWill Gardner        LockHolder lh(mutex);
213495aaee4SSundar Sridharan
21466b2fdaeSJames Harrison        // If we are rescheduling a previously cancelled task, we should reset
21566b2fdaeSJames Harrison        // the task state to the initial value of running.
216fe6b9e03SDave Rigby        task->setState(TASK_RUNNING, TASK_DEAD);
21766b2fdaeSJames Harrison
2183e85e50aSWill Gardner        futureQueue.push(task);
219b6534b8aSSundar Sridharan
220d26ea7d1SDave Rigby        LOG(EXTENSION_LOG_DEBUG,
221d26ea7d1SDave Rigby            "%s: Schedule a task \"%.*s\" id %" PRIu64,
222d26ea7d1SDave Rigby            name.c_str(),
223d26ea7d1SDave Rigby            int(task->getDescription().size()),
224d26ea7d1SDave Rigby            task->getDescription().data(),
225d26ea7d1SDave Rigby            uint64_t(task->getId()));
2263e85e50aSWill Gardner
2273e85e50aSWill Gardner        sleepQ = manager->getSleepQ(queueType);
2283e85e50aSWill Gardner        _doWake_UNLOCKED(numToWake);
2293e85e50aSWill Gardner    }
2304b0aa9eaSSundar Sridharan    if (this != sleepQ) {
2314b0aa9eaSSundar Sridharan        sleepQ->doWake(numToWake);
2324b0aa9eaSSundar Sridharan    }
233495aaee4SSundar Sridharan}
234495aaee4SSundar Sridharan
2352900340fSChiyoung Seovoid TaskQueue::schedule(ExTask &task) {
2362900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
2372900340fSChiyoung Seo    _schedule(task);
2382900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
2392900340fSChiyoung Seo}
2402900340fSChiyoung Seo
2412900340fSChiyoung Seovoid TaskQueue::_wake(ExTask &task) {
242d8577c54SDaniel Owen    const ProcessClock::time_point now = ProcessClock::now();
2433e85e50aSWill Gardner    TaskQueue* sleepQ;
2443e85e50aSWill Gardner    // One task is being made ready regardless of the queue it's in.
2453e85e50aSWill Gardner    size_t readyCount = 1;
2463e85e50aSWill Gardner    {
2473e85e50aSWill Gardner        LockHolder lh(mutex);
248d26ea7d1SDave Rigby        LOG(EXTENSION_LOG_DEBUG,
249d26ea7d1SDave Rigby            "%s: Wake a task \"%.*s\" id %" PRIu64,
250d26ea7d1SDave Rigby            name.c_str(),
251d26ea7d1SDave Rigby            int(task->getDescription().size()),
252d26ea7d1SDave Rigby            task->getDescription().data(),
253d26ea7d1SDave Rigby            uint64_t(task->getId()));
2543e85e50aSWill Gardner
2553e85e50aSWill Gardner        std::queue<ExTask> notReady;
2563e85e50aSWill Gardner        // Wake thread-count-serialized tasks too
2573e85e50aSWill Gardner        for (std::list<ExTask>::iterator it = pendingQueue.begin();
2583e85e50aSWill Gardner             it != pendingQueue.end();) {
2593e85e50aSWill Gardner            ExTask tid = *it;
2603e85e50aSWill Gardner            if (tid->getId() == task->getId() || tid->isdead()) {
2613e85e50aSWill Gardner                notReady.push(tid);
2623e85e50aSWill Gardner                it = pendingQueue.erase(it);
2633e85e50aSWill Gardner            } else {
2643e85e50aSWill Gardner                it++;
2653e85e50aSWill Gardner            }
2660aa9f232SSundar Sridharan        }
2670aa9f232SSundar Sridharan
2683e85e50aSWill Gardner        futureQueue.updateWaketime(task, now);
2693e85e50aSWill Gardner        task->setState(TASK_RUNNING, TASK_SNOOZED);
27029849d03SSundar Sridharan
2713e85e50aSWill Gardner        while (!notReady.empty()) {
2723e85e50aSWill Gardner            ExTask tid = notReady.front();
2733e85e50aSWill Gardner            if (tid->getWaketime() <= now || tid->isdead()) {
2743e85e50aSWill Gardner                readyCount++;
2753e85e50aSWill Gardner            }
2763e85e50aSWill Gardner
2773e85e50aSWill Gardner            // MB-18453: Only push to the futureQueue
2783e85e50aSWill Gardner            futureQueue.push(tid);
2793e85e50aSWill Gardner            notReady.pop();
28029849d03SSundar Sridharan        }
281b6534b8aSSundar Sridharan
2823e85e50aSWill Gardner        _doWake_UNLOCKED(readyCount);
2833e85e50aSWill Gardner        sleepQ = manager->getSleepQ(queueType);
28450838e8aSJim Walker    }
285e5a74388SJim Walker    if (this != sleepQ) {
286e5a74388SJim Walker        sleepQ->doWake(readyCount);
287b6534b8aSSundar Sridharan    }
288e32900fbSTrond Norbye}
289e32900fbSTrond Norbye
2902900340fSChiyoung Seovoid TaskQueue::wake(ExTask &task) {
2912900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
2922900340fSChiyoung Seo    _wake(task);
2932900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
2942900340fSChiyoung Seo}
2952900340fSChiyoung Seo
296e32900fbSTrond Norbyeconst std::string TaskQueue::taskType2Str(task_type_t type) {
297e32900fbSTrond Norbye    switch (type) {
298e32900fbSTrond Norbye    case WRITER_TASK_IDX:
299e32900fbSTrond Norbye        return std::string("Writer");
300e32900fbSTrond Norbye    case READER_TASK_IDX:
301e32900fbSTrond Norbye        return std::string("Reader");
302e32900fbSTrond Norbye    case AUXIO_TASK_IDX:
303e32900fbSTrond Norbye        return std::string("AuxIO");
304ddb60c3dSSundar Sridharan    case NONIO_TASK_IDX:
305ddb60c3dSSundar Sridharan        return std::string("NonIO");
306e32900fbSTrond Norbye    default:
307e32900fbSTrond Norbye        return std::string("None");
308e32900fbSTrond Norbye    }
309e32900fbSTrond Norbye}
310