1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include "ep_engine.h"
21 #include "ep_time.h"
22 #include "executorpool.h"
23 #include "executorthread.h"
24 #include "statwriter.h"
25 #include "taskqueue.h"
26 
27 #include <cJSON_utils.h>
28 #include <platform/checked_snprintf.h>
29 #include <platform/processclock.h>
30 #include <platform/string.h>
31 #include <platform/sysinfo.h>
32 #include <algorithm>
33 #include <chrono>
34 #include <queue>
35 #include <sstream>
36 
37 std::mutex ExecutorPool::initGuard;
38 std::atomic<ExecutorPool*> ExecutorPool::instance;
39 
40 static const size_t EP_MIN_NUM_THREADS    = 10;
41 static const size_t EP_MIN_READER_THREADS = 4;
42 static const size_t EP_MIN_WRITER_THREADS = 4;
43 static const size_t EP_MIN_NONIO_THREADS = 2;
44 
45 
46 static const size_t EP_MAX_READER_THREADS = 12;
47 static const size_t EP_MAX_WRITER_THREADS = 8;
48 static const size_t EP_MAX_AUXIO_THREADS  = 8;
49 static const size_t EP_MAX_NONIO_THREADS  = 8;
50 
getNumNonIO(void)51 size_t ExecutorPool::getNumNonIO(void) {
52     // 1. compute: 30% of total threads
53     size_t count = maxGlobalThreads * 0.3;
54 
55     // 2. adjust computed value to be within range
56     count = std::min(EP_MAX_NONIO_THREADS,
57                      std::max(EP_MIN_NONIO_THREADS, count));
58 
59     // 3. pick user's value if specified
60     if (numWorkers[NONIO_TASK_IDX]) {
61         count = numWorkers[NONIO_TASK_IDX];
62     }
63     return count;
64 }
65 
getNumAuxIO(void)66 size_t ExecutorPool::getNumAuxIO(void) {
67     // 1. compute: ceil of 10% of total threads
68     size_t count = maxGlobalThreads / 10;
69     if (!count || maxGlobalThreads % 10) {
70         count++;
71     }
72     // 2. adjust computed value to be within range
73     if (count > EP_MAX_AUXIO_THREADS) {
74         count = EP_MAX_AUXIO_THREADS;
75     }
76     // 3. Override with user's value if specified
77     if (numWorkers[AUXIO_TASK_IDX]) {
78         count = numWorkers[AUXIO_TASK_IDX];
79     }
80     return count;
81 }
82 
getNumWriters(void)83 size_t ExecutorPool::getNumWriters(void) {
84     size_t count = 0;
85     // 1. compute: floor of Half of what remains after nonIO, auxIO threads
86     if (maxGlobalThreads > (getNumAuxIO() + getNumNonIO())) {
87         count = maxGlobalThreads - getNumAuxIO() - getNumNonIO();
88         count = count >> 1;
89     }
90     // 2. adjust computed value to be within range
91     if (count > EP_MAX_WRITER_THREADS) {
92         count = EP_MAX_WRITER_THREADS;
93     } else if (count < EP_MIN_WRITER_THREADS) {
94         count = EP_MIN_WRITER_THREADS;
95     }
96     // 3. Override with user's value if specified
97     if (numWorkers[WRITER_TASK_IDX]) {
98         count = numWorkers[WRITER_TASK_IDX];
99     }
100     return count;
101 }
102 
getNumReaders(void)103 size_t ExecutorPool::getNumReaders(void) {
104     size_t count = 0;
105     // 1. compute: what remains after writers, nonIO & auxIO threads are taken
106     if (maxGlobalThreads >
107             (getNumWriters() + getNumAuxIO() + getNumNonIO())) {
108         count = maxGlobalThreads
109               - getNumWriters() - getNumAuxIO() - getNumNonIO();
110     }
111     // 2. adjust computed value to be within range
112     if (count > EP_MAX_READER_THREADS) {
113         count = EP_MAX_READER_THREADS;
114     } else if (count < EP_MIN_READER_THREADS) {
115         count = EP_MIN_READER_THREADS;
116     }
117     // 3. Override with user's value if specified
118     if (numWorkers[READER_TASK_IDX]) {
119         count = numWorkers[READER_TASK_IDX];
120     }
121     return count;
122 }
123 
get(void)124 ExecutorPool *ExecutorPool::get(void) {
125     auto* tmp = instance.load();
126     if (tmp == nullptr) {
127         LockHolder lh(initGuard);
128         tmp = instance.load();
129         if (tmp == nullptr) {
130             // Double-checked locking if instance is null - ensure two threads
131             // don't both create an instance.
132 
133             Configuration &config =
134                 ObjectRegistry::getCurrentEngine()->getConfiguration();
135             EventuallyPersistentEngine *epe =
136                                    ObjectRegistry::onSwitchThread(NULL, true);
137             tmp = new ExecutorPool(config.getMaxThreads(),
138                                    NUM_TASK_GROUPS,
139                                    config.getNumReaderThreads(),
140                                    config.getNumWriterThreads(),
141                                    config.getNumAuxioThreads(),
142                                    config.getNumNonioThreads());
143             ObjectRegistry::onSwitchThread(epe);
144             instance.store(tmp);
145         }
146     }
147     return tmp;
148 }
149 
shutdown(void)150 void ExecutorPool::shutdown(void) {
151     std::lock_guard<std::mutex> lock(initGuard);
152     auto* tmp = instance.load();
153     if (tmp != nullptr) {
154         delete tmp;
155         instance = nullptr;
156     }
157 }
158 
ExecutorPool(size_t maxThreads, size_t nTaskSets, size_t maxReaders, size_t maxWriters, size_t maxAuxIO, size_t maxNonIO)159 ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
160                            size_t maxReaders, size_t maxWriters,
161                            size_t maxAuxIO,   size_t maxNonIO) :
162                   numTaskSets(nTaskSets), totReadyTasks(0),
163                   isHiPrioQset(false), isLowPrioQset(false), numBuckets(0),
164                   numSleepers(0), curWorkers(nTaskSets), numWorkers(nTaskSets),
165                   numReadyTasks(nTaskSets) {
166     size_t numCPU = Couchbase::get_available_cpu_count();
167     size_t numThreads = (size_t)((numCPU * 3)/4);
168     numThreads = (numThreads < EP_MIN_NUM_THREADS) ?
169                         EP_MIN_NUM_THREADS : numThreads;
170     maxGlobalThreads = maxThreads ? maxThreads : numThreads;
171     for (size_t i = 0; i < nTaskSets; i++) {
172         curWorkers[i] = 0;
173         numReadyTasks[i] = 0;
174     }
175     numWorkers[WRITER_TASK_IDX] = maxWriters;
176     numWorkers[READER_TASK_IDX] = maxReaders;
177     numWorkers[AUXIO_TASK_IDX] = maxAuxIO;
178     numWorkers[NONIO_TASK_IDX] = maxNonIO;
179 }
180 
~ExecutorPool(void)181 ExecutorPool::~ExecutorPool(void) {
182     _stopAndJoinThreads();
183 
184     if (isHiPrioQset) {
185         for (size_t i = 0; i < numTaskSets; i++) {
186             delete hpTaskQ[i];
187         }
188     }
189     if (isLowPrioQset) {
190         for (size_t i = 0; i < numTaskSets; i++) {
191             delete lpTaskQ[i];
192         }
193     }
194 }
195 
196 // To prevent starvation of low priority queues, we define their
197 // polling frequencies as follows ...
198 #define LOW_PRIORITY_FREQ 5 // 1 out of 5 times threads check low priority Q
199 
_nextTask(ExecutorThread &t, uint8_t tick)200 TaskQueue *ExecutorPool::_nextTask(ExecutorThread &t, uint8_t tick) {
201     if (!tick) {
202         return NULL;
203     }
204 
205     task_type_t myq = t.taskType;
206     TaskQueue *checkQ; // which TaskQueue set should be polled first
207     TaskQueue *checkNextQ; // which set of TaskQueue should be polled next
208     TaskQueue *toggle = NULL;
209     if ( !(tick % LOW_PRIORITY_FREQ)) { // if only 1 Q set, both point to it
210         checkQ = isLowPrioQset ? lpTaskQ[myq] :
211                 (isHiPrioQset ? hpTaskQ[myq] : NULL);
212         checkNextQ = isHiPrioQset ? hpTaskQ[myq] : checkQ;
213     } else {
214         checkQ = isHiPrioQset ? hpTaskQ[myq] :
215                 (isLowPrioQset ? lpTaskQ[myq] : NULL);
216         checkNextQ = isLowPrioQset ? lpTaskQ[myq] : checkQ;
217     }
218     while (t.state == EXECUTOR_RUNNING) {
219         if (checkQ &&
220             checkQ->fetchNextTask(t, false)) {
221             return checkQ;
222         }
223         if (toggle || checkQ == checkNextQ) {
224             TaskQueue *sleepQ = getSleepQ(myq);
225             if (sleepQ->fetchNextTask(t, true)) {
226                 return sleepQ;
227             } else {
228                 return NULL;
229             }
230         }
231         toggle = checkQ;
232         checkQ = checkNextQ;
233         checkNextQ = toggle;
234     }
235     return NULL;
236 }
237 
nextTask(ExecutorThread &t, uint8_t tick)238 TaskQueue *ExecutorPool::nextTask(ExecutorThread &t, uint8_t tick) {
239     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
240     TaskQueue *tq = _nextTask(t, tick);
241     ObjectRegistry::onSwitchThread(epe);
242     return tq;
243 }
244 
addWork(size_t newWork, task_type_t qType)245 void ExecutorPool::addWork(size_t newWork, task_type_t qType) {
246     if (newWork) {
247         totReadyTasks.fetch_add(newWork);
248         numReadyTasks[qType].fetch_add(newWork);
249     }
250 }
251 
lessWork(task_type_t qType)252 void ExecutorPool::lessWork(task_type_t qType) {
253     if (numReadyTasks[qType].load() == 0) {
254         throw std::logic_error("ExecutorPool::lessWork: number of ready "
255                 "tasks on qType " + std::to_string(qType) + " is zero");
256     }
257     numReadyTasks[qType]--;
258     totReadyTasks--;
259 }
260 
startWork(task_type_t taskType)261 void ExecutorPool::startWork(task_type_t taskType) {
262     if (taskType == NO_TASK_TYPE || taskType == NUM_TASK_GROUPS) {
263         throw std::logic_error(
264                 "ExecutorPool::startWork: worker is starting task with invalid "
265                 "type {" +
266                 std::to_string(taskType) + "}");
267     } else {
268         ++curWorkers[taskType];
269         LOG(EXTENSION_LOG_DEBUG,
270             "Taking up work in task "
271             "type:{%" PRIu32 "} "
272             "current:{%" PRIu16 "}, max:{%" PRIu16 "}",
273             taskType,
274             curWorkers[taskType].load(),
275             numWorkers[taskType].load());
276     }
277 }
278 
doneWork(task_type_t taskType)279 void ExecutorPool::doneWork(task_type_t taskType) {
280     if (taskType == NO_TASK_TYPE || taskType == NUM_TASK_GROUPS) {
281         throw std::logic_error(
282                 "ExecutorPool::doneWork: worker is finishing task with invalid "
283                 "type {" + std::to_string(taskType) + "}");
284     } else {
285         --curWorkers[taskType];
286         // Record that a thread is done working on a particular queue type
287         LOG(EXTENSION_LOG_DEBUG,
288             "Done with task type:{%" PRIu32 "} capacity:{%" PRIu16 "}",
289             taskType,
290             numWorkers[taskType].load());
291     }
292 }
293 
_cancel(size_t taskId, bool eraseTask)294 bool ExecutorPool::_cancel(size_t taskId, bool eraseTask) {
295     LockHolder lh(tMutex);
296     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
297     if (itr == taskLocator.end()) {
298         LOG(EXTENSION_LOG_DEBUG, "Task id %" PRIu64 " not found",
299             uint64_t(taskId));
300         return false;
301     }
302 
303     ExTask task = itr->second.first;
304     LOG(EXTENSION_LOG_DEBUG,
305         "Cancel task %s id %" PRIu64 " on bucket %s %s",
306         task->getDescription().c_str(),
307         uint64_t(task->getId()),
308         task->getTaskable().getName().c_str(),
309         eraseTask ? "final erase" : "!");
310 
311     task->cancel(); // must be idempotent, just set state to dead
312 
313     if (eraseTask) { // only internal threads can erase tasks
314         if (!task->isdead()) {
315             throw std::logic_error("ExecutorPool::_cancel: task '" +
316                                    task->getDescription() +
317                                    "' is not dead after calling "
318                                    "cancel() on it");
319         }
320         taskLocator.erase(itr);
321         tMutex.notify_all();
322     } else { // wake up the task from the TaskQ so a thread can safely erase it
323              // otherwise we may race with unregisterTaskable where a unlocated
324              // task runs in spite of its bucket getting unregistered
325         itr->second.second->wake(task);
326     }
327     return true;
328 }
329 
cancel(size_t taskId, bool eraseTask)330 bool ExecutorPool::cancel(size_t taskId, bool eraseTask) {
331     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
332     bool rv = _cancel(taskId, eraseTask);
333     ObjectRegistry::onSwitchThread(epe);
334     return rv;
335 }
336 
_wake(size_t taskId)337 bool ExecutorPool::_wake(size_t taskId) {
338     LockHolder lh(tMutex);
339     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
340     if (itr != taskLocator.end()) {
341         itr->second.second->wake(itr->second.first);
342         return true;
343     }
344     return false;
345 }
346 
wake(size_t taskId)347 bool ExecutorPool::wake(size_t taskId) {
348     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
349     bool rv = _wake(taskId);
350     ObjectRegistry::onSwitchThread(epe);
351     return rv;
352 }
353 
_snooze(size_t taskId, double toSleep)354 bool ExecutorPool::_snooze(size_t taskId, double toSleep) {
355     LockHolder lh(tMutex);
356     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
357     if (itr != taskLocator.end()) {
358         itr->second.second->snooze(itr->second.first, toSleep);
359         return true;
360     }
361     return false;
362 }
363 
snooze(size_t taskId, double toSleep)364 bool ExecutorPool::snooze(size_t taskId, double toSleep) {
365     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
366     bool rv = _snooze(taskId, toSleep);
367     ObjectRegistry::onSwitchThread(epe);
368     return rv;
369 }
370 
_getTaskQueue(const Taskable& t, task_type_t qidx)371 TaskQueue* ExecutorPool::_getTaskQueue(const Taskable& t,
372                                        task_type_t qidx) {
373     TaskQueue         *q             = NULL;
374     size_t            curNumThreads  = 0;
375 
376     bucket_priority_t bucketPriority = t.getWorkloadPriority();
377 
378     if (qidx < 0 || static_cast<size_t>(qidx) >= numTaskSets) {
379         throw std::invalid_argument("ExecutorPool::_getTaskQueue: qidx "
380                 "(which is " + std::to_string(qidx) + ") is outside the range [0,"
381                 + std::to_string(numTaskSets) + ")");
382     }
383 
384     curNumThreads = threadQ.size();
385 
386     if (!bucketPriority) {
387         LOG(EXTENSION_LOG_WARNING, "Trying to schedule task for unregistered "
388             "bucket %s", t.getName().c_str());
389         return q;
390     }
391 
392     if (curNumThreads < maxGlobalThreads) {
393         if (isHiPrioQset) {
394             q = hpTaskQ[qidx];
395         } else if (isLowPrioQset) {
396             q = lpTaskQ[qidx];
397         }
398     } else { // Max capacity Mode scheduling ...
399         switch (bucketPriority) {
400         case LOW_BUCKET_PRIORITY:
401             if (lpTaskQ.size() != numTaskSets) {
402                 throw std::logic_error("ExecutorPool::_getTaskQueue: At "
403                         "maximum capacity but low-priority taskQ size "
404                         "(which is " + std::to_string(lpTaskQ.size()) +
405                         ") is not " + std::to_string(numTaskSets));
406             }
407             q = lpTaskQ[qidx];
408             break;
409 
410         case HIGH_BUCKET_PRIORITY:
411             if (hpTaskQ.size() != numTaskSets) {
412                 throw std::logic_error("ExecutorPool::_getTaskQueue: At "
413                         "maximum capacity but high-priority taskQ size "
414                         "(which is " + std::to_string(lpTaskQ.size()) +
415                         ") is not " + std::to_string(numTaskSets));
416             }
417             q = hpTaskQ[qidx];
418             break;
419 
420         default:
421             throw std::logic_error("ExecutorPool::_getTaskQueue: Invalid "
422                     "bucketPriority " + std::to_string(bucketPriority));
423         }
424     }
425     return q;
426 }
427 
_schedule(ExTask task)428 size_t ExecutorPool::_schedule(ExTask task) {
429     LockHolder lh(tMutex);
430     const size_t taskId = task->getId();
431 
432     TaskQueue* q = _getTaskQueue(task->getTaskable(),
433                                  GlobalTask::getTaskType(task->getTaskId()));
434     TaskQpair tqp(task, q);
435 
436     auto result = taskLocator.insert(std::make_pair(taskId, tqp));
437 
438     if (result.second) {
439         // tqp was inserted; it was not already present. Prevents multiple
440         // copies of a task being present in the task queues.
441         q->schedule(task);
442     }
443 
444     return taskId;
445 }
446 
schedule(ExTask task)447 size_t ExecutorPool::schedule(ExTask task) {
448     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
449     size_t rv = _schedule(task);
450     ObjectRegistry::onSwitchThread(epe);
451     return rv;
452 }
453 
_registerTaskable(Taskable& taskable)454 void ExecutorPool::_registerTaskable(Taskable& taskable) {
455     TaskQ *taskQ;
456     bool *whichQset;
457     const char *queueName;
458     WorkLoadPolicy &workload = taskable.getWorkLoadPolicy();
459     bucket_priority_t priority = workload.getBucketPriority();
460 
461     if (priority < HIGH_BUCKET_PRIORITY) {
462         taskable.setWorkloadPriority(LOW_BUCKET_PRIORITY);
463         taskQ = &lpTaskQ;
464         whichQset = &isLowPrioQset;
465         queueName = "LowPrioQ_";
466         LOG(EXTENSION_LOG_NOTICE, "Taskable %s registered with low priority",
467             taskable.getName().c_str());
468     } else {
469         taskable.setWorkloadPriority(HIGH_BUCKET_PRIORITY);
470         taskQ = &hpTaskQ;
471         whichQset = &isHiPrioQset;
472         queueName = "HiPrioQ_";
473         LOG(EXTENSION_LOG_NOTICE, "Taskable %s registered with high priority",
474             taskable.getName().c_str());
475     }
476 
477     {
478         LockHolder lh(tMutex);
479 
480         if (!(*whichQset)) {
481             taskQ->reserve(numTaskSets);
482             for (size_t i = 0; i < numTaskSets; ++i) {
483                 taskQ->push_back(
484                         new TaskQueue(this, (task_type_t)i, queueName));
485             }
486             *whichQset = true;
487         }
488 
489         taskOwners.insert(&taskable);
490         numBuckets++;
491     }
492 
493     _startWorkers();
494 }
495 
registerTaskable(Taskable& taskable)496 void ExecutorPool::registerTaskable(Taskable& taskable) {
497     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
498     _registerTaskable(taskable);
499     ObjectRegistry::onSwitchThread(epe);
500 }
501 
_adjustWorkers(task_type_t type, size_t desiredNumItems)502 ssize_t ExecutorPool::_adjustWorkers(task_type_t type, size_t desiredNumItems) {
503     std::string typeName{to_string(type)};
504 
505     // vector of threads which have been stopped
506     // and should be joined after unlocking, if any.
507     ThreadQ removed;
508 
509     size_t numItems;
510 
511     {
512         // Lock mutex, we are modifying threadQ
513         LockHolder lh(tMutex);
514 
515         // How many threads performing this task type there are currently
516         numItems = std::count_if(
517                 threadQ.begin(), threadQ.end(), [type](ExecutorThread* thread) {
518                     return thread->taskType == type;
519                 });
520 
521         if (numItems == desiredNumItems) {
522             return 0;
523         }
524 
525         LOG(EXTENSION_LOG_NOTICE,
526             "Adjusting threads of type:%s from:%" PRIu64 " to:%" PRIu64,
527             typeName.c_str(),
528             uint64_t(numItems),
529             uint64_t(desiredNumItems));
530 
531         if (numItems < desiredNumItems) {
532             // If we want to increase the number of threads, they must be
533             // created and started
534             for (size_t tidx = numItems; tidx < desiredNumItems; ++tidx) {
535                 threadQ.push_back(new ExecutorThread(
536                         this,
537                         type,
538                         typeName + "_worker_" + std::to_string(tidx)));
539                 threadQ.back()->start();
540             }
541         } else if (numItems > desiredNumItems) {
542             // If we want to decrease the number of threads, they must be
543             // identified in the threadQ, stopped, and removed.
544             size_t toRemove = numItems - desiredNumItems;
545 
546             auto itr = threadQ.rbegin();
547             while (itr != threadQ.rend() && toRemove) {
548                 if ((*itr)->taskType == type) {
549                     // stop but /don't/ join yet
550                     (*itr)->stop(false);
551 
552                     // store temporarily
553                     removed.push_back(*itr);
554 
555                     // remove from the threadQ
556                     itr = ThreadQ::reverse_iterator(
557                             threadQ.erase(std::next(itr).base()));
558                     --toRemove;
559                 } else {
560                     ++itr;
561                 }
562             }
563         }
564 
565         numWorkers[type] = desiredNumItems;
566     } // release mutex
567 
568     // MB-22938 wake all threads to avoid blocking if a thread is sleeping
569     // waiting for work. Without this, stopping a single thread could take
570     // up to 2s (MIN_SLEEP_TIME).
571     if (!removed.empty()) {
572         TaskQueue* sleepQ = getSleepQ(type);
573         size_t threadCount = threadQ.size();
574         sleepQ->doWake(threadCount);
575     }
576 
577     // We could not join the threads while holding the lock, as some operations
578     // called from the threads (such as schedule) acquire the lock - we could
579     // have caused deadlock by waiting for the thread to complete its task and
580     // exit, while it waits to acquire the lock.
581     auto itr = removed.begin();
582     while (itr != removed.end()) {
583         (*itr)->stop(true);
584         delete (*itr);
585         itr = removed.erase(itr);
586     }
587 
588     return ssize_t(desiredNumItems) - ssize_t(numItems);
589 }
590 
adjustWorkers(task_type_t type, size_t newCount)591 void ExecutorPool::adjustWorkers(task_type_t type, size_t newCount) {
592     EventuallyPersistentEngine* epe =
593             ObjectRegistry::onSwitchThread(NULL, true);
594     _adjustWorkers(type, newCount);
595     ObjectRegistry::onSwitchThread(epe);
596 }
597 
_startWorkers(void)598 bool ExecutorPool::_startWorkers(void) {
599     size_t numReaders = getNumReaders();
600     size_t numWriters = getNumWriters();
601     size_t numAuxIO = getNumAuxIO();
602     size_t numNonIO = getNumNonIO();
603 
604     if (!numWorkers[WRITER_TASK_IDX]) {
605         // MB-12279: Limit writers to 4 for faster bgfetches in DGM by default
606         numWriters = 4;
607     }
608 
609     _adjustWorkers(READER_TASK_IDX, numReaders);
610     _adjustWorkers(WRITER_TASK_IDX, numWriters);
611     _adjustWorkers(AUXIO_TASK_IDX, numAuxIO);
612     _adjustWorkers(NONIO_TASK_IDX, numNonIO);
613 
614     return true;
615 }
616 
_stopTaskGroup(task_gid_t taskGID, task_type_t taskType, bool force)617 bool ExecutorPool::_stopTaskGroup(task_gid_t taskGID,
618                                   task_type_t taskType,
619                                   bool force) {
620     bool unfinishedTask;
621     bool retVal = false;
622     std::map<size_t, TaskQpair>::iterator itr;
623 
624     std::unique_lock<std::mutex> lh(tMutex);
625     do {
626         ExTask task;
627         unfinishedTask = false;
628         for (itr = taskLocator.begin(); itr != taskLocator.end(); itr++) {
629             task = itr->second.first;
630             TaskQueue *q = itr->second.second;
631             if (task->getTaskable().getGID() == taskGID &&
632                 (taskType == NO_TASK_TYPE || q->queueType == taskType)) {
633                 LOG(EXTENSION_LOG_NOTICE,
634                     "Stopping Task id %" PRIu64 " %s %s",
635                     uint64_t(task->getId()),
636                     task->getTaskable().getName().c_str(),
637                     task->getDescription().c_str());
638                 // If force flag is set during shutdown, cancel all tasks
639                 // without considering the blockShutdown status of the task.
640                 if (force || !task->blockShutdown) {
641                     task->cancel(); // Must be idempotent
642                 }
643                 q->wake(task);
644                 unfinishedTask = true;
645                 retVal = true;
646             }
647         }
648         if (unfinishedTask) {
649             tMutex.wait_for(lh, MIN_SLEEP_TIME); // Wait till task gets cancelled
650         }
651     } while (unfinishedTask);
652 
653     return retVal;
654 }
655 
stopTaskGroup(task_gid_t taskGID, task_type_t taskType, bool force)656 bool ExecutorPool::stopTaskGroup(task_gid_t taskGID,
657                                  task_type_t taskType,
658                                  bool force) {
659     // Note: Stopping a task group is special - any memory allocations /
660     // deallocations made while unregistering *should* be accounted to the
661     // bucket in question - hence no `onSwitchThread(NULL)` call.
662     return _stopTaskGroup(taskGID, taskType, force);
663 }
664 
_unregisterTaskable(Taskable& taskable, bool force)665 void ExecutorPool::_unregisterTaskable(Taskable& taskable, bool force) {
666 
667     LOG(EXTENSION_LOG_NOTICE, "Unregistering %s taskable %s",
668             (numBuckets == 1)? "last" : "", taskable.getName().c_str());
669 
670     _stopTaskGroup(taskable.getGID(), NO_TASK_TYPE, force);
671 
672     LockHolder lh(tMutex);
673     taskOwners.erase(&taskable);
674     if (!(--numBuckets)) {
675         if (taskLocator.size()) {
676             throw std::logic_error("ExecutorPool::_unregisterTaskable: "
677                     "Attempting to unregister taskable '" +
678                     taskable.getName() + "' but taskLocator is not empty");
679         }
680         for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
681             threadQ[tidx]->stop(false); // only set state to DEAD
682         }
683 
684         for (unsigned int idx = 0; idx < numTaskSets; idx++) {
685             TaskQueue *sleepQ = getSleepQ(idx);
686             size_t wakeAll = threadQ.size();
687             sleepQ->doWake(wakeAll);
688         }
689 
690         for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
691             threadQ[tidx]->stop(/*wait for threads */);
692             delete threadQ[tidx];
693         }
694 
695         for (size_t i = 0; i < numTaskSets; i++) {
696             curWorkers[i] = 0;
697         }
698 
699         threadQ.clear();
700         if (isHiPrioQset) {
701             for (size_t i = 0; i < numTaskSets; i++) {
702                 delete hpTaskQ[i];
703             }
704             hpTaskQ.clear();
705             isHiPrioQset = false;
706         }
707         if (isLowPrioQset) {
708             for (size_t i = 0; i < numTaskSets; i++) {
709                 delete lpTaskQ[i];
710             }
711             lpTaskQ.clear();
712             isLowPrioQset = false;
713         }
714     }
715 }
716 
unregisterTaskable(Taskable& taskable, bool force)717 void ExecutorPool::unregisterTaskable(Taskable& taskable, bool force) {
718     // Note: unregistering a bucket is special - any memory allocations /
719     // deallocations made while unregistering *should* be accounted to the
720     // bucket in question - hence no `onSwitchThread(NULL)` call.
721     _unregisterTaskable(taskable, force);
722 }
723 
doTaskQStat(EventuallyPersistentEngine *engine, const void *cookie, ADD_STAT add_stat)724 void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
725                                const void *cookie, ADD_STAT add_stat) {
726     if (engine->getEpStats().isShutdown) {
727         return;
728     }
729 
730     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
731     try {
732         char statname[80] = {0};
733         if (isHiPrioQset) {
734             for (size_t i = 0; i < numTaskSets; i++) {
735                 checked_snprintf(statname, sizeof(statname),
736                                  "ep_workload:%s:InQsize",
737                                  hpTaskQ[i]->getName().c_str());
738                 add_casted_stat(statname, hpTaskQ[i]->getFutureQueueSize(),
739                                 add_stat,
740                                 cookie);
741                 checked_snprintf(statname, sizeof(statname),
742                                  "ep_workload:%s:OutQsize",
743                                  hpTaskQ[i]->getName().c_str());
744                 add_casted_stat(statname, hpTaskQ[i]->getReadyQueueSize(),
745                                 add_stat,
746                                 cookie);
747                 size_t pendingQsize = hpTaskQ[i]->getPendingQueueSize();
748                 if (pendingQsize > 0) {
749                     checked_snprintf(statname, sizeof(statname),
750                                      "ep_workload:%s:PendingQ",
751                                      hpTaskQ[i]->getName().c_str());
752                     add_casted_stat(statname, pendingQsize, add_stat, cookie);
753                 }
754             }
755         }
756         if (isLowPrioQset) {
757             for (size_t i = 0; i < numTaskSets; i++) {
758                 checked_snprintf(statname, sizeof(statname),
759                                  "ep_workload:%s:InQsize",
760                                  lpTaskQ[i]->getName().c_str());
761                 add_casted_stat(statname, lpTaskQ[i]->getFutureQueueSize(),
762                                 add_stat,
763                                 cookie);
764                 checked_snprintf(statname, sizeof(statname),
765                                  "ep_workload:%s:OutQsize",
766                                  lpTaskQ[i]->getName().c_str());
767                 add_casted_stat(statname, lpTaskQ[i]->getReadyQueueSize(),
768                                 add_stat,
769                                 cookie);
770                 size_t pendingQsize = lpTaskQ[i]->getPendingQueueSize();
771                 if (pendingQsize > 0) {
772                     checked_snprintf(statname, sizeof(statname),
773                                      "ep_workload:%s:PendingQ",
774                                      lpTaskQ[i]->getName().c_str());
775                     add_casted_stat(statname, pendingQsize, add_stat, cookie);
776                 }
777             }
778         }
779     } catch (std::exception& error) {
780         LOG(EXTENSION_LOG_WARNING,
781             "ExecutorPool::doTaskQStat: Failed to build stats: %s",
782             error.what());
783     }
784     ObjectRegistry::onSwitchThread(epe);
785 }
786 
addWorkerStats(const char *prefix, ExecutorThread *t, const void *cookie, ADD_STAT add_stat)787 static void addWorkerStats(const char *prefix, ExecutorThread *t,
788                            const void *cookie, ADD_STAT add_stat) {
789     char statname[80] = {0};
790 
791     try {
792         std::string bucketName = t->getTaskableName();
793         if (!bucketName.empty()) {
794             checked_snprintf(statname, sizeof(statname), "%s:bucket", prefix);
795             add_casted_stat(statname, bucketName.c_str(), add_stat, cookie);
796         }
797 
798         checked_snprintf(statname, sizeof(statname), "%s:state", prefix);
799         add_casted_stat(statname, t->getStateName().c_str(), add_stat, cookie);
800         checked_snprintf(statname, sizeof(statname), "%s:task", prefix);
801         add_casted_stat(statname, t->getTaskName(), add_stat, cookie);
802 
803         if (strcmp(t->getStateName().c_str(), "running") == 0) {
804             checked_snprintf(statname, sizeof(statname), "%s:runtime", prefix);
805             const auto duration = ProcessClock::now() - t->getTaskStart();
806             add_casted_stat(statname, std::chrono::duration_cast<
807                             std::chrono::microseconds>(duration).count(),
808                             add_stat, cookie);
809         }
810         checked_snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
811         add_casted_stat(statname, to_ns_since_epoch(t->getCurTime()).count(),
812                         add_stat, cookie);
813     } catch (std::exception& error) {
814         LOG(EXTENSION_LOG_WARNING,
815             "addWorkerStats: Failed to build stats: %s", error.what());
816     }
817 }
818 
doWorkerStat(EventuallyPersistentEngine *engine, const void *cookie, ADD_STAT add_stat)819 void ExecutorPool::doWorkerStat(EventuallyPersistentEngine *engine,
820                                const void *cookie, ADD_STAT add_stat) {
821     if (engine->getEpStats().isShutdown) {
822         return;
823     }
824 
825     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
826     LockHolder lh(tMutex);
827     //TODO: implement tracking per engine stats ..
828     for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
829         addWorkerStats(threadQ[tidx]->getName().c_str(), threadQ[tidx],
830                      cookie, add_stat);
831     }
832     ObjectRegistry::onSwitchThread(epe);
833 }
834 
doTasksStat(EventuallyPersistentEngine* engine, const void* cookie, ADD_STAT add_stat)835 void ExecutorPool::doTasksStat(EventuallyPersistentEngine* engine,
836                                const void* cookie,
837                                ADD_STAT add_stat) {
838     if (engine->getEpStats().isShutdown) {
839         return;
840     }
841 
842     EventuallyPersistentEngine* epe =
843             ObjectRegistry::onSwitchThread(NULL, true);
844 
845     std::map<size_t, TaskQpair> taskLocatorCopy;
846 
847     {
848         // Holding this lock will block scheduling new tasks and cancelling
849         // tasks, but threads can still take up work other than this
850         LockHolder lh(tMutex);
851 
852         // Copy taskLocator
853         taskLocatorCopy = taskLocator;
854     }
855 
856     char statname[80] = {0};
857     char prefix[] = "ep_tasks";
858 
859     unique_cJSON_ptr list(cJSON_CreateArray());
860 
861     for (auto& pair : taskLocatorCopy) {
862         size_t tid = pair.first;
863         ExTask& task = pair.second.first;
864 
865         unique_cJSON_ptr obj(cJSON_CreateObject());
866 
867         cJSON_AddNumberToObject(obj.get(), "tid", tid);
868         cJSON_AddStringToObject(
869                 obj.get(), "state", to_string(task->getState()).c_str());
870         cJSON_AddStringToObject(
871                 obj.get(), "name", GlobalTask::getTaskName(task->getTaskId()));
872         cJSON_AddStringToObject(
873                 obj.get(),
874                 "this",
875                 cb::to_hex(reinterpret_cast<uint64_t>(task.get())).c_str());
876         cJSON_AddStringToObject(
877                 obj.get(), "bucket", task->getTaskable().getName().c_str());
878         cJSON_AddStringToObject(
879                 obj.get(), "description", task->getDescription().c_str());
880         cJSON_AddNumberToObject(
881                 obj.get(), "priority", task->getQueuePriority());
882         cJSON_AddNumberToObject(obj.get(),
883                                 "waketime_ns",
884                                 task->getWaketime().time_since_epoch().count());
885         cJSON_AddNumberToObject(
886                 obj.get(), "total_runtime_ns", task->getTotalRuntime().count());
887         cJSON_AddNumberToObject(
888                 obj.get(),
889                 "last_starttime_ns",
890                 to_ns_since_epoch(task->getLastStartTime()).count());
891         cJSON_AddNumberToObject(obj.get(),
892                                 "previous_runtime_ns",
893                                 task->getPrevRuntime().count());
894         cJSON_AddNumberToObject(
895                 obj.get(),
896                 "num_runs",
897                 engine->getEpStats()
898                         .taskRuntimeHisto[static_cast<int>(task->getTaskId())]
899                         .total());
900         cJSON_AddStringToObject(
901                 obj.get(),
902                 "type",
903                 TaskQueue::taskType2Str(
904                         GlobalTask::getTaskType(task->getTaskId()))
905                         .c_str());
906 
907         cJSON_AddItemToArray(list.get(), obj.release());
908     }
909 
910     checked_snprintf(statname, sizeof(statname), "%s:tasks", prefix);
911     add_casted_stat(statname, to_string(list, false), add_stat, cookie);
912 
913     checked_snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
914     add_casted_stat(statname,
915                     to_ns_since_epoch(ProcessClock::now()).count(),
916                     add_stat,
917                     cookie);
918 
919     checked_snprintf(statname, sizeof(statname), "%s:uptime_s", prefix);
920     add_casted_stat(statname, ep_current_time(), add_stat, cookie);
921 
922     ObjectRegistry::onSwitchThread(epe);
923 }
924 
_stopAndJoinThreads()925 void ExecutorPool::_stopAndJoinThreads() {
926 
927     // Ask all threads to stop (but don't wait)
928     for (auto thread : threadQ) {
929         thread->stop(false);
930     }
931 
932     // Go over all tasks and wake them up.
933     for (auto tq : lpTaskQ) {
934         size_t wakeAll = threadQ.size();
935         tq->doWake(wakeAll);
936     }
937     for (auto tq : hpTaskQ) {
938         size_t wakeAll = threadQ.size();
939         tq->doWake(wakeAll);
940     }
941 
942     // Now reap/join those threads.
943     for (auto thread : threadQ) {
944         thread->stop(true);
945     }
946 }
947