173dac325Sabhinavdangeti/*
273dac325Sabhinavdangeti *     Copyright 2013 Couchbase, Inc.
373dac325Sabhinavdangeti *
473dac325Sabhinavdangeti *   Licensed under the Apache License, Version 2.0 (the "License");
573dac325Sabhinavdangeti *   you may not use this file except in compliance with the License.
673dac325Sabhinavdangeti *   You may obtain a copy of the License at
773dac325Sabhinavdangeti *
873dac325Sabhinavdangeti *       http://www.apache.org/licenses/LICENSE-2.0
973dac325Sabhinavdangeti *
1073dac325Sabhinavdangeti *   Unless required by applicable law or agreed to in writing, software
1173dac325Sabhinavdangeti *   distributed under the License is distributed on an "AS IS" BASIS,
1273dac325Sabhinavdangeti *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1373dac325Sabhinavdangeti *   See the License for the specific language governing permissions and
1473dac325Sabhinavdangeti *   limitations under the License.
1573dac325Sabhinavdangeti */
1673dac325Sabhinavdangeti
1711d62689SDave Rigby#pragma once
1873dac325Sabhinavdangeti
1973dac325Sabhinavdangeti#include "config.h"
2073dac325Sabhinavdangeti
2174551d34SDave Rigby#include "globaltask.h"
2211d62689SDave Rigby#include "objectregistry.h"
2311d62689SDave Rigby#include "task_type.h"
2411d62689SDave Rigby
2511d62689SDave Rigby#include <platform/ring_buffer.h>
2611d62689SDave Rigby#include <platform/processclock.h>
2711d62689SDave Rigby#include <relaxed_atomic.h>
2811d62689SDave Rigby
2966bb41adSDave Rigby#include <atomic>
30d8577c54SDaniel Owen#include <chrono>
31b926395aSMike Wiederhold#include <deque>
3273dac325Sabhinavdangeti#include <list>
3373dac325Sabhinavdangeti#include <map>
34d8577c54SDaniel Owen#include <mutex>
35b926395aSMike Wiederhold#include <queue>
3673dac325Sabhinavdangeti#include <string>
37b926395aSMike Wiederhold#include <utility>
38b926395aSMike Wiederhold#include <vector>
39b926395aSMike Wiederhold
4082430da3SSundar Sridharan#define TASK_LOG_SIZE 80
4173dac325Sabhinavdangeti
42bde80621SSundar Sridharan#define MIN_SLEEP_TIME 2.0
4373dac325Sabhinavdangeti
4473dac325Sabhinavdangeticlass ExecutorPool;
453b9a12eaSSundar Sridharanclass ExecutorThread;
46b6534b8aSSundar Sridharanclass TaskQueue;
47bde80621SSundar Sridharanclass WorkLoadPolicy;
4873dac325Sabhinavdangeti
49e2ebaad3Sabhinavdangetienum executor_state_t {
5073dac325Sabhinavdangeti    EXECUTOR_RUNNING,
51c8df8f70SJin Lim    EXECUTOR_WAITING,
52c8df8f70SJin Lim    EXECUTOR_SLEEPING,
5373dac325Sabhinavdangeti    EXECUTOR_SHUTDOWN,
5473dac325Sabhinavdangeti    EXECUTOR_DEAD
55e2ebaad3Sabhinavdangeti};
5673dac325Sabhinavdangeti
57bde80621SSundar Sridharan
5873dac325Sabhinavdangeticlass ExecutorThread {
593b9a12eaSSundar Sridharan    friend class ExecutorPool;
60b6534b8aSSundar Sridharan    friend class TaskQueue;
6173dac325Sabhinavdangetipublic:
6273dac325Sabhinavdangeti
63d8577c54SDaniel Owen    /* The AtomicProcessTime class provides an abstraction for ensuring that
64d8577c54SDaniel Owen     * changes to a ProcessClock::time_point are atomic.  This is achieved by
65d8577c54SDaniel Owen     * ensuring that all accesses are protected by a mutex.
66d8577c54SDaniel Owen     */
67d8577c54SDaniel Owen    class AtomicProcessTime {
68d8577c54SDaniel Owen    public:
69d8577c54SDaniel Owen        AtomicProcessTime() {}
70d8577c54SDaniel Owen        AtomicProcessTime(const ProcessClock::time_point& tp) : timepoint(tp) {}
71d8577c54SDaniel Owen
72d8577c54SDaniel Owen        void setTimePoint(const ProcessClock::time_point& tp) {
73d8577c54SDaniel Owen            std::lock_guard<std::mutex> lock(mutex);
74d8577c54SDaniel Owen            timepoint = tp;
75d8577c54SDaniel Owen        }
76d8577c54SDaniel Owen
77d8577c54SDaniel Owen        ProcessClock::time_point getTimePoint() const {
78d8577c54SDaniel Owen            std::lock_guard<std::mutex> lock(mutex);
79d8577c54SDaniel Owen            return timepoint;
80d8577c54SDaniel Owen        }
81d8577c54SDaniel Owen
82d8577c54SDaniel Owen    private:
83d8577c54SDaniel Owen        mutable std::mutex mutex;
84d8577c54SDaniel Owen        ProcessClock::time_point timepoint;
85d8577c54SDaniel Owen    };
86d8577c54SDaniel Owen
87e7ce1c4fSJames Harrison    ExecutorThread(ExecutorPool* m, task_type_t type, const std::string nm)
88e7ce1c4fSJames Harrison        : manager(m),
89e7ce1c4fSJames Harrison          taskType(type),
90e7ce1c4fSJames Harrison          name(nm),
91d8577c54SDaniel Owen          state(EXECUTOR_RUNNING),
92d8577c54SDaniel Owen          now(ProcessClock::now()),
93d8577c54SDaniel Owen          taskStart(),
94e7ce1c4fSJames Harrison          currentTask(NULL) {
95e7ce1c4fSJames Harrison    }
9673dac325Sabhinavdangeti
9773dac325Sabhinavdangeti    ~ExecutorThread() {
98c8df9b58SJin Lim        LOG(EXTENSION_LOG_INFO, "Executor killing %s", name.c_str());
9973dac325Sabhinavdangeti    }
10073dac325Sabhinavdangeti
1013b9a12eaSSundar Sridharan    void start(void);
10273dac325Sabhinavdangeti
1033b9a12eaSSundar Sridharan    void run(void);
10473dac325Sabhinavdangeti
1053b9a12eaSSundar Sridharan    void stop(bool wait=true);
10673dac325Sabhinavdangeti
10773dac325Sabhinavdangeti    void schedule(ExTask &task);
10873dac325Sabhinavdangeti
10973dac325Sabhinavdangeti    void reschedule(ExTask &task);
11073dac325Sabhinavdangeti
11173dac325Sabhinavdangeti    void wake(ExTask &task);
11273dac325Sabhinavdangeti
113052a2cb2SDave Rigby    // Changes this threads' current task to the specified task
114052a2cb2SDave Rigby    void setCurrentTask(ExTask newTask);
115052a2cb2SDave Rigby
116f48fae53SJim Walker    /**
117f48fae53SJim Walker     * Reset the currentTask shared_ptr so it 'owns' nothing
118f48fae53SJim Walker     */
119f48fae53SJim Walker    void resetCurrentTask();
120f48fae53SJim Walker
1213b9a12eaSSundar Sridharan    const std::string& getName() const { return name; }
12273dac325Sabhinavdangeti
12344a3fd5dSDave Rigby    std::string getTaskName();
12495215e1fSJin Lim
12511d62689SDave Rigby    const std::string getTaskableName();
126b09116f0Sabhinavdangeti
127d8577c54SDaniel Owen    ProcessClock::time_point getTaskStart() const {
128d8577c54SDaniel Owen        return taskStart.getTimePoint();
129d8577c54SDaniel Owen    }
130d8577c54SDaniel Owen
131d8577c54SDaniel Owen    void updateTaskStart(void) {
1327dfe4a57SJames Harrison        const ProcessClock::time_point& now = ProcessClock::now();
1337dfe4a57SJames Harrison        taskStart.setTimePoint(now);
1347dfe4a57SJames Harrison        currentTask->updateLastStartTime(now);
135d8577c54SDaniel Owen    }
1362ed6ef98Sabhinavdangeti
137c8df8f70SJin Lim    const std::string getStateName();
138c8df8f70SJin Lim
139d8577c54SDaniel Owen    ProcessClock::time_point getCurTime() const {
140d8577c54SDaniel Owen        return now.getTimePoint();
141d8577c54SDaniel Owen    }
142e091cfa9SSundar Sridharan
143d8577c54SDaniel Owen    void updateCurrentTime(void) {
144d8577c54SDaniel Owen        now.setTimePoint(ProcessClock::now());
145d8577c54SDaniel Owen    }
146e091cfa9SSundar Sridharan
147c9a6de87SDave Rigbyprotected:
14873dac325Sabhinavdangeti
1494db168a3STrond Norbye    cb_thread_t thread;
150cf188e70STrond Norbye    ExecutorPool *manager;
151e7ce1c4fSJames Harrison    task_type_t taskType;
15273dac325Sabhinavdangeti    const std::string name;
15366bb41adSDave Rigby    std::atomic<executor_state_t> state;
154cf188e70STrond Norbye
155d8577c54SDaniel Owen    // record of current time
156d8577c54SDaniel Owen    AtomicProcessTime now;
157d8577c54SDaniel Owen    AtomicProcessTime taskStart;
158052a2cb2SDave Rigby
15966bb41adSDave Rigby    std::mutex currentTaskMutex; // Protects currentTask
16095215e1fSJin Lim    ExTask currentTask;
16173dac325Sabhinavdangeti};
162