1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <chrono>
21#include <queue>
22
23#include "common.h"
24#include "executorpool.h"
25#include "executorthread.h"
26#include "globaltask.h"
27#include "taskqueue.h"
28
29#include <platform/timeutils.h>
30
31extern "C" {
32    static void launch_executor_thread(void *arg) {
33        ExecutorThread *executor = (ExecutorThread*) arg;
34        executor->run();
35    }
36}
37
38void ExecutorThread::start() {
39    std::string thread_name("mc:" + getName());
40    // Only permitted 15 characters of name; therefore abbreviate thread names.
41    std::string worker("_worker");
42    std::string::size_type pos = thread_name.find(worker);
43    if (pos != std::string::npos) {
44        thread_name.replace(pos, worker.size(), "");
45    }
46    thread_name.resize(15);
47    if (cb_create_named_thread(&thread, launch_executor_thread, this, 0,
48                               thread_name.c_str()) != 0) {
49        std::stringstream ss;
50        ss << name.c_str() << ": Initialization error!!!";
51        throw std::runtime_error(ss.str().c_str());
52    }
53
54    LOG(EXTENSION_LOG_INFO, "%s: Started", name.c_str());
55}
56
57void ExecutorThread::stop(bool wait) {
58    if (!wait && (state == EXECUTOR_SHUTDOWN || state == EXECUTOR_DEAD)) {
59        return;
60    }
61
62    state = EXECUTOR_SHUTDOWN;
63
64    if (!wait) {
65        LOG(EXTENSION_LOG_NOTICE, "%s: Stopping", name.c_str());
66        return;
67    }
68    cb_join_thread(thread);
69    LOG(EXTENSION_LOG_NOTICE, "%s: Stopped", name.c_str());
70}
71
72void ExecutorThread::run() {
73    LOG(EXTENSION_LOG_DEBUG, "Thread %s running..", getName().c_str());
74
75    for (uint8_t tick = 1;; tick++) {
76        resetCurrentTask();
77
78        if (state != EXECUTOR_RUNNING) {
79            break;
80        }
81
82        updateCurrentTime();
83        if (TaskQueue *q = manager->nextTask(*this, tick)) {
84            manager->startWork(taskType);
85            EventuallyPersistentEngine *engine = currentTask->getEngine();
86
87            // Not all tasks are associated with an engine, only switch
88            // for those that do.
89            if (engine) {
90                ObjectRegistry::onSwitchThread(engine);
91            }
92
93            if (currentTask->isdead()) {
94                manager->doneWork(taskType);
95                manager->cancel(currentTask->uid, true);
96                continue;
97            }
98
99            // Measure scheduling overhead as difference between the time
100            // that the task wanted to wake up and the current time
101            const ProcessClock::time_point woketime =
102                    currentTask->getWaketime();
103
104            auto scheduleOverhead = getCurTime() - woketime;
105            // scheduleOverhead can be a negative number if the task has been
106            // woken up before we expected it too be. In this case this means
107            // that we have no schedule overhead and thus need to set it too 0.
108            if (scheduleOverhead.count() < 0) {
109                scheduleOverhead = std::chrono::steady_clock::duration::zero();
110            }
111
112            currentTask->getTaskable().logQTime(currentTask->getTaskId(),
113                                                scheduleOverhead);
114            // MB-25822: It could be useful to have the exact datetime of long
115            // schedule times, in the same way we have for long runtimes.
116            // It is more difficult to estimate the expected schedule time than
117            // the runtime for a task, because the schedule times depends on
118            // things "external" to the task itself (e.g., how many tasks are
119            // in queue in the same priority-group).
120            // Also, the schedule time depends on the runtime of the previous
121            // run. That means that for Read/Write/AuxIO tasks it is even more
122            // difficult to predict because that do IO.
123            // So, for now we log long schedule times only for NON_IO tasks,
124            // which is the task type for the ConnManager and
125            // ConnNotifierCallback tasks involved in MB-25822 and that we aim
126            // to debug. We consider 1 second a sensible schedule overhead
127            // limit for NON_IO tasks.
128            if (GlobalTask::getTaskType(currentTask->getTaskId()) ==
129                        task_type_t::NONIO_TASK_IDX &&
130                scheduleOverhead > std::chrono::seconds(1)) {
131                auto description = currentTask->getDescription();
132                LOG(EXTENSION_LOG_WARNING,
133                    "Slow scheduling for NON_IO task '%.*s' on thread %s. "
134                    "Schedule overhead: %s",
135                    int(description.size()),
136                    description.data(),
137                    getName().c_str(),
138                    cb::time2text(scheduleOverhead).c_str());
139            }
140            updateTaskStart();
141
142            const auto curTaskDescr = currentTask->getDescription();
143            LOG(EXTENSION_LOG_DEBUG,
144                "%s: Run task \"%.*s\" id %" PRIu64,
145                getName().c_str(),
146                int(curTaskDescr.size()),
147                curTaskDescr.data(),
148                uint64_t(currentTask->getId()));
149
150            // Now Run the Task ....
151            currentTask->setState(TASK_RUNNING, TASK_SNOOZED);
152            bool again = currentTask->run();
153
154            // Task done, log it ...
155            const ProcessClock::duration runtime(ProcessClock::now() -
156                                                 getTaskStart());
157            currentTask->getTaskable().logRunTime(currentTask->getTaskId(),
158                                                  runtime);
159            currentTask->updateRuntime(runtime);
160
161            // Check if exceeded expected duration; and if so log.
162            // Note: This is done before we call onSwitchThread(NULL)
163            // so the bucket name is included in the Log message.
164            if (runtime > currentTask->maxExpectedDuration()) {
165                auto description = currentTask->getDescription();
166                LOG(EXTENSION_LOG_WARNING,
167                    "Slow runtime for '%.*s' on thread %s: %s",
168                    int(description.size()),
169                    description.data(),
170                    getName().c_str(),
171                    cb::time2text(runtime).c_str());
172            }
173
174            // Check if task is run once or needs to be rescheduled..
175            if (!again || currentTask->isdead()) {
176                manager->cancel(currentTask->uid, true);
177            } else {
178                // if a task has not set snooze, update its waketime to now
179                // before rescheduling for more accurate timing histograms
180                currentTask->updateWaketimeIfLessThan(getCurTime());
181
182                // reschedule this task back into the queue it was fetched from
183                const ProcessClock::time_point new_waketime =
184                        q->reschedule(currentTask);
185                // record min waketime ...
186                if (new_waketime < getWaketime()) {
187                    setWaketime(new_waketime);
188                }
189                LOG(EXTENSION_LOG_DEBUG,
190                    "%s: Reschedule a task"
191                    " \"%.*s\" id %" PRIu64 "[%" PRIu64 " %" PRIu64 " |%" PRIu64
192                    "]",
193                    name.c_str(),
194                    int(curTaskDescr.size()),
195                    curTaskDescr.data(),
196                    uint64_t(currentTask->getId()),
197                    uint64_t(to_ns_since_epoch(new_waketime).count()),
198                    uint64_t(to_ns_since_epoch(currentTask->getWaketime())
199                                     .count()),
200                    uint64_t(to_ns_since_epoch(getWaketime()).count()));
201            }
202            manager->doneWork(taskType);
203        }
204    }
205    // Thread is about to terminate - disassociate it from any engine.
206    ObjectRegistry::onSwitchThread(nullptr);
207
208    state = EXECUTOR_DEAD;
209}
210
211void ExecutorThread::setCurrentTask(ExTask newTask) {
212    LockHolder lh(currentTaskMutex);
213    currentTask = newTask;
214}
215
216// MB-24394: reset currentTask, however we will perform the actual shared_ptr
217// reset without the lock. This is because the task *can* re-enter the
218// executorthread/pool code from it's destructor path, specifically if the task
219// owns a VBucketPtr which is marked as "deferred-delete". Doing this std::move
220// and lockless reset prevents a lock inversion.
221void ExecutorThread::resetCurrentTask() {
222    ExTask resetThisObject;
223    {
224        LockHolder lh(currentTaskMutex);
225        // move currentTask so we 'steal' the pointer and ensure currentTask
226        // owns nothing.
227        resetThisObject = std::move(currentTask);
228    }
229    resetThisObject.reset();
230}
231
232std::string ExecutorThread::getTaskName() {
233    LockHolder lh(currentTaskMutex);
234    if (currentTask) {
235        return currentTask->getDescription();
236    } else {
237        return "Not currently running any task";
238    }
239}
240
241const std::string ExecutorThread::getTaskableName() {
242    LockHolder lh(currentTaskMutex);
243    if (currentTask) {
244        return currentTask->getTaskable().getName();
245    } else {
246        return std::string();
247    }
248}
249
250const std::string ExecutorThread::getStateName() {
251    switch (state.load()) {
252    case EXECUTOR_RUNNING:
253        return std::string("running");
254    case EXECUTOR_WAITING:
255        return std::string("waiting");
256    case EXECUTOR_SLEEPING:
257        return std::string("sleeping");
258    case EXECUTOR_SHUTDOWN:
259        return std::string("shutdown");
260    default:
261        return std::string("dead");
262    }
263}
264