1/*
2 *     Copyright 2013 Couchbase, Inc.
3 *
4 *   Licensed under the Apache License, Version 2.0 (the "License");
5 *   you may not use this file except in compliance with the License.
6 *   You may obtain a copy of the License at
7 *
8 *       http://www.apache.org/licenses/LICENSE-2.0
9 *
10 *   Unless required by applicable law or agreed to in writing, software
11 *   distributed under the License is distributed on an "AS IS" BASIS,
12 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 *   See the License for the specific language governing permissions and
14 *   limitations under the License.
15 */
16
17#pragma once
18
19#include "config.h"
20
21#include "globaltask.h"
22#include "objectregistry.h"
23#include "task_type.h"
24
25#include <platform/ring_buffer.h>
26#include <platform/processclock.h>
27#include <relaxed_atomic.h>
28
29#include <atomic>
30#include <chrono>
31#include <deque>
32#include <list>
33#include <map>
34#include <mutex>
35#include <queue>
36#include <string>
37#include <utility>
38#include <vector>
39
40#define TASK_LOG_SIZE 80
41
42#define MIN_SLEEP_TIME 2.0
43
44class ExecutorPool;
45class ExecutorThread;
46class TaskQueue;
47class WorkLoadPolicy;
48
49enum executor_state_t {
50    EXECUTOR_RUNNING,
51    EXECUTOR_WAITING,
52    EXECUTOR_SLEEPING,
53    EXECUTOR_SHUTDOWN,
54    EXECUTOR_DEAD
55};
56
57
58class ExecutorThread {
59    friend class ExecutorPool;
60    friend class TaskQueue;
61public:
62
63    /* The AtomicProcessTime class provides an abstraction for ensuring that
64     * changes to a ProcessClock::time_point are atomic.  This is achieved by
65     * ensuring that all accesses are protected by a mutex.
66     */
67    class AtomicProcessTime {
68    public:
69        AtomicProcessTime() {}
70        AtomicProcessTime(const ProcessClock::time_point& tp) : timepoint(tp) {}
71
72        void setTimePoint(const ProcessClock::time_point& tp) {
73            std::lock_guard<std::mutex> lock(mutex);
74            timepoint = tp;
75        }
76
77        ProcessClock::time_point getTimePoint() const {
78            std::lock_guard<std::mutex> lock(mutex);
79            return timepoint;
80        }
81
82    private:
83        mutable std::mutex mutex;
84        ProcessClock::time_point timepoint;
85    };
86
87    ExecutorThread(ExecutorPool* m, task_type_t type, const std::string nm)
88        : manager(m),
89          taskType(type),
90          name(nm),
91          state(EXECUTOR_RUNNING),
92          now(ProcessClock::now()),
93          taskStart(),
94          currentTask(NULL) {
95    }
96
97    ~ExecutorThread() {
98        LOG(EXTENSION_LOG_INFO, "Executor killing %s", name.c_str());
99    }
100
101    void start(void);
102
103    void run(void);
104
105    void stop(bool wait=true);
106
107    void schedule(ExTask &task);
108
109    void reschedule(ExTask &task);
110
111    void wake(ExTask &task);
112
113    // Changes this threads' current task to the specified task
114    void setCurrentTask(ExTask newTask);
115
116    /**
117     * Reset the currentTask shared_ptr so it 'owns' nothing
118     */
119    void resetCurrentTask();
120
121    const std::string& getName() const { return name; }
122
123    std::string getTaskName();
124
125    const std::string getTaskableName();
126
127    ProcessClock::time_point getTaskStart() const {
128        return taskStart.getTimePoint();
129    }
130
131    void updateTaskStart(void) {
132        const ProcessClock::time_point& now = ProcessClock::now();
133        taskStart.setTimePoint(now);
134        currentTask->updateLastStartTime(now);
135    }
136
137    const std::string getStateName();
138
139    ProcessClock::time_point getCurTime() const {
140        return now.getTimePoint();
141    }
142
143    void updateCurrentTime(void) {
144        now.setTimePoint(ProcessClock::now());
145    }
146
147protected:
148
149    cb_thread_t thread;
150    ExecutorPool *manager;
151    task_type_t taskType;
152    const std::string name;
153    std::atomic<executor_state_t> state;
154
155    // record of current time
156    AtomicProcessTime now;
157    AtomicProcessTime taskStart;
158
159    std::mutex currentTaskMutex; // Protects currentTask
160    ExTask currentTask;
161};
162