1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2013 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#ifndef SRC_TASKS_H_ 19#define SRC_TASKS_H_ 1 20 21#include "config.h" 22 23#include <list> 24#include <string> 25#include <utility> 26 27#include "atomic.h" 28#include "priority.h" 29 30typedef enum { 31 TASK_RUNNING, 32 TASK_SNOOZED, 33 TASK_DEAD 34} task_state_t; 35 36class BgFetcher; 37class CompareTasksByDueDate; 38class CompareTasksByPriority; 39class EventuallyPersistentEngine; 40class Flusher; 41class Warmup; 42 43/** 44 * Compaction context to perform compaction 45 */ 46 47typedef struct { 48 uint64_t revSeqno; 49 std::string keyStr; 50} expiredItemCtx; 51 52typedef struct { 53 uint64_t purge_before_ts; 54 uint64_t purge_before_seq; 55 uint8_t drop_deletes; 56 uint64_t max_purged_seq; 57 uint32_t curr_time; 58 std::list<expiredItemCtx> expiredItems; 59} compaction_ctx; 60 61class GlobalTask : public RCValue { 62friend class CompareByDueDate; 63friend class CompareByPriority; 64friend class ExecutorPool; 65friend class ExecutorThread; 66friend class TaskQueue; 67public: 68 GlobalTask(EventuallyPersistentEngine *e, const Priority &p, 69 double sleeptime = 0, bool completeBeforeShutdown = true) : 70 RCValue(), priority(p), 71 blockShutdown(completeBeforeShutdown), 72 state(TASK_RUNNING), taskId(nextTaskId()), engine(e) { 73 snooze(sleeptime); 74 } 75 76 /* destructor */ 77 virtual ~GlobalTask(void) { 78 } 79 80 /** 81 * The invoked function when the task is executed. 82 * 83 * @return Whether or not this task should be rescheduled 84 */ 85 virtual bool run(void) = 0; 86 87 /** 88 * Gives a description of this task. 89 * 90 * @return A description of this task 91 */ 92 virtual std::string getDescription(void) = 0; 93 94 virtual int maxExpectedDuration(void) { 95 return 3600; 96 } 97 98 /** 99 * test if a task is dead 100 */ 101 bool isdead(void) { 102 return (state == TASK_DEAD); 103 } 104 105 106 /** 107 * Cancels this task by marking it dead. 108 */ 109 void cancel(void) { 110 state = TASK_DEAD; 111 } 112 113 /** 114 * Puts the task to sleep for a given duration. 115 */ 116 void snooze(const double secs); 117 118 /** 119 * Returns the id of this task. 120 * 121 * @return A unique task id number. 122 */ 123 size_t getId() { return taskId; } 124 125 /** 126 * Returns the type id of this task. 127 * 128 * @return A type id of the task. 129 */ 130 type_id_t getTypeId() { return priority.getTypeId(); } 131 132 /** 133 * Gets the engine that this task was scheduled from 134 * 135 * @returns A handle to the engine 136 */ 137 EventuallyPersistentEngine* getEngine() { return engine; } 138 139 task_state_t getState(void) { 140 return state.load(); 141 } 142 143 void setState(task_state_t tstate, task_state_t expected) { 144 state.compare_exchange_strong(expected, tstate); 145 } 146 147protected: 148 149 const Priority &priority; 150 bool blockShutdown; 151 AtomicValue<task_state_t> state; 152 const size_t taskId; 153 struct timeval waketime; 154 EventuallyPersistentEngine *engine; 155 156 static AtomicValue<size_t> task_id_counter; 157 static size_t nextTaskId() { return task_id_counter.fetch_add(1); } 158}; 159 160typedef SingleThreadedRCPtr<GlobalTask> ExTask; 161 162/** 163 * A task for persisting items to disk. 164 */ 165class FlusherTask : public GlobalTask { 166public: 167 FlusherTask(EventuallyPersistentEngine *e, Flusher* f, const Priority &p, 168 uint16_t shardid, bool completeBeforeShutdown = true) : 169 GlobalTask(e, p, 0, completeBeforeShutdown), flusher(f) { 170 std::stringstream ss; 171 ss<<"Running a flusher loop: shard "<<shardid; 172 desc = ss.str(); 173 } 174 175 bool run(); 176 177 std::string getDescription() { 178 return desc; 179 } 180 181private: 182 Flusher* flusher; 183 std::string desc; 184}; 185 186/** 187 * A task for persisting VBucket state changes to disk and creating new 188 * VBucket database files. 189 * sid (shard ID) passed on to GlobalTask indicates that task needs to be 190 * serialized with other tasks that require serialization on its shard 191 */ 192class VBSnapshotTask : public GlobalTask { 193public: 194 VBSnapshotTask(EventuallyPersistentEngine *e, const Priority &p, 195 uint16_t sID = 0, bool completeBeforeShutdown = true) : 196 GlobalTask(e, p, 0, completeBeforeShutdown), shardID(sID) { 197 std::stringstream ss; 198 ss<<"Snapshotting vbucket states for the shard: "<<shardID; 199 desc = ss.str(); 200 } 201 202 bool run(); 203 204 std::string getDescription() { 205 return desc; 206 } 207 208private: 209 uint16_t shardID; 210 std::string desc; 211}; 212 213/** 214 * A daemon task for persisting VBucket state changes to disk periodically. 215 */ 216class DaemonVBSnapshotTask : public GlobalTask { 217public: 218 DaemonVBSnapshotTask(EventuallyPersistentEngine *e, 219 bool completeBeforeShutdown = true); 220 221 bool run(); 222 223 std::string getDescription() { 224 return desc; 225 } 226 227private: 228 std::string desc; 229}; 230 231/** 232 * A task for persisting a VBucket state to disk and creating a vbucket 233 * database file if necessary. 234 */ 235class VBStatePersistTask : public GlobalTask { 236public: 237 VBStatePersistTask(EventuallyPersistentEngine *e, const Priority &p, 238 uint16_t vbucket, bool completeBeforeShutdown = true) : 239 GlobalTask(e, p, 0, completeBeforeShutdown), vbid(vbucket) { 240 std::stringstream ss; 241 ss<<"Persisting a vbucket state for vbucket: "<< vbid; 242 desc = ss.str(); 243 } 244 245 bool run(); 246 247 std::string getDescription() { 248 return desc; 249 } 250 251private: 252 uint16_t vbid; 253 std::string desc; 254}; 255 256/** 257 * A task for deleting VBucket files from disk and cleaning up any outstanding 258 * writes for that VBucket file. 259 * sid (shard ID) passed on to GlobalTask indicates that task needs to be 260 * serialized with other tasks that require serialization on its shard 261 */ 262class VBDeleteTask : public GlobalTask { 263public: 264 VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c, 265 const Priority &p, bool completeBeforeShutdown = true) : 266 GlobalTask(e, p, 0, completeBeforeShutdown), 267 vbucketId(vbid), cookie(c) { } 268 269 bool run(); 270 271 std::string getDescription() { 272 std::stringstream ss; 273 ss<<"Deleting VBucket:"<<vbucketId; 274 return ss.str(); 275 } 276 277private: 278 uint16_t vbucketId; 279 const void* cookie; 280}; 281 282/** 283 * A task for compacting a vbucket db file 284 */ 285class CompactVBucketTask : public GlobalTask { 286public: 287 CompactVBucketTask(EventuallyPersistentEngine *e, const Priority &p, 288 uint16_t vbucket, compaction_ctx c, const void *ck, 289 bool completeBeforeShutdown = true) : 290 GlobalTask(e, p, 0, completeBeforeShutdown), 291 vbid(vbucket), compactCtx(c), cookie(ck) 292 { 293 std::stringstream ss; 294 ss<<"Compact VBucket "<<vbid; 295 desc = ss.str(); 296 } 297 298 bool run(); 299 300 std::string getDescription() { 301 return desc; 302 } 303 304private: 305 uint16_t vbid; 306 compaction_ctx compactCtx; 307 const void* cookie; 308 std::string desc; 309}; 310 311/** 312 * A task that periodically takes a snapshot of the stats and persists them to 313 * disk. 314 */ 315class StatSnap : public GlobalTask { 316public: 317 StatSnap(EventuallyPersistentEngine *e, const Priority &p, 318 bool runOneTimeOnly = false, bool sleeptime = 0, 319 bool shutdown = false) : 320 GlobalTask(e, p, sleeptime, shutdown), runOnce(runOneTimeOnly) { } 321 322 bool run(); 323 324 std::string getDescription() { 325 std::string rv("Updating stat snapshot on disk"); 326 return rv; 327 } 328 329private: 330 bool runOnce; 331}; 332 333/** 334 * A task for fetching items from disk. 335 */ 336class BgFetcherTask : public GlobalTask { 337public: 338 BgFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b, 339 const Priority &p, bool sleeptime = 0, bool shutdown = false) 340 : GlobalTask(e, p, sleeptime, shutdown), bgfetcher(b) { } 341 342 bool run(); 343 344 std::string getDescription() { 345 return std::string("Batching background fetch"); 346 } 347 348private: 349 BgFetcher *bgfetcher; 350}; 351 352/** 353 * A task for performing disk fetches for "stats vkey". 354 */ 355class VKeyStatBGFetchTask : public GlobalTask { 356public: 357 VKeyStatBGFetchTask(EventuallyPersistentEngine *e, const std::string &k, 358 uint16_t vbid, uint64_t s, const void *c, 359 const Priority &p, int sleeptime = 0, 360 bool shutdown = false) : 361 GlobalTask(e, p, sleeptime, shutdown), key(k), 362 vbucket(vbid), bySeqNum(s), cookie(c) { } 363 364 bool run(); 365 366 std::string getDescription() { 367 std::stringstream ss; 368 ss << "Fetching item from disk for vkey stat: " << key<<" vbucket " 369 <<vbucket; 370 return ss.str(); 371 } 372 373private: 374 std::string key; 375 uint16_t vbucket; 376 uint64_t bySeqNum; 377 const void *cookie; 378}; 379 380/** 381 * A task that performs disk fetches for non-resident get requests. 382 */ 383class BGFetchTask : public GlobalTask { 384public: 385 BGFetchTask(EventuallyPersistentEngine *e, const std::string &k, 386 uint16_t vbid, uint64_t s, const void *c, bool isMeta, 387 const Priority &p, int sleeptime = 0, bool shutdown = false) : 388 GlobalTask(e, p, sleeptime, shutdown), key(k), vbucket(vbid), 389 seqNum(s), cookie(c), metaFetch(isMeta), init(gethrtime()) { } 390 391 bool run(); 392 393 std::string getDescription() { 394 std::stringstream ss; 395 ss << "Fetching item from disk: " << key<<" vbucket "<<vbucket; 396 return ss.str(); 397 } 398 399private: 400 const std::string key; 401 uint16_t vbucket; 402 uint64_t seqNum; 403 const void *cookie; 404 bool metaFetch; 405 hrtime_t init; 406}; 407 408/** 409 * A task that monitors if a bucket is read-heavy, write-heavy, or mixed. 410 */ 411class WorkLoadMonitor : public GlobalTask { 412public: 413 WorkLoadMonitor(EventuallyPersistentEngine *e, 414 bool completeBeforeShutdown = false); 415 416 bool run(); 417 418 std::string getDescription() { 419 return desc; 420 } 421 422private: 423 424 size_t getNumMutations(); 425 size_t getNumGets(); 426 427 size_t prevNumMutations; 428 size_t prevNumGets; 429 std::string desc; 430}; 431 432/** 433 * Order tasks by their priority and taskId (try to ensure FIFO) 434 */ 435class CompareByPriority { 436public: 437 bool operator()(ExTask &t1, ExTask &t2) { 438 return (t1->priority == t2->priority) ? 439 (t1->taskId > t2->taskId) : 440 (t1->priority < t2->priority); 441 } 442}; 443 444/** 445 * Order tasks by their ready date. 446 */ 447class CompareByDueDate { 448public: 449 bool operator()(ExTask &t1, ExTask &t2) { 450 return less_tv(t2->waketime, t1->waketime); 451 } 452}; 453 454#endif // SRC_TASKS_H_ 455