1 /*
2  *     Copyright 2013 Couchbase, Inc.
3  *
4  *   Licensed under the Apache License, Version 2.0 (the "License");
5  *   you may not use this file except in compliance with the License.
6  *   You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *   Unless required by applicable law or agreed to in writing, software
11  *   distributed under the License is distributed on an "AS IS" BASIS,
12  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *   See the License for the specific language governing permissions and
14  *   limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "config.h"
20 
21 #include "globaltask.h"
22 #include "objectregistry.h"
23 #include "task_type.h"
24 
25 #include <platform/ring_buffer.h>
26 #include <platform/processclock.h>
27 #include <relaxed_atomic.h>
28 
29 #include <atomic>
30 #include <chrono>
31 #include <deque>
32 #include <list>
33 #include <map>
34 #include <mutex>
35 #include <queue>
36 #include <string>
37 #include <utility>
38 #include <vector>
39 
40 #define TASK_LOG_SIZE 80
41 
42 #define MIN_SLEEP_TIME 2.0
43 
44 class ExecutorPool;
45 class ExecutorThread;
46 class TaskQueue;
47 class WorkLoadPolicy;
48 
49 enum executor_state_t {
50     EXECUTOR_RUNNING,
51     EXECUTOR_WAITING,
52     EXECUTOR_SLEEPING,
53     EXECUTOR_SHUTDOWN,
54     EXECUTOR_DEAD
55 };
56 
57 
58 class ExecutorThread {
59     friend class ExecutorPool;
60     friend class TaskQueue;
61 public:
62 
63     /* The AtomicProcessTime class provides an abstraction for ensuring that
64      * changes to a ProcessClock::time_point are atomic.  This is achieved by
65      * ensuring that all accesses are protected by a mutex.
66      */
67     class AtomicProcessTime {
68     public:
AtomicProcessTime()69         AtomicProcessTime() {}
AtomicProcessTime(const ProcessClock::time_point& tp)70         AtomicProcessTime(const ProcessClock::time_point& tp) : timepoint(tp) {}
71 
setTimePoint(const ProcessClock::time_point& tp)72         void setTimePoint(const ProcessClock::time_point& tp) {
73             std::lock_guard<std::mutex> lock(mutex);
74             timepoint = tp;
75         }
76 
getTimePoint() const77         ProcessClock::time_point getTimePoint() const {
78             std::lock_guard<std::mutex> lock(mutex);
79             return timepoint;
80         }
81 
82     private:
83         mutable std::mutex mutex;
84         ProcessClock::time_point timepoint;
85     };
86 
ExecutorThread(ExecutorPool* m, task_type_t type, const std::string nm)87     ExecutorThread(ExecutorPool* m, task_type_t type, const std::string nm)
88         : manager(m),
89           taskType(type),
90           name(nm),
91           state(EXECUTOR_RUNNING),
92           now(ProcessClock::now()),
93           taskStart(),
94           currentTask(NULL) {
95     }
96 
~ExecutorThread()97     ~ExecutorThread() {
98         LOG(EXTENSION_LOG_INFO, "Executor killing %s", name.c_str());
99     }
100 
101     void start(void);
102 
103     void run(void);
104 
105     void stop(bool wait=true);
106 
107     void schedule(ExTask &task);
108 
109     void reschedule(ExTask &task);
110 
111     void wake(ExTask &task);
112 
113     // Changes this threads' current task to the specified task
114     void setCurrentTask(ExTask newTask);
115 
116     /**
117      * Reset the currentTask shared_ptr so it 'owns' nothing
118      */
119     void resetCurrentTask();
120 
getName() const121     const std::string& getName() const { return name; }
122 
123     std::string getTaskName();
124 
125     const std::string getTaskableName();
126 
getTaskStart() const127     ProcessClock::time_point getTaskStart() const {
128         return taskStart.getTimePoint();
129     }
130 
updateTaskStart(void)131     void updateTaskStart(void) {
132         const ProcessClock::time_point& now = ProcessClock::now();
133         taskStart.setTimePoint(now);
134         currentTask->updateLastStartTime(now);
135     }
136 
137     const std::string getStateName();
138 
getCurTime() const139     ProcessClock::time_point getCurTime() const {
140         return now.getTimePoint();
141     }
142 
updateCurrentTime(void)143     void updateCurrentTime(void) {
144         now.setTimePoint(ProcessClock::now());
145     }
146 
147 protected:
148 
149     cb_thread_t thread;
150     ExecutorPool *manager;
151     task_type_t taskType;
152     const std::string name;
153     std::atomic<executor_state_t> state;
154 
155     // record of current time
156     AtomicProcessTime now;
157     AtomicProcessTime taskStart;
158 
159     std::mutex currentTaskMutex; // Protects currentTask
160     ExTask currentTask;
161 };
162