1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef SRC_TASKS_H_
19 #define SRC_TASKS_H_ 1
20 
21 #include "config.h"
22 
23 #include <list>
24 #include <string>
25 #include <utility>
26 
27 #include "atomic.h"
28 #include "priority.h"
29 
30 typedef enum {
31     TASK_RUNNING,
32     TASK_SNOOZED,
33     TASK_DEAD
34 } task_state_t;
35 
36 class BgFetcher;
37 class CompareTasksByDueDate;
38 class CompareTasksByPriority;
39 class EventuallyPersistentEngine;
40 class Flusher;
41 class Warmup;
42 
43 /**
44  * Compaction context to perform compaction
45  */
46 
47 typedef struct {
48     uint64_t revSeqno;
49     std::string keyStr;
50 } expiredItemCtx;
51 
52 typedef struct {
53     uint64_t purge_before_ts;
54     uint64_t purge_before_seq;
55     uint8_t  drop_deletes;
56     uint64_t max_purged_seq;
57     uint32_t curr_time;
58     std::list<expiredItemCtx> expiredItems;
59 } compaction_ctx;
60 
61 class GlobalTask : public RCValue {
62 friend class CompareByDueDate;
63 friend class CompareByPriority;
64 friend class ExecutorPool;
65 friend class ExecutorThread;
66 friend class TaskQueue;
67 public:
GlobalTask(EventuallyPersistentEngine *e, const Priority &p, double sleeptime = 0, bool completeBeforeShutdown = true)68     GlobalTask(EventuallyPersistentEngine *e, const Priority &p,
69                double sleeptime = 0, bool completeBeforeShutdown = true) :
70           RCValue(), priority(p),
71           blockShutdown(completeBeforeShutdown),
72           state(TASK_RUNNING), taskId(nextTaskId()), engine(e) {
73         snooze(sleeptime);
74     }
75 
76     /* destructor */
~GlobalTask(void)77     virtual ~GlobalTask(void) {
78     }
79 
80     /**
81      * The invoked function when the task is executed.
82      *
83      * @return Whether or not this task should be rescheduled
84      */
85     virtual bool run(void) = 0;
86 
87     /**
88      * Gives a description of this task.
89      *
90      * @return A description of this task
91      */
92     virtual std::string getDescription(void) = 0;
93 
maxExpectedDuration(void)94     virtual int maxExpectedDuration(void) {
95         return 3600;
96     }
97 
98     /**
99      * test if a task is dead
100      */
isdead(void)101      bool isdead(void) {
102         return (state == TASK_DEAD);
103      }
104 
105 
106     /**
107      * Cancels this task by marking it dead.
108      */
cancel(void)109     void cancel(void) {
110         state = TASK_DEAD;
111     }
112 
113     /**
114      * Puts the task to sleep for a given duration.
115      */
116     void snooze(const double secs);
117 
118     /**
119      * Returns the id of this task.
120      *
121      * @return A unique task id number.
122      */
getId()123     size_t getId() { return taskId; }
124 
125     /**
126      * Returns the type id of this task.
127      *
128      * @return A type id of the task.
129      */
getTypeId()130     type_id_t getTypeId() { return priority.getTypeId(); }
131 
132     /**
133      * Gets the engine that this task was scheduled from
134      *
135      * @returns A handle to the engine
136      */
getEngine()137     EventuallyPersistentEngine* getEngine() { return engine; }
138 
getState(void)139     task_state_t getState(void) {
140         return state.load();
141     }
142 
setState(task_state_t tstate, task_state_t expected)143     void setState(task_state_t tstate, task_state_t expected) {
144         state.compare_exchange_strong(expected, tstate);
145     }
146 
147 protected:
148 
149     const Priority &priority;
150     bool blockShutdown;
151     AtomicValue<task_state_t> state;
152     const size_t taskId;
153     struct timeval waketime;
154     EventuallyPersistentEngine *engine;
155 
156     static AtomicValue<size_t> task_id_counter;
nextTaskId()157     static size_t nextTaskId() { return task_id_counter.fetch_add(1); }
158 };
159 
160 typedef SingleThreadedRCPtr<GlobalTask> ExTask;
161 
162 /**
163  * A task for persisting items to disk.
164  */
165 class FlusherTask : public GlobalTask {
166 public:
FlusherTask(EventuallyPersistentEngine *e, Flusher* f, const Priority &p, uint16_t shardid, bool completeBeforeShutdown = true)167     FlusherTask(EventuallyPersistentEngine *e, Flusher* f, const Priority &p,
168                 uint16_t shardid, bool completeBeforeShutdown = true) :
169                 GlobalTask(e, p, 0, completeBeforeShutdown), flusher(f) {
170         std::stringstream ss;
171         ss<<"Running a flusher loop: shard "<<shardid;
172         desc = ss.str();
173     }
174 
175     bool run();
176 
getDescription()177     std::string getDescription() {
178         return desc;
179     }
180 
181 private:
182     Flusher* flusher;
183     std::string desc;
184 };
185 
186 /**
187  * A task for persisting VBucket state changes to disk and creating new
188  * VBucket database files.
189  * sid (shard ID) passed on to GlobalTask indicates that task needs to be
190  *     serialized with other tasks that require serialization on its shard
191  */
192 class VBSnapshotTask : public GlobalTask {
193 public:
VBSnapshotTask(EventuallyPersistentEngine *e, const Priority &p, uint16_t sID = 0, bool completeBeforeShutdown = true)194     VBSnapshotTask(EventuallyPersistentEngine *e, const Priority &p,
195                 uint16_t sID = 0, bool completeBeforeShutdown = true) :
196                 GlobalTask(e, p, 0, completeBeforeShutdown), shardID(sID) {
197         std::stringstream ss;
198         ss<<"Snapshotting vbucket states for the shard: "<<shardID;
199         desc = ss.str();
200     }
201 
202     bool run();
203 
getDescription()204     std::string getDescription() {
205         return desc;
206     }
207 
208 private:
209     uint16_t shardID;
210     std::string desc;
211 };
212 
213 /**
214  * A daemon task for persisting VBucket state changes to disk periodically.
215  */
216 class DaemonVBSnapshotTask : public GlobalTask {
217 public:
218     DaemonVBSnapshotTask(EventuallyPersistentEngine *e,
219                          bool completeBeforeShutdown = true);
220 
221     bool run();
222 
getDescription()223     std::string getDescription() {
224         return desc;
225     }
226 
227 private:
228     std::string desc;
229 };
230 
231 /**
232  * A task for persisting a VBucket state to disk and creating a vbucket
233  * database file if necessary.
234  */
235 class VBStatePersistTask : public GlobalTask {
236 public:
VBStatePersistTask(EventuallyPersistentEngine *e, const Priority &p, uint16_t vbucket, bool completeBeforeShutdown = true)237     VBStatePersistTask(EventuallyPersistentEngine *e, const Priority &p,
238                        uint16_t vbucket, bool completeBeforeShutdown = true) :
239         GlobalTask(e, p, 0, completeBeforeShutdown), vbid(vbucket) {
240         std::stringstream ss;
241         ss<<"Persisting a vbucket state for vbucket: "<< vbid;
242         desc = ss.str();
243     }
244 
245     bool run();
246 
getDescription()247     std::string getDescription() {
248         return desc;
249     }
250 
251 private:
252     uint16_t vbid;
253     std::string desc;
254 };
255 
256 /**
257  * A task for deleting VBucket files from disk and cleaning up any outstanding
258  * writes for that VBucket file.
259  * sid (shard ID) passed on to GlobalTask indicates that task needs to be
260  *     serialized with other tasks that require serialization on its shard
261  */
262 class VBDeleteTask : public GlobalTask {
263 public:
VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c, const Priority &p, bool completeBeforeShutdown = true)264     VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c,
265                  const Priority &p, bool completeBeforeShutdown = true) :
266         GlobalTask(e, p, 0, completeBeforeShutdown),
267         vbucketId(vbid), cookie(c) { }
268 
269     bool run();
270 
getDescription()271     std::string getDescription() {
272         std::stringstream ss;
273         ss<<"Deleting VBucket:"<<vbucketId;
274         return ss.str();
275     }
276 
277 private:
278     uint16_t vbucketId;
279     const void* cookie;
280 };
281 
282 /**
283  * A task for compacting a vbucket db file
284  */
285 class CompactVBucketTask : public GlobalTask {
286 public:
CompactVBucketTask(EventuallyPersistentEngine *e, const Priority &p, uint16_t vbucket, compaction_ctx c, const void *ck, bool completeBeforeShutdown = true)287     CompactVBucketTask(EventuallyPersistentEngine *e, const Priority &p,
288                 uint16_t vbucket, compaction_ctx c, const void *ck,
289                 bool completeBeforeShutdown = true) :
290                 GlobalTask(e, p, 0, completeBeforeShutdown),
291                            vbid(vbucket), compactCtx(c), cookie(ck)
292     {
293         std::stringstream ss;
294         ss<<"Compact VBucket "<<vbid;
295         desc = ss.str();
296     }
297 
298     bool run();
299 
getDescription()300     std::string getDescription() {
301         return desc;
302     }
303 
304 private:
305     uint16_t vbid;
306     compaction_ctx compactCtx;
307     const void* cookie;
308     std::string desc;
309 };
310 
311 /**
312  * A task that periodically takes a snapshot of the stats and persists them to
313  * disk.
314  */
315 class StatSnap : public GlobalTask {
316 public:
StatSnap(EventuallyPersistentEngine *e, const Priority &p, bool runOneTimeOnly = false, bool sleeptime = 0, bool shutdown = false)317     StatSnap(EventuallyPersistentEngine *e, const Priority &p,
318              bool runOneTimeOnly = false, bool sleeptime = 0,
319              bool shutdown = false) :
320         GlobalTask(e, p, sleeptime, shutdown), runOnce(runOneTimeOnly) { }
321 
322     bool run();
323 
getDescription()324     std::string getDescription() {
325         std::string rv("Updating stat snapshot on disk");
326         return rv;
327     }
328 
329 private:
330     bool runOnce;
331 };
332 
333 /**
334  * A task for fetching items from disk.
335  */
336 class BgFetcherTask : public GlobalTask {
337 public:
BgFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b, const Priority &p, bool sleeptime = 0, bool shutdown = false)338     BgFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b,
339                   const Priority &p, bool sleeptime = 0, bool shutdown = false)
340         : GlobalTask(e, p, sleeptime, shutdown), bgfetcher(b) { }
341 
342     bool run();
343 
getDescription()344     std::string getDescription() {
345         return std::string("Batching background fetch");
346     }
347 
348 private:
349     BgFetcher *bgfetcher;
350 };
351 
352 /**
353  * A task for performing disk fetches for "stats vkey".
354  */
355 class VKeyStatBGFetchTask : public GlobalTask {
356 public:
VKeyStatBGFetchTask(EventuallyPersistentEngine *e, const std::string &k, uint16_t vbid, uint64_t s, const void *c, const Priority &p, int sleeptime = 0, bool shutdown = false)357     VKeyStatBGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
358                         uint16_t vbid, uint64_t s, const void *c,
359                         const Priority &p, int sleeptime = 0,
360                         bool shutdown = false) :
361         GlobalTask(e, p, sleeptime, shutdown), key(k),
362                    vbucket(vbid), bySeqNum(s), cookie(c) { }
363 
364     bool run();
365 
getDescription()366     std::string getDescription() {
367         std::stringstream ss;
368         ss << "Fetching item from disk for vkey stat:  " << key<<" vbucket "
369            <<vbucket;
370         return ss.str();
371     }
372 
373 private:
374     std::string                      key;
375     uint16_t                         vbucket;
376     uint64_t                         bySeqNum;
377     const void                      *cookie;
378 };
379 
380 /**
381  * A task that performs disk fetches for non-resident get requests.
382  */
383 class BGFetchTask : public GlobalTask {
384 public:
BGFetchTask(EventuallyPersistentEngine *e, const std::string &k, uint16_t vbid, uint64_t s, const void *c, bool isMeta, const Priority &p, int sleeptime = 0, bool shutdown = false)385     BGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
386             uint16_t vbid, uint64_t s, const void *c, bool isMeta,
387             const Priority &p, int sleeptime = 0, bool shutdown = false) :
388         GlobalTask(e, p, sleeptime, shutdown), key(k), vbucket(vbid),
389         seqNum(s), cookie(c), metaFetch(isMeta), init(gethrtime()) { }
390 
391     bool run();
392 
getDescription()393     std::string getDescription() {
394         std::stringstream ss;
395         ss << "Fetching item from disk:  " << key<<" vbucket "<<vbucket;
396         return ss.str();
397     }
398 
399 private:
400     const std::string          key;
401     uint16_t                   vbucket;
402     uint64_t                   seqNum;
403     const void                *cookie;
404     bool                       metaFetch;
405     hrtime_t                   init;
406 };
407 
408 /**
409  * A task that monitors if a bucket is read-heavy, write-heavy, or mixed.
410  */
411 class WorkLoadMonitor : public GlobalTask {
412 public:
413     WorkLoadMonitor(EventuallyPersistentEngine *e,
414                     bool completeBeforeShutdown = false);
415 
416     bool run();
417 
getDescription()418     std::string getDescription() {
419         return desc;
420     }
421 
422 private:
423 
424     size_t getNumMutations();
425     size_t getNumGets();
426 
427     size_t prevNumMutations;
428     size_t prevNumGets;
429     std::string desc;
430 };
431 
432 /**
433  * Order tasks by their priority and taskId (try to ensure FIFO)
434  */
435 class CompareByPriority {
436 public:
operator ()(ExTask &t1, ExTask &t2)437     bool operator()(ExTask &t1, ExTask &t2) {
438         return (t1->priority == t2->priority) ?
439                (t1->taskId   > t2->taskId)    :
440                (t1->priority < t2->priority);
441     }
442 };
443 
444 /**
445  * Order tasks by their ready date.
446  */
447 class CompareByDueDate {
448 public:
operator ()(ExTask &t1, ExTask &t2)449     bool operator()(ExTask &t1, ExTask &t2) {
450         return less_tv(t2->waketime, t1->waketime);
451     }
452 };
453 
454 #endif  // SRC_TASKS_H_
455