1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_TASKS_H_
19#define SRC_TASKS_H_ 1
20
21#include "config.h"
22
23#include <list>
24#include <string>
25#include <utility>
26
27#include "atomic.h"
28#include "priority.h"
29
30typedef enum {
31    TASK_RUNNING,
32    TASK_SNOOZED,
33    TASK_DEAD
34} task_state_t;
35
36class BgFetcher;
37class CompareTasksByDueDate;
38class CompareTasksByPriority;
39class EventuallyPersistentEngine;
40class Flusher;
41class Warmup;
42
43/**
44 * Compaction context to perform compaction
45 */
46
47typedef struct {
48    uint64_t revSeqno;
49    std::string keyStr;
50} expiredItemCtx;
51
52typedef struct {
53    uint64_t purge_before_ts;
54    uint64_t purge_before_seq;
55    uint8_t  drop_deletes;
56    uint64_t max_purged_seq;
57    uint32_t curr_time;
58    std::list<expiredItemCtx> expiredItems;
59} compaction_ctx;
60
61class GlobalTask : public RCValue {
62friend class CompareByDueDate;
63friend class CompareByPriority;
64friend class ExecutorPool;
65friend class ExecutorThread;
66friend class TaskQueue;
67public:
68    GlobalTask(EventuallyPersistentEngine *e, const Priority &p,
69               double sleeptime = 0, bool completeBeforeShutdown = true) :
70          RCValue(), priority(p),
71          blockShutdown(completeBeforeShutdown),
72          state(TASK_RUNNING), taskId(nextTaskId()), engine(e) {
73        snooze(sleeptime);
74    }
75
76    /* destructor */
77    virtual ~GlobalTask(void) {
78    }
79
80    /**
81     * The invoked function when the task is executed.
82     *
83     * @return Whether or not this task should be rescheduled
84     */
85    virtual bool run(void) = 0;
86
87    /**
88     * Gives a description of this task.
89     *
90     * @return A description of this task
91     */
92    virtual std::string getDescription(void) = 0;
93
94    virtual int maxExpectedDuration(void) {
95        return 3600;
96    }
97
98    /**
99     * test if a task is dead
100     */
101     bool isdead(void) {
102        return (state == TASK_DEAD);
103     }
104
105
106    /**
107     * Cancels this task by marking it dead.
108     */
109    void cancel(void) {
110        state = TASK_DEAD;
111    }
112
113    /**
114     * Puts the task to sleep for a given duration.
115     */
116    void snooze(const double secs);
117
118    /**
119     * Returns the id of this task.
120     *
121     * @return A unique task id number.
122     */
123    size_t getId() { return taskId; }
124
125    /**
126     * Returns the type id of this task.
127     *
128     * @return A type id of the task.
129     */
130    type_id_t getTypeId() { return priority.getTypeId(); }
131
132    /**
133     * Gets the engine that this task was scheduled from
134     *
135     * @returns A handle to the engine
136     */
137    EventuallyPersistentEngine* getEngine() { return engine; }
138
139    task_state_t getState(void) {
140        return state.load();
141    }
142
143    void setState(task_state_t tstate, task_state_t expected) {
144        state.compare_exchange_strong(expected, tstate);
145    }
146
147protected:
148
149    const Priority &priority;
150    bool blockShutdown;
151    AtomicValue<task_state_t> state;
152    const size_t taskId;
153    struct timeval waketime;
154    EventuallyPersistentEngine *engine;
155
156    static AtomicValue<size_t> task_id_counter;
157    static size_t nextTaskId() { return task_id_counter.fetch_add(1); }
158};
159
160typedef SingleThreadedRCPtr<GlobalTask> ExTask;
161
162/**
163 * A task for persisting items to disk.
164 */
165class FlusherTask : public GlobalTask {
166public:
167    FlusherTask(EventuallyPersistentEngine *e, Flusher* f, const Priority &p,
168                uint16_t shardid, bool completeBeforeShutdown = true) :
169                GlobalTask(e, p, 0, completeBeforeShutdown), flusher(f) {
170        std::stringstream ss;
171        ss<<"Running a flusher loop: shard "<<shardid;
172        desc = ss.str();
173    }
174
175    bool run();
176
177    std::string getDescription() {
178        return desc;
179    }
180
181private:
182    Flusher* flusher;
183    std::string desc;
184};
185
186/**
187 * A task for persisting VBucket state changes to disk and creating new
188 * VBucket database files.
189 * sid (shard ID) passed on to GlobalTask indicates that task needs to be
190 *     serialized with other tasks that require serialization on its shard
191 */
192class VBSnapshotTask : public GlobalTask {
193public:
194    VBSnapshotTask(EventuallyPersistentEngine *e, const Priority &p,
195                uint16_t sID = 0, bool completeBeforeShutdown = true) :
196                GlobalTask(e, p, 0, completeBeforeShutdown), shardID(sID) {
197        std::stringstream ss;
198        ss<<"Snapshotting vbucket states for the shard: "<<shardID;
199        desc = ss.str();
200    }
201
202    bool run();
203
204    std::string getDescription() {
205        return desc;
206    }
207
208private:
209    uint16_t shardID;
210    std::string desc;
211};
212
213/**
214 * A daemon task for persisting VBucket state changes to disk periodically.
215 */
216class DaemonVBSnapshotTask : public GlobalTask {
217public:
218    DaemonVBSnapshotTask(EventuallyPersistentEngine *e,
219                         bool completeBeforeShutdown = true);
220
221    bool run();
222
223    std::string getDescription() {
224        return desc;
225    }
226
227private:
228    std::string desc;
229};
230
231/**
232 * A task for persisting a VBucket state to disk and creating a vbucket
233 * database file if necessary.
234 */
235class VBStatePersistTask : public GlobalTask {
236public:
237    VBStatePersistTask(EventuallyPersistentEngine *e, const Priority &p,
238                       uint16_t vbucket, bool completeBeforeShutdown = true) :
239        GlobalTask(e, p, 0, completeBeforeShutdown), vbid(vbucket) {
240        std::stringstream ss;
241        ss<<"Persisting a vbucket state for vbucket: "<< vbid;
242        desc = ss.str();
243    }
244
245    bool run();
246
247    std::string getDescription() {
248        return desc;
249    }
250
251private:
252    uint16_t vbid;
253    std::string desc;
254};
255
256/**
257 * A task for deleting VBucket files from disk and cleaning up any outstanding
258 * writes for that VBucket file.
259 * sid (shard ID) passed on to GlobalTask indicates that task needs to be
260 *     serialized with other tasks that require serialization on its shard
261 */
262class VBDeleteTask : public GlobalTask {
263public:
264    VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c,
265                 const Priority &p, bool completeBeforeShutdown = true) :
266        GlobalTask(e, p, 0, completeBeforeShutdown),
267        vbucketId(vbid), cookie(c) { }
268
269    bool run();
270
271    std::string getDescription() {
272        std::stringstream ss;
273        ss<<"Deleting VBucket:"<<vbucketId;
274        return ss.str();
275    }
276
277private:
278    uint16_t vbucketId;
279    const void* cookie;
280};
281
282/**
283 * A task for compacting a vbucket db file
284 */
285class CompactVBucketTask : public GlobalTask {
286public:
287    CompactVBucketTask(EventuallyPersistentEngine *e, const Priority &p,
288                uint16_t vbucket, compaction_ctx c, const void *ck,
289                bool completeBeforeShutdown = true) :
290                GlobalTask(e, p, 0, completeBeforeShutdown),
291                           vbid(vbucket), compactCtx(c), cookie(ck)
292    {
293        std::stringstream ss;
294        ss<<"Compact VBucket "<<vbid;
295        desc = ss.str();
296    }
297
298    bool run();
299
300    std::string getDescription() {
301        return desc;
302    }
303
304private:
305    uint16_t vbid;
306    compaction_ctx compactCtx;
307    const void* cookie;
308    std::string desc;
309};
310
311/**
312 * A task that periodically takes a snapshot of the stats and persists them to
313 * disk.
314 */
315class StatSnap : public GlobalTask {
316public:
317    StatSnap(EventuallyPersistentEngine *e, const Priority &p,
318             bool runOneTimeOnly = false, bool sleeptime = 0,
319             bool shutdown = false) :
320        GlobalTask(e, p, sleeptime, shutdown), runOnce(runOneTimeOnly) { }
321
322    bool run();
323
324    std::string getDescription() {
325        std::string rv("Updating stat snapshot on disk");
326        return rv;
327    }
328
329private:
330    bool runOnce;
331};
332
333/**
334 * A task for fetching items from disk.
335 */
336class BgFetcherTask : public GlobalTask {
337public:
338    BgFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b,
339                  const Priority &p, bool sleeptime = 0, bool shutdown = false)
340        : GlobalTask(e, p, sleeptime, shutdown), bgfetcher(b) { }
341
342    bool run();
343
344    std::string getDescription() {
345        return std::string("Batching background fetch");
346    }
347
348private:
349    BgFetcher *bgfetcher;
350};
351
352/**
353 * A task for performing disk fetches for "stats vkey".
354 */
355class VKeyStatBGFetchTask : public GlobalTask {
356public:
357    VKeyStatBGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
358                        uint16_t vbid, uint64_t s, const void *c,
359                        const Priority &p, int sleeptime = 0,
360                        bool shutdown = false) :
361        GlobalTask(e, p, sleeptime, shutdown), key(k),
362                   vbucket(vbid), bySeqNum(s), cookie(c) { }
363
364    bool run();
365
366    std::string getDescription() {
367        std::stringstream ss;
368        ss << "Fetching item from disk for vkey stat:  " << key<<" vbucket "
369           <<vbucket;
370        return ss.str();
371    }
372
373private:
374    std::string                      key;
375    uint16_t                         vbucket;
376    uint64_t                         bySeqNum;
377    const void                      *cookie;
378};
379
380/**
381 * A task that performs disk fetches for non-resident get requests.
382 */
383class BGFetchTask : public GlobalTask {
384public:
385    BGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
386            uint16_t vbid, uint64_t s, const void *c, bool isMeta,
387            const Priority &p, int sleeptime = 0, bool shutdown = false) :
388        GlobalTask(e, p, sleeptime, shutdown), key(k), vbucket(vbid),
389        seqNum(s), cookie(c), metaFetch(isMeta), init(gethrtime()) { }
390
391    bool run();
392
393    std::string getDescription() {
394        std::stringstream ss;
395        ss << "Fetching item from disk:  " << key<<" vbucket "<<vbucket;
396        return ss.str();
397    }
398
399private:
400    const std::string          key;
401    uint16_t                   vbucket;
402    uint64_t                   seqNum;
403    const void                *cookie;
404    bool                       metaFetch;
405    hrtime_t                   init;
406};
407
408/**
409 * A task that monitors if a bucket is read-heavy, write-heavy, or mixed.
410 */
411class WorkLoadMonitor : public GlobalTask {
412public:
413    WorkLoadMonitor(EventuallyPersistentEngine *e,
414                    bool completeBeforeShutdown = false);
415
416    bool run();
417
418    std::string getDescription() {
419        return desc;
420    }
421
422private:
423
424    size_t getNumMutations();
425    size_t getNumGets();
426
427    size_t prevNumMutations;
428    size_t prevNumGets;
429    std::string desc;
430};
431
432/**
433 * Order tasks by their priority and taskId (try to ensure FIFO)
434 */
435class CompareByPriority {
436public:
437    bool operator()(ExTask &t1, ExTask &t2) {
438        return (t1->priority == t2->priority) ?
439               (t1->taskId   > t2->taskId)    :
440               (t1->priority < t2->priority);
441    }
442};
443
444/**
445 * Order tasks by their ready date.
446 */
447class CompareByDueDate {
448public:
449    bool operator()(ExTask &t1, ExTask &t2) {
450        return less_tv(t2->waketime, t1->waketime);
451    }
452};
453
454#endif  // SRC_TASKS_H_
455