xref: /4.6.4/ep-engine/src/tasks.h (revision 7b2c0f63)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_TASKS_H_
19#define SRC_TASKS_H_ 1
20
21#include "config.h"
22
23#include <platform/processclock.h>
24
25#include <array>
26#include <chrono>
27#include <string>
28#include "atomic.h"
29#include "kvstore.h"
30#include "daemon/buffer.h"
31
32enum task_state_t {
33    TASK_RUNNING,
34    TASK_SNOOZED,
35    TASK_DEAD
36};
37
38enum class TaskId : int {
39#define TASK(name, prio) name,
40#include "tasks.def.h"
41#undef TASK
42    TASK_COUNT
43};
44
45typedef int queue_priority_t;
46
47enum class TaskPriority : int {
48#define TASK(name, prio) name = prio,
49#include "tasks.def.h"
50#undef TASK
51    PRIORITY_COUNT
52};
53
54class BgFetcher;
55class CompareTasksByDueDate;
56class CompareTasksByPriority;
57class EventuallyPersistentEngine;
58class Flusher;
59class Warmup;
60class Taskable;
61class VBucket;
62
63class GlobalTask : public RCValue {
64friend class CompareByDueDate;
65friend class CompareByPriority;
66friend class ExecutorPool;
67friend class ExecutorThread;
68friend class TaskQueue;
69public:
70
71    GlobalTask(Taskable& t,
72               TaskId taskId,
73               double sleeptime = 0,
74               bool completeBeforeShutdown = true);
75
76    GlobalTask(EventuallyPersistentEngine *e,
77               TaskId taskId,
78               double sleeptime = 0,
79               bool completeBeforeShutdown = true);
80
81    /* destructor */
82    virtual ~GlobalTask(void) {}
83
84    /**
85     * The invoked function when the task is executed.
86     *
87     * @return Whether or not this task should be rescheduled
88     */
89    virtual bool run(void) = 0;
90
91    /**
92     * Gives a description of this task.
93     *
94     * @return A description of this task
95     */
96    virtual std::string getDescription(void) = 0;
97
98    virtual int maxExpectedDuration(void) {
99        return 3600;
100    }
101
102    /**
103     * test if a task is dead
104     */
105    bool isdead(void) {
106        return (state == TASK_DEAD);
107    }
108
109    /**
110     * Cancels this task by marking it dead.
111     */
112    void cancel(void) {
113        state = TASK_DEAD;
114    }
115
116    /**
117     * Puts the task to sleep for a given duration.
118     */
119    virtual void snooze(const double secs);
120
121    /**
122     * Returns the id of this task.
123     *
124     * @return A unique task id number.
125     */
126    size_t getId() const { return uid; }
127
128    /**
129     * Returns the type id of this task.
130     *
131     * @return A type id of the task.
132     */
133    TaskId getTypeId() const { return typeId; }
134
135    /**
136     * Gets the engine that this task was scheduled from
137     *
138     * @returns A handle to the engine
139     */
140    EventuallyPersistentEngine* getEngine() { return engine; }
141
142    task_state_t getState(void) {
143        return state.load();
144    }
145
146    void setState(task_state_t tstate, task_state_t expected) {
147        state.compare_exchange_strong(expected, tstate);
148    }
149
150    Taskable& getTaskable() {
151        return taskable;
152    }
153
154    queue_priority_t getQueuePriority() const {
155        return static_cast<queue_priority_t>(priority);
156    }
157
158    /*
159     * Lookup the task name for TaskId id.
160     * The data used is generated from tasks.def.h
161     */
162    static const char* getTaskName(TaskId id);
163
164    /*
165     * Lookup the task priority for TaskId id.
166     * The data used is generated from tasks.def.h
167     */
168    static TaskPriority getTaskPriority(TaskId id);
169
170    /*
171     * A vector of all TaskId generated from tasks.def.h
172     */
173    static std::array<TaskId, static_cast<int>(TaskId::TASK_COUNT)> allTaskIds;
174
175protected:
176    bool blockShutdown;
177    AtomicValue<task_state_t> state;
178    const size_t uid;
179    TaskId typeId;
180    TaskPriority priority;
181    EventuallyPersistentEngine *engine;
182    Taskable& taskable;
183
184    static AtomicValue<size_t> task_id_counter;
185    static size_t nextTaskId() { return task_id_counter.fetch_add(1); }
186
187    ProcessClock::time_point getWaketime() const {
188        const auto waketime_chrono = std::chrono::nanoseconds(waketime);
189        return ProcessClock::time_point(waketime_chrono);
190    }
191
192    void updateWaketime(const ProcessClock::time_point tp) {
193        waketime = to_ns_since_epoch(tp).count();
194    }
195
196    void updateWaketimeIfLessThan(const ProcessClock::time_point tp) {
197        const auto tp_ns = to_ns_since_epoch(tp).count();
198        atomic_setIfBigger(waketime, tp_ns);
199    }
200
201private:
202    /**
203     * We are using a uint64_t as opposed to ProcessTime::time_point because
204     * was want the access to be atomic without the use of a mutex.
205     * The reason for this is that the CompareByDueDate function has been shown
206     * to be pretty hot and we want to avoid the overhead of acquiring
207     * two mutexes (one for ExTask 1 and one for ExTask 2) for every invocation
208     * of the function.
209     */
210    std::atomic<int64_t> waketime; // used for priority_queue
211};
212
213typedef SingleThreadedRCPtr<GlobalTask> ExTask;
214
215/**
216 * A task for persisting items to disk.
217 */
218class FlusherTask : public GlobalTask {
219public:
220    FlusherTask(EventuallyPersistentEngine *e, Flusher* f, uint16_t shardid,
221                bool completeBeforeShutdown = true)
222        : GlobalTask(e, TaskId::FlusherTask, 0, completeBeforeShutdown),
223          flusher(f) {
224        std::stringstream ss;
225        ss<<"Running a flusher loop: shard "<<shardid;
226        desc = ss.str();
227    }
228
229    bool run();
230
231    std::string getDescription() {
232        return desc;
233    }
234
235private:
236    Flusher* flusher;
237    std::string desc;
238};
239
240/**
241 * A task for deleting VBucket files from disk and cleaning up any outstanding
242 * writes for that VBucket file.
243 * sid (shard ID) passed on to GlobalTask indicates that task needs to be
244 *     serialized with other tasks that require serialization on its shard
245 */
246class VBDeleteTask : public GlobalTask {
247public:
248    VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c,
249                 bool completeBeforeShutdown = true)
250        : GlobalTask(e, TaskId::VBDeleteTask, 0, completeBeforeShutdown),
251          vbucketId(vbid), cookie(c) {}
252
253    bool run();
254
255    std::string getDescription() {
256        std::stringstream ss;
257        ss<<"Deleting VBucket:"<<vbucketId;
258        return ss.str();
259    }
260
261private:
262    uint16_t vbucketId;
263    const void* cookie;
264};
265
266/**
267 * A task for compacting a vbucket db file
268 */
269class CompactTask : public GlobalTask {
270public:
271    CompactTask(EventuallyPersistentEngine *e,
272                compaction_ctx c, const void *ck,
273                bool completeBeforeShutdown = false) :
274                GlobalTask(e, TaskId::CompactVBucketTask, 0, completeBeforeShutdown),
275                           compactCtx(c), cookie(ck) {
276        desc = "Compact DB file " + std::to_string(c.db_file_id);
277    }
278
279    bool run();
280
281    std::string getDescription() {
282        return desc;
283    }
284
285private:
286    compaction_ctx compactCtx;
287    const void* cookie;
288    std::string desc;
289};
290
291/**
292 * A task that periodically takes a snapshot of the stats and persists them to
293 * disk.
294 */
295class StatSnap : public GlobalTask {
296public:
297    StatSnap(EventuallyPersistentEngine *e, bool runOneTimeOnly = false,
298             bool sleeptime = 0, bool completeBeforeShutdown = false)
299        : GlobalTask(e, TaskId::StatSnap, sleeptime, completeBeforeShutdown),
300          runOnce(runOneTimeOnly) {}
301
302    bool run();
303
304    std::string getDescription() {
305        std::string rv("Updating stat snapshot on disk");
306        return rv;
307    }
308
309private:
310    bool runOnce;
311};
312
313/**
314 * A task for fetching items from disk.
315 * This task is used if EventuallyPersistentStore::multiBGFetchEnabled is true.
316 */
317class MultiBGFetcherTask : public GlobalTask {
318public:
319    MultiBGFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b, bool sleeptime = 0,
320                        bool completeBeforeShutdown = false)
321        : GlobalTask(e, TaskId::MultiBGFetcherTask, sleeptime, completeBeforeShutdown),
322          bgfetcher(b) {}
323
324    bool run();
325
326    std::string getDescription() {
327        return std::string("Batching background fetch");
328    }
329
330private:
331    BgFetcher *bgfetcher;
332};
333
334/**
335 * A task that performs the bucket flush operation.
336 */
337class FlushAllTask : public GlobalTask {
338public:
339    FlushAllTask(EventuallyPersistentEngine *e, double when)
340        : GlobalTask(e, TaskId::FlushAllTask, when, false) {}
341
342    bool run();
343
344    std::string getDescription() {
345        std::stringstream ss;
346        ss << "Performing flush_all operation.";
347        return ss.str();
348    }
349};
350
351/**
352 * A task for performing disk fetches for "stats vkey".
353 */
354class VKeyStatBGFetchTask : public GlobalTask {
355public:
356    VKeyStatBGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
357                        uint16_t vbid, uint64_t s, const void *c, int sleeptime = 0,
358                        bool completeBeforeShutdown = false)
359        : GlobalTask(e, TaskId::VKeyStatBGFetchTask, sleeptime, completeBeforeShutdown),
360          key(k),
361          vbucket(vbid),
362          bySeqNum(s),
363          cookie(c) {}
364
365    bool run();
366
367    std::string getDescription() {
368        std::stringstream ss;
369        ss << "Fetching item from disk for vkey stat:  " << key<<" vbucket "
370           <<vbucket;
371        return ss.str();
372    }
373
374private:
375    std::string                      key;
376    uint16_t                         vbucket;
377    uint64_t                         bySeqNum;
378    const void                      *cookie;
379};
380
381/**
382 * A task that performs disk fetches for non-resident get requests.
383 * This task is used if EventuallyPersistentStore::multiBGFetchEnabled is false.
384 */
385class SingleBGFetcherTask : public GlobalTask {
386public:
387    SingleBGFetcherTask(EventuallyPersistentEngine *e, const const_sized_buffer k,
388                       uint16_t vbid, const void *c, bool isMeta,
389                       int sleeptime = 0, bool completeBeforeShutdown = false)
390        : GlobalTask(e, TaskId::SingleBGFetcherTask, sleeptime, completeBeforeShutdown),
391          key(k.data(), k.size()),
392          vbucket(vbid),
393          cookie(c),
394          metaFetch(isMeta),
395          init(gethrtime()) {}
396
397    bool run();
398
399    std::string getDescription() {
400        std::stringstream ss;
401        ss << "Fetching item from disk:  " << key<<" vbucket "<<vbucket;
402        return ss.str();
403    }
404
405private:
406    const std::string          key;
407    uint16_t                   vbucket;
408    const void                *cookie;
409    bool                       metaFetch;
410    hrtime_t                   init;
411};
412
413/*
414 * This is a NONIO task called as part of VB deletion.  The task is responsible
415 * for clearing all the VBucket's pending operations and for clearing the
416 * VBucket's hash table.
417 */
418class VBucketMemoryDeletionTask : public GlobalTask {
419public:
420    VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
421                              RCPtr<VBucket>& vb,
422                              double delay);
423
424    std::string getDescription() {
425        return desc;
426    }
427
428    bool run();
429
430private:
431    EventuallyPersistentEngine& e;
432    RCPtr<VBucket> vbucket;
433    std::string desc;
434};
435
436/**
437 * A task that monitors if a bucket is read-heavy, write-heavy, or mixed.
438 */
439class WorkLoadMonitor : public GlobalTask {
440public:
441    WorkLoadMonitor(EventuallyPersistentEngine *e,
442                    bool completeBeforeShutdown = false);
443
444    bool run();
445
446    std::string getDescription() {
447        return desc;
448    }
449
450private:
451
452    size_t getNumMutations();
453    size_t getNumGets();
454
455    size_t prevNumMutations;
456    size_t prevNumGets;
457    std::string desc;
458};
459
460/**
461 * Order tasks by their priority and taskId (try to ensure FIFO)
462 * @return true if t2 should have priority over t1
463 */
464class CompareByPriority {
465public:
466    bool operator()(ExTask &t1, ExTask &t2) {
467        return (t1->getQueuePriority() == t2->getQueuePriority()) ?
468               (t1->uid > t2->uid) :
469               (t1->getQueuePriority() > t2->getQueuePriority());
470    }
471};
472
473/**
474 * Order tasks by their ready date.
475 * @return true if t2 should have priority over t1
476 */
477class CompareByDueDate {
478public:
479    bool operator()(ExTask &t1, ExTask &t2) {
480        return t2->waketime < t1->waketime;
481    }
482};
483
484#endif  // SRC_TASKS_H_
485