xref: /3.0.3-GA/ep-engine/src/taskqueue.cc (revision a5111634)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2014 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include "config.h"
18
19#include "taskqueue.h"
20#include "executorpool.h"
21#include "ep_engine.h"
22
23TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
24    isLock(false), name(nm), queueType(t), manager(m),
25    tasklog(TASK_LOG_SIZE), slowjobs(TASK_LOG_SIZE)
26{
27    // EMPTY
28}
29
30TaskQueue::~TaskQueue() {
31    LOG(EXTENSION_LOG_INFO, "Task Queue killing %s", name.c_str());
32}
33
34const std::string TaskQueue::getName() const {
35    return (name+taskType2Str(queueType));
36}
37
38bool TaskQueue::empty(void) {
39    return readyQueue.empty() && futureQueue.empty();
40}
41
42void TaskQueue::pushReadyTask(ExTask &tid) {
43    readyQueue.push(tid);
44    manager->moreWork();
45}
46
47ExTask TaskQueue::popReadyTask(void) {
48    ExTask t = readyQueue.top();
49    readyQueue.pop();
50    manager->lessWork();
51    return t;
52}
53
54bool TaskQueue::checkOutShard(ExTask &task) {
55    uint16_t shard = task->serialShard;
56    if (shard != NO_SHARD_ID) {
57        EventuallyPersistentStore *e = task->getEngine()->getEpStore();
58        return e->tryLockShard(shard, task);
59    }
60    return true;
61}
62
63bool TaskQueue::fetchNextTask(ExTask &task, struct timeval &waketime,
64                              task_type_t &taskType, struct timeval now) {
65    bool inverse = false;
66    if (!isLock.compare_exchange_strong(inverse, true)) {
67        return false;
68    }
69
70    inverse = true;
71    LockHolder lh(mutex);
72
73    if (empty()) {
74        isLock.compare_exchange_strong(inverse, false);
75        return false;
76    }
77
78    moveReadyTasks(now);
79
80    if (!futureQueue.empty() &&
81        less_tv(futureQueue.top()->waketime, waketime)) {
82        waketime = futureQueue.top()->waketime; // record earliest waketime
83    }
84
85    if (!readyQueue.empty()) {
86        if (readyQueue.top()->isdead()) {
87            task = popReadyTask();
88            isLock.compare_exchange_strong(inverse, false);
89            return true;
90        }
91        ExTask tid = readyQueue.top();
92        popReadyTask();
93
94        if (checkOutShard(tid)) { // shardLock obtained...
95            taskType = manager->tryNewWork(queueType);
96            if (taskType != NO_TASK_TYPE) {
97                task = tid; // return the dequeued task to thread
98                isLock.compare_exchange_strong(inverse, false);
99                return true;
100            } else { // limit on number of threads that can work on queue hit
101                pendingQueue.push_back(tid);
102                // In the future if we wish to limit tasks of other types
103                // please remove the assert below
104                cb_assert(queueType == AUXIO_TASK_IDX ||
105                          queueType == NONIO_TASK_IDX);
106            }
107        }
108    }
109
110    isLock.compare_exchange_strong(inverse, false);
111    return false;
112}
113
114void TaskQueue::moveReadyTasks(struct timeval tv) {
115    if (!readyQueue.empty()) {
116        return;
117    }
118
119    while (!futureQueue.empty()) {
120        ExTask tid = futureQueue.top();
121        if (less_eq_tv(tid->waketime, tv)) {
122            pushReadyTask(tid);
123        } else {
124            return;
125        }
126        futureQueue.pop();
127    }
128}
129
130void TaskQueue::schedule(ExTask &task) {
131    LockHolder lh(mutex);
132
133    futureQueue.push(task);
134    manager->notifyAll();
135
136    LOG(EXTENSION_LOG_DEBUG, "%s: Schedule a task \"%s\" id %d",
137            name.c_str(), task->getDescription().c_str(), task->getId());
138}
139
140void TaskQueue::doneTask_UNLOCKED(ExTask &task, task_type_t &curTaskType) {
141    uint16_t shard = task->serialShard;
142    ExTask runnableTask;
143    if (shard != NO_SHARD_ID) {
144        EventuallyPersistentStore *e = task->getEngine()->getEpStore();
145        runnableTask = e->unlockShard(shard);
146        if (runnableTask.get() != NULL) {
147            pushReadyTask(runnableTask);
148        }
149    }
150
151    if (manager->doneWork(curTaskType) && !pendingQueue.empty()) {
152        runnableTask = pendingQueue.front();
153        pushReadyTask(runnableTask);
154        pendingQueue.pop_front();
155    }
156}
157
158void TaskQueue::doneTask(ExTask &task, task_type_t &curTaskType) {
159    LockHolder lh(mutex);
160    doneTask_UNLOCKED(task, curTaskType);
161}
162
163struct timeval TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
164    LockHolder lh(mutex);
165    doneTask_UNLOCKED(task, curTaskType);
166    futureQueue.push(task);
167    return futureQueue.top()->waketime;
168}
169
170void TaskQueue::wake(ExTask &task) {
171    struct  timeval    now;
172    gettimeofday(&now, NULL);
173
174    LockHolder lh(mutex);
175    LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %d", name.c_str(),
176            task->getDescription().c_str(), task->getId());
177
178    // MB-9986: Re-sort futureQueue for now. TODO: avoid this O(N) overhead
179    std::queue<ExTask> notReady;
180    while (!futureQueue.empty()) {
181        ExTask tid = futureQueue.top();
182        notReady.push(tid);
183        futureQueue.pop();
184    }
185
186    task->waketime = now;
187
188    while (!notReady.empty()) {
189        ExTask tid = notReady.front();
190        if (less_eq_tv(tid->waketime, now) || tid->isdead()) {
191            pushReadyTask(tid);
192        } else {
193            futureQueue.push(tid);
194        }
195        notReady.pop();
196    }
197}
198
199void TaskQueue::addLogEntry(const std::string &desc, const hrtime_t runtime,
200                            rel_time_t t, bool isSlowJob) {
201    TaskLogEntry tle(desc, runtime, t);
202    LockHolder lh(mutex);
203    tasklog.add(tle);
204    if (isSlowJob) {
205        slowjobs.add(tle);
206    }
207}
208
209const std::string TaskQueue::taskType2Str(task_type_t type) {
210    switch (type) {
211    case WRITER_TASK_IDX:
212        return std::string("Writer");
213    case READER_TASK_IDX:
214        return std::string("Reader");
215    case AUXIO_TASK_IDX:
216        return std::string("AuxIO");
217    case NONIO_TASK_IDX:
218        return std::string("NonIO");
219    default:
220        return std::string("None");
221    }
222}
223