1 /*
2  *     Copyright 2014 Couchbase, Inc.
3  *
4  *   Licensed under the Apache License, Version 2.0 (the "License");
5  *   you may not use this file except in compliance with the License.
6  *   You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *   Unless required by applicable law or agreed to in writing, software
11  *   distributed under the License is distributed on an "AS IS" BASIS,
12  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *   See the License for the specific language governing permissions and
14  *   limitations under the License.
15  */
16 
17 /*
18  * === High-level overview of the task execution system. ===
19  *
20  * ExecutorPool is the core interface for users wishing to run tasks on our
21  * worker threads.
22  *
23  * Under the covers we have a configurable number of system threads that are
24  * labeled with a type (see task_type_t). These threads service all buckets.
25  *
26  * Each thread operates by reading from a shared TaskQueue. Each thread wakes
27  * up and fetches (TaskQueue::fetchNextTask) a task for execution
28  * (GlobalTask::run() is called to execute the task).
29  *
30  * The pool also has the concept of high and low priority which is achieved by
31  * having two TaskQueue objects per task-type. When a thread wakes up to run
32  * a task, it will service the high-priority queue more frequently than the
33  * low-priority queue.
34  *
35  * Within a single queue itself there is also a task priority. The task priority
36  * is a value where lower is better. When many tasks are ready for execution
37  * they are moved to a ready queue and sorted by their priority. Thus tasks
38  * with priority 0 get to go before tasks with priority 1. Only once the ready
39  * queue of tasks is empty will we consider looking for more eligible tasks.
40  * In this context, an eligible task is one that has a wakeTime <= now.
41  *
42  * === Important methods of the ExecutorPool ===
43  *
44  * ExecutorPool* ExecutorPool::get()
45  *   The ExecutorPool is accessed via the static get() method. Calling get
46  *   returns the processes global ExecutorPool object. This is an instance
47  *   that is global/shared between all buckets.
48  *
49  * ExecutorPool::schedule(ExTask task, task_type_t qidx)
50  *   The schedule method allows task to be scheduled for future execution by a
51  *   thread of type 'qidx'. The task's 'wakeTime' determines approximately when
52  *   the task will be executed (no guarantees).
53  *
54  * ExecutorPool::wake(size_t taskId)
55  *   The wake method allows for a caller to request that the task matching
56  *   taskId be executed by its thread-type now'. The tasks wakeTime is modified
57  *   so that it has a wakeTime of now and a thread of the correct type is
58  *   signaled to wake-up and perform fetching. The woken task will have to wait
59  *   for any current tasks to be executed first, but it will jump ahead of other
60  *   tasks as tasks that are ready to run are ordered by their priority.
61  *
62  * ExecutorPool::snooze(size_t taskId, double toSleep)
63  *   The pool's snooze method will locate the task matching taskId and adjust
64  *   its wakeTime to account for the toSleep value.
65  */
66 #ifndef SRC_EXECUTORPOOL_H_
67 #define SRC_EXECUTORPOOL_H_ 1
68 
69 #include "config.h"
70 
71 #include "syncobject.h"
72 #include "task_type.h"
73 #include "taskable.h"
74 
75 #include <memcached/engine.h>
76 #include <map>
77 #include <set>
78 
79 // Forward decl
80 class TaskQueue;
81 class ExecutorThread;
82 
83 typedef std::vector<ExecutorThread *> ThreadQ;
84 typedef std::pair<ExTask, TaskQueue *> TaskQpair;
85 typedef std::vector<TaskQueue *> TaskQ;
86 
87 class ExecutorPool {
88 public:
89 
90     void addWork(size_t newWork, task_type_t qType);
91 
92     void lessWork(task_type_t qType);
93 
94     void startWork(task_type_t taskType);
95 
96     void doneWork(task_type_t taskType);
97 
trySleep(task_type_t task_type)98     bool trySleep(task_type_t task_type) {
99         if (!numReadyTasks[task_type]) {
100             numSleepers++;
101             return true;
102         }
103         return false;
104     }
105 
woke(void)106     void woke(void) {
107         numSleepers--;
108     }
109 
110     TaskQueue *nextTask(ExecutorThread &t, uint8_t tick);
111 
getSleepQ(unsigned int curTaskType)112     TaskQueue *getSleepQ(unsigned int curTaskType) {
113         return isHiPrioQset ? hpTaskQ[curTaskType] : lpTaskQ[curTaskType];
114     }
115 
116     bool cancel(size_t taskId, bool eraseTask=false);
117 
118     bool stopTaskGroup(task_gid_t taskGID, task_type_t qidx, bool force);
119 
120     bool wake(size_t taskId);
121 
122     /**
123      * Change how many worker threads there are for a given task type,
124      * stopping/starting threads to reach the desired number.
125      *
126      * @param type the type of task for which to adjust the workers
127      * @param newCount Target number of worker threads
128      */
129     void adjustWorkers(task_type_t type, size_t newCount);
130 
131     bool snooze(size_t taskId, double tosleep);
132 
133     void registerTaskable(Taskable& taskable);
134 
135     void unregisterTaskable(Taskable& taskable, bool force);
136 
137     void doWorkerStat(EventuallyPersistentEngine *engine, const void *cookie,
138                       ADD_STAT add_stat);
139 
140     /**
141      * Generates stats regarding currently running tasks, as displayed by
142      * cbstats tasks.
143      */
144     void doTasksStat(EventuallyPersistentEngine* engine,
145                      const void* cookie,
146                      ADD_STAT add_stat);
147 
148     void doTaskQStat(EventuallyPersistentEngine *engine, const void *cookie,
149                      ADD_STAT add_stat);
150 
getNumWorkersStat(void)151     size_t getNumWorkersStat(void) {
152         LockHolder lh(tMutex);
153         return threadQ.size();
154     }
155 
156     size_t getNumReaders(void);
157 
158     size_t getNumWriters(void);
159 
160     size_t getNumAuxIO(void);
161 
162     size_t getNumNonIO(void);
163 
getMaxReaders(void)164     size_t getMaxReaders(void) {
165         return numWorkers[READER_TASK_IDX];
166     }
167 
getMaxWriters(void)168     size_t getMaxWriters(void) {
169         return numWorkers[WRITER_TASK_IDX];
170     }
171 
getMaxAuxIO(void)172     size_t getMaxAuxIO(void) {
173         return numWorkers[AUXIO_TASK_IDX];
174     }
175 
getMaxNonIO(void)176     size_t getMaxNonIO(void) {
177         return numWorkers[NONIO_TASK_IDX];
178     }
179 
setNumReaders(uint16_t v)180     void setNumReaders(uint16_t v) {
181         adjustWorkers(READER_TASK_IDX, v);
182     }
183 
setNumWriters(uint16_t v)184     void setNumWriters(uint16_t v) {
185         adjustWorkers(WRITER_TASK_IDX, v);
186     }
187 
setNumAuxIO(uint16_t v)188     void setNumAuxIO(uint16_t v) {
189         adjustWorkers(AUXIO_TASK_IDX, v);
190     }
191 
setNumNonIO(uint16_t v)192     void setNumNonIO(uint16_t v) {
193         adjustWorkers(NONIO_TASK_IDX, v);
194     }
195 
getNumReadyTasks(void)196     size_t getNumReadyTasks(void) { return totReadyTasks; }
197 
getNumSleepers(void)198     size_t getNumSleepers(void) { return numSleepers; }
199 
200     size_t schedule(ExTask task);
201 
202     static ExecutorPool *get(void);
203 
204     static void shutdown(void);
205 
206 protected:
207 
208     ExecutorPool(size_t t, size_t nTaskSets, size_t r, size_t w, size_t a,
209                  size_t n);
210     virtual ~ExecutorPool(void);
211 
212     TaskQueue* _nextTask(ExecutorThread &t, uint8_t tick);
213     bool _cancel(size_t taskId, bool eraseTask=false);
214     bool _wake(size_t taskId);
215     virtual bool _startWorkers(void);
216 
217     /**
218      * Change the number of worked threads.
219      *
220      * @param type Thread type to change
221      * @param desiredNumItems Number of threads we want to result in.
222      * @return How many threads have been created (+ve) / destroyed (-ve)
223      */
224     ssize_t _adjustWorkers(task_type_t type, size_t desiredNumItems);
225 
226     bool _snooze(size_t taskId, double tosleep);
227     size_t _schedule(ExTask task);
228     void _registerTaskable(Taskable& taskable);
229     void _unregisterTaskable(Taskable& taskable, bool force);
230     bool _stopTaskGroup(task_gid_t taskGID, task_type_t qidx, bool force);
231     TaskQueue* _getTaskQueue(const Taskable& t, task_type_t qidx);
232     void _stopAndJoinThreads();
233 
234     size_t numTaskSets; // safe to read lock-less not altered after creation
235     size_t maxGlobalThreads;
236 
237     std::atomic<size_t> totReadyTasks;
238     SyncObject mutex; // Thread management condition var + mutex
239 
240     //! A mapping of task ids to Task, TaskQ in the thread pool
241     std::map<size_t, TaskQpair> taskLocator;
242 
243     //A list of threads
244     ThreadQ threadQ;
245 
246     // Global cross bucket priority queues where tasks get scheduled into ...
247     TaskQ hpTaskQ; // a vector array of numTaskSets elements for high priority
248     bool isHiPrioQset;
249 
250     TaskQ lpTaskQ; // a vector array of numTaskSets elements for low priority
251     bool isLowPrioQset;
252 
253     size_t numBuckets;
254 
255     SyncObject tMutex; // to serialize taskLocator, threadQ, numBuckets access
256 
257     std::atomic<uint16_t> numSleepers; // total number of sleeping threads
258     std::vector<std::atomic<uint16_t>> curWorkers; // track # of active workers per TaskSet
259     std::vector<std::atomic<uint16_t>> numWorkers; // and limit it to the value set here
260     std::vector<std::atomic<size_t>> numReadyTasks; // number of ready tasks per task set
261 
262     // Set of all known task owners
263     std::set<void *> taskOwners;
264 
265     // Singleton creation
266     static std::mutex initGuard;
267     static std::atomic<ExecutorPool*> instance;
268 };
269 #endif  // SRC_EXECUTORPOOL_H_
270