xref: /6.0.3/kv_engine/engines/ep/src/taskqueue.cc (revision e7ed2862)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include "config.h"
18
19#include "taskqueue.h"
20#include "executorpool.h"
21#include "executorthread.h"
22
23#include <cmath>
24
25TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
26    name(nm), queueType(t), manager(m), sleepers(0)
27{
28    // EMPTY
29}
30
31TaskQueue::~TaskQueue() {
32    LOG(EXTENSION_LOG_INFO, "Task Queue killing %s", name.c_str());
33}
34
35const std::string TaskQueue::getName() const {
36    return (name+taskType2Str(queueType));
37}
38
39size_t TaskQueue::getReadyQueueSize() {
40    LockHolder lh(mutex);
41    return readyQueue.size();
42}
43
44size_t TaskQueue::getFutureQueueSize() {
45    LockHolder lh(mutex);
46    return futureQueue.size();
47}
48
49size_t TaskQueue::getPendingQueueSize() {
50    LockHolder lh(mutex);
51    return pendingQueue.size();
52}
53
54ExTask TaskQueue::_popReadyTask(void) {
55    ExTask t = readyQueue.top();
56    readyQueue.pop();
57    manager->lessWork(queueType);
58    return t;
59}
60
61void TaskQueue::doWake(size_t &numToWake) {
62    LockHolder lh(mutex);
63    _doWake_UNLOCKED(numToWake);
64}
65
66void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
67    if (sleepers && numToWake)  {
68        if (numToWake < sleepers) {
69            for (; numToWake; --numToWake) {
70                mutex.notify_one(); // cond_signal 1
71            }
72        } else {
73            mutex.notify_all(); // cond_broadcast
74            numToWake -= sleepers;
75        }
76    }
77}
78
79bool TaskQueue::_doSleep(ExecutorThread &t,
80                         std::unique_lock<std::mutex>& lock) {
81    t.updateCurrentTime();
82
83    // Determine the time point to wake this thread - either "forever" if the
84    // futureQueue is empty, or the earliest wake time in the futureQueue.
85    const auto wakeTime = futureQueue.empty()
86                                  ? std::chrono::steady_clock::time_point::max()
87                                  : futureQueue.top()->getWaketime();
88
89    if (t.getCurTime() < wakeTime && manager->trySleep(queueType)) {
90        // Atomically switch from running to sleeping; iff we were previously
91        // running.
92        executor_state_t expected_state = EXECUTOR_RUNNING;
93        if (!t.state.compare_exchange_strong(expected_state,
94                                             EXECUTOR_SLEEPING)) {
95            return false;
96        }
97        sleepers++;
98        // zzz....
99        const auto snooze = wakeTime - t.getCurTime();
100
101        if (snooze > std::chrono::seconds((int)round(MIN_SLEEP_TIME))) {
102            mutex.wait_for(lock, MIN_SLEEP_TIME);
103        } else {
104            mutex.wait_for(lock, snooze);
105        }
106        // ... woke!
107        sleepers--;
108        manager->woke();
109
110        // Finished our sleep, atomically switch back to running iff we were
111        // previously sleeping.
112        expected_state = EXECUTOR_SLEEPING;
113        if (!t.state.compare_exchange_strong(expected_state,
114                                             EXECUTOR_RUNNING)) {
115            return false;
116        }
117        t.updateCurrentTime();
118    }
119
120    return true;
121}
122
123bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
124    bool ret = false;
125    std::unique_lock<std::mutex> lh(mutex);
126
127    if (toSleep && !_doSleep(t, lh)) {
128        return ret; // shutting down
129    }
130
131    size_t numToWake = _moveReadyTasks(t.getCurTime());
132
133    if (!readyQueue.empty() && readyQueue.top()->isdead()) {
134        t.setCurrentTask(_popReadyTask()); // clean out dead tasks first
135        ret = true;
136    } else if (!readyQueue.empty() || !pendingQueue.empty()) {
137        // we must consider any pending tasks too. To ensure prioritized run
138        // order, the function below will push any pending task back into the
139        // readyQueue (sorted by priority)
140        _checkPendingQueue();
141        ExTask tid = _popReadyTask(); // and pop out the top task
142        t.setCurrentTask(tid);
143        ret = true;
144    } else { // Let the task continue waiting in pendingQueue
145        numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
146    }
147
148    _doWake_UNLOCKED(numToWake);
149    lh.unlock();
150
151    return ret;
152}
153
154bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
155    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
156    bool rv = _fetchNextTask(thread, toSleep);
157    ObjectRegistry::onSwitchThread(epe);
158    return rv;
159}
160
161size_t TaskQueue::_moveReadyTasks(const ProcessClock::time_point tv) {
162    if (!readyQueue.empty()) {
163        return 0;
164    }
165
166    size_t numReady = 0;
167    while (!futureQueue.empty()) {
168        ExTask tid = futureQueue.top();
169        if (tid->getWaketime() <= tv) {
170            futureQueue.pop();
171            readyQueue.push(tid);
172            numReady++;
173        } else {
174            break;
175        }
176    }
177
178    manager->addWork(numReady, queueType);
179
180    // Current thread will pop one task, so wake up one less thread
181    return numReady ? numReady - 1 : 0;
182}
183
184void TaskQueue::_checkPendingQueue(void) {
185    if (!pendingQueue.empty()) {
186        ExTask runnableTask = pendingQueue.front();
187        readyQueue.push(runnableTask);
188        manager->addWork(1, queueType);
189        pendingQueue.pop_front();
190    }
191}
192
193ProcessClock::time_point TaskQueue::_reschedule(ExTask &task) {
194    LockHolder lh(mutex);
195
196    futureQueue.push(task);
197    return futureQueue.top()->getWaketime();
198}
199
200ProcessClock::time_point TaskQueue::reschedule(ExTask &task) {
201    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
202    auto rv = _reschedule(task);
203    ObjectRegistry::onSwitchThread(epe);
204    return rv;
205}
206
207void TaskQueue::_schedule(ExTask &task) {
208    TaskQueue* sleepQ;
209    size_t numToWake = 1;
210
211    {
212        LockHolder lh(mutex);
213
214        // If we are rescheduling a previously cancelled task, we should reset
215        // the task state to the initial value of running.
216        task->setState(TASK_RUNNING, TASK_DEAD);
217
218        futureQueue.push(task);
219
220        LOG(EXTENSION_LOG_DEBUG,
221            "%s: Schedule a task \"%.*s\" id %" PRIu64,
222            name.c_str(),
223            int(task->getDescription().size()),
224            task->getDescription().data(),
225            uint64_t(task->getId()));
226
227        sleepQ = manager->getSleepQ(queueType);
228        _doWake_UNLOCKED(numToWake);
229    }
230    if (this != sleepQ) {
231        sleepQ->doWake(numToWake);
232    }
233}
234
235void TaskQueue::schedule(ExTask &task) {
236    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
237    _schedule(task);
238    ObjectRegistry::onSwitchThread(epe);
239}
240
241void TaskQueue::_wake(ExTask &task) {
242    const ProcessClock::time_point now = ProcessClock::now();
243    TaskQueue* sleepQ;
244    // One task is being made ready regardless of the queue it's in.
245    size_t readyCount = 1;
246    {
247        LockHolder lh(mutex);
248        LOG(EXTENSION_LOG_DEBUG,
249            "%s: Wake a task \"%.*s\" id %" PRIu64,
250            name.c_str(),
251            int(task->getDescription().size()),
252            task->getDescription().data(),
253            uint64_t(task->getId()));
254
255        std::queue<ExTask> notReady;
256        // Wake thread-count-serialized tasks too
257        for (std::list<ExTask>::iterator it = pendingQueue.begin();
258             it != pendingQueue.end();) {
259            ExTask tid = *it;
260            if (tid->getId() == task->getId() || tid->isdead()) {
261                notReady.push(tid);
262                it = pendingQueue.erase(it);
263            } else {
264                it++;
265            }
266        }
267
268        futureQueue.updateWaketime(task, now);
269        task->setState(TASK_RUNNING, TASK_SNOOZED);
270
271        while (!notReady.empty()) {
272            ExTask tid = notReady.front();
273            if (tid->getWaketime() <= now || tid->isdead()) {
274                readyCount++;
275            }
276
277            // MB-18453: Only push to the futureQueue
278            futureQueue.push(tid);
279            notReady.pop();
280        }
281
282        _doWake_UNLOCKED(readyCount);
283        sleepQ = manager->getSleepQ(queueType);
284    }
285    if (this != sleepQ) {
286        sleepQ->doWake(readyCount);
287    }
288}
289
290void TaskQueue::wake(ExTask &task) {
291    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
292    _wake(task);
293    ObjectRegistry::onSwitchThread(epe);
294}
295
296const std::string TaskQueue::taskType2Str(task_type_t type) {
297    switch (type) {
298    case WRITER_TASK_IDX:
299        return std::string("Writer");
300    case READER_TASK_IDX:
301        return std::string("Reader");
302    case AUXIO_TASK_IDX:
303        return std::string("AuxIO");
304    case NONIO_TASK_IDX:
305        return std::string("NonIO");
306    default:
307        return std::string("None");
308    }
309}
310