1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "config.h"
18 
19 #include "taskqueue.h"
20 #include "executorpool.h"
21 #include "executorthread.h"
22 
23 #include <cmath>
24 
TaskQueue(ExecutorPool *m, task_type_t t, const char *nm)25 TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
26     name(nm), queueType(t), manager(m), sleepers(0)
27 {
28     // EMPTY
29 }
30 
~TaskQueue()31 TaskQueue::~TaskQueue() {
32     LOG(EXTENSION_LOG_INFO, "Task Queue killing %s", name.c_str());
33 }
34 
getName() const35 const std::string TaskQueue::getName() const {
36     return (name+taskType2Str(queueType));
37 }
38 
getReadyQueueSize()39 size_t TaskQueue::getReadyQueueSize() {
40     LockHolder lh(mutex);
41     return readyQueue.size();
42 }
43 
getFutureQueueSize()44 size_t TaskQueue::getFutureQueueSize() {
45     LockHolder lh(mutex);
46     return futureQueue.size();
47 }
48 
getPendingQueueSize()49 size_t TaskQueue::getPendingQueueSize() {
50     LockHolder lh(mutex);
51     return pendingQueue.size();
52 }
53 
_popReadyTask(void)54 ExTask TaskQueue::_popReadyTask(void) {
55     ExTask t = readyQueue.top();
56     readyQueue.pop();
57     manager->lessWork(queueType);
58     return t;
59 }
60 
doWake(size_t &numToWake)61 void TaskQueue::doWake(size_t &numToWake) {
62     LockHolder lh(mutex);
63     _doWake_UNLOCKED(numToWake);
64 }
65 
_doWake_UNLOCKED(size_t &numToWake)66 void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
67     if (sleepers && numToWake)  {
68         if (numToWake < sleepers) {
69             for (; numToWake; --numToWake) {
70                 mutex.notify_one(); // cond_signal 1
71             }
72         } else {
73             mutex.notify_all(); // cond_broadcast
74             numToWake -= sleepers;
75         }
76     }
77 }
78 
_doSleep(ExecutorThread &t, std::unique_lock<std::mutex>& lock)79 bool TaskQueue::_doSleep(ExecutorThread &t,
80                          std::unique_lock<std::mutex>& lock) {
81     t.updateCurrentTime();
82 
83     // Determine the time point to wake this thread - either "forever" if the
84     // futureQueue is empty, or the earliest wake time in the futureQueue.
85     const auto wakeTime = futureQueue.empty()
86                                   ? std::chrono::steady_clock::time_point::max()
87                                   : futureQueue.top()->getWaketime();
88 
89     if (t.getCurTime() < wakeTime && manager->trySleep(queueType)) {
90         // Atomically switch from running to sleeping; iff we were previously
91         // running.
92         executor_state_t expected_state = EXECUTOR_RUNNING;
93         if (!t.state.compare_exchange_strong(expected_state,
94                                              EXECUTOR_SLEEPING)) {
95             return false;
96         }
97         sleepers++;
98         // zzz....
99         const auto snooze = wakeTime - t.getCurTime();
100 
101         if (snooze > std::chrono::seconds((int)round(MIN_SLEEP_TIME))) {
102             mutex.wait_for(lock, MIN_SLEEP_TIME);
103         } else {
104             mutex.wait_for(lock, snooze);
105         }
106         // ... woke!
107         sleepers--;
108         manager->woke();
109 
110         // Finished our sleep, atomically switch back to running iff we were
111         // previously sleeping.
112         expected_state = EXECUTOR_SLEEPING;
113         if (!t.state.compare_exchange_strong(expected_state,
114                                              EXECUTOR_RUNNING)) {
115             return false;
116         }
117         t.updateCurrentTime();
118     }
119 
120     return true;
121 }
122 
_fetchNextTask(ExecutorThread &t, bool toSleep)123 bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
124     bool ret = false;
125     std::unique_lock<std::mutex> lh(mutex);
126 
127     if (toSleep && !_doSleep(t, lh)) {
128         return ret; // shutting down
129     }
130 
131     size_t numToWake = _moveReadyTasks(t.getCurTime());
132 
133     if (!readyQueue.empty() && readyQueue.top()->isdead()) {
134         t.setCurrentTask(_popReadyTask()); // clean out dead tasks first
135         ret = true;
136     } else if (!readyQueue.empty() || !pendingQueue.empty()) {
137         // we must consider any pending tasks too. To ensure prioritized run
138         // order, the function below will push any pending task back into the
139         // readyQueue (sorted by priority)
140         _checkPendingQueue();
141         ExTask tid = _popReadyTask(); // and pop out the top task
142         t.setCurrentTask(tid);
143         ret = true;
144     } else { // Let the task continue waiting in pendingQueue
145         numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
146     }
147 
148     _doWake_UNLOCKED(numToWake);
149     lh.unlock();
150 
151     return ret;
152 }
153 
fetchNextTask(ExecutorThread &thread, bool toSleep)154 bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
155     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
156     bool rv = _fetchNextTask(thread, toSleep);
157     ObjectRegistry::onSwitchThread(epe);
158     return rv;
159 }
160 
_moveReadyTasks(const ProcessClock::time_point tv)161 size_t TaskQueue::_moveReadyTasks(const ProcessClock::time_point tv) {
162     if (!readyQueue.empty()) {
163         return 0;
164     }
165 
166     size_t numReady = 0;
167     while (!futureQueue.empty()) {
168         ExTask tid = futureQueue.top();
169         if (tid->getWaketime() <= tv) {
170             futureQueue.pop();
171             readyQueue.push(tid);
172             numReady++;
173         } else {
174             break;
175         }
176     }
177 
178     manager->addWork(numReady, queueType);
179 
180     // Current thread will pop one task, so wake up one less thread
181     return numReady ? numReady - 1 : 0;
182 }
183 
_checkPendingQueue(void)184 void TaskQueue::_checkPendingQueue(void) {
185     if (!pendingQueue.empty()) {
186         ExTask runnableTask = pendingQueue.front();
187         readyQueue.push(runnableTask);
188         manager->addWork(1, queueType);
189         pendingQueue.pop_front();
190     }
191 }
192 
_reschedule(ExTask &task)193 ProcessClock::time_point TaskQueue::_reschedule(ExTask &task) {
194     LockHolder lh(mutex);
195 
196     futureQueue.push(task);
197     return futureQueue.top()->getWaketime();
198 }
199 
reschedule(ExTask &task)200 ProcessClock::time_point TaskQueue::reschedule(ExTask &task) {
201     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
202     auto rv = _reschedule(task);
203     ObjectRegistry::onSwitchThread(epe);
204     return rv;
205 }
206 
_schedule(ExTask &task)207 void TaskQueue::_schedule(ExTask &task) {
208     TaskQueue* sleepQ;
209     size_t numToWake = 1;
210 
211     {
212         LockHolder lh(mutex);
213 
214         // If we are rescheduling a previously cancelled task, we should reset
215         // the task state to the initial value of running.
216         task->setState(TASK_RUNNING, TASK_DEAD);
217 
218         futureQueue.push(task);
219 
220         LOG(EXTENSION_LOG_DEBUG,
221             "%s: Schedule a task \"%.*s\" id %" PRIu64,
222             name.c_str(),
223             int(task->getDescription().size()),
224             task->getDescription().data(),
225             uint64_t(task->getId()));
226 
227         sleepQ = manager->getSleepQ(queueType);
228         _doWake_UNLOCKED(numToWake);
229     }
230     if (this != sleepQ) {
231         sleepQ->doWake(numToWake);
232     }
233 }
234 
schedule(ExTask &task)235 void TaskQueue::schedule(ExTask &task) {
236     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
237     _schedule(task);
238     ObjectRegistry::onSwitchThread(epe);
239 }
240 
_wake(ExTask &task)241 void TaskQueue::_wake(ExTask &task) {
242     const ProcessClock::time_point now = ProcessClock::now();
243     TaskQueue* sleepQ;
244     // One task is being made ready regardless of the queue it's in.
245     size_t readyCount = 1;
246     {
247         LockHolder lh(mutex);
248         LOG(EXTENSION_LOG_DEBUG,
249             "%s: Wake a task \"%.*s\" id %" PRIu64,
250             name.c_str(),
251             int(task->getDescription().size()),
252             task->getDescription().data(),
253             uint64_t(task->getId()));
254 
255         std::queue<ExTask> notReady;
256         // Wake thread-count-serialized tasks too
257         for (std::list<ExTask>::iterator it = pendingQueue.begin();
258              it != pendingQueue.end();) {
259             ExTask tid = *it;
260             if (tid->getId() == task->getId() || tid->isdead()) {
261                 notReady.push(tid);
262                 it = pendingQueue.erase(it);
263             } else {
264                 it++;
265             }
266         }
267 
268         futureQueue.updateWaketime(task, now);
269         task->setState(TASK_RUNNING, TASK_SNOOZED);
270 
271         while (!notReady.empty()) {
272             ExTask tid = notReady.front();
273             if (tid->getWaketime() <= now || tid->isdead()) {
274                 readyCount++;
275             }
276 
277             // MB-18453: Only push to the futureQueue
278             futureQueue.push(tid);
279             notReady.pop();
280         }
281 
282         _doWake_UNLOCKED(readyCount);
283         sleepQ = manager->getSleepQ(queueType);
284     }
285     if (this != sleepQ) {
286         sleepQ->doWake(readyCount);
287     }
288 }
289 
wake(ExTask &task)290 void TaskQueue::wake(ExTask &task) {
291     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
292     _wake(task);
293     ObjectRegistry::onSwitchThread(epe);
294 }
295 
taskType2Str(task_type_t type)296 const std::string TaskQueue::taskType2Str(task_type_t type) {
297     switch (type) {
298     case WRITER_TASK_IDX:
299         return std::string("Writer");
300     case READER_TASK_IDX:
301         return std::string("Reader");
302     case AUXIO_TASK_IDX:
303         return std::string("AuxIO");
304     case NONIO_TASK_IDX:
305         return std::string("NonIO");
306     default:
307         return std::string("None");
308     }
309 }
310