xref: /3.0.3-GA/ep-engine/src/executorpool.h (revision 1172eb5d)
1/*
2 *     Copyright 2014 Couchbase, Inc.
3 *
4 *   Licensed under the Apache License, Version 2.0 (the "License");
5 *   you may not use this file except in compliance with the License.
6 *   You may obtain a copy of the License at
7 *
8 *       http://www.apache.org/licenses/LICENSE-2.0
9 *
10 *   Unless required by applicable law or agreed to in writing, software
11 *   distributed under the License is distributed on an "AS IS" BASIS,
12 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 *   See the License for the specific language governing permissions and
14 *   limitations under the License.
15 */
16
17#ifndef SRC_EXECUTORPOOL_H_
18#define SRC_EXECUTORPOOL_H_ 1
19
20#include "config.h"
21
22#include <map>
23#include <queue>
24
25#include "tasks.h"
26#include "ringbuffer.h"
27#include "task_type.h"
28
29// Forward decl
30class TaskQueue;
31class ExecutorThread;
32class TaskLogEntry;
33
34typedef std::vector<ExecutorThread *> ThreadQ;
35typedef std::pair<ExTask, TaskQueue *> TaskQpair;
36typedef std::pair<RingBuffer<TaskLogEntry>*, RingBuffer<TaskLogEntry> *>
37                                                                TaskLog;
38typedef std::vector<TaskQueue *> TaskQ;
39
40class ExecutorPool {
41public:
42
43    void moreWork(void);
44
45    void lessWork(void);
46
47    size_t doneWork(task_type_t &doneTaskType);
48
49    task_type_t tryNewWork(task_type_t newTaskType);
50
51    bool trySleep(ExecutorThread &t, struct timeval &now);
52
53    TaskQueue *nextTask(ExecutorThread &t, uint8_t tick);
54
55    bool cancel(size_t taskId, bool eraseTask=false);
56
57    bool stopTaskGroup(EventuallyPersistentEngine *e, task_type_t qidx);
58
59    bool wake(size_t taskId);
60
61    void notifyOne(void);
62
63    void notifyAll(void);
64
65    bool snooze(size_t taskId, double tosleep);
66
67    void registerBucket(EventuallyPersistentEngine *engine);
68
69    void unregisterBucket(EventuallyPersistentEngine *engine);
70
71    void doWorkerStat(EventuallyPersistentEngine *engine, const void *cookie,
72                      ADD_STAT add_stat);
73
74    void doTaskQStat(EventuallyPersistentEngine *engine, const void *cookie,
75                     ADD_STAT add_stat);
76
77    size_t getNumWorkersStat(void) { return threadQ.size(); }
78
79    size_t getNumCPU(void);
80
81    size_t getNumWorkers(void);
82
83    size_t getNumReaders(void);
84
85    size_t getNumWriters(void);
86
87    size_t getNumAuxIO(void);
88
89    size_t getNumNonIO(void);
90
91    size_t schedule(ExTask task, task_type_t qidx);
92
93    static ExecutorPool *get(void);
94
95private:
96
97    ExecutorPool(size_t m, size_t nTaskSets);
98    ~ExecutorPool(void);
99
100    bool startWorkers(void);
101
102    TaskQueue* getTaskQueue(EventuallyPersistentEngine *e, task_type_t qidx);
103    size_t maxGlobalThreads;
104    size_t numTaskSets; // safe to read lock-less not altered after creation
105
106    size_t     numReadyTasks;
107    size_t     highWaterMark; // High Water Mark for num Ready Tasks
108    SyncObject mutex; // Thread management condition var + mutex
109    // sync: numReadyTasks, highWaterMark, defaultQ
110
111    //! A mapping of task ids to Task, TaskQ in the thread pool
112    std::map<size_t, TaskQpair> taskLocator;
113
114    //A list of threads
115    ThreadQ threadQ;
116
117    // Global cross bucket priority queues where tasks get scheduled into ...
118    TaskQ hpTaskQ; // a vector array of numTaskSets elements for high priority
119    bool isHiPrioQset;
120
121    TaskQ lpTaskQ; // a vector array of numTaskSets elements for low priority
122    bool isLowPrioQset;
123
124    size_t numBuckets;
125
126    SyncObject tMutex; // to serialize taskLocator, threadQ, numBuckets access
127
128    uint16_t *curWorkers; // for every TaskSet track its no. of worker threads
129    uint16_t *maxWorkers; // and limit it to the value set here
130
131    // Singleton creation
132    static Mutex initGuard;
133    static ExecutorPool *instance;
134};
135#endif  // SRC_EXECUTORPOOL_H_
136