1 /*
2  *     Copyright 2014 Couchbase, Inc.
3  *
4  *   Licensed under the Apache License, Version 2.0 (the "License");
5  *   you may not use this file except in compliance with the License.
6  *   You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *   Unless required by applicable law or agreed to in writing, software
11  *   distributed under the License is distributed on an "AS IS" BASIS,
12  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *   See the License for the specific language governing permissions and
14  *   limitations under the License.
15  */
16 
17 #ifndef SRC_TASKQUEUE_H_
18 #define SRC_TASKQUEUE_H_ 1
19 
20 #include "config.h"
21 
22 #include "futurequeue.h"
23 #include "syncobject.h"
24 #include "task_type.h"
25 
26 #include <platform/processclock.h>
27 
28 #include <list>
29 #include <queue>
30 
31 class ExecutorPool;
32 class ExecutorThread;
33 
34 class TaskQueue {
35     friend class ExecutorPool;
36 public:
37     TaskQueue(ExecutorPool *m, task_type_t t, const char *nm);
38     ~TaskQueue();
39 
40     void schedule(ExTask &task);
41 
42     ProcessClock::time_point reschedule(ExTask &task);
43 
44     void checkPendingQueue(void);
45 
46     void doWake(size_t &numToWake);
47 
48     bool fetchNextTask(ExecutorThread &thread, bool toSleep);
49 
50     void wake(ExTask &task);
51 
52     static const std::string taskType2Str(task_type_t type);
53 
54     const std::string getName() const;
55 
getQueueType() const56     const task_type_t getQueueType() const { return queueType; }
57 
58     size_t getReadyQueueSize();
59 
60     size_t getFutureQueueSize();
61 
62     size_t getPendingQueueSize();
63 
snooze(ExTask& task, const double secs)64     void snooze(ExTask& task, const double secs) {
65         futureQueue.snooze(task, secs);
66     }
67 
68 private:
69     void _schedule(ExTask &task);
70     ProcessClock::time_point _reschedule(ExTask &task);
71     void _checkPendingQueue(void);
72     bool _fetchNextTask(ExecutorThread &thread, bool toSleep);
73     void _wake(ExTask &task);
74     bool _doSleep(ExecutorThread &thread, std::unique_lock<std::mutex>& lock);
75     void _doWake_UNLOCKED(size_t &numToWake);
76     size_t _moveReadyTasks(const ProcessClock::time_point tv);
77     ExTask _popReadyTask(void);
78 
79     SyncObject mutex;
80     const std::string name;
81     task_type_t queueType;
82     ExecutorPool *manager;
83     size_t sleepers; // number of threads sleeping in this taskQueue
84 
85     // sorted by task priority.
86     std::priority_queue<ExTask, std::deque<ExTask>,
87                         CompareByPriority> readyQueue;
88 
89     // sorted by waketime. Guarded by `mutex`.
90     FutureQueue<> futureQueue;
91 
92     std::list<ExTask> pendingQueue;
93 };
94 
95 #endif  // SRC_TASKQUEUE_H_
96