1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "config.h"
18 
19 #include "taskqueue.h"
20 #include "executorpool.h"
21 #include "executorthread.h"
22 
TaskQueue(ExecutorPool *m, task_type_t t, const char *nm)23 TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
24     name(nm), queueType(t), manager(m), sleepers(0)
25 {
26     // EMPTY
27 }
28 
~TaskQueue()29 TaskQueue::~TaskQueue() {
30     LOG(EXTENSION_LOG_INFO, "Task Queue killing %s", name.c_str());
31 }
32 
getName() const33 const std::string TaskQueue::getName() const {
34     return (name+taskType2Str(queueType));
35 }
36 
_popReadyTask(void)37 ExTask TaskQueue::_popReadyTask(void) {
38     ExTask t = readyQueue.top();
39     readyQueue.pop();
40     manager->lessWork(queueType);
41     return t;
42 }
43 
doWake(size_t &numToWake)44 void TaskQueue::doWake(size_t &numToWake) {
45     LockHolder lh(mutex);
46     _doWake_UNLOCKED(numToWake);
47 }
48 
_doWake_UNLOCKED(size_t &numToWake)49 void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
50     if (sleepers && numToWake)  {
51         if (numToWake < sleepers) {
52             for (; numToWake; --numToWake) {
53                 mutex.notifyOne(); // cond_signal 1
54             }
55         } else {
56             mutex.notify(); // cond_broadcast
57             numToWake -= sleepers;
58         }
59     }
60 }
61 
_doSleep(ExecutorThread &t)62 bool TaskQueue::_doSleep(ExecutorThread &t) {
63     gettimeofday(&t.now, NULL);
64     if (less_tv(t.now, t.waketime) && manager->trySleep(queueType)) {
65         if (t.state == EXECUTOR_RUNNING) {
66             t.state = EXECUTOR_SLEEPING;
67         } else {
68             return false;
69         }
70         sleepers++;
71         // zzz....
72         struct timeval waketime = t.now;
73         advance_tv(waketime, MIN_SLEEP_TIME); // avoid sleeping more than this
74         if (less_tv(waketime, t.waketime)) { // to prevent losing posts
75             mutex.wait(waketime);
76         } else {
77             mutex.wait(t.waketime);
78         }
79         // ... woke!
80         sleepers--;
81         manager->woke();
82 
83         if (t.state == EXECUTOR_SLEEPING) {
84             t.state = EXECUTOR_RUNNING;
85         } else {
86             return false;
87         }
88         gettimeofday(&t.now, NULL);
89     }
90     set_max_tv(t.waketime);
91     return true;
92 }
93 
_fetchNextTask(ExecutorThread &t, bool toSleep)94 bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
95     bool ret = false;
96     LockHolder lh(mutex);
97 
98     if (toSleep && !_doSleep(t)) {
99         return ret; // shutting down
100     }
101 
102     size_t numToWake = _moveReadyTasks(t.now);
103 
104     if (!futureQueue.empty() && t.startIndex == queueType &&
105         less_tv(futureQueue.top()->waketime, t.waketime)) {
106         t.waketime = futureQueue.top()->waketime; // record earliest waketime
107     }
108 
109     if (!readyQueue.empty() && readyQueue.top()->isdead()) {
110         t.currentTask = _popReadyTask(); // clean out dead tasks first
111         ret = true;
112     } else if (!readyQueue.empty() || !pendingQueue.empty()) {
113         t.curTaskType = manager->tryNewWork(queueType);
114         if (t.curTaskType != NO_TASK_TYPE) {
115             // if this TaskQueue has obtained capacity for the thread, then we must
116             // consider any pending tasks too. To ensure prioritized run order,
117             // the function below will push any pending task back into
118             // the readyQueue (sorted by priority)
119             _checkPendingQueue();
120 
121             ExTask tid = _popReadyTask(); // and pop out the top task
122             t.currentTask = tid; // assign task to thread
123             ret = true;
124         } else if (!readyQueue.empty()) { // We hit limit on max # workers
125             ExTask tid = _popReadyTask(); // that can work on current Q type!
126             pendingQueue.push_back(tid);
127             numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
128         } else { // Let the task continue waiting in pendingQueue
129             cb_assert(!pendingQueue.empty());
130             numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
131         }
132     }
133 
134     _doWake_UNLOCKED(numToWake);
135     lh.unlock();
136 
137     return ret;
138 }
139 
fetchNextTask(ExecutorThread &thread, bool toSleep)140 bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
141     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
142     size_t rv = _fetchNextTask(thread, toSleep);
143     ObjectRegistry::onSwitchThread(epe);
144     return rv;
145 }
146 
_moveReadyTasks(struct timeval tv)147 size_t TaskQueue::_moveReadyTasks(struct timeval tv) {
148     if (!readyQueue.empty()) {
149         return 0;
150     }
151 
152     size_t numReady = 0;
153     while (!futureQueue.empty()) {
154         ExTask tid = futureQueue.top();
155         if (less_eq_tv(tid->waketime, tv)) {
156             futureQueue.pop();
157             readyQueue.push(tid);
158             numReady++;
159         } else {
160             break;
161         }
162     }
163 
164     manager->addWork(numReady, queueType);
165 
166     // Current thread will pop one task, so wake up one less thread
167     return numReady ? numReady - 1 : 0;
168 }
169 
_checkPendingQueue(void)170 void TaskQueue::_checkPendingQueue(void) {
171     if (!pendingQueue.empty()) {
172         ExTask runnableTask = pendingQueue.front();
173         readyQueue.push(runnableTask);
174         manager->addWork(1, queueType);
175         pendingQueue.pop_front();
176     }
177 }
178 
_reschedule(ExTask &task, task_type_t &curTaskType)179 struct timeval TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
180     struct timeval waktime;
181     manager->doneWork(curTaskType);
182 
183     LockHolder lh(mutex);
184 
185     futureQueue.push(task);
186     if (curTaskType == queueType) {
187         waktime = futureQueue.top()->waketime;
188     } else {
189         set_max_tv(waktime);
190     }
191 
192     return waktime;
193 }
194 
reschedule(ExTask &task, task_type_t &curTaskType)195 struct timeval TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
196     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
197     struct timeval rv = _reschedule(task, curTaskType);
198     ObjectRegistry::onSwitchThread(epe);
199     return rv;
200 }
201 
_schedule(ExTask &task)202 void TaskQueue::_schedule(ExTask &task) {
203     LockHolder lh(mutex);
204 
205     futureQueue.push(task);
206 
207     LOG(EXTENSION_LOG_DEBUG, "%s: Schedule a task \"%s\" id %d",
208             name.c_str(), task->getDescription().c_str(), task->getId());
209 
210     size_t numToWake = 1;
211     TaskQueue *sleepQ = manager->getSleepQ(queueType);
212     _doWake_UNLOCKED(numToWake);
213     lh.unlock();
214     if (this != sleepQ) {
215         sleepQ->doWake(numToWake);
216     }
217 }
218 
schedule(ExTask &task)219 void TaskQueue::schedule(ExTask &task) {
220     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
221     _schedule(task);
222     ObjectRegistry::onSwitchThread(epe);
223 }
224 
_wake(ExTask &task)225 void TaskQueue::_wake(ExTask &task) {
226     struct  timeval    now;
227     size_t numReady = 0;
228     gettimeofday(&now, NULL);
229 
230     LockHolder lh(mutex);
231     LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %d", name.c_str(),
232             task->getDescription().c_str(), task->getId());
233 
234     // MB-9986: Re-sort futureQueue for now. TODO: avoid this O(N) overhead
235     std::queue<ExTask> notReady;
236     while (!futureQueue.empty()) {
237         ExTask tid = futureQueue.top();
238         notReady.push(tid);
239         futureQueue.pop();
240     }
241 
242     // Wake thread-count-serialized tasks too
243     for (std::list<ExTask>::iterator it = pendingQueue.begin();
244          it != pendingQueue.end();) {
245         ExTask tid = *it;
246         if (tid->getId() == task->getId() || tid->isdead()) {
247             notReady.push(tid);
248             it = pendingQueue.erase(it);
249         } else {
250             it++;
251         }
252     }
253 
254     // Note that this task that we are waking may nor may not be blocked in Q
255     task->waketime = now;
256     task->setState(TASK_RUNNING, TASK_SNOOZED);
257 
258     while (!notReady.empty()) {
259         ExTask tid = notReady.front();
260         if (less_eq_tv(tid->waketime, now) || tid->isdead()) {
261             readyQueue.push(tid);
262             numReady++;
263         } else {
264             futureQueue.push(tid);
265         }
266         notReady.pop();
267     }
268 
269     if (numReady) {
270         manager->addWork(numReady, queueType);
271         _doWake_UNLOCKED(numReady);
272         TaskQueue *sleepQ = manager->getSleepQ(queueType);
273         lh.unlock();
274         if (this != sleepQ) {
275             sleepQ->doWake(numReady);
276         }
277     }
278 }
279 
wake(ExTask &task)280 void TaskQueue::wake(ExTask &task) {
281     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
282     _wake(task);
283     ObjectRegistry::onSwitchThread(epe);
284 }
285 
taskType2Str(task_type_t type)286 const std::string TaskQueue::taskType2Str(task_type_t type) {
287     switch (type) {
288     case WRITER_TASK_IDX:
289         return std::string("Writer");
290     case READER_TASK_IDX:
291         return std::string("Reader");
292     case AUXIO_TASK_IDX:
293         return std::string("AuxIO");
294     case NONIO_TASK_IDX:
295         return std::string("NonIO");
296     default:
297         return std::string("None");
298     }
299 }
300