1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2014 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include "config.h"
18
19#include "taskqueue.h"
20#include "executorpool.h"
21#include "executorthread.h"
22
23TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
24    name(nm), queueType(t), manager(m), sleepers(0)
25{
26    // EMPTY
27}
28
29TaskQueue::~TaskQueue() {
30    LOG(EXTENSION_LOG_INFO, "Task Queue killing %s", name.c_str());
31}
32
33const std::string TaskQueue::getName() const {
34    return (name+taskType2Str(queueType));
35}
36
37ExTask TaskQueue::_popReadyTask(void) {
38    ExTask t = readyQueue.top();
39    readyQueue.pop();
40    manager->lessWork(queueType);
41    return t;
42}
43
44void TaskQueue::doWake(size_t &numToWake) {
45    LockHolder lh(mutex);
46    _doWake_UNLOCKED(numToWake);
47}
48
49void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
50    if (sleepers && numToWake)  {
51        if (numToWake < sleepers) {
52            for (; numToWake; --numToWake) {
53                mutex.notifyOne(); // cond_signal 1
54            }
55        } else {
56            mutex.notify(); // cond_broadcast
57            numToWake -= sleepers;
58        }
59    }
60}
61
62bool TaskQueue::_doSleep(ExecutorThread &t) {
63    gettimeofday(&t.now, NULL);
64    if (less_tv(t.now, t.waketime) && manager->trySleep(queueType)) {
65        if (t.state == EXECUTOR_RUNNING) {
66            t.state = EXECUTOR_SLEEPING;
67        } else {
68            return false;
69        }
70        sleepers++;
71        // zzz....
72        struct timeval waketime = t.now;
73        advance_tv(waketime, MIN_SLEEP_TIME); // avoid sleeping more than this
74        if (less_tv(waketime, t.waketime)) { // to prevent losing posts
75            mutex.wait(waketime);
76        } else {
77            mutex.wait(t.waketime);
78        }
79        // ... woke!
80        sleepers--;
81        manager->woke();
82
83        if (t.state == EXECUTOR_SLEEPING) {
84            t.state = EXECUTOR_RUNNING;
85        } else {
86            return false;
87        }
88        gettimeofday(&t.now, NULL);
89    }
90    set_max_tv(t.waketime);
91    return true;
92}
93
94bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
95    bool ret = false;
96    LockHolder lh(mutex);
97
98    if (toSleep && !_doSleep(t)) {
99        return ret; // shutting down
100    }
101
102    size_t numToWake = _moveReadyTasks(t.now);
103
104    if (!futureQueue.empty() && t.startIndex == queueType &&
105        less_tv(futureQueue.top()->waketime, t.waketime)) {
106        t.waketime = futureQueue.top()->waketime; // record earliest waketime
107    }
108
109    if (!readyQueue.empty() && readyQueue.top()->isdead()) {
110        t.currentTask = _popReadyTask(); // clean out dead tasks first
111        ret = true;
112    } else if (!readyQueue.empty() || !pendingQueue.empty()) {
113        t.curTaskType = manager->tryNewWork(queueType);
114        if (t.curTaskType != NO_TASK_TYPE) {
115            // if this TaskQueue has obtained capacity for the thread, then we must
116            // consider any pending tasks too. To ensure prioritized run order,
117            // the function below will push any pending task back into
118            // the readyQueue (sorted by priority)
119            _checkPendingQueue();
120
121            ExTask tid = _popReadyTask(); // and pop out the top task
122            t.currentTask = tid; // assign task to thread
123            ret = true;
124        } else if (!readyQueue.empty()) { // We hit limit on max # workers
125            ExTask tid = _popReadyTask(); // that can work on current Q type!
126            pendingQueue.push_back(tid);
127            numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
128        } else { // Let the task continue waiting in pendingQueue
129            cb_assert(!pendingQueue.empty());
130            numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
131        }
132    }
133
134    _doWake_UNLOCKED(numToWake);
135    lh.unlock();
136
137    return ret;
138}
139
140bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
141    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
142    size_t rv = _fetchNextTask(thread, toSleep);
143    ObjectRegistry::onSwitchThread(epe);
144    return rv;
145}
146
147size_t TaskQueue::_moveReadyTasks(struct timeval tv) {
148    if (!readyQueue.empty()) {
149        return 0;
150    }
151
152    size_t numReady = 0;
153    while (!futureQueue.empty()) {
154        ExTask tid = futureQueue.top();
155        if (less_eq_tv(tid->waketime, tv)) {
156            futureQueue.pop();
157            readyQueue.push(tid);
158            numReady++;
159        } else {
160            break;
161        }
162    }
163
164    manager->addWork(numReady, queueType);
165
166    // Current thread will pop one task, so wake up one less thread
167    return numReady ? numReady - 1 : 0;
168}
169
170void TaskQueue::_checkPendingQueue(void) {
171    if (!pendingQueue.empty()) {
172        ExTask runnableTask = pendingQueue.front();
173        readyQueue.push(runnableTask);
174        manager->addWork(1, queueType);
175        pendingQueue.pop_front();
176    }
177}
178
179struct timeval TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
180    struct timeval waktime;
181    manager->doneWork(curTaskType);
182
183    LockHolder lh(mutex);
184
185    futureQueue.push(task);
186    if (curTaskType == queueType) {
187        waktime = futureQueue.top()->waketime;
188    } else {
189        set_max_tv(waktime);
190    }
191
192    return waktime;
193}
194
195struct timeval TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
196    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
197    struct timeval rv = _reschedule(task, curTaskType);
198    ObjectRegistry::onSwitchThread(epe);
199    return rv;
200}
201
202void TaskQueue::_schedule(ExTask &task) {
203    LockHolder lh(mutex);
204
205    futureQueue.push(task);
206
207    LOG(EXTENSION_LOG_DEBUG, "%s: Schedule a task \"%s\" id %d",
208            name.c_str(), task->getDescription().c_str(), task->getId());
209
210    size_t numToWake = 1;
211    TaskQueue *sleepQ = manager->getSleepQ(queueType);
212    _doWake_UNLOCKED(numToWake);
213    lh.unlock();
214    if (this != sleepQ) {
215        sleepQ->doWake(numToWake);
216    }
217}
218
219void TaskQueue::schedule(ExTask &task) {
220    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
221    _schedule(task);
222    ObjectRegistry::onSwitchThread(epe);
223}
224
225void TaskQueue::_wake(ExTask &task) {
226    struct  timeval    now;
227    size_t numReady = 0;
228    gettimeofday(&now, NULL);
229
230    LockHolder lh(mutex);
231    LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %d", name.c_str(),
232            task->getDescription().c_str(), task->getId());
233
234    // MB-9986: Re-sort futureQueue for now. TODO: avoid this O(N) overhead
235    std::queue<ExTask> notReady;
236    while (!futureQueue.empty()) {
237        ExTask tid = futureQueue.top();
238        notReady.push(tid);
239        futureQueue.pop();
240    }
241
242    // Wake thread-count-serialized tasks too
243    for (std::list<ExTask>::iterator it = pendingQueue.begin();
244         it != pendingQueue.end();) {
245        ExTask tid = *it;
246        if (tid->getId() == task->getId() || tid->isdead()) {
247            notReady.push(tid);
248            it = pendingQueue.erase(it);
249        } else {
250            it++;
251        }
252    }
253
254    // Note that this task that we are waking may nor may not be blocked in Q
255    task->waketime = now;
256    task->setState(TASK_RUNNING, TASK_SNOOZED);
257
258    while (!notReady.empty()) {
259        ExTask tid = notReady.front();
260        if (less_eq_tv(tid->waketime, now) || tid->isdead()) {
261            readyQueue.push(tid);
262            numReady++;
263        } else {
264            futureQueue.push(tid);
265        }
266        notReady.pop();
267    }
268
269    if (numReady) {
270        manager->addWork(numReady, queueType);
271        _doWake_UNLOCKED(numReady);
272        TaskQueue *sleepQ = manager->getSleepQ(queueType);
273        lh.unlock();
274        if (this != sleepQ) {
275            sleepQ->doWake(numReady);
276        }
277    }
278}
279
280void TaskQueue::wake(ExTask &task) {
281    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
282    _wake(task);
283    ObjectRegistry::onSwitchThread(epe);
284}
285
286const std::string TaskQueue::taskType2Str(task_type_t type) {
287    switch (type) {
288    case WRITER_TASK_IDX:
289        return std::string("Writer");
290    case READER_TASK_IDX:
291        return std::string("Reader");
292    case AUXIO_TASK_IDX:
293        return std::string("AuxIO");
294    case NONIO_TASK_IDX:
295        return std::string("NonIO");
296    default:
297        return std::string("None");
298    }
299}
300