xref: /6.0.3/kv_engine/engines/ep/src/executorpool.cc (revision e7ed2862)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "ep_engine.h"
21#include "ep_time.h"
22#include "executorpool.h"
23#include "executorthread.h"
24#include "statwriter.h"
25#include "taskqueue.h"
26
27#include <cJSON_utils.h>
28#include <platform/checked_snprintf.h>
29#include <platform/processclock.h>
30#include <platform/string.h>
31#include <platform/sysinfo.h>
32#include <algorithm>
33#include <chrono>
34#include <queue>
35#include <sstream>
36
37std::mutex ExecutorPool::initGuard;
38std::atomic<ExecutorPool*> ExecutorPool::instance;
39
40static const size_t EP_MIN_NUM_THREADS    = 10;
41static const size_t EP_MIN_READER_THREADS = 4;
42static const size_t EP_MIN_WRITER_THREADS = 4;
43static const size_t EP_MIN_NONIO_THREADS = 2;
44
45
46static const size_t EP_MAX_READER_THREADS = 12;
47static const size_t EP_MAX_WRITER_THREADS = 8;
48static const size_t EP_MAX_AUXIO_THREADS  = 8;
49static const size_t EP_MAX_NONIO_THREADS  = 8;
50
51size_t ExecutorPool::getNumNonIO(void) {
52    // 1. compute: 30% of total threads
53    size_t count = maxGlobalThreads * 0.3;
54
55    // 2. adjust computed value to be within range
56    count = std::min(EP_MAX_NONIO_THREADS,
57                     std::max(EP_MIN_NONIO_THREADS, count));
58
59    // 3. pick user's value if specified
60    if (numWorkers[NONIO_TASK_IDX]) {
61        count = numWorkers[NONIO_TASK_IDX];
62    }
63    return count;
64}
65
66size_t ExecutorPool::getNumAuxIO(void) {
67    // 1. compute: ceil of 10% of total threads
68    size_t count = maxGlobalThreads / 10;
69    if (!count || maxGlobalThreads % 10) {
70        count++;
71    }
72    // 2. adjust computed value to be within range
73    if (count > EP_MAX_AUXIO_THREADS) {
74        count = EP_MAX_AUXIO_THREADS;
75    }
76    // 3. Override with user's value if specified
77    if (numWorkers[AUXIO_TASK_IDX]) {
78        count = numWorkers[AUXIO_TASK_IDX];
79    }
80    return count;
81}
82
83size_t ExecutorPool::getNumWriters(void) {
84    size_t count = 0;
85    // 1. compute: floor of Half of what remains after nonIO, auxIO threads
86    if (maxGlobalThreads > (getNumAuxIO() + getNumNonIO())) {
87        count = maxGlobalThreads - getNumAuxIO() - getNumNonIO();
88        count = count >> 1;
89    }
90    // 2. adjust computed value to be within range
91    if (count > EP_MAX_WRITER_THREADS) {
92        count = EP_MAX_WRITER_THREADS;
93    } else if (count < EP_MIN_WRITER_THREADS) {
94        count = EP_MIN_WRITER_THREADS;
95    }
96    // 3. Override with user's value if specified
97    if (numWorkers[WRITER_TASK_IDX]) {
98        count = numWorkers[WRITER_TASK_IDX];
99    }
100    return count;
101}
102
103size_t ExecutorPool::getNumReaders(void) {
104    size_t count = 0;
105    // 1. compute: what remains after writers, nonIO & auxIO threads are taken
106    if (maxGlobalThreads >
107            (getNumWriters() + getNumAuxIO() + getNumNonIO())) {
108        count = maxGlobalThreads
109              - getNumWriters() - getNumAuxIO() - getNumNonIO();
110    }
111    // 2. adjust computed value to be within range
112    if (count > EP_MAX_READER_THREADS) {
113        count = EP_MAX_READER_THREADS;
114    } else if (count < EP_MIN_READER_THREADS) {
115        count = EP_MIN_READER_THREADS;
116    }
117    // 3. Override with user's value if specified
118    if (numWorkers[READER_TASK_IDX]) {
119        count = numWorkers[READER_TASK_IDX];
120    }
121    return count;
122}
123
124ExecutorPool *ExecutorPool::get(void) {
125    auto* tmp = instance.load();
126    if (tmp == nullptr) {
127        LockHolder lh(initGuard);
128        tmp = instance.load();
129        if (tmp == nullptr) {
130            // Double-checked locking if instance is null - ensure two threads
131            // don't both create an instance.
132
133            Configuration &config =
134                ObjectRegistry::getCurrentEngine()->getConfiguration();
135            EventuallyPersistentEngine *epe =
136                                   ObjectRegistry::onSwitchThread(NULL, true);
137            tmp = new ExecutorPool(config.getMaxThreads(),
138                                   NUM_TASK_GROUPS,
139                                   config.getNumReaderThreads(),
140                                   config.getNumWriterThreads(),
141                                   config.getNumAuxioThreads(),
142                                   config.getNumNonioThreads());
143            ObjectRegistry::onSwitchThread(epe);
144            instance.store(tmp);
145        }
146    }
147    return tmp;
148}
149
150void ExecutorPool::shutdown(void) {
151    std::lock_guard<std::mutex> lock(initGuard);
152    auto* tmp = instance.load();
153    if (tmp != nullptr) {
154        delete tmp;
155        instance = nullptr;
156    }
157}
158
159ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
160                           size_t maxReaders, size_t maxWriters,
161                           size_t maxAuxIO,   size_t maxNonIO) :
162                  numTaskSets(nTaskSets), totReadyTasks(0),
163                  isHiPrioQset(false), isLowPrioQset(false), numBuckets(0),
164                  numSleepers(0), curWorkers(nTaskSets), numWorkers(nTaskSets),
165                  numReadyTasks(nTaskSets) {
166    size_t numCPU = Couchbase::get_available_cpu_count();
167    size_t numThreads = (size_t)((numCPU * 3)/4);
168    numThreads = (numThreads < EP_MIN_NUM_THREADS) ?
169                        EP_MIN_NUM_THREADS : numThreads;
170    maxGlobalThreads = maxThreads ? maxThreads : numThreads;
171    for (size_t i = 0; i < nTaskSets; i++) {
172        curWorkers[i] = 0;
173        numReadyTasks[i] = 0;
174    }
175    numWorkers[WRITER_TASK_IDX] = maxWriters;
176    numWorkers[READER_TASK_IDX] = maxReaders;
177    numWorkers[AUXIO_TASK_IDX] = maxAuxIO;
178    numWorkers[NONIO_TASK_IDX] = maxNonIO;
179}
180
181ExecutorPool::~ExecutorPool(void) {
182    _stopAndJoinThreads();
183
184    if (isHiPrioQset) {
185        for (size_t i = 0; i < numTaskSets; i++) {
186            delete hpTaskQ[i];
187        }
188    }
189    if (isLowPrioQset) {
190        for (size_t i = 0; i < numTaskSets; i++) {
191            delete lpTaskQ[i];
192        }
193    }
194}
195
196// To prevent starvation of low priority queues, we define their
197// polling frequencies as follows ...
198#define LOW_PRIORITY_FREQ 5 // 1 out of 5 times threads check low priority Q
199
200TaskQueue *ExecutorPool::_nextTask(ExecutorThread &t, uint8_t tick) {
201    if (!tick) {
202        return NULL;
203    }
204
205    task_type_t myq = t.taskType;
206    TaskQueue *checkQ; // which TaskQueue set should be polled first
207    TaskQueue *checkNextQ; // which set of TaskQueue should be polled next
208    TaskQueue *toggle = NULL;
209    if ( !(tick % LOW_PRIORITY_FREQ)) { // if only 1 Q set, both point to it
210        checkQ = isLowPrioQset ? lpTaskQ[myq] :
211                (isHiPrioQset ? hpTaskQ[myq] : NULL);
212        checkNextQ = isHiPrioQset ? hpTaskQ[myq] : checkQ;
213    } else {
214        checkQ = isHiPrioQset ? hpTaskQ[myq] :
215                (isLowPrioQset ? lpTaskQ[myq] : NULL);
216        checkNextQ = isLowPrioQset ? lpTaskQ[myq] : checkQ;
217    }
218    while (t.state == EXECUTOR_RUNNING) {
219        if (checkQ &&
220            checkQ->fetchNextTask(t, false)) {
221            return checkQ;
222        }
223        if (toggle || checkQ == checkNextQ) {
224            TaskQueue *sleepQ = getSleepQ(myq);
225            if (sleepQ->fetchNextTask(t, true)) {
226                return sleepQ;
227            } else {
228                return NULL;
229            }
230        }
231        toggle = checkQ;
232        checkQ = checkNextQ;
233        checkNextQ = toggle;
234    }
235    return NULL;
236}
237
238TaskQueue *ExecutorPool::nextTask(ExecutorThread &t, uint8_t tick) {
239    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
240    TaskQueue *tq = _nextTask(t, tick);
241    ObjectRegistry::onSwitchThread(epe);
242    return tq;
243}
244
245void ExecutorPool::addWork(size_t newWork, task_type_t qType) {
246    if (newWork) {
247        totReadyTasks.fetch_add(newWork);
248        numReadyTasks[qType].fetch_add(newWork);
249    }
250}
251
252void ExecutorPool::lessWork(task_type_t qType) {
253    if (numReadyTasks[qType].load() == 0) {
254        throw std::logic_error("ExecutorPool::lessWork: number of ready "
255                "tasks on qType " + std::to_string(qType) + " is zero");
256    }
257    numReadyTasks[qType]--;
258    totReadyTasks--;
259}
260
261void ExecutorPool::startWork(task_type_t taskType) {
262    if (taskType == NO_TASK_TYPE || taskType == NUM_TASK_GROUPS) {
263        throw std::logic_error(
264                "ExecutorPool::startWork: worker is starting task with invalid "
265                "type {" +
266                std::to_string(taskType) + "}");
267    } else {
268        ++curWorkers[taskType];
269        LOG(EXTENSION_LOG_DEBUG,
270            "Taking up work in task "
271            "type:{%" PRIu32 "} "
272            "current:{%" PRIu16 "}, max:{%" PRIu16 "}",
273            taskType,
274            curWorkers[taskType].load(),
275            numWorkers[taskType].load());
276    }
277}
278
279void ExecutorPool::doneWork(task_type_t taskType) {
280    if (taskType == NO_TASK_TYPE || taskType == NUM_TASK_GROUPS) {
281        throw std::logic_error(
282                "ExecutorPool::doneWork: worker is finishing task with invalid "
283                "type {" + std::to_string(taskType) + "}");
284    } else {
285        --curWorkers[taskType];
286        // Record that a thread is done working on a particular queue type
287        LOG(EXTENSION_LOG_DEBUG,
288            "Done with task type:{%" PRIu32 "} capacity:{%" PRIu16 "}",
289            taskType,
290            numWorkers[taskType].load());
291    }
292}
293
294bool ExecutorPool::_cancel(size_t taskId, bool eraseTask) {
295    LockHolder lh(tMutex);
296    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
297    if (itr == taskLocator.end()) {
298        LOG(EXTENSION_LOG_DEBUG, "Task id %" PRIu64 " not found",
299            uint64_t(taskId));
300        return false;
301    }
302
303    ExTask task = itr->second.first;
304    LOG(EXTENSION_LOG_DEBUG,
305        "Cancel task %s id %" PRIu64 " on bucket %s %s",
306        task->getDescription().c_str(),
307        uint64_t(task->getId()),
308        task->getTaskable().getName().c_str(),
309        eraseTask ? "final erase" : "!");
310
311    task->cancel(); // must be idempotent, just set state to dead
312
313    if (eraseTask) { // only internal threads can erase tasks
314        if (!task->isdead()) {
315            throw std::logic_error("ExecutorPool::_cancel: task '" +
316                                   task->getDescription() +
317                                   "' is not dead after calling "
318                                   "cancel() on it");
319        }
320        taskLocator.erase(itr);
321        tMutex.notify_all();
322    } else { // wake up the task from the TaskQ so a thread can safely erase it
323             // otherwise we may race with unregisterTaskable where a unlocated
324             // task runs in spite of its bucket getting unregistered
325        itr->second.second->wake(task);
326    }
327    return true;
328}
329
330bool ExecutorPool::cancel(size_t taskId, bool eraseTask) {
331    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
332    bool rv = _cancel(taskId, eraseTask);
333    ObjectRegistry::onSwitchThread(epe);
334    return rv;
335}
336
337bool ExecutorPool::_wake(size_t taskId) {
338    LockHolder lh(tMutex);
339    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
340    if (itr != taskLocator.end()) {
341        itr->second.second->wake(itr->second.first);
342        return true;
343    }
344    return false;
345}
346
347bool ExecutorPool::wake(size_t taskId) {
348    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
349    bool rv = _wake(taskId);
350    ObjectRegistry::onSwitchThread(epe);
351    return rv;
352}
353
354bool ExecutorPool::_snooze(size_t taskId, double toSleep) {
355    LockHolder lh(tMutex);
356    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
357    if (itr != taskLocator.end()) {
358        itr->second.second->snooze(itr->second.first, toSleep);
359        return true;
360    }
361    return false;
362}
363
364bool ExecutorPool::snooze(size_t taskId, double toSleep) {
365    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
366    bool rv = _snooze(taskId, toSleep);
367    ObjectRegistry::onSwitchThread(epe);
368    return rv;
369}
370
371TaskQueue* ExecutorPool::_getTaskQueue(const Taskable& t,
372                                       task_type_t qidx) {
373    TaskQueue         *q             = NULL;
374    size_t            curNumThreads  = 0;
375
376    bucket_priority_t bucketPriority = t.getWorkloadPriority();
377
378    if (qidx < 0 || static_cast<size_t>(qidx) >= numTaskSets) {
379        throw std::invalid_argument("ExecutorPool::_getTaskQueue: qidx "
380                "(which is " + std::to_string(qidx) + ") is outside the range [0,"
381                + std::to_string(numTaskSets) + ")");
382    }
383
384    curNumThreads = threadQ.size();
385
386    if (!bucketPriority) {
387        LOG(EXTENSION_LOG_WARNING, "Trying to schedule task for unregistered "
388            "bucket %s", t.getName().c_str());
389        return q;
390    }
391
392    if (curNumThreads < maxGlobalThreads) {
393        if (isHiPrioQset) {
394            q = hpTaskQ[qidx];
395        } else if (isLowPrioQset) {
396            q = lpTaskQ[qidx];
397        }
398    } else { // Max capacity Mode scheduling ...
399        switch (bucketPriority) {
400        case LOW_BUCKET_PRIORITY:
401            if (lpTaskQ.size() != numTaskSets) {
402                throw std::logic_error("ExecutorPool::_getTaskQueue: At "
403                        "maximum capacity but low-priority taskQ size "
404                        "(which is " + std::to_string(lpTaskQ.size()) +
405                        ") is not " + std::to_string(numTaskSets));
406            }
407            q = lpTaskQ[qidx];
408            break;
409
410        case HIGH_BUCKET_PRIORITY:
411            if (hpTaskQ.size() != numTaskSets) {
412                throw std::logic_error("ExecutorPool::_getTaskQueue: At "
413                        "maximum capacity but high-priority taskQ size "
414                        "(which is " + std::to_string(lpTaskQ.size()) +
415                        ") is not " + std::to_string(numTaskSets));
416            }
417            q = hpTaskQ[qidx];
418            break;
419
420        default:
421            throw std::logic_error("ExecutorPool::_getTaskQueue: Invalid "
422                    "bucketPriority " + std::to_string(bucketPriority));
423        }
424    }
425    return q;
426}
427
428size_t ExecutorPool::_schedule(ExTask task) {
429    LockHolder lh(tMutex);
430    const size_t taskId = task->getId();
431
432    TaskQueue* q = _getTaskQueue(task->getTaskable(),
433                                 GlobalTask::getTaskType(task->getTaskId()));
434    TaskQpair tqp(task, q);
435
436    auto result = taskLocator.insert(std::make_pair(taskId, tqp));
437
438    if (result.second) {
439        // tqp was inserted; it was not already present. Prevents multiple
440        // copies of a task being present in the task queues.
441        q->schedule(task);
442    }
443
444    return taskId;
445}
446
447size_t ExecutorPool::schedule(ExTask task) {
448    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
449    size_t rv = _schedule(task);
450    ObjectRegistry::onSwitchThread(epe);
451    return rv;
452}
453
454void ExecutorPool::_registerTaskable(Taskable& taskable) {
455    TaskQ *taskQ;
456    bool *whichQset;
457    const char *queueName;
458    WorkLoadPolicy &workload = taskable.getWorkLoadPolicy();
459    bucket_priority_t priority = workload.getBucketPriority();
460
461    if (priority < HIGH_BUCKET_PRIORITY) {
462        taskable.setWorkloadPriority(LOW_BUCKET_PRIORITY);
463        taskQ = &lpTaskQ;
464        whichQset = &isLowPrioQset;
465        queueName = "LowPrioQ_";
466        LOG(EXTENSION_LOG_NOTICE, "Taskable %s registered with low priority",
467            taskable.getName().c_str());
468    } else {
469        taskable.setWorkloadPriority(HIGH_BUCKET_PRIORITY);
470        taskQ = &hpTaskQ;
471        whichQset = &isHiPrioQset;
472        queueName = "HiPrioQ_";
473        LOG(EXTENSION_LOG_NOTICE, "Taskable %s registered with high priority",
474            taskable.getName().c_str());
475    }
476
477    {
478        LockHolder lh(tMutex);
479
480        if (!(*whichQset)) {
481            taskQ->reserve(numTaskSets);
482            for (size_t i = 0; i < numTaskSets; ++i) {
483                taskQ->push_back(
484                        new TaskQueue(this, (task_type_t)i, queueName));
485            }
486            *whichQset = true;
487        }
488
489        taskOwners.insert(&taskable);
490        numBuckets++;
491    }
492
493    _startWorkers();
494}
495
496void ExecutorPool::registerTaskable(Taskable& taskable) {
497    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
498    _registerTaskable(taskable);
499    ObjectRegistry::onSwitchThread(epe);
500}
501
502ssize_t ExecutorPool::_adjustWorkers(task_type_t type, size_t desiredNumItems) {
503    std::string typeName{to_string(type)};
504
505    // vector of threads which have been stopped
506    // and should be joined after unlocking, if any.
507    ThreadQ removed;
508
509    size_t numItems;
510
511    {
512        // Lock mutex, we are modifying threadQ
513        LockHolder lh(tMutex);
514
515        // How many threads performing this task type there are currently
516        numItems = std::count_if(
517                threadQ.begin(), threadQ.end(), [type](ExecutorThread* thread) {
518                    return thread->taskType == type;
519                });
520
521        if (numItems == desiredNumItems) {
522            return 0;
523        }
524
525        LOG(EXTENSION_LOG_NOTICE,
526            "Adjusting threads of type:%s from:%" PRIu64 " to:%" PRIu64,
527            typeName.c_str(),
528            uint64_t(numItems),
529            uint64_t(desiredNumItems));
530
531        if (numItems < desiredNumItems) {
532            // If we want to increase the number of threads, they must be
533            // created and started
534            for (size_t tidx = numItems; tidx < desiredNumItems; ++tidx) {
535                threadQ.push_back(new ExecutorThread(
536                        this,
537                        type,
538                        typeName + "_worker_" + std::to_string(tidx)));
539                threadQ.back()->start();
540            }
541        } else if (numItems > desiredNumItems) {
542            // If we want to decrease the number of threads, they must be
543            // identified in the threadQ, stopped, and removed.
544            size_t toRemove = numItems - desiredNumItems;
545
546            auto itr = threadQ.rbegin();
547            while (itr != threadQ.rend() && toRemove) {
548                if ((*itr)->taskType == type) {
549                    // stop but /don't/ join yet
550                    (*itr)->stop(false);
551
552                    // store temporarily
553                    removed.push_back(*itr);
554
555                    // remove from the threadQ
556                    itr = ThreadQ::reverse_iterator(
557                            threadQ.erase(std::next(itr).base()));
558                    --toRemove;
559                } else {
560                    ++itr;
561                }
562            }
563        }
564
565        numWorkers[type] = desiredNumItems;
566    } // release mutex
567
568    // MB-22938 wake all threads to avoid blocking if a thread is sleeping
569    // waiting for work. Without this, stopping a single thread could take
570    // up to 2s (MIN_SLEEP_TIME).
571    if (!removed.empty()) {
572        TaskQueue* sleepQ = getSleepQ(type);
573        size_t threadCount = threadQ.size();
574        sleepQ->doWake(threadCount);
575    }
576
577    // We could not join the threads while holding the lock, as some operations
578    // called from the threads (such as schedule) acquire the lock - we could
579    // have caused deadlock by waiting for the thread to complete its task and
580    // exit, while it waits to acquire the lock.
581    auto itr = removed.begin();
582    while (itr != removed.end()) {
583        (*itr)->stop(true);
584        delete (*itr);
585        itr = removed.erase(itr);
586    }
587
588    return ssize_t(desiredNumItems) - ssize_t(numItems);
589}
590
591void ExecutorPool::adjustWorkers(task_type_t type, size_t newCount) {
592    EventuallyPersistentEngine* epe =
593            ObjectRegistry::onSwitchThread(NULL, true);
594    _adjustWorkers(type, newCount);
595    ObjectRegistry::onSwitchThread(epe);
596}
597
598bool ExecutorPool::_startWorkers(void) {
599    size_t numReaders = getNumReaders();
600    size_t numWriters = getNumWriters();
601    size_t numAuxIO = getNumAuxIO();
602    size_t numNonIO = getNumNonIO();
603
604    if (!numWorkers[WRITER_TASK_IDX]) {
605        // MB-12279: Limit writers to 4 for faster bgfetches in DGM by default
606        numWriters = 4;
607    }
608
609    _adjustWorkers(READER_TASK_IDX, numReaders);
610    _adjustWorkers(WRITER_TASK_IDX, numWriters);
611    _adjustWorkers(AUXIO_TASK_IDX, numAuxIO);
612    _adjustWorkers(NONIO_TASK_IDX, numNonIO);
613
614    return true;
615}
616
617bool ExecutorPool::_stopTaskGroup(task_gid_t taskGID,
618                                  task_type_t taskType,
619                                  bool force) {
620    bool unfinishedTask;
621    bool retVal = false;
622    std::map<size_t, TaskQpair>::iterator itr;
623
624    std::unique_lock<std::mutex> lh(tMutex);
625    do {
626        ExTask task;
627        unfinishedTask = false;
628        for (itr = taskLocator.begin(); itr != taskLocator.end(); itr++) {
629            task = itr->second.first;
630            TaskQueue *q = itr->second.second;
631            if (task->getTaskable().getGID() == taskGID &&
632                (taskType == NO_TASK_TYPE || q->queueType == taskType)) {
633                LOG(EXTENSION_LOG_NOTICE,
634                    "Stopping Task id %" PRIu64 " %s %s",
635                    uint64_t(task->getId()),
636                    task->getTaskable().getName().c_str(),
637                    task->getDescription().c_str());
638                // If force flag is set during shutdown, cancel all tasks
639                // without considering the blockShutdown status of the task.
640                if (force || !task->blockShutdown) {
641                    task->cancel(); // Must be idempotent
642                }
643                q->wake(task);
644                unfinishedTask = true;
645                retVal = true;
646            }
647        }
648        if (unfinishedTask) {
649            tMutex.wait_for(lh, MIN_SLEEP_TIME); // Wait till task gets cancelled
650        }
651    } while (unfinishedTask);
652
653    return retVal;
654}
655
656bool ExecutorPool::stopTaskGroup(task_gid_t taskGID,
657                                 task_type_t taskType,
658                                 bool force) {
659    // Note: Stopping a task group is special - any memory allocations /
660    // deallocations made while unregistering *should* be accounted to the
661    // bucket in question - hence no `onSwitchThread(NULL)` call.
662    return _stopTaskGroup(taskGID, taskType, force);
663}
664
665void ExecutorPool::_unregisterTaskable(Taskable& taskable, bool force) {
666
667    LOG(EXTENSION_LOG_NOTICE, "Unregistering %s taskable %s",
668            (numBuckets == 1)? "last" : "", taskable.getName().c_str());
669
670    _stopTaskGroup(taskable.getGID(), NO_TASK_TYPE, force);
671
672    LockHolder lh(tMutex);
673    taskOwners.erase(&taskable);
674    if (!(--numBuckets)) {
675        if (taskLocator.size()) {
676            throw std::logic_error("ExecutorPool::_unregisterTaskable: "
677                    "Attempting to unregister taskable '" +
678                    taskable.getName() + "' but taskLocator is not empty");
679        }
680        for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
681            threadQ[tidx]->stop(false); // only set state to DEAD
682        }
683
684        for (unsigned int idx = 0; idx < numTaskSets; idx++) {
685            TaskQueue *sleepQ = getSleepQ(idx);
686            size_t wakeAll = threadQ.size();
687            sleepQ->doWake(wakeAll);
688        }
689
690        for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
691            threadQ[tidx]->stop(/*wait for threads */);
692            delete threadQ[tidx];
693        }
694
695        for (size_t i = 0; i < numTaskSets; i++) {
696            curWorkers[i] = 0;
697        }
698
699        threadQ.clear();
700        if (isHiPrioQset) {
701            for (size_t i = 0; i < numTaskSets; i++) {
702                delete hpTaskQ[i];
703            }
704            hpTaskQ.clear();
705            isHiPrioQset = false;
706        }
707        if (isLowPrioQset) {
708            for (size_t i = 0; i < numTaskSets; i++) {
709                delete lpTaskQ[i];
710            }
711            lpTaskQ.clear();
712            isLowPrioQset = false;
713        }
714    }
715}
716
717void ExecutorPool::unregisterTaskable(Taskable& taskable, bool force) {
718    // Note: unregistering a bucket is special - any memory allocations /
719    // deallocations made while unregistering *should* be accounted to the
720    // bucket in question - hence no `onSwitchThread(NULL)` call.
721    _unregisterTaskable(taskable, force);
722}
723
724void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
725                               const void *cookie, ADD_STAT add_stat) {
726    if (engine->getEpStats().isShutdown) {
727        return;
728    }
729
730    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
731    try {
732        char statname[80] = {0};
733        if (isHiPrioQset) {
734            for (size_t i = 0; i < numTaskSets; i++) {
735                checked_snprintf(statname, sizeof(statname),
736                                 "ep_workload:%s:InQsize",
737                                 hpTaskQ[i]->getName().c_str());
738                add_casted_stat(statname, hpTaskQ[i]->getFutureQueueSize(),
739                                add_stat,
740                                cookie);
741                checked_snprintf(statname, sizeof(statname),
742                                 "ep_workload:%s:OutQsize",
743                                 hpTaskQ[i]->getName().c_str());
744                add_casted_stat(statname, hpTaskQ[i]->getReadyQueueSize(),
745                                add_stat,
746                                cookie);
747                size_t pendingQsize = hpTaskQ[i]->getPendingQueueSize();
748                if (pendingQsize > 0) {
749                    checked_snprintf(statname, sizeof(statname),
750                                     "ep_workload:%s:PendingQ",
751                                     hpTaskQ[i]->getName().c_str());
752                    add_casted_stat(statname, pendingQsize, add_stat, cookie);
753                }
754            }
755        }
756        if (isLowPrioQset) {
757            for (size_t i = 0; i < numTaskSets; i++) {
758                checked_snprintf(statname, sizeof(statname),
759                                 "ep_workload:%s:InQsize",
760                                 lpTaskQ[i]->getName().c_str());
761                add_casted_stat(statname, lpTaskQ[i]->getFutureQueueSize(),
762                                add_stat,
763                                cookie);
764                checked_snprintf(statname, sizeof(statname),
765                                 "ep_workload:%s:OutQsize",
766                                 lpTaskQ[i]->getName().c_str());
767                add_casted_stat(statname, lpTaskQ[i]->getReadyQueueSize(),
768                                add_stat,
769                                cookie);
770                size_t pendingQsize = lpTaskQ[i]->getPendingQueueSize();
771                if (pendingQsize > 0) {
772                    checked_snprintf(statname, sizeof(statname),
773                                     "ep_workload:%s:PendingQ",
774                                     lpTaskQ[i]->getName().c_str());
775                    add_casted_stat(statname, pendingQsize, add_stat, cookie);
776                }
777            }
778        }
779    } catch (std::exception& error) {
780        LOG(EXTENSION_LOG_WARNING,
781            "ExecutorPool::doTaskQStat: Failed to build stats: %s",
782            error.what());
783    }
784    ObjectRegistry::onSwitchThread(epe);
785}
786
787static void addWorkerStats(const char *prefix, ExecutorThread *t,
788                           const void *cookie, ADD_STAT add_stat) {
789    char statname[80] = {0};
790
791    try {
792        std::string bucketName = t->getTaskableName();
793        if (!bucketName.empty()) {
794            checked_snprintf(statname, sizeof(statname), "%s:bucket", prefix);
795            add_casted_stat(statname, bucketName.c_str(), add_stat, cookie);
796        }
797
798        checked_snprintf(statname, sizeof(statname), "%s:state", prefix);
799        add_casted_stat(statname, t->getStateName().c_str(), add_stat, cookie);
800        checked_snprintf(statname, sizeof(statname), "%s:task", prefix);
801        add_casted_stat(statname, t->getTaskName(), add_stat, cookie);
802
803        if (strcmp(t->getStateName().c_str(), "running") == 0) {
804            checked_snprintf(statname, sizeof(statname), "%s:runtime", prefix);
805            const auto duration = ProcessClock::now() - t->getTaskStart();
806            add_casted_stat(statname, std::chrono::duration_cast<
807                            std::chrono::microseconds>(duration).count(),
808                            add_stat, cookie);
809        }
810        checked_snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
811        add_casted_stat(statname, to_ns_since_epoch(t->getCurTime()).count(),
812                        add_stat, cookie);
813    } catch (std::exception& error) {
814        LOG(EXTENSION_LOG_WARNING,
815            "addWorkerStats: Failed to build stats: %s", error.what());
816    }
817}
818
819void ExecutorPool::doWorkerStat(EventuallyPersistentEngine *engine,
820                               const void *cookie, ADD_STAT add_stat) {
821    if (engine->getEpStats().isShutdown) {
822        return;
823    }
824
825    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
826    LockHolder lh(tMutex);
827    //TODO: implement tracking per engine stats ..
828    for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
829        addWorkerStats(threadQ[tidx]->getName().c_str(), threadQ[tidx],
830                     cookie, add_stat);
831    }
832    ObjectRegistry::onSwitchThread(epe);
833}
834
835void ExecutorPool::doTasksStat(EventuallyPersistentEngine* engine,
836                               const void* cookie,
837                               ADD_STAT add_stat) {
838    if (engine->getEpStats().isShutdown) {
839        return;
840    }
841
842    EventuallyPersistentEngine* epe =
843            ObjectRegistry::onSwitchThread(NULL, true);
844
845    std::map<size_t, TaskQpair> taskLocatorCopy;
846
847    {
848        // Holding this lock will block scheduling new tasks and cancelling
849        // tasks, but threads can still take up work other than this
850        LockHolder lh(tMutex);
851
852        // Copy taskLocator
853        taskLocatorCopy = taskLocator;
854    }
855
856    char statname[80] = {0};
857    char prefix[] = "ep_tasks";
858
859    unique_cJSON_ptr list(cJSON_CreateArray());
860
861    for (auto& pair : taskLocatorCopy) {
862        size_t tid = pair.first;
863        ExTask& task = pair.second.first;
864
865        unique_cJSON_ptr obj(cJSON_CreateObject());
866
867        cJSON_AddNumberToObject(obj.get(), "tid", tid);
868        cJSON_AddStringToObject(
869                obj.get(), "state", to_string(task->getState()).c_str());
870        cJSON_AddStringToObject(
871                obj.get(), "name", GlobalTask::getTaskName(task->getTaskId()));
872        cJSON_AddStringToObject(
873                obj.get(),
874                "this",
875                cb::to_hex(reinterpret_cast<uint64_t>(task.get())).c_str());
876        cJSON_AddStringToObject(
877                obj.get(), "bucket", task->getTaskable().getName().c_str());
878        cJSON_AddStringToObject(
879                obj.get(), "description", task->getDescription().c_str());
880        cJSON_AddNumberToObject(
881                obj.get(), "priority", task->getQueuePriority());
882        cJSON_AddNumberToObject(obj.get(),
883                                "waketime_ns",
884                                task->getWaketime().time_since_epoch().count());
885        cJSON_AddNumberToObject(
886                obj.get(), "total_runtime_ns", task->getTotalRuntime().count());
887        cJSON_AddNumberToObject(
888                obj.get(),
889                "last_starttime_ns",
890                to_ns_since_epoch(task->getLastStartTime()).count());
891        cJSON_AddNumberToObject(obj.get(),
892                                "previous_runtime_ns",
893                                task->getPrevRuntime().count());
894        cJSON_AddNumberToObject(
895                obj.get(),
896                "num_runs",
897                engine->getEpStats()
898                        .taskRuntimeHisto[static_cast<int>(task->getTaskId())]
899                        .total());
900        cJSON_AddStringToObject(
901                obj.get(),
902                "type",
903                TaskQueue::taskType2Str(
904                        GlobalTask::getTaskType(task->getTaskId()))
905                        .c_str());
906
907        cJSON_AddItemToArray(list.get(), obj.release());
908    }
909
910    checked_snprintf(statname, sizeof(statname), "%s:tasks", prefix);
911    add_casted_stat(statname, to_string(list, false), add_stat, cookie);
912
913    checked_snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
914    add_casted_stat(statname,
915                    to_ns_since_epoch(ProcessClock::now()).count(),
916                    add_stat,
917                    cookie);
918
919    checked_snprintf(statname, sizeof(statname), "%s:uptime_s", prefix);
920    add_casted_stat(statname, ep_current_time(), add_stat, cookie);
921
922    ObjectRegistry::onSwitchThread(epe);
923}
924
925void ExecutorPool::_stopAndJoinThreads() {
926
927    // Ask all threads to stop (but don't wait)
928    for (auto thread : threadQ) {
929        thread->stop(false);
930    }
931
932    // Go over all tasks and wake them up.
933    for (auto tq : lpTaskQ) {
934        size_t wakeAll = threadQ.size();
935        tq->doWake(wakeAll);
936    }
937    for (auto tq : hpTaskQ) {
938        size_t wakeAll = threadQ.size();
939        tq->doWake(wakeAll);
940    }
941
942    // Now reap/join those threads.
943    for (auto thread : threadQ) {
944        thread->stop(true);
945    }
946}
947