xref: /4.6.4/ep-engine/src/tasks.h (revision e6286bdc)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_TASKS_H_
19#define SRC_TASKS_H_ 1
20
21#include "config.h"
22
23#include <platform/processclock.h>
24
25#include <array>
26#include <chrono>
27#include <string>
28#include "atomic.h"
29#include "kvstore.h"
30#include "daemon/buffer.h"
31
32enum task_state_t {
33    TASK_RUNNING,
34    TASK_SNOOZED,
35    TASK_DEAD
36};
37
38enum class TaskId : int {
39#define TASK(name, prio) name,
40#include "tasks.def.h"
41#undef TASK
42    TASK_COUNT
43};
44
45typedef int queue_priority_t;
46
47enum class TaskPriority : int {
48#define TASK(name, prio) name = prio,
49#include "tasks.def.h"
50#undef TASK
51    PRIORITY_COUNT
52};
53
54class BgFetcher;
55class CompareTasksByDueDate;
56class CompareTasksByPriority;
57class EventuallyPersistentEngine;
58class Flusher;
59class Warmup;
60class Taskable;
61class VBucket;
62
63class GlobalTask : public RCValue {
64friend class CompareByDueDate;
65friend class CompareByPriority;
66friend class ExecutorPool;
67friend class ExecutorThread;
68friend class TaskQueue;
69public:
70
71    GlobalTask(Taskable& t,
72               TaskId taskId,
73               double sleeptime = 0,
74               bool completeBeforeShutdown = true);
75
76    GlobalTask(EventuallyPersistentEngine *e,
77               TaskId taskId,
78               double sleeptime = 0,
79               bool completeBeforeShutdown = true);
80
81    /* destructor */
82    virtual ~GlobalTask(void) {}
83
84    /**
85     * The invoked function when the task is executed.
86     *
87     * @return Whether or not this task should be rescheduled
88     */
89    virtual bool run(void) = 0;
90
91    /**
92     * Gives a description of this task.
93     *
94     * @return A description of this task
95     */
96    virtual std::string getDescription(void) = 0;
97
98    /**
99     * The maximum expected duration of this task. Any tasks taking longer than
100     * this to run will be logged as "slow".
101     */
102    virtual std::chrono::microseconds maxExpectedDuration() {
103        return std::chrono::microseconds(3600);
104    }
105
106    /**
107     * test if a task is dead
108     */
109    bool isdead(void) {
110        return (state == TASK_DEAD);
111    }
112
113    /**
114     * Cancels this task by marking it dead.
115     */
116    void cancel(void) {
117        state = TASK_DEAD;
118    }
119
120    /**
121     * Puts the task to sleep for a given duration.
122     */
123    virtual void snooze(const double secs);
124
125    /**
126     * Returns the id of this task.
127     *
128     * @return A unique task id number.
129     */
130    size_t getId() const { return uid; }
131
132    /**
133     * Returns the type id of this task.
134     *
135     * @return A type id of the task.
136     */
137    TaskId getTypeId() const { return typeId; }
138
139    /**
140     * Gets the engine that this task was scheduled from
141     *
142     * @returns A handle to the engine
143     */
144    EventuallyPersistentEngine* getEngine() { return engine; }
145
146    task_state_t getState(void) {
147        return state.load();
148    }
149
150    void setState(task_state_t tstate, task_state_t expected) {
151        state.compare_exchange_strong(expected, tstate);
152    }
153
154    Taskable& getTaskable() {
155        return taskable;
156    }
157
158    queue_priority_t getQueuePriority() const {
159        return static_cast<queue_priority_t>(priority);
160    }
161
162    /*
163     * Lookup the task name for TaskId id.
164     * The data used is generated from tasks.def.h
165     */
166    static const char* getTaskName(TaskId id);
167
168    /*
169     * Lookup the task priority for TaskId id.
170     * The data used is generated from tasks.def.h
171     */
172    static TaskPriority getTaskPriority(TaskId id);
173
174    /*
175     * A vector of all TaskId generated from tasks.def.h
176     */
177    static std::array<TaskId, static_cast<int>(TaskId::TASK_COUNT)> allTaskIds;
178
179protected:
180    bool blockShutdown;
181    AtomicValue<task_state_t> state;
182    const size_t uid;
183    TaskId typeId;
184    TaskPriority priority;
185    EventuallyPersistentEngine *engine;
186    Taskable& taskable;
187
188    static AtomicValue<size_t> task_id_counter;
189    static size_t nextTaskId() { return task_id_counter.fetch_add(1); }
190
191    ProcessClock::time_point getWaketime() const {
192        const auto waketime_chrono = std::chrono::nanoseconds(waketime);
193        return ProcessClock::time_point(waketime_chrono);
194    }
195
196    void updateWaketime(const ProcessClock::time_point tp) {
197        waketime = to_ns_since_epoch(tp).count();
198    }
199
200    void updateWaketimeIfLessThan(const ProcessClock::time_point tp) {
201        const auto tp_ns = to_ns_since_epoch(tp).count();
202        atomic_setIfBigger(waketime, tp_ns);
203    }
204
205private:
206    /**
207     * We are using a uint64_t as opposed to ProcessTime::time_point because
208     * was want the access to be atomic without the use of a mutex.
209     * The reason for this is that the CompareByDueDate function has been shown
210     * to be pretty hot and we want to avoid the overhead of acquiring
211     * two mutexes (one for ExTask 1 and one for ExTask 2) for every invocation
212     * of the function.
213     */
214    std::atomic<int64_t> waketime; // used for priority_queue
215};
216
217typedef SingleThreadedRCPtr<GlobalTask> ExTask;
218
219/**
220 * A task for persisting items to disk.
221 */
222class FlusherTask : public GlobalTask {
223public:
224    FlusherTask(EventuallyPersistentEngine *e, Flusher* f, uint16_t shardid,
225                bool completeBeforeShutdown = true)
226        : GlobalTask(e, TaskId::FlusherTask, 0, completeBeforeShutdown),
227          flusher(f) {
228        std::stringstream ss;
229        ss<<"Running a flusher loop: shard "<<shardid;
230        desc = ss.str();
231    }
232
233    bool run();
234
235    std::string getDescription() {
236        return desc;
237    }
238
239private:
240    Flusher* flusher;
241    std::string desc;
242};
243
244/**
245 * A task for deleting VBucket files from disk and cleaning up any outstanding
246 * writes for that VBucket file.
247 * sid (shard ID) passed on to GlobalTask indicates that task needs to be
248 *     serialized with other tasks that require serialization on its shard
249 */
250class VBDeleteTask : public GlobalTask {
251public:
252    VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c,
253                 bool completeBeforeShutdown = true)
254        : GlobalTask(e, TaskId::VBDeleteTask, 0, completeBeforeShutdown),
255          vbucketId(vbid), cookie(c) {}
256
257    bool run();
258
259    std::string getDescription() {
260        std::stringstream ss;
261        ss<<"Deleting VBucket:"<<vbucketId;
262        return ss.str();
263    }
264
265private:
266    uint16_t vbucketId;
267    const void* cookie;
268};
269
270/**
271 * A task for compacting a vbucket db file
272 */
273class CompactTask : public GlobalTask {
274public:
275    CompactTask(EventuallyPersistentEngine *e,
276                compaction_ctx c, const void *ck,
277                bool completeBeforeShutdown = false) :
278                GlobalTask(e, TaskId::CompactVBucketTask, 0, completeBeforeShutdown),
279                           compactCtx(c), cookie(ck) {
280        desc = "Compact DB file " + std::to_string(c.db_file_id);
281    }
282
283    bool run();
284
285    std::string getDescription() {
286        return desc;
287    }
288
289private:
290    compaction_ctx compactCtx;
291    const void* cookie;
292    std::string desc;
293};
294
295/**
296 * A task that periodically takes a snapshot of the stats and persists them to
297 * disk.
298 */
299class StatSnap : public GlobalTask {
300public:
301    StatSnap(EventuallyPersistentEngine *e, bool runOneTimeOnly = false,
302             bool sleeptime = 0, bool completeBeforeShutdown = false)
303        : GlobalTask(e, TaskId::StatSnap, sleeptime, completeBeforeShutdown),
304          runOnce(runOneTimeOnly) {}
305
306    bool run();
307
308    std::string getDescription() {
309        std::string rv("Updating stat snapshot on disk");
310        return rv;
311    }
312
313private:
314    bool runOnce;
315};
316
317/**
318 * A task for fetching items from disk.
319 * This task is used if EventuallyPersistentStore::multiBGFetchEnabled is true.
320 */
321class MultiBGFetcherTask : public GlobalTask {
322public:
323    MultiBGFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b, bool sleeptime = 0,
324                        bool completeBeforeShutdown = false)
325        : GlobalTask(e, TaskId::MultiBGFetcherTask, sleeptime, completeBeforeShutdown),
326          bgfetcher(b) {}
327
328    bool run();
329
330    std::string getDescription() {
331        return std::string("Batching background fetch");
332    }
333
334private:
335    BgFetcher *bgfetcher;
336};
337
338/**
339 * A task that performs the bucket flush operation.
340 */
341class FlushAllTask : public GlobalTask {
342public:
343    FlushAllTask(EventuallyPersistentEngine *e, double when)
344        : GlobalTask(e, TaskId::FlushAllTask, when, false) {}
345
346    bool run();
347
348    std::string getDescription() {
349        std::stringstream ss;
350        ss << "Performing flush_all operation.";
351        return ss.str();
352    }
353};
354
355/**
356 * A task for performing disk fetches for "stats vkey".
357 */
358class VKeyStatBGFetchTask : public GlobalTask {
359public:
360    VKeyStatBGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
361                        uint16_t vbid, uint64_t s, const void *c, int sleeptime = 0,
362                        bool completeBeforeShutdown = false)
363        : GlobalTask(e, TaskId::VKeyStatBGFetchTask, sleeptime, completeBeforeShutdown),
364          key(k),
365          vbucket(vbid),
366          bySeqNum(s),
367          cookie(c) {}
368
369    bool run();
370
371    std::string getDescription() {
372        std::stringstream ss;
373        ss << "Fetching item from disk for vkey stat:  " << key<<" vbucket "
374           <<vbucket;
375        return ss.str();
376    }
377
378private:
379    std::string                      key;
380    uint16_t                         vbucket;
381    uint64_t                         bySeqNum;
382    const void                      *cookie;
383};
384
385/**
386 * A task that performs disk fetches for non-resident get requests.
387 * This task is used if EventuallyPersistentStore::multiBGFetchEnabled is false.
388 */
389class SingleBGFetcherTask : public GlobalTask {
390public:
391    SingleBGFetcherTask(EventuallyPersistentEngine *e, const const_sized_buffer k,
392                       uint16_t vbid, const void *c, bool isMeta,
393                       int sleeptime = 0, bool completeBeforeShutdown = false)
394        : GlobalTask(e, TaskId::SingleBGFetcherTask, sleeptime, completeBeforeShutdown),
395          key(k.data(), k.size()),
396          vbucket(vbid),
397          cookie(c),
398          metaFetch(isMeta),
399          init(gethrtime()) {}
400
401    bool run();
402
403    std::string getDescription() {
404        std::stringstream ss;
405        ss << "Fetching item from disk:  " << key<<" vbucket "<<vbucket;
406        return ss.str();
407    }
408
409private:
410    const std::string          key;
411    uint16_t                   vbucket;
412    const void                *cookie;
413    bool                       metaFetch;
414    hrtime_t                   init;
415};
416
417/*
418 * This is a NONIO task called as part of VB deletion.  The task is responsible
419 * for clearing all the VBucket's pending operations and for clearing the
420 * VBucket's hash table.
421 */
422class VBucketMemoryDeletionTask : public GlobalTask {
423public:
424    VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
425                              RCPtr<VBucket>& vb,
426                              double delay);
427
428    std::string getDescription() {
429        return desc;
430    }
431
432    bool run();
433
434private:
435    EventuallyPersistentEngine& e;
436    RCPtr<VBucket> vbucket;
437    std::string desc;
438};
439
440/**
441 * A task that monitors if a bucket is read-heavy, write-heavy, or mixed.
442 */
443class WorkLoadMonitor : public GlobalTask {
444public:
445    WorkLoadMonitor(EventuallyPersistentEngine *e,
446                    bool completeBeforeShutdown = false);
447
448    bool run();
449
450    std::string getDescription() {
451        return desc;
452    }
453
454private:
455
456    size_t getNumMutations();
457    size_t getNumGets();
458
459    size_t prevNumMutations;
460    size_t prevNumGets;
461    std::string desc;
462};
463
464/**
465 * Order tasks by their priority and taskId (try to ensure FIFO)
466 * @return true if t2 should have priority over t1
467 */
468class CompareByPriority {
469public:
470    bool operator()(ExTask &t1, ExTask &t2) {
471        return (t1->getQueuePriority() == t2->getQueuePriority()) ?
472               (t1->uid > t2->uid) :
473               (t1->getQueuePriority() > t2->getQueuePriority());
474    }
475};
476
477/**
478 * Order tasks by their ready date.
479 * @return true if t2 should have priority over t1
480 */
481class CompareByDueDate {
482public:
483    bool operator()(ExTask &t1, ExTask &t2) {
484        return t2->waketime < t1->waketime;
485    }
486};
487
488#endif  // SRC_TASKS_H_
489