1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2014 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 #include "config.h"
18
19 #include "taskqueue.h"
20 #include "executorpool.h"
21 #include "executorthread.h"
22
TaskQueue(ExecutorPool *m, task_type_t t, const char *nm)23 TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
24 name(nm), queueType(t), manager(m), sleepers(0)
25 {
26 // EMPTY
27 }
28
~TaskQueue()29 TaskQueue::~TaskQueue() {
30 LOG(EXTENSION_LOG_INFO, "Task Queue killing %s", name.c_str());
31 }
32
getName() const33 const std::string TaskQueue::getName() const {
34 return (name+taskType2Str(queueType));
35 }
36
_popReadyTask(void)37 ExTask TaskQueue::_popReadyTask(void) {
38 ExTask t = readyQueue.top();
39 readyQueue.pop();
40 manager->lessWork(queueType);
41 return t;
42 }
43
doWake(size_t &numToWake)44 void TaskQueue::doWake(size_t &numToWake) {
45 LockHolder lh(mutex);
46 _doWake_UNLOCKED(numToWake);
47 }
48
_doWake_UNLOCKED(size_t &numToWake)49 void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
50 if (sleepers && numToWake) {
51 if (numToWake < sleepers) {
52 for (; numToWake; --numToWake) {
53 mutex.notifyOne(); // cond_signal 1
54 }
55 } else {
56 mutex.notify(); // cond_broadcast
57 numToWake -= sleepers;
58 }
59 }
60 }
61
_doSleep(ExecutorThread &t)62 bool TaskQueue::_doSleep(ExecutorThread &t) {
63 gettimeofday(&t.now, NULL);
64 if (less_tv(t.now, t.waketime) && manager->trySleep(queueType)) {
65 if (t.state == EXECUTOR_RUNNING) {
66 t.state = EXECUTOR_SLEEPING;
67 } else {
68 return false;
69 }
70 sleepers++;
71 // zzz....
72 struct timeval waketime = t.now;
73 advance_tv(waketime, MIN_SLEEP_TIME); // avoid sleeping more than this
74 if (less_tv(waketime, t.waketime)) { // to prevent losing posts
75 mutex.wait(waketime);
76 } else {
77 mutex.wait(t.waketime);
78 }
79 // ... woke!
80 sleepers--;
81 manager->woke();
82
83 if (t.state == EXECUTOR_SLEEPING) {
84 t.state = EXECUTOR_RUNNING;
85 } else {
86 return false;
87 }
88 gettimeofday(&t.now, NULL);
89 }
90 set_max_tv(t.waketime);
91 return true;
92 }
93
_fetchNextTask(ExecutorThread &t, bool toSleep)94 bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
95 bool ret = false;
96 LockHolder lh(mutex);
97
98 if (toSleep && !_doSleep(t)) {
99 return ret; // shutting down
100 }
101
102 size_t numToWake = _moveReadyTasks(t.now);
103
104 if (!futureQueue.empty() && t.startIndex == queueType &&
105 less_tv(futureQueue.top()->waketime, t.waketime)) {
106 t.waketime = futureQueue.top()->waketime; // record earliest waketime
107 }
108
109 if (!readyQueue.empty() && readyQueue.top()->isdead()) {
110 t.currentTask = _popReadyTask(); // clean out dead tasks first
111 ret = true;
112 } else if (!readyQueue.empty() || !pendingQueue.empty()) {
113 t.curTaskType = manager->tryNewWork(queueType);
114 if (t.curTaskType != NO_TASK_TYPE) {
115 // if this TaskQueue has obtained capacity for the thread, then we must
116 // consider any pending tasks too. To ensure prioritized run order,
117 // the function below will push any pending task back into
118 // the readyQueue (sorted by priority)
119 _checkPendingQueue();
120
121 ExTask tid = _popReadyTask(); // and pop out the top task
122 t.currentTask = tid; // assign task to thread
123 ret = true;
124 } else if (!readyQueue.empty()) { // We hit limit on max # workers
125 ExTask tid = _popReadyTask(); // that can work on current Q type!
126 pendingQueue.push_back(tid);
127 numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
128 } else { // Let the task continue waiting in pendingQueue
129 cb_assert(!pendingQueue.empty());
130 numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
131 }
132 }
133
134 _doWake_UNLOCKED(numToWake);
135 lh.unlock();
136
137 return ret;
138 }
139
fetchNextTask(ExecutorThread &thread, bool toSleep)140 bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
141 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
142 size_t rv = _fetchNextTask(thread, toSleep);
143 ObjectRegistry::onSwitchThread(epe);
144 return rv;
145 }
146
_moveReadyTasks(struct timeval tv)147 size_t TaskQueue::_moveReadyTasks(struct timeval tv) {
148 if (!readyQueue.empty()) {
149 return 0;
150 }
151
152 size_t numReady = 0;
153 while (!futureQueue.empty()) {
154 ExTask tid = futureQueue.top();
155 if (less_eq_tv(tid->waketime, tv)) {
156 futureQueue.pop();
157 readyQueue.push(tid);
158 numReady++;
159 } else {
160 break;
161 }
162 }
163
164 manager->addWork(numReady, queueType);
165
166 // Current thread will pop one task, so wake up one less thread
167 return numReady ? numReady - 1 : 0;
168 }
169
_checkPendingQueue(void)170 void TaskQueue::_checkPendingQueue(void) {
171 if (!pendingQueue.empty()) {
172 ExTask runnableTask = pendingQueue.front();
173 readyQueue.push(runnableTask);
174 manager->addWork(1, queueType);
175 pendingQueue.pop_front();
176 }
177 }
178
_reschedule(ExTask &task, task_type_t &curTaskType)179 struct timeval TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
180 struct timeval waktime;
181 manager->doneWork(curTaskType);
182
183 LockHolder lh(mutex);
184
185 futureQueue.push(task);
186 if (curTaskType == queueType) {
187 waktime = futureQueue.top()->waketime;
188 } else {
189 set_max_tv(waktime);
190 }
191
192 return waktime;
193 }
194
reschedule(ExTask &task, task_type_t &curTaskType)195 struct timeval TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
196 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
197 struct timeval rv = _reschedule(task, curTaskType);
198 ObjectRegistry::onSwitchThread(epe);
199 return rv;
200 }
201
_schedule(ExTask &task)202 void TaskQueue::_schedule(ExTask &task) {
203 LockHolder lh(mutex);
204
205 futureQueue.push(task);
206
207 LOG(EXTENSION_LOG_DEBUG, "%s: Schedule a task \"%s\" id %d",
208 name.c_str(), task->getDescription().c_str(), task->getId());
209
210 size_t numToWake = 1;
211 TaskQueue *sleepQ = manager->getSleepQ(queueType);
212 _doWake_UNLOCKED(numToWake);
213 lh.unlock();
214 if (this != sleepQ) {
215 sleepQ->doWake(numToWake);
216 }
217 }
218
schedule(ExTask &task)219 void TaskQueue::schedule(ExTask &task) {
220 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
221 _schedule(task);
222 ObjectRegistry::onSwitchThread(epe);
223 }
224
_wake(ExTask &task)225 void TaskQueue::_wake(ExTask &task) {
226 struct timeval now;
227 size_t numReady = 0;
228 gettimeofday(&now, NULL);
229
230 LockHolder lh(mutex);
231 LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %d", name.c_str(),
232 task->getDescription().c_str(), task->getId());
233
234 // MB-9986: Re-sort futureQueue for now. TODO: avoid this O(N) overhead
235 std::queue<ExTask> notReady;
236 while (!futureQueue.empty()) {
237 ExTask tid = futureQueue.top();
238 notReady.push(tid);
239 futureQueue.pop();
240 }
241
242 // Wake thread-count-serialized tasks too
243 for (std::list<ExTask>::iterator it = pendingQueue.begin();
244 it != pendingQueue.end();) {
245 ExTask tid = *it;
246 if (tid->getId() == task->getId() || tid->isdead()) {
247 notReady.push(tid);
248 it = pendingQueue.erase(it);
249 } else {
250 it++;
251 }
252 }
253
254 // Note that this task that we are waking may nor may not be blocked in Q
255 task->waketime = now;
256 task->setState(TASK_RUNNING, TASK_SNOOZED);
257
258 while (!notReady.empty()) {
259 ExTask tid = notReady.front();
260 if (less_eq_tv(tid->waketime, now) || tid->isdead()) {
261 readyQueue.push(tid);
262 numReady++;
263 } else {
264 futureQueue.push(tid);
265 }
266 notReady.pop();
267 }
268
269 if (numReady) {
270 manager->addWork(numReady, queueType);
271 _doWake_UNLOCKED(numReady);
272 TaskQueue *sleepQ = manager->getSleepQ(queueType);
273 lh.unlock();
274 if (this != sleepQ) {
275 sleepQ->doWake(numReady);
276 }
277 }
278 }
279
wake(ExTask &task)280 void TaskQueue::wake(ExTask &task) {
281 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
282 _wake(task);
283 ObjectRegistry::onSwitchThread(epe);
284 }
285
taskType2Str(task_type_t type)286 const std::string TaskQueue::taskType2Str(task_type_t type) {
287 switch (type) {
288 case WRITER_TASK_IDX:
289 return std::string("Writer");
290 case READER_TASK_IDX:
291 return std::string("Reader");
292 case AUXIO_TASK_IDX:
293 return std::string("AuxIO");
294 case NONIO_TASK_IDX:
295 return std::string("NonIO");
296 default:
297 return std::string("None");
298 }
299 }
300