xref: /4.6.4/ep-engine/src/tasks.cc (revision 7b2c0f63)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include "config.h"
18
19#include "bgfetcher.h"
20#include "ep_engine.h"
21#include "flusher.h"
22#include "tasks.h"
23#include "vbucket.h"
24#include "warmup.h"
25
26#include <climits>
27
28#include <type_traits>
29
30static const double WORKLOAD_MONITOR_FREQ(5.0);
31
32GlobalTask::GlobalTask(Taskable& t,
33                       TaskId taskId,
34                       double sleeptime,
35                       bool completeBeforeShutdown)
36      : RCValue(),
37        blockShutdown(completeBeforeShutdown),
38        state(TASK_RUNNING),
39        uid(nextTaskId()),
40        typeId(taskId),
41        engine(NULL),
42        taskable(t) {
43    priority = getTaskPriority(taskId);
44    snooze(sleeptime);
45}
46
47GlobalTask::GlobalTask(EventuallyPersistentEngine *e,
48                       TaskId taskId,
49                       double sleeptime,
50                       bool completeBeforeShutdown)
51      : GlobalTask(e->getTaskable(),
52                   taskId,
53                   sleeptime,
54                   completeBeforeShutdown) {
55    engine = e;
56}
57
58void GlobalTask::snooze(const double secs) {
59    if (secs == INT_MAX) {
60        setState(TASK_SNOOZED, TASK_RUNNING);
61        updateWaketime(ProcessClock::time_point::max());
62        return;
63    }
64
65    const auto curTime = ProcessClock::now();
66    if (secs) {
67        setState(TASK_SNOOZED, TASK_RUNNING);
68        updateWaketime(curTime + std::chrono::seconds((int)round(secs)));
69    } else {
70        updateWaketime(curTime);
71    }
72}
73
74// These static_asserts previously were in priority_test.cc
75static_assert(TaskPriority::MultiBGFetcherTask < TaskPriority::BGFetchCallback,
76              "MultiBGFetcherTask not less than BGFetchCallback");
77
78static_assert(TaskPriority::BGFetchCallback == TaskPriority::VBDeleteTask,
79              "BGFetchCallback not equal VBDeleteTask");
80
81static_assert(TaskPriority::VKeyStatBGFetchTask < TaskPriority::FlusherTask,
82              "VKeyStatBGFetchTask not less than FlusherTask");
83
84static_assert(TaskPriority::FlusherTask < TaskPriority::ItemPager,
85              "FlusherTask not less than ItemPager");
86
87static_assert(TaskPriority::ItemPager < TaskPriority::BackfillManagerTask,
88              "ItemPager not less than BackfillManagerTask");
89
90/*
91 * Generate a switch statement from tasks.def.h that maps TaskId to a
92 * stringified value of the task's name.
93 */
94const char* GlobalTask::getTaskName(TaskId id) {
95    switch(id) {
96#define TASK(name, prio) case TaskId::name: {return #name;}
97#include "tasks.def.h"
98#undef TASK
99        case TaskId::TASK_COUNT: {
100            throw std::invalid_argument("GlobalTask::getTaskName(TaskId::TASK_COUNT) called.");
101        }
102    }
103    throw std::logic_error("GlobalTask::getTaskName() unknown id " +
104                          std::to_string(static_cast<int>(id)));
105    return nullptr;
106}
107
108/*
109 * Generate a switch statement from tasks.def.h that maps TaskId to priority
110 */
111TaskPriority GlobalTask::getTaskPriority(TaskId id) {
112   switch(id) {
113#define TASK(name, prio) case TaskId::name: {return TaskPriority::name;}
114#include "tasks.def.h"
115#undef TASK
116        case TaskId::TASK_COUNT: {
117            throw std::invalid_argument("GlobalTask::getTaskPriority(TaskId::TASK_COUNT) called.");
118        }
119    }
120    throw std::logic_error("GlobalTask::getTaskPriority() unknown id " +
121                           std::to_string(static_cast<int>(id)));
122    return TaskPriority::PRIORITY_COUNT;
123}
124
125std::array<TaskId, static_cast<int>(TaskId::TASK_COUNT)> GlobalTask::allTaskIds = {{
126#define TASK(name, prio) TaskId::name,
127#include "tasks.def.h"
128#undef TASK
129}};
130
131
132bool FlusherTask::run() {
133    return flusher->step(this);
134}
135
136bool VBDeleteTask::run() {
137    return !engine->getEpStore()->completeVBucketDeletion(vbucketId, cookie);
138}
139
140bool CompactTask::run() {
141    return engine->getEpStore()->doCompact(&compactCtx, cookie);
142}
143
144bool StatSnap::run() {
145    engine->getEpStore()->snapshotStats();
146    if (runOnce) {
147        return false;
148    }
149    ExecutorPool::get()->snooze(uid, 60);
150    return true;
151}
152
153bool MultiBGFetcherTask::run() {
154    return bgfetcher->run(this);
155}
156
157bool FlushAllTask::run() {
158    engine->getEpStore()->reset();
159    return false;
160}
161
162bool VKeyStatBGFetchTask::run() {
163    engine->getEpStore()->completeStatsVKey(cookie, key, vbucket, bySeqNum);
164    return false;
165}
166
167
168bool SingleBGFetcherTask::run() {
169    engine->getEpStore()->completeBGFetch(key, vbucket, cookie, init,
170                                          metaFetch);
171    return false;
172}
173
174VBucketMemoryDeletionTask::VBucketMemoryDeletionTask(
175        EventuallyPersistentEngine& eng, RCPtr<VBucket>& vb, double delay)
176    : GlobalTask(&eng, TaskId::VBucketMemoryDeletionTask, delay, true),
177      e(eng),
178      vbucket(vb) {
179    desc = "Removing (dead) vb:" + std::to_string(vbucket->getId()) +
180           " from memory";
181}
182
183bool VBucketMemoryDeletionTask::run() {
184    vbucket->notifyAllPendingConnsFailed(e);
185    vbucket->ht.clear();
186    vbucket.reset();
187    return false;
188}
189
190WorkLoadMonitor::WorkLoadMonitor(EventuallyPersistentEngine *e,
191                                 bool completeBeforeShutdown) :
192    GlobalTask(e, TaskId::WorkLoadMonitor, WORKLOAD_MONITOR_FREQ,
193               completeBeforeShutdown) {
194    prevNumMutations = getNumMutations();
195    prevNumGets = getNumGets();
196    desc = "Monitoring a workload pattern";
197}
198
199size_t WorkLoadMonitor::getNumMutations() {
200    return engine->getEpStats().numOpsStore +
201           engine->getEpStats().numOpsDelete +
202           engine->getEpStats().numOpsSetMeta +
203           engine->getEpStats().numOpsDelMeta +
204           engine->getEpStats().numOpsSetRetMeta +
205           engine->getEpStats().numOpsDelRetMeta;
206}
207
208size_t WorkLoadMonitor::getNumGets() {
209    return engine->getEpStats().numOpsGet +
210           engine->getEpStats().numOpsGetMeta;
211}
212
213bool WorkLoadMonitor::run() {
214    size_t curr_num_mutations = getNumMutations();
215    size_t curr_num_gets = getNumGets();
216    double delta_mutations = static_cast<double>(curr_num_mutations -
217                                                 prevNumMutations);
218    double delta_gets = static_cast<double>(curr_num_gets - prevNumGets);
219    double total_delta_ops = delta_gets + delta_mutations;
220    double read_ratio = 0;
221
222    if (total_delta_ops) {
223        read_ratio = delta_gets / total_delta_ops;
224        if (read_ratio < 0.4) {
225            engine->getWorkLoadPolicy().setWorkLoadPattern(WRITE_HEAVY);
226        } else if (read_ratio >= 0.4 && read_ratio <= 0.6) {
227            engine->getWorkLoadPolicy().setWorkLoadPattern(MIXED);
228        } else {
229            engine->getWorkLoadPolicy().setWorkLoadPattern(READ_HEAVY);
230        }
231    }
232    prevNumMutations = curr_num_mutations;
233    prevNumGets = curr_num_gets;
234
235    snooze(WORKLOAD_MONITOR_FREQ);
236    if (engine->getEpStats().isShutdown) {
237        return false;
238    }
239    return true;
240}
241