1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #pragma once
19 
20 #include "atomic.h"
21 #include "config.h"
22 #include "task_type.h"
23 
24 #include <platform/processclock.h>
25 #include <platform/sized_buffer.h>
26 #include <array>
27 
28 enum task_state_t {
29     TASK_RUNNING,
30     TASK_SNOOZED,
31     TASK_DEAD
32 };
33 
34 std::string to_string(task_state_t state);
35 
36 enum class TaskId : int {
37 #define TASK(name, type, prio) name,
38 #include "tasks.def.h"
39 #undef TASK
40     TASK_COUNT
41 };
42 
43 typedef int queue_priority_t;
44 
45 enum class TaskPriority : int {
46 #define TASK(name, type, prio) name = prio,
47 #include "tasks.def.h"
48 #undef TASK
49     PRIORITY_COUNT
50 };
51 
52 class Taskable;
53 class EventuallyPersistentEngine;
54 
55 class GlobalTask {
56     friend class CompareByDueDate;
57     friend class CompareByPriority;
58     friend class ExecutorPool;
59     friend class ExecutorThread;
60 public:
61 
62     GlobalTask(Taskable& t,
63                TaskId taskId,
64                double sleeptime = 0,
65                bool completeBeforeShutdown = true);
66 
67     GlobalTask(EventuallyPersistentEngine *e,
68                TaskId taskId,
69                double sleeptime = 0,
70                bool completeBeforeShutdown = true);
71 
72     /* destructor */
~GlobalTask(void)73     virtual ~GlobalTask(void) {
74     }
75 
76     /**
77      * The invoked function when the task is executed.
78      *
79      * If the task wants to run again, it should return true - it will be
80      * added back onto the ready queue and scheduled according to it's
81      * priority. To run again but at a later time, call snooze() specifying
82      * how long to sleep before it should be scheduled again.
83      * If the task is complete (and should never be run again), return false.
84      *
85      * @return Whether or not this task should be rescheduled
86      */
87     virtual bool run() = 0;
88 
89     /**
90      * Gives a description of this task.
91      *
92      * @return A description of this task
93      */
94     virtual std::string getDescription() = 0;
95 
96     /**
97      * The maximum expected duration of a single execution of this task - i.e.
98      * how long should run() take.
99      * Any task executions taking longer than this to run will be logged as
100      * "slow".
101      *
102      * Exact values will vary significantly depending on the class of task;
103      * however here are some general principles:
104      *
105      *   1. Our scheduler is non-preemptive, so a long-running task *cannot* be
106      *      interrupted to allow another (possibly higher priority) task to run.
107      *      As such, tasks in general should aim to only run for a brief
108      *      duration at a time; for example at most 25ms (typical OS scheduler
109      *      time-slice).
110      *   2. Select a suitable limit for the given task - if a task is expected
111      *      to complete in 1us; it isn't very useful to specify a limit of 25ms
112      *      as we will fail to log any executions which are abnormally slow
113      *      (even if they arn't causing scheduling issues for other tasks).
114      *   3. Tasks which block other tasks while they run should aim to minimize
115      *      their runtime - 25ms would be a significant delay to add to
116      *      front-end operations if a particular task (e.g. HashTableResizer)
117      *      blocks FE operations while running.
118      *   4. One-off, startup tasks (e.g. Warmup) can take as long as necessary -
119      *      given they must run before we can operate their duration isn't
120      *      critical.
121      */
122     virtual std::chrono::microseconds maxExpectedDuration() = 0;
123 
124     /**
125      * test if a task is dead
126      */
isdead(void)127      bool isdead(void) {
128         return (state == TASK_DEAD);
129      }
130 
131 
132     /**
133      * Cancels this task by marking it dead.
134      */
cancel(void)135     void cancel(void) {
136         state = TASK_DEAD;
137     }
138 
139     /**
140      * Puts the task to sleep for a given duration.
141      */
142     virtual void snooze(const double secs);
143 
144     /// Wake up a task, setting it to run as soon as possible.
145     void wakeUp();
146 
147     /**
148      * Returns the id of this task.
149      *
150      * @return A unique task id number.
151      */
getId() const152     size_t getId() const { return uid; }
153 
154     /**
155      * Returns the id of this task.
156      *
157      * @return The id of this task.
158      */
getTaskId()159     TaskId getTaskId() {
160         return taskId;
161     }
162 
163     /**
164      * Gets the engine that this task was scheduled from
165      *
166      * @returns A handle to the engine
167      */
getEngine()168     EventuallyPersistentEngine* getEngine() { return engine; }
169 
getState(void)170     task_state_t getState(void) {
171         return state.load();
172     }
173 
setState(task_state_t tstate, task_state_t expected)174     void setState(task_state_t tstate, task_state_t expected) {
175         state.compare_exchange_strong(expected, tstate);
176     }
177 
getTaskable() const178     Taskable& getTaskable() const {
179         return taskable;
180     }
181 
getWaketime() const182     ProcessClock::time_point getWaketime() const {
183         const auto waketime_chrono = std::chrono::nanoseconds(waketime);
184         return ProcessClock::time_point(waketime_chrono);
185     }
186 
updateWaketime(ProcessClock::time_point tp)187     void updateWaketime(ProcessClock::time_point tp) {
188         waketime = to_ns_since_epoch(tp).count();
189     }
190 
updateWaketimeIfLessThan(ProcessClock::time_point tp)191     void updateWaketimeIfLessThan(ProcessClock::time_point tp) {
192         const int64_t tp_ns = to_ns_since_epoch(tp).count();
193         atomic_setIfBigger(waketime, tp_ns);
194     }
195 
getLastStartTime() const196     ProcessClock::time_point getLastStartTime() const {
197         const auto waketime_chrono = std::chrono::nanoseconds(lastStartTime);
198         return ProcessClock::time_point(waketime_chrono);
199     }
200 
updateLastStartTime(ProcessClock::time_point tp)201     void updateLastStartTime(ProcessClock::time_point tp) {
202         lastStartTime = to_ns_since_epoch(tp).count();
203     }
204 
getTotalRuntime() const205     ProcessClock::duration getTotalRuntime() const {
206         return std::chrono::nanoseconds(totalRuntime);
207     }
208 
getPrevRuntime() const209     ProcessClock::duration getPrevRuntime() const {
210         return std::chrono::nanoseconds(previousRuntime);
211     }
212 
updateRuntime(ProcessClock::duration tp)213     void updateRuntime(ProcessClock::duration tp) {
214         int64_t nanoseconds =
215                 std::chrono::duration_cast<std::chrono::nanoseconds>(tp)
216                         .count();
217         totalRuntime += nanoseconds;
218         previousRuntime = nanoseconds;
219     }
220 
getQueuePriority() const221     queue_priority_t getQueuePriority() const {
222         return static_cast<queue_priority_t>(priority);
223     }
224 
225     /*
226      * Lookup the task name for TaskId id.
227      * The data used is generated from tasks.def.h
228      */
229     static const char* getTaskName(TaskId id);
230 
231     /*
232      * Lookup the task priority for TaskId id.
233      * The data used is generated from tasks.def.h
234      */
235     static TaskPriority getTaskPriority(TaskId id);
236 
237     /*
238      * Lookup the task type for TaskId id.
239      * The data used is generated from tasks.def.h
240      */
241     static task_type_t getTaskType(TaskId id);
242 
243     /*
244      * A vector of all TaskId generated from tasks.def.h
245      */
246     static std::array<TaskId, static_cast<int>(TaskId::TASK_COUNT)> allTaskIds;
247 
248 protected:
249     /**
250      * We are using a int64_t as opposed to ProcessTime::time_point because we
251      * want the access to be atomic without the use of a mutex. The reason for
252      * this is that we update these timepoints in locations which have been
253      * shown to be pretty hot (e.g. CompareByDueDate) and we want to avoid
254      * the overhead of acquiring mutexes.
255      */
256     using atomic_time_point = std::atomic<int64_t>;
257     using atomic_duration = std::atomic<int64_t>;
258     bool blockShutdown;
259     std::atomic<task_state_t> state;
260     const size_t uid;
261     const TaskId taskId;
262     TaskPriority priority;
263     EventuallyPersistentEngine *engine;
264     Taskable& taskable;
265 
266     static std::atomic<size_t> task_id_counter;
nextTaskId()267     static size_t nextTaskId() { return task_id_counter.fetch_add(1); }
268 
269     atomic_duration totalRuntime;
270     atomic_duration previousRuntime;
271     atomic_time_point lastStartTime;
272 
273 private:
274     atomic_time_point waketime; // used for priority_queue
275 };
276 
277 typedef std::shared_ptr<GlobalTask> ExTask;
278 
279 /**
280  * Order tasks by their priority and taskId (try to ensure FIFO)
281  * @return true if t2 should have priority over t1
282  */
283 class CompareByPriority {
284 public:
operator ()(ExTask &t1, ExTask &t2)285     bool operator()(ExTask &t1, ExTask &t2) {
286         return (t1->getQueuePriority() == t2->getQueuePriority()) ?
287                (t1->uid > t2->uid) :
288                (t1->getQueuePriority() > t2->getQueuePriority());
289     }
290 };
291 
292 /**
293  * Order tasks by their ready date.
294  * @return true if t2 should have priority over t1
295  */
296 class CompareByDueDate {
297 public:
operator ()(ExTask &t1, ExTask &t2)298     bool operator()(ExTask &t1, ExTask &t2) {
299         return t2->waketime < t1->waketime;
300     }
301 };
302