1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "executorpool.h"
19 #include "task.h"
20 
21 #include <cstdlib>
22 #include <iostream>
23 #include <string>
24 
ExecutorPool(size_t sz)25 ExecutorPool::ExecutorPool(size_t sz)
26     : ExecutorPool(sz, cb::defaultProcessClockSource()) {
27 }
28 
ExecutorPool(size_t sz, cb::ProcessClockSource& clock)29 ExecutorPool::ExecutorPool(size_t sz, cb::ProcessClockSource& clock) {
30     roundRobin.store(0);
31     executors.reserve(sz);
32     for (size_t ii = 0; ii < sz; ++ii) {
33         executors.emplace_back(createWorker(clock));
34     }
35 }
36 
schedule(std::shared_ptr<Task>& task, bool runnable)37 void ExecutorPool::schedule(std::shared_ptr<Task>& task, bool runnable) {
38     if (task->getMutex().try_lock()) {
39         task->getMutex().unlock();
40         throw std::logic_error(
41             "The mutex should be held when trying to schedule a event");
42     }
43 
44     executors[++roundRobin % executors.size()]->schedule(task, runnable);
45 }
46 
clockTick()47 void ExecutorPool::clockTick() {
48     for (const auto& executor : executors) {
49         executor->clockTick();
50     }
51 }
52 
waitqSize() const53 size_t ExecutorPool::waitqSize() const {
54     size_t count = 0;
55     for (const auto& executor : executors) {
56         count += executor->waitqSize();
57     }
58     return count;
59 }
60 
runqSize() const61 size_t ExecutorPool::runqSize() const {
62     size_t count = 0;
63     for (const auto& executor : executors) {
64         count += executor->runqSize();
65     }
66     return count;
67 }
68 
futureqSize() const69 size_t ExecutorPool::futureqSize() const {
70     size_t count = 0;
71     for (const auto& executor : executors) {
72         count += executor->futureqSize();
73     }
74     return count;
75 }
76