11c55c549STrond Norbye/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
21c55c549STrond Norbye/*
31c55c549STrond Norbye *     Copyright 2013 Couchbase, Inc.
41c55c549STrond Norbye *
51c55c549STrond Norbye *   Licensed under the Apache License, Version 2.0 (the "License");
61c55c549STrond Norbye *   you may not use this file except in compliance with the License.
71c55c549STrond Norbye *   You may obtain a copy of the License at
81c55c549STrond Norbye *
91c55c549STrond Norbye *       http://www.apache.org/licenses/LICENSE-2.0
101c55c549STrond Norbye *
111c55c549STrond Norbye *   Unless required by applicable law or agreed to in writing, software
121c55c549STrond Norbye *   distributed under the License is distributed on an "AS IS" BASIS,
131c55c549STrond Norbye *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
141c55c549STrond Norbye *   See the License for the specific language governing permissions and
151c55c549STrond Norbye *   limitations under the License.
161c55c549STrond Norbye */
171c55c549STrond Norbye
181c55c549STrond Norbye#include "config.h"
191c55c549STrond Norbye
20e7754280SDave Rigby#include "ep_engine.h"
2145a7a9feSJames Harrison#include "ep_time.h"
221c55c549STrond Norbye#include "executorpool.h"
2315aed944SMike Wiederhold#include "executorthread.h"
2445a7a9feSJames Harrison#include "statwriter.h"
2545a7a9feSJames Harrison#include "taskqueue.h"
261c55c549STrond Norbye
277dfe4a57SJames Harrison#include <cJSON_utils.h>
287dfe4a57SJames Harrison#include <platform/checked_snprintf.h>
297dfe4a57SJames Harrison#include <platform/processclock.h>
30927920a8SDave Rigby#include <platform/string.h>
317dfe4a57SJames Harrison#include <platform/sysinfo.h>
327dfe4a57SJames Harrison#include <algorithm>
337dfe4a57SJames Harrison#include <chrono>
347dfe4a57SJames Harrison#include <queue>
357dfe4a57SJames Harrison#include <sstream>
367dfe4a57SJames Harrison
3766bb41adSDave Rigbystd::mutex ExecutorPool::initGuard;
38bd1c6438SDave Rigbystd::atomic<ExecutorPool*> ExecutorPool::instance;
391c55c549STrond Norbye
403aaaaa4fSSundar Sridharanstatic const size_t EP_MIN_NUM_THREADS    = 10;
413aaaaa4fSSundar Sridharanstatic const size_t EP_MIN_READER_THREADS = 4;
423aaaaa4fSSundar Sridharanstatic const size_t EP_MIN_WRITER_THREADS = 4;
43890f7994SJim Walkerstatic const size_t EP_MIN_NONIO_THREADS = 2;
44890f7994SJim Walker
453aaaaa4fSSundar Sridharan
466090dc26SChiyoung Seostatic const size_t EP_MAX_READER_THREADS = 12;
473aaaaa4fSSundar Sridharanstatic const size_t EP_MAX_WRITER_THREADS = 8;
483aaaaa4fSSundar Sridharanstatic const size_t EP_MAX_AUXIO_THREADS  = 8;
493aaaaa4fSSundar Sridharanstatic const size_t EP_MAX_NONIO_THREADS  = 8;
50afa8f9afSSundar Sridharan
511c55c549STrond Norbyesize_t ExecutorPool::getNumNonIO(void) {
52890f7994SJim Walker    // 1. compute: 30% of total threads
53890f7994SJim Walker    size_t count = maxGlobalThreads * 0.3;
54890f7994SJim Walker
553aaaaa4fSSundar Sridharan    // 2. adjust computed value to be within range
56890f7994SJim Walker    count = std::min(EP_MAX_NONIO_THREADS,
57890f7994SJim Walker                     std::max(EP_MIN_NONIO_THREADS, count));
58890f7994SJim Walker
593aaaaa4fSSundar Sridharan    // 3. pick user's value if specified
60e7ce1c4fSJames Harrison    if (numWorkers[NONIO_TASK_IDX]) {
61e7ce1c4fSJames Harrison        count = numWorkers[NONIO_TASK_IDX];
623aaaaa4fSSundar Sridharan    }
633aaaaa4fSSundar Sridharan    return count;
64c29bbd99SSundar Sridharan}
65c29bbd99SSundar Sridharan
661c55c549STrond Norbyesize_t ExecutorPool::getNumAuxIO(void) {
673aaaaa4fSSundar Sridharan    // 1. compute: ceil of 10% of total threads
681c55c549STrond Norbye    size_t count = maxGlobalThreads / 10;
693aaaaa4fSSundar Sridharan    if (!count || maxGlobalThreads % 10) {
703aaaaa4fSSundar Sridharan        count++;
713aaaaa4fSSundar Sridharan    }
723aaaaa4fSSundar Sridharan    // 2. adjust computed value to be within range
733aaaaa4fSSundar Sridharan    if (count > EP_MAX_AUXIO_THREADS) {
743aaaaa4fSSundar Sridharan        count = EP_MAX_AUXIO_THREADS;
753aaaaa4fSSundar Sridharan    }
763aaaaa4fSSundar Sridharan    // 3. Override with user's value if specified
77e7ce1c4fSJames Harrison    if (numWorkers[AUXIO_TASK_IDX]) {
78e7ce1c4fSJames Harrison        count = numWorkers[AUXIO_TASK_IDX];
793aaaaa4fSSundar Sridharan    }
803aaaaa4fSSundar Sridharan    return count;
81c29bbd99SSundar Sridharan}
82c29bbd99SSundar Sridharan
831c55c549STrond Norbyesize_t ExecutorPool::getNumWriters(void) {
843aaaaa4fSSundar Sridharan    size_t count = 0;
853aaaaa4fSSundar Sridharan    // 1. compute: floor of Half of what remains after nonIO, auxIO threads
863aaaaa4fSSundar Sridharan    if (maxGlobalThreads > (getNumAuxIO() + getNumNonIO())) {
873aaaaa4fSSundar Sridharan        count = maxGlobalThreads - getNumAuxIO() - getNumNonIO();
883aaaaa4fSSundar Sridharan        count = count >> 1;
893aaaaa4fSSundar Sridharan    }
903aaaaa4fSSundar Sridharan    // 2. adjust computed value to be within range
913aaaaa4fSSundar Sridharan    if (count > EP_MAX_WRITER_THREADS) {
923aaaaa4fSSundar Sridharan        count = EP_MAX_WRITER_THREADS;
933aaaaa4fSSundar Sridharan    } else if (count < EP_MIN_WRITER_THREADS) {
943aaaaa4fSSundar Sridharan        count = EP_MIN_WRITER_THREADS;
953aaaaa4fSSundar Sridharan    }
963aaaaa4fSSundar Sridharan    // 3. Override with user's value if specified
97e7ce1c4fSJames Harrison    if (numWorkers[WRITER_TASK_IDX]) {
98e7ce1c4fSJames Harrison        count = numWorkers[WRITER_TASK_IDX];
993aaaaa4fSSundar Sridharan    }
1003aaaaa4fSSundar Sridharan    return count;
1011c55c549STrond Norbye}
1021c55c549STrond Norbye
1031c55c549STrond Norbyesize_t ExecutorPool::getNumReaders(void) {
1043aaaaa4fSSundar Sridharan    size_t count = 0;
1053aaaaa4fSSundar Sridharan    // 1. compute: what remains after writers, nonIO & auxIO threads are taken
1063aaaaa4fSSundar Sridharan    if (maxGlobalThreads >
1073aaaaa4fSSundar Sridharan            (getNumWriters() + getNumAuxIO() + getNumNonIO())) {
1083aaaaa4fSSundar Sridharan        count = maxGlobalThreads
1093aaaaa4fSSundar Sridharan              - getNumWriters() - getNumAuxIO() - getNumNonIO();
1103aaaaa4fSSundar Sridharan    }
1113aaaaa4fSSundar Sridharan    // 2. adjust computed value to be within range
1123aaaaa4fSSundar Sridharan    if (count > EP_MAX_READER_THREADS) {
1133aaaaa4fSSundar Sridharan        count = EP_MAX_READER_THREADS;
1143aaaaa4fSSundar Sridharan    } else if (count < EP_MIN_READER_THREADS) {
1153aaaaa4fSSundar Sridharan        count = EP_MIN_READER_THREADS;
1163aaaaa4fSSundar Sridharan    }
1173aaaaa4fSSundar Sridharan    // 3. Override with user's value if specified
118e7ce1c4fSJames Harrison    if (numWorkers[READER_TASK_IDX]) {
119e7ce1c4fSJames Harrison        count = numWorkers[READER_TASK_IDX];
1203aaaaa4fSSundar Sridharan    }
1213aaaaa4fSSundar Sridharan    return count;
1221c55c549STrond Norbye}
1231c55c549STrond Norbye
1241c55c549STrond NorbyeExecutorPool *ExecutorPool::get(void) {
125bd1c6438SDave Rigby    auto* tmp = instance.load();
126bd1c6438SDave Rigby    if (tmp == nullptr) {
1271c55c549STrond Norbye        LockHolder lh(initGuard);
128bd1c6438SDave Rigby        tmp = instance.load();
129bd1c6438SDave Rigby        if (tmp == nullptr) {
130bd1c6438SDave Rigby            // Double-checked locking if instance is null - ensure two threads
131bd1c6438SDave Rigby            // don't both create an instance.
132bd1c6438SDave Rigby
1331c55c549STrond Norbye            Configuration &config =
1341c55c549STrond Norbye                ObjectRegistry::getCurrentEngine()->getConfiguration();
135c29bbd99SSundar Sridharan            EventuallyPersistentEngine *epe =
136c29bbd99SSundar Sridharan                                   ObjectRegistry::onSwitchThread(NULL, true);
137bd1c6438SDave Rigby            tmp = new ExecutorPool(config.getMaxThreads(),
138b20b9600SJames Harrison                                   NUM_TASK_GROUPS,
139b20b9600SJames Harrison                                   config.getNumReaderThreads(),
140b20b9600SJames Harrison                                   config.getNumWriterThreads(),
141b20b9600SJames Harrison                                   config.getNumAuxioThreads(),
142b20b9600SJames Harrison                                   config.getNumNonioThreads());
1432900340fSChiyoung Seo            ObjectRegistry::onSwitchThread(epe);
144bd1c6438SDave Rigby            instance.store(tmp);
1451c55c549STrond Norbye        }
1461c55c549STrond Norbye    }
147bd1c6438SDave Rigby    return tmp;
1481c55c549STrond Norbye}
1491c55c549STrond Norbye
150a2285797SDave Rigbyvoid ExecutorPool::shutdown(void) {
151bd1c6438SDave Rigby    std::lock_guard<std::mutex> lock(initGuard);
152bd1c6438SDave Rigby    auto* tmp = instance.load();
153bd1c6438SDave Rigby    if (tmp != nullptr) {
154bd1c6438SDave Rigby        delete tmp;
155a2285797SDave Rigby        instance = nullptr;
156a2285797SDave Rigby    }
157a2285797SDave Rigby}
158a2285797SDave Rigby
159c29bbd99SSundar SridharanExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
160c29bbd99SSundar Sridharan                           size_t maxReaders, size_t maxWriters,
161c29bbd99SSundar Sridharan                           size_t maxAuxIO,   size_t maxNonIO) :
1623aaaaa4fSSundar Sridharan                  numTaskSets(nTaskSets), totReadyTasks(0),
163160e8b18SMike Wiederhold                  isHiPrioQset(false), isLowPrioQset(false), numBuckets(0),
164d36551bcSEugen-Alexandru Virtan                  numSleepers(0), curWorkers(nTaskSets), numWorkers(nTaskSets),
165d36551bcSEugen-Alexandru Virtan                  numReadyTasks(nTaskSets) {
166455fe779SDave Rigby    size_t numCPU = Couchbase::get_available_cpu_count();
167afa8f9afSSundar Sridharan    size_t numThreads = (size_t)((numCPU * 3)/4);
1683aaaaa4fSSundar Sridharan    numThreads = (numThreads < EP_MIN_NUM_THREADS) ?
1693aaaaa4fSSundar Sridharan                        EP_MIN_NUM_THREADS : numThreads;
170afa8f9afSSundar Sridharan    maxGlobalThreads = maxThreads ? maxThreads : numThreads;
1711c55c549STrond Norbye    for (size_t i = 0; i < nTaskSets; i++) {
172b6534b8aSSundar Sridharan        curWorkers[i] = 0;
17380656224SSundar Sridharan        numReadyTasks[i] = 0;
1741c55c549STrond Norbye    }
175e7ce1c4fSJames Harrison    numWorkers[WRITER_TASK_IDX] = maxWriters;
176e7ce1c4fSJames Harrison    numWorkers[READER_TASK_IDX] = maxReaders;
177e7ce1c4fSJames Harrison    numWorkers[AUXIO_TASK_IDX] = maxAuxIO;
178e7ce1c4fSJames Harrison    numWorkers[NONIO_TASK_IDX] = maxNonIO;
1791c55c549STrond Norbye}
1801c55c549STrond Norbye
1811c55c549STrond NorbyeExecutorPool::~ExecutorPool(void) {
1822db3059dSDave Rigby    _stopAndJoinThreads();
1832db3059dSDave Rigby
1841c55c549STrond Norbye    if (isHiPrioQset) {
1851c55c549STrond Norbye        for (size_t i = 0; i < numTaskSets; i++) {
1861c55c549STrond Norbye            delete hpTaskQ[i];
1871c55c549STrond Norbye        }
1881c55c549STrond Norbye    }
1891c55c549STrond Norbye    if (isLowPrioQset) {
1901c55c549STrond Norbye        for (size_t i = 0; i < numTaskSets; i++) {
1911c55c549STrond Norbye            delete lpTaskQ[i];
1921c55c549STrond Norbye        }
1931c55c549STrond Norbye    }
1941c55c549STrond Norbye}
1951c55c549STrond Norbye
1961c55c549STrond Norbye// To prevent starvation of low priority queues, we define their
1971c55c549STrond Norbye// polling frequencies as follows ...
1981c55c549STrond Norbye#define LOW_PRIORITY_FREQ 5 // 1 out of 5 times threads check low priority Q
1991c55c549STrond Norbye
2002900340fSChiyoung SeoTaskQueue *ExecutorPool::_nextTask(ExecutorThread &t, uint8_t tick) {
2011c55c549STrond Norbye    if (!tick) {
2021c55c549STrond Norbye        return NULL;
2031c55c549STrond Norbye    }
2041c55c549STrond Norbye
205e7ce1c4fSJames Harrison    task_type_t myq = t.taskType;
2064b0aa9eaSSundar Sridharan    TaskQueue *checkQ; // which TaskQueue set should be polled first
2074b0aa9eaSSundar Sridharan    TaskQueue *checkNextQ; // which set of TaskQueue should be polled next
2084b0aa9eaSSundar Sridharan    TaskQueue *toggle = NULL;
209b6534b8aSSundar Sridharan    if ( !(tick % LOW_PRIORITY_FREQ)) { // if only 1 Q set, both point to it
2104b0aa9eaSSundar Sridharan        checkQ = isLowPrioQset ? lpTaskQ[myq] :
2114b0aa9eaSSundar Sridharan                (isHiPrioQset ? hpTaskQ[myq] : NULL);
2124b0aa9eaSSundar Sridharan        checkNextQ = isHiPrioQset ? hpTaskQ[myq] : checkQ;
213b6534b8aSSundar Sridharan    } else {
2144b0aa9eaSSundar Sridharan        checkQ = isHiPrioQset ? hpTaskQ[myq] :
2154b0aa9eaSSundar Sridharan                (isLowPrioQset ? lpTaskQ[myq] : NULL);
2164b0aa9eaSSundar Sridharan        checkNextQ = isLowPrioQset ? lpTaskQ[myq] : checkQ;
217b6534b8aSSundar Sridharan    }
21880656224SSundar Sridharan    while (t.state == EXECUTOR_RUNNING) {
219b6534b8aSSundar Sridharan        if (checkQ &&
2204b0aa9eaSSundar Sridharan            checkQ->fetchNextTask(t, false)) {
2214b0aa9eaSSundar Sridharan            return checkQ;
2221c55c549STrond Norbye        }
22380656224SSundar Sridharan        if (toggle || checkQ == checkNextQ) {
2244b0aa9eaSSundar Sridharan            TaskQueue *sleepQ = getSleepQ(myq);
22580656224SSundar Sridharan            if (sleepQ->fetchNextTask(t, true)) {
22680656224SSundar Sridharan                return sleepQ;
227fab04982SSundar Sridharan            } else {
228fab04982SSundar Sridharan                return NULL;
2294b0aa9eaSSundar Sridharan            }
2301c55c549STrond Norbye        }
23180656224SSundar Sridharan        toggle = checkQ;
23280656224SSundar Sridharan        checkQ = checkNextQ;
23380656224SSundar Sridharan        checkNextQ = toggle;
2341c55c549STrond Norbye    }
2351c55c549STrond Norbye    return NULL;
2361c55c549STrond Norbye}
2371c55c549STrond Norbye
2382900340fSChiyoung SeoTaskQueue *ExecutorPool::nextTask(ExecutorThread &t, uint8_t tick) {
2392900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
2402900340fSChiyoung Seo    TaskQueue *tq = _nextTask(t, tick);
2412900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
2422900340fSChiyoung Seo    return tq;
2432900340fSChiyoung Seo}
2442900340fSChiyoung Seo
24580656224SSundar Sridharanvoid ExecutorPool::addWork(size_t newWork, task_type_t qType) {
246b6534b8aSSundar Sridharan    if (newWork) {
24780656224SSundar Sridharan        totReadyTasks.fetch_add(newWork);
24880656224SSundar Sridharan        numReadyTasks[qType].fetch_add(newWork);
249d197ec73SSundar Sridharan    }
250d197ec73SSundar Sridharan}
2511c55c549STrond Norbye
25280656224SSundar Sridharanvoid ExecutorPool::lessWork(task_type_t qType) {
253b008e0cfSDave Rigby    if (numReadyTasks[qType].load() == 0) {
254b008e0cfSDave Rigby        throw std::logic_error("ExecutorPool::lessWork: number of ready "
255b008e0cfSDave Rigby                "tasks on qType " + std::to_string(qType) + " is zero");
256b008e0cfSDave Rigby    }
25780656224SSundar Sridharan    numReadyTasks[qType]--;
25880656224SSundar Sridharan    totReadyTasks--;
259b6534b8aSSundar Sridharan}
260b6534b8aSSundar Sridharan
261e7ce1c4fSJames Harrisonvoid ExecutorPool::startWork(task_type_t taskType) {
262e7ce1c4fSJames Harrison    if (taskType == NO_TASK_TYPE || taskType == NUM_TASK_GROUPS) {
263e7ce1c4fSJames Harrison        throw std::logic_error(
264e7ce1c4fSJames Harrison                "ExecutorPool::startWork: worker is starting task with invalid "
265e7ce1c4fSJames Harrison                "type {" +
266e7ce1c4fSJames Harrison                std::to_string(taskType) + "}");
267e7ce1c4fSJames Harrison    } else {
268e7ce1c4fSJames Harrison        ++curWorkers[taskType];
269e7ce1c4fSJames Harrison        LOG(EXTENSION_LOG_DEBUG,
270e7ce1c4fSJames Harrison            "Taking up work in task "
271e7ce1c4fSJames Harrison            "type:{%" PRIu32 "} "
272e7ce1c4fSJames Harrison            "current:{%" PRIu16 "}, max:{%" PRIu16 "}",
273e7ce1c4fSJames Harrison            taskType,
274e7ce1c4fSJames Harrison            curWorkers[taskType].load(),
275e7ce1c4fSJames Harrison            numWorkers[taskType].load());
2761c55c549STrond Norbye    }
2771c55c549STrond Norbye}
2781c55c549STrond Norbye
279e7ce1c4fSJames Harrisonvoid ExecutorPool::doneWork(task_type_t taskType) {
280e7ce1c4fSJames Harrison    if (taskType == NO_TASK_TYPE || taskType == NUM_TASK_GROUPS) {
281e7ce1c4fSJames Harrison        throw std::logic_error(
282e7ce1c4fSJames Harrison                "ExecutorPool::doneWork: worker is finishing task with invalid "
283e7ce1c4fSJames Harrison                "type {" + std::to_string(taskType) + "}");
284c29bbd99SSundar Sridharan    } else {
285e7ce1c4fSJames Harrison        --curWorkers[taskType];
286e7ce1c4fSJames Harrison        // Record that a thread is done working on a particular queue type
287e7ce1c4fSJames Harrison        LOG(EXTENSION_LOG_DEBUG,
288e7ce1c4fSJames Harrison            "Done with task type:{%" PRIu32 "} capacity:{%" PRIu16 "}",
289e7ce1c4fSJames Harrison            taskType,
290e7ce1c4fSJames Harrison            numWorkers[taskType].load());
2911c55c549STrond Norbye    }
2921c55c549STrond Norbye}
2931c55c549STrond Norbye
2942900340fSChiyoung Seobool ExecutorPool::_cancel(size_t taskId, bool eraseTask) {
2951c55c549STrond Norbye    LockHolder lh(tMutex);
2961c55c549STrond Norbye    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
2971c55c549STrond Norbye    if (itr == taskLocator.end()) {
29855384e2bSDave Rigby        LOG(EXTENSION_LOG_DEBUG, "Task id %" PRIu64 " not found",
29955384e2bSDave Rigby            uint64_t(taskId));
3001c55c549STrond Norbye        return false;
3011c55c549STrond Norbye    }
3021c55c549STrond Norbye
3031c55c549STrond Norbye    ExTask task = itr->second.first;
304d26ea7d1SDave Rigby    LOG(EXTENSION_LOG_DEBUG,
30544a3fd5dSDave Rigby        "Cancel task %s id %" PRIu64 " on bucket %s %s",
30644a3fd5dSDave Rigby        task->getDescription().c_str(),
307d26ea7d1SDave Rigby        uint64_t(task->getId()),
308d26ea7d1SDave Rigby        task->getTaskable().getName().c_str(),
309d26ea7d1SDave Rigby        eraseTask ? "final erase" : "!");
3101c55c549STrond Norbye
3111c55c549STrond Norbye    task->cancel(); // must be idempotent, just set state to dead
3121c55c549STrond Norbye
3131c55c549STrond Norbye    if (eraseTask) { // only internal threads can erase tasks
314b008e0cfSDave Rigby        if (!task->isdead()) {
315d26ea7d1SDave Rigby            throw std::logic_error("ExecutorPool::_cancel: task '" +
31644a3fd5dSDave Rigby                                   task->getDescription() +
317d26ea7d1SDave Rigby                                   "' is not dead after calling "
318d26ea7d1SDave Rigby                                   "cancel() on it");
319b008e0cfSDave Rigby        }
3201c55c549STrond Norbye        taskLocator.erase(itr);
321df3730beSDave Rigby        tMutex.notify_all();
3221c55c549STrond Norbye    } else { // wake up the task from the TaskQ so a thread can safely erase it
3237e6316b6Sjim             // otherwise we may race with unregisterTaskable where a unlocated
3241c55c549STrond Norbye             // task runs in spite of its bucket getting unregistered
3251c55c549STrond Norbye        itr->second.second->wake(task);
3261c55c549STrond Norbye    }
3271c55c549STrond Norbye    return true;
3281c55c549STrond Norbye}
3291c55c549STrond Norbye
3302900340fSChiyoung Seobool ExecutorPool::cancel(size_t taskId, bool eraseTask) {
3312900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
3322900340fSChiyoung Seo    bool rv = _cancel(taskId, eraseTask);
3332900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
3342900340fSChiyoung Seo    return rv;
3352900340fSChiyoung Seo}
3362900340fSChiyoung Seo
3372900340fSChiyoung Seobool ExecutorPool::_wake(size_t taskId) {
3381c55c549STrond Norbye    LockHolder lh(tMutex);
3391c55c549STrond Norbye    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
3401c55c549STrond Norbye    if (itr != taskLocator.end()) {
3411c55c549STrond Norbye        itr->second.second->wake(itr->second.first);
3421c55c549STrond Norbye        return true;
3431c55c549STrond Norbye    }
3441c55c549STrond Norbye    return false;
3451c55c549STrond Norbye}
3461c55c549STrond Norbye
3472900340fSChiyoung Seobool ExecutorPool::wake(size_t taskId) {
3482900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
3492900340fSChiyoung Seo    bool rv = _wake(taskId);
3502900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
3512900340fSChiyoung Seo    return rv;
3522900340fSChiyoung Seo}
3532900340fSChiyoung Seo
35424c408e6SJim Walkerbool ExecutorPool::_snooze(size_t taskId, double toSleep) {
3551c55c549STrond Norbye    LockHolder lh(tMutex);
3561c55c549STrond Norbye    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
3571c55c549STrond Norbye    if (itr != taskLocator.end()) {
35824c408e6SJim Walker        itr->second.second->snooze(itr->second.first, toSleep);
3591c55c549STrond Norbye        return true;
3601c55c549STrond Norbye    }
3611c55c549STrond Norbye    return false;
3621c55c549STrond Norbye}
3631c55c549STrond Norbye
36424c408e6SJim Walkerbool ExecutorPool::snooze(size_t taskId, double toSleep) {
3652900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
36624c408e6SJim Walker    bool rv = _snooze(taskId, toSleep);
3672900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
3682900340fSChiyoung Seo    return rv;
3692900340fSChiyoung Seo}
3702900340fSChiyoung Seo
371e06c9f8aSDave RigbyTaskQueue* ExecutorPool::_getTaskQueue(const Taskable& t,
3722900340fSChiyoung Seo                                       task_type_t qidx) {
3731c55c549STrond Norbye    TaskQueue         *q             = NULL;
3741c55c549STrond Norbye    size_t            curNumThreads  = 0;
376e06c9f8aSDave Rigby    bucket_priority_t bucketPriority = t.getWorkloadPriority();
3771c55c549STrond Norbye
378a2657ecaSSriram Ganesan    if (qidx < 0 || static_cast<size_t>(qidx) >= numTaskSets) {
379b008e0cfSDave Rigby        throw std::invalid_argument("ExecutorPool::_getTaskQueue: qidx "
380b008e0cfSDave Rigby                "(which is " + std::to_string(qidx) + ") is outside the range [0,"
381b008e0cfSDave Rigby                + std::to_string(numTaskSets) + ")");
382b008e0cfSDave Rigby    }
3831c55c549STrond Norbye
3841c55c549STrond Norbye    curNumThreads = threadQ.size();
3851c55c549STrond Norbye
3861c55c549STrond Norbye    if (!bucketPriority) {
3871c55c549STrond Norbye        LOG(EXTENSION_LOG_WARNING, "Trying to schedule task for unregistered "
388e06c9f8aSDave Rigby            "bucket %s", t.getName().c_str());
3891c55c549STrond Norbye        return q;
3901c55c549STrond Norbye    }
3911c55c549STrond Norbye
3921c55c549STrond Norbye    if (curNumThreads < maxGlobalThreads) {
3931c55c549STrond Norbye        if (isHiPrioQset) {
3941c55c549STrond Norbye            q = hpTaskQ[qidx];
3951c55c549STrond Norbye        } else if (isLowPrioQset) {
3961c55c549STrond Norbye            q = lpTaskQ[qidx];
3971c55c549STrond Norbye        }
3981c55c549STrond Norbye    } else { // Max capacity Mode scheduling ...
399b008e0cfSDave Rigby        switch (bucketPriority) {
400b008e0cfSDave Rigby        case LOW_BUCKET_PRIORITY:
401b008e0cfSDave Rigby            if (lpTaskQ.size() != numTaskSets) {
402b008e0cfSDave Rigby                throw std::logic_error("ExecutorPool::_getTaskQueue: At "
403b008e0cfSDave Rigby                        "maximum capacity but low-priority taskQ size "
404b008e0cfSDave Rigby                        "(which is " + std::to_string(lpTaskQ.size()) +
405b008e0cfSDave Rigby                        ") is not " + std::to_string(numTaskSets));
406b008e0cfSDave Rigby            }
4071c55c549STrond Norbye            q = lpTaskQ[qidx];
408b008e0cfSDave Rigby            break;
409b008e0cfSDave Rigby
410b008e0cfSDave Rigby        case HIGH_BUCKET_PRIORITY:
411b008e0cfSDave Rigby            if (hpTaskQ.size() != numTaskSets) {
412b008e0cfSDave Rigby                throw std::logic_error("ExecutorPool::_getTaskQueue: At "
413b008e0cfSDave Rigby                        "maximum capacity but high-priority taskQ size "
414b008e0cfSDave Rigby                        "(which is " + std::to_string(lpTaskQ.size()) +
415b008e0cfSDave Rigby                        ") is not " + std::to_string(numTaskSets));
416b008e0cfSDave Rigby            }
4171c55c549STrond Norbye            q = hpTaskQ[qidx];
418b008e0cfSDave Rigby            break;
419b008e0cfSDave Rigby
420b008e0cfSDave Rigby        default:
421b008e0cfSDave Rigby            throw std::logic_error("ExecutorPool::_getTaskQueue: Invalid "
422b008e0cfSDave Rigby                    "bucketPriority " + std::to_string(bucketPriority));
4231c55c549STrond Norbye        }
4241c55c549STrond Norbye    }
4251c55c549STrond Norbye    return q;
4261c55c549STrond Norbye}
4271c55c549STrond Norbye
4283ca9fd41SJames Harrisonsize_t ExecutorPool::_schedule(ExTask task) {
4291c55c549STrond Norbye    LockHolder lh(tMutex);
4303f816b09SJames Harrison    const size_t taskId = task->getId();
4313f816b09SJames Harrison
4323ca9fd41SJames Harrison    TaskQueue* q = _getTaskQueue(task->getTaskable(),
433482fe784SPaolo Cocchi                                 GlobalTask::getTaskType(task->getTaskId()));
4341c55c549STrond Norbye    TaskQpair tqp(task, q);
4351c55c549STrond Norbye
4363f816b09SJames Harrison    auto result = taskLocator.insert(std::make_pair(taskId, tqp));
4373f816b09SJames Harrison
4383f816b09SJames Harrison    if (result.second) {
4393f816b09SJames Harrison        // tqp was inserted; it was not already present. Prevents multiple
4403f816b09SJames Harrison        // copies of a task being present in the task queues.
4413f816b09SJames Harrison        q->schedule(task);
4423f816b09SJames Harrison    }
4431c55c549STrond Norbye
4443f816b09SJames Harrison    return taskId;
4451c55c549STrond Norbye}
4461c55c549STrond Norbye
4473ca9fd41SJames Harrisonsize_t ExecutorPool::schedule(ExTask task) {
4482900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
4493ca9fd41SJames Harrison    size_t rv = _schedule(task);
4502900340fSChiyoung Seo    ObjectRegistry::onSwitchThread(epe);
4512900340fSChiyoung Seo    return rv;
4522900340fSChiyoung Seo}
4532900340fSChiyoung Seo
454e06c9f8aSDave Rigbyvoid ExecutorPool::_registerTaskable(Taskable& taskable) {
4551c55c549STrond Norbye    TaskQ *taskQ;
4561c55c549STrond Norbye    bool *whichQset;
4571c55c549STrond Norbye    const char *queueName;
458e06c9f8aSDave Rigby    WorkLoadPolicy &workload = taskable.getWorkLoadPolicy();
4591c55c549STrond Norbye    bucket_priority_t priority = workload.getBucketPriority();
4601c55c549STrond Norbye
4611c55c549STrond Norbye    if (priority < HIGH_BUCKET_PRIORITY) {
462e06c9f8aSDave Rigby        taskable.setWorkloadPriority(LOW_BUCKET_PRIORITY);
4631c55c549STrond Norbye        taskQ = &lpTaskQ;
4641c55c549STrond Norbye        whichQset = &isLowPrioQset;
465ddb60c3dSSundar Sridharan        queueName = "LowPrioQ_";
4667e6316b6Sjim        LOG(EXTENSION_LOG_NOTICE, "Taskable %s registered with low priority",
467e06c9f8aSDave Rigby            taskable.getName().c_str());
4681c55c549STrond Norbye    } else {
469e06c9f8aSDave Rigby        taskable.setWorkloadPriority(HIGH_BUCKET_PRIORITY);
4701c55c549STrond Norbye        taskQ = &hpTaskQ;
4711c55c549STrond Norbye        whichQset = &isHiPrioQset;
472ddb60c3dSSundar Sridharan        queueName = "HiPrioQ_";
4737e6316b6Sjim        LOG(EXTENSION_LOG_NOTICE, "Taskable %s registered with high priority",
474e06c9f8aSDave Rigby            taskable.getName().c_str());
4751c55c549STrond Norbye    }
4761c55c549STrond Norbye
477e7ce1c4fSJames Harrison    {
478e7ce1c4fSJames Harrison        LockHolder lh(tMutex);
4791c55c549STrond Norbye
480e7ce1c4fSJames Harrison        if (!(*whichQset)) {
481e7ce1c4fSJames Harrison            taskQ->reserve(numTaskSets);
482e7ce1c4fSJames Harrison            for (size_t i = 0; i < numTaskSets; ++i) {
483e7ce1c4fSJames Harrison                taskQ->push_back(
484e7ce1c4fSJames Harrison                        new TaskQueue(this, (task_type_t)i, queueName));
485e7ce1c4fSJames Harrison            }
486e7ce1c4fSJames Harrison            *whichQset = true;
4871c55c549STrond Norbye        }
4881c55c549STrond Norbye
489e7ce1c4fSJames Harrison        taskOwners.insert(&taskable);
490e7ce1c4fSJames Harrison        numBuckets++;
491e7ce1c4fSJames Harrison    }
4921c55c549STrond Norbye
4932900340fSChiyoung Seo    _startWorkers();
4941c55c549STrond Norbye}
4951c55c549STrond Norbye
496e06c9f8aSDave Rigbyvoid ExecutorPool::registerTaskable(Taskable& taskable) {
4972900340fSChiyoung Seo    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(<