xref: /4.6.4/ep-engine/src/executorthread.cc (revision e6286bdc)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <chrono>
21#include <queue>
22
23#include "common.h"
24#include "executorpool.h"
25#include "executorthread.h"
26#include "taskqueue.h"
27#include "ep_engine.h"
28
29AtomicValue<size_t> GlobalTask::task_id_counter(1);
30
31extern "C" {
32    static void launch_executor_thread(void *arg) {
33        ExecutorThread *executor = (ExecutorThread*) arg;
34        executor->run();
35    }
36}
37
38void ExecutorThread::start() {
39    std::string thread_name("mc:" + getName());
40    // Only permitted 15 characters of name; therefore abbreviate thread names.
41    std::string worker("_worker");
42    std::string::size_type pos = thread_name.find(worker);
43    if (pos != std::string::npos) {
44        thread_name.replace(pos, worker.size(), "");
45    }
46    thread_name.resize(15);
47    if (cb_create_named_thread(&thread, launch_executor_thread, this, 0,
48                               thread_name.c_str()) != 0) {
49        std::stringstream ss;
50        ss << name.c_str() << ": Initialization error!!!";
51        throw std::runtime_error(ss.str().c_str());
52    }
53}
54
55void ExecutorThread::stop(bool wait) {
56    if (!wait && (state == EXECUTOR_SHUTDOWN || state == EXECUTOR_DEAD)) {
57        return;
58    }
59
60    state = EXECUTOR_SHUTDOWN;
61
62    if (!wait) {
63        LOG(EXTENSION_LOG_NOTICE, "%s: Stopping", name.c_str());
64        return;
65    }
66    cb_join_thread(thread);
67    LOG(EXTENSION_LOG_NOTICE, "%s: Stopped", name.c_str());
68}
69
70void ExecutorThread::run() {
71    LOG(EXTENSION_LOG_DEBUG, "Thread %s running..", getName().c_str());
72
73    for (uint8_t tick = 1;; tick++) {
74        {
75            LockHolder lh(currentTaskMutex);
76            currentTask.reset();
77        }
78        if (state != EXECUTOR_RUNNING) {
79            break;
80        }
81
82        updateCurrentTime();
83        if (TaskQueue *q = manager->nextTask(*this, tick)) {
84            EventuallyPersistentEngine *engine = currentTask->getEngine();
85
86            // Not all tasks are associated with an engine, only switch
87            // for those that do.
88            if (engine) {
89                ObjectRegistry::onSwitchThread(engine);
90            }
91
92            if (currentTask->isdead()) {
93                // release capacity back to TaskQueue
94                manager->doneWork(curTaskType);
95                manager->cancel(currentTask->uid, true);
96                continue;
97            }
98
99            // Measure scheduling overhead as difference between the time
100            // that the task wanted to wake up and the current time
101            const ProcessClock::time_point woketime =
102                    currentTask->getWaketime();
103            currentTask->
104            getTaskable().logQTime(currentTask->getTypeId(),
105                                   getCurTime() > woketime ?
106                                           getCurTime() - woketime :
107                                           ProcessClock::duration::zero());
108            updateTaskStart();
109            rel_time_t startReltime = ep_current_time();
110
111            LOG(EXTENSION_LOG_DEBUG,
112                "%s: Run task \"%s\" id %" PRIu64,
113                getName().c_str(), currentTask->getDescription().c_str(),
114                uint64_t(currentTask->getId()));
115
116            // Now Run the Task ....
117            currentTask->setState(TASK_RUNNING, TASK_SNOOZED);
118            bool again = currentTask->run();
119
120            // Task done, log it ...
121            const ProcessClock::duration runtime(ProcessClock::now() -
122                                                 getTaskStart());
123            currentTask->getTaskable().logRunTime(currentTask->getTypeId(),
124                                                  runtime);
125            if (engine) {
126                ObjectRegistry::onSwitchThread(NULL);
127            }
128
129            addLogEntry(currentTask->getTaskable().getName() +
130                        currentTask->getDescription(),
131                       q->getQueueType(), runtime, startReltime,
132                       (runtime > currentTask->maxExpectedDuration()));
133
134            if (engine) {
135                ObjectRegistry::onSwitchThread(engine);
136            }
137
138            // Check if task is run once or needs to be rescheduled..
139            if (!again || currentTask->isdead()) {
140                // release capacity back to TaskQueue
141                manager->doneWork(curTaskType);
142                manager->cancel(currentTask->uid, true);
143            } else {
144                // if a task has not set snooze, update its waketime to now
145                // before rescheduling for more accurate timing histograms
146                currentTask->updateWaketimeIfLessThan(getCurTime());
147
148                // release capacity back to TaskQueue ..
149                manager->doneWork(curTaskType);
150                const ProcessClock::time_point new_waketime =
151                        q->reschedule(currentTask, curTaskType);
152                // record min waketime ...
153                if (new_waketime < getWaketime()) {
154                    setWaketime(new_waketime);
155                }
156                LOG(EXTENSION_LOG_DEBUG, "%s: Reschedule a task"
157                        " \"%s\" id %" PRIu64 "[%" PRIu64 " %" PRIu64 " |%" PRIu64 "]",
158                        name.c_str(),
159                        currentTask->getDescription().c_str(),
160                        uint64_t(currentTask->getId()),
161                        uint64_t(to_ns_since_epoch(new_waketime).count()),
162                        uint64_t(to_ns_since_epoch(currentTask->getWaketime()).
163                                 count()),
164                        uint64_t(to_ns_since_epoch(getWaketime()).count()));
165            }
166        }
167    }
168    // Thread is about to terminate - disassociate it from any engine.
169    ObjectRegistry::onSwitchThread(nullptr);
170
171    state = EXECUTOR_DEAD;
172}
173
174void ExecutorThread::setCurrentTask(ExTask newTask) {
175    LockHolder lh(currentTaskMutex);
176    currentTask = newTask;
177}
178
179void ExecutorThread::addLogEntry(const std::string &desc,
180                                 const task_type_t taskType,
181                                 const ProcessClock::duration runtime,
182                                 rel_time_t t, bool isSlowJob) {
183    LockHolder lh(logMutex);
184    TaskLogEntry tle(desc, taskType, runtime, t);
185    if (isSlowJob) {
186        slowjobs.add(tle);
187    } else {
188        tasklog.add(tle);
189    }
190}
191
192const std::string ExecutorThread::getStateName() {
193    switch (state.load()) {
194    case EXECUTOR_RUNNING:
195        return std::string("running");
196    case EXECUTOR_WAITING:
197        return std::string("waiting");
198    case EXECUTOR_SLEEPING:
199        return std::string("sleeping");
200    case EXECUTOR_SHUTDOWN:
201        return std::string("shutdown");
202    default:
203        return std::string("dead");
204    }
205}
206