1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "executorpool.h"
19 #include "executor.h"
20 #include "task.h"
21 
22 #include <platform/processclock.h>
23 #include <cstdlib>
24 #include <iostream>
25 #include <string>
26 
ExecutorPool(size_t sz)27 cb::ExecutorPool::ExecutorPool(size_t sz)
28     : ExecutorPool(sz, cb::defaultProcessClockSource()) {
29 }
30 
ExecutorPool(size_t sz, cb::ProcessClockSource& clock)31 cb::ExecutorPool::ExecutorPool(size_t sz, cb::ProcessClockSource& clock) {
32     roundRobin.store(0);
33     executors.reserve(sz);
34     for (size_t ii = 0; ii < sz; ++ii) {
35         executors.emplace_back(createWorker(clock));
36     }
37 }
38 
schedule(std::shared_ptr<Task>& task, bool runnable)39 void cb::ExecutorPool::schedule(std::shared_ptr<Task>& task, bool runnable) {
40     if (task->getMutex().try_lock()) {
41         task->getMutex().unlock();
42         throw std::logic_error(
43             "The mutex should be held when trying to schedule a event");
44     }
45 
46     executors[++roundRobin % executors.size()]->schedule(task, runnable);
47 }
48 
clockTick()49 void cb::ExecutorPool::clockTick() {
50     for (const auto& executor : executors) {
51         executor->clockTick();
52     }
53 }
54 
waitqSize() const55 size_t cb::ExecutorPool::waitqSize() const {
56     size_t count = 0;
57     for (const auto& executor : executors) {
58         count += executor->waitqSize();
59     }
60     return count;
61 }
62 
runqSize() const63 size_t cb::ExecutorPool::runqSize() const {
64     size_t count = 0;
65     for (const auto& executor : executors) {
66         count += executor->runqSize();
67     }
68     return count;
69 }
70 
futureqSize() const71 size_t cb::ExecutorPool::futureqSize() const {
72     size_t count = 0;
73     for (const auto& executor : executors) {
74         count += executor->futureqSize();
75     }
76     return count;
77 }
78