xref: /3.0.3-GA/ep-engine/src/executorpool.cc (revision 1172eb5d)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <queue>
21
22#include "statwriter.h"
23#include "taskqueue.h"
24#include "executorpool.h"
25#include "executorthread.h"
26
27Mutex ExecutorPool::initGuard;
28ExecutorPool *ExecutorPool::instance = NULL;
29
30static const size_t EP_MIN_NUM_IO_THREADS = 4;
31
32size_t ExecutorPool::getNumCPU(void) {
33    size_t numCPU;
34#ifdef WIN32
35    SYSTEM_INFO sysinfo;
36    GetSystemInfo(&sysinfo);
37    numCPU = (size_t)sysinfo.dwNumberOfProcessors;
38#else
39    numCPU = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
40#endif
41
42    return (numCPU < 256) ? numCPU : 0;
43}
44
45size_t ExecutorPool::getNumNonIO(void) {
46    // ceil of 10 % of total threads
47    size_t count = maxGlobalThreads / 10;
48    return (!count || maxGlobalThreads % 10) ? count + 1 : count;
49}
50
51size_t ExecutorPool::getNumAuxIO(void) {
52    // ceil of 10 % of total threads
53    size_t count = maxGlobalThreads / 10;
54    return (!count || maxGlobalThreads % 10) ? count + 1 : count;
55}
56
57size_t ExecutorPool::getNumWriters(void) {
58    // floor of half of what remains after nonIO and auxIO threads are taken
59    size_t count = maxGlobalThreads - getNumAuxIO() - getNumNonIO();
60    count = count >> 1;
61    return count ? count : 1;
62}
63
64size_t ExecutorPool::getNumReaders(void) {
65    // what remains after writers, nonIO and auxIO threads are taken
66    return(maxGlobalThreads - getNumWriters() - getNumAuxIO() - getNumNonIO());
67}
68
69ExecutorPool *ExecutorPool::get(void) {
70    if (!instance) {
71        LockHolder lh(initGuard);
72        if (!instance) {
73            Configuration &config =
74                ObjectRegistry::getCurrentEngine()->getConfiguration();
75            instance = new ExecutorPool(config.getMaxThreads(),
76                                        NUM_TASK_GROUPS);
77        }
78    }
79    return instance;
80}
81
82ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets) :
83                  numTaskSets(nTaskSets), numReadyTasks(0), highWaterMark(0),
84                  isHiPrioQset(false), isLowPrioQset(false), numBuckets(0) {
85    size_t numCPU = getNumCPU();
86    size_t numThreads = (size_t)((numCPU * 3)/4);
87    numThreads = (numThreads < EP_MIN_NUM_IO_THREADS) ?
88                        EP_MIN_NUM_IO_THREADS : numThreads;
89    maxGlobalThreads = maxThreads ? maxThreads : numThreads;
90    curWorkers = (uint16_t *)calloc(nTaskSets, sizeof(uint16_t));
91    maxWorkers = (uint16_t *)malloc(nTaskSets*sizeof(uint16_t));
92    for (size_t i = 0; i < nTaskSets; i++) {
93        maxWorkers[i] = maxGlobalThreads;
94    }
95}
96
97ExecutorPool::~ExecutorPool(void) {
98    free(curWorkers);
99    free(maxWorkers);
100    if (isHiPrioQset) {
101        for (size_t i = 0; i < numTaskSets; i++) {
102            delete hpTaskQ[i];
103        }
104    }
105    if (isLowPrioQset) {
106        for (size_t i = 0; i < numTaskSets; i++) {
107            delete lpTaskQ[i];
108        }
109    }
110}
111
112// To prevent starvation of low priority queues, we define their
113// polling frequencies as follows ...
114#define LOW_PRIORITY_FREQ 5 // 1 out of 5 times threads check low priority Q
115
116TaskQueue *ExecutorPool::nextTask(ExecutorThread &t, uint8_t tick) {
117    if (!tick) {
118        return NULL;
119    }
120
121    struct  timeval    now;
122    gettimeofday(&now, NULL);
123    size_t idx = t.startIndex;
124
125    for (; !(tick % LOW_PRIORITY_FREQ); idx = (idx + 1) % numTaskSets) {
126        if (isLowPrioQset &&
127             lpTaskQ[idx]->fetchNextTask(t.currentTask, t.waketime,
128                                         t.curTaskType, now)) {
129            return lpTaskQ[idx];
130        } else if (isHiPrioQset &&
131             hpTaskQ[idx]->fetchNextTask(t.currentTask, t.waketime,
132                                         t.curTaskType, now)) {
133            return hpTaskQ[idx];
134        } else if ((idx + 1) % numTaskSets == t.startIndex) {
135            if (!trySleep(t, now)) { // as all queues checked & got no task
136                return NULL; // executor is shutting down..
137            }
138        }
139    }
140
141    for (;; idx = (idx + 1) % numTaskSets) {
142        if (isHiPrioQset &&
143             hpTaskQ[idx]->fetchNextTask(t.currentTask, t.waketime,
144                                         t.curTaskType, now)) {
145            return hpTaskQ[idx];
146        } else if (isLowPrioQset &&
147             lpTaskQ[idx]->fetchNextTask(t.currentTask, t.waketime,
148                                         t.curTaskType, now)) {
149            return lpTaskQ[idx];
150        } else if ((idx + 1) % numTaskSets == t.startIndex) {
151            if (!trySleep(t, now)) { // as all queues checked & got no task
152                return NULL; // executor is shutting down..
153            }
154        }
155    }
156    return NULL;
157}
158
159bool ExecutorPool::trySleep(ExecutorThread &t, struct timeval &now) {
160    LockHolder lh(mutex);
161    if (!numReadyTasks && less_tv(now, t.waketime)) {
162        if (t.state == EXECUTOR_RUNNING) {
163            t.state = EXECUTOR_SLEEPING;
164        } else {
165            LOG(EXTENSION_LOG_DEBUG, "%s: shutting down %d tasks ready",
166                    t.getName().c_str(), numReadyTasks);
167            return false;
168        }
169
170        LOG(EXTENSION_LOG_DEBUG, "%s: to sleep for %d s", t.getName().c_str(),
171                (t.waketime.tv_sec - now.tv_sec));
172        // zzz ....
173        if (is_max_tv(t.waketime)) { // in absence of reliable posting
174            advance_tv(now, MIN_SLEEP_TIME); // don't miss posts,
175            mutex.wait(now); // timed sleeps are the safe way to go
176        } else {
177            mutex.wait(t.waketime);
178        }
179
180        // got up ..
181        if (t.state == EXECUTOR_SLEEPING) {
182            t.state = EXECUTOR_RUNNING;
183        } else {
184            LOG(EXTENSION_LOG_DEBUG, "%s: shutting down %d tasks ready",
185                    t.getName().c_str(), numReadyTasks);
186            return false;
187        }
188
189        gettimeofday(&now, NULL);
190        LOG(EXTENSION_LOG_DEBUG, "%s: woke up %d tasks ready",
191        t.getName().c_str(), numReadyTasks);
192    }
193    set_max_tv(t.waketime);
194    return true;
195}
196
197
198void ExecutorPool::notifyOne(void) {
199    LockHolder lh(mutex);
200    mutex.notifyOne();
201}
202
203void ExecutorPool::notifyAll(void) {
204    LockHolder lh(mutex);
205    mutex.notify();
206}
207
208void ExecutorPool::moreWork(void) {
209    LockHolder lh(mutex);
210    numReadyTasks++;
211    highWaterMark = (numReadyTasks > highWaterMark) ?
212                     numReadyTasks : highWaterMark;
213
214    mutex.notifyOne();
215}
216
217void ExecutorPool::lessWork(void) {
218    cb_assert(numReadyTasks);
219    LockHolder lh(mutex);
220    numReadyTasks--;
221}
222
223size_t ExecutorPool::doneWork(task_type_t &curTaskType) {
224    size_t newCapacity = 0;
225    if (curTaskType != NO_TASK_TYPE) {
226        // Record that a thread is done working on a particular queue type
227        LockHolder lh(mutex);
228        LOG(EXTENSION_LOG_DEBUG, "Done with Task Type %d capacity = %d",
229                curTaskType, curWorkers[curTaskType]);
230        curWorkers[curTaskType]--;
231        newCapacity = maxWorkers[curTaskType] - curWorkers[curTaskType];
232        curTaskType = NO_TASK_TYPE;
233    }
234    return newCapacity;
235}
236
237task_type_t ExecutorPool::tryNewWork(task_type_t newTaskType) {
238    LockHolder lh(mutex);
239    // Test if a thread can take up task from the target Queue type
240    if (curWorkers[newTaskType] + 1 <= maxWorkers[newTaskType]) {
241        curWorkers[newTaskType]++;
242        LOG(EXTENSION_LOG_DEBUG,
243                "Taking up work in task type %d capacity = %d",
244                newTaskType, curWorkers[newTaskType]);
245        return newTaskType;
246    }
247
248    LOG(EXTENSION_LOG_DEBUG, "Limiting from taking up work in task "
249            "type %d capacity = %d", newTaskType, maxWorkers[newTaskType]);
250    return NO_TASK_TYPE;
251}
252
253bool ExecutorPool::cancel(size_t taskId, bool eraseTask) {
254    LockHolder lh(tMutex);
255    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
256    if (itr == taskLocator.end()) {
257        LOG(EXTENSION_LOG_DEBUG, "Task id %d not found");
258        return false;
259    }
260
261    ExTask task = itr->second.first;
262    LOG(EXTENSION_LOG_DEBUG, "Cancel task %s id %d on bucket %s %s",
263            task->getDescription().c_str(), task->getId(),
264            task->getEngine()->getName(), eraseTask ? "final erase" : "!");
265
266    task->cancel(); // must be idempotent, just set state to dead
267
268    if (eraseTask) { // only internal threads can erase tasks
269        cb_assert(task->isdead());
270        taskLocator.erase(itr);
271        tMutex.notify();
272    } else { // wake up the task from the TaskQ so a thread can safely erase it
273             // otherwise we may race with unregisterBucket where a unlocated
274             // task runs in spite of its bucket getting unregistered
275        itr->second.second->wake(task);
276    }
277    return true;
278}
279
280bool ExecutorPool::wake(size_t taskId) {
281    LockHolder lh(tMutex);
282    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
283    if (itr != taskLocator.end()) {
284        itr->second.second->wake(itr->second.first);
285        return true;
286    }
287    return false;
288}
289
290bool ExecutorPool::snooze(size_t taskId, double tosleep) {
291    LockHolder lh(tMutex);
292    std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
293    if (itr != taskLocator.end()) {
294        itr->second.first->snooze(tosleep);
295        return true;
296    }
297    return false;
298}
299
300TaskQueue* ExecutorPool::getTaskQueue(EventuallyPersistentEngine *e,
301                                      task_type_t qidx) {
302    TaskQueue         *q             = NULL;
303    size_t            curNumThreads  = 0;
304    bucket_priority_t bucketPriority = e->getWorkloadPriority();
305
306    cb_assert(0 <= (int)qidx && (size_t)qidx < numTaskSets);
307
308    curNumThreads = threadQ.size();
309
310    if (!bucketPriority) {
311        LOG(EXTENSION_LOG_WARNING, "Trying to schedule task for unregistered "
312            "bucket %s", e->getName());
313        return q;
314    }
315
316    if (curNumThreads < maxGlobalThreads) {
317        if (isHiPrioQset) {
318            q = hpTaskQ[qidx];
319        } else if (isLowPrioQset) {
320            q = lpTaskQ[qidx];
321        }
322    } else { // Max capacity Mode scheduling ...
323        if (bucketPriority == LOW_BUCKET_PRIORITY) {
324            cb_assert(lpTaskQ.size() == numTaskSets);
325            q = lpTaskQ[qidx];
326        } else {
327            cb_assert(hpTaskQ.size() == numTaskSets);
328            q = hpTaskQ[qidx];
329        }
330    }
331    return q;
332}
333
334size_t ExecutorPool::schedule(ExTask task, task_type_t qidx) {
335    LockHolder lh(tMutex);
336    TaskQueue *q = getTaskQueue(task->getEngine(), qidx);
337    TaskQpair tqp(task, q);
338    taskLocator[task->getId()] = tqp;
339
340    q->schedule(task);
341
342    return task->getId();
343}
344
345void ExecutorPool::registerBucket(EventuallyPersistentEngine *engine) {
346    TaskQ *taskQ;
347    bool *whichQset;
348    const char *queueName;
349    WorkLoadPolicy &workload = engine->getWorkLoadPolicy();
350    bucket_priority_t priority = workload.getBucketPriority();
351
352    if (priority < HIGH_BUCKET_PRIORITY) {
353        engine->setWorkloadPriority(LOW_BUCKET_PRIORITY);
354        taskQ = &lpTaskQ;
355        whichQset = &isLowPrioQset;
356        queueName = "LowPrioQ_";
357        LOG(EXTENSION_LOG_WARNING, "Bucket %s registered with low priority",
358            engine->getName());
359    } else {
360        engine->setWorkloadPriority(HIGH_BUCKET_PRIORITY);
361        taskQ = &hpTaskQ;
362        whichQset = &isHiPrioQset;
363        queueName = "HiPrioQ_";
364        LOG(EXTENSION_LOG_WARNING, "Bucket %s registered with high priority",
365            engine->getName());
366    }
367
368    LockHolder lh(tMutex);
369
370    if (!(*whichQset)) {
371        taskQ->reserve(numTaskSets);
372        for (size_t i = 0; i < numTaskSets; i++) {
373            taskQ->push_back(new TaskQueue(this, (task_type_t)i, queueName));
374        }
375        *whichQset = true;
376    }
377
378    numBuckets++;
379
380    startWorkers();
381}
382
383bool ExecutorPool::startWorkers(void) {
384    if (threadQ.size()) {
385        return false;
386    }
387
388    size_t numReaders = getNumReaders();
389    size_t numWriters = getNumWriters();
390    size_t numAuxIO   = getNumAuxIO();
391    size_t numNonIO   = getNumNonIO();
392
393    LOG(EXTENSION_LOG_WARNING,
394            "Spawning %zu readers, %zu writers, %zu auxIO, %zu nonIO threads",
395            numReaders, numWriters, numAuxIO, numNonIO);
396
397    for (size_t tidx = 0; tidx < numReaders; ++tidx) {
398        std::stringstream ss;
399        ss << "reader_worker_" << tidx;
400
401        threadQ.push_back(new ExecutorThread(this, READER_TASK_IDX, ss.str()));
402        threadQ.back()->start();
403    }
404    for (size_t tidx = 0; tidx < numWriters; ++tidx) {
405        std::stringstream ss;
406        ss << "writer_worker_" << numReaders + tidx;
407
408        threadQ.push_back(new ExecutorThread(this, WRITER_TASK_IDX, ss.str()));
409        threadQ.back()->start();
410    }
411    for (size_t tidx = 0; tidx < numAuxIO; ++tidx) {
412        std::stringstream ss;
413        ss << "auxio_worker_" << numReaders + numWriters + tidx;
414
415        threadQ.push_back(new ExecutorThread(this, AUXIO_TASK_IDX, ss.str()));
416        threadQ.back()->start();
417    }
418    for (size_t tidx = 0; tidx < numNonIO; ++tidx) {
419        std::stringstream ss;
420        ss << "nonio_worker_" << numReaders + numWriters + numAuxIO + tidx;
421
422        threadQ.push_back(new ExecutorThread(this, NONIO_TASK_IDX, ss.str()));
423        threadQ.back()->start();
424    }
425
426    LockHolder lh(mutex);
427    maxWorkers[AUXIO_TASK_IDX]  = numAuxIO;
428    maxWorkers[NONIO_TASK_IDX]  = numNonIO;
429
430    return true;
431}
432
433bool ExecutorPool::stopTaskGroup(EventuallyPersistentEngine *e,
434                                 task_type_t taskType) {
435    bool unfinishedTask;
436    bool retVal = false;
437    std::map<size_t, TaskQpair>::iterator itr;
438
439    LockHolder lh(tMutex);
440    LOG(EXTENSION_LOG_DEBUG, "Stopping %d type tasks in bucket %s", taskType,
441            e->getName());
442    do {
443        ExTask task;
444        unfinishedTask = false;
445        for (itr = taskLocator.begin(); itr != taskLocator.end(); itr++) {
446            task = itr->second.first;
447            TaskQueue *q = itr->second.second;
448            if (task->getEngine() == e &&
449                (taskType == NO_TASK_TYPE || q->queueType == taskType)) {
450                LOG(EXTENSION_LOG_DEBUG, "Stopping Task id %d %s ",
451                        task->getId(), task->getDescription().c_str());
452                if (!task->blockShutdown) {
453                    task->cancel(); // Must be idempotent
454                }
455                q->wake(task);
456                unfinishedTask = true;
457                retVal = true;
458            }
459        }
460        if (unfinishedTask) {
461            struct timeval waktime;
462            gettimeofday(&waktime, NULL);
463            advance_tv(waktime, MIN_SLEEP_TIME);
464            tMutex.wait(waktime); // Wait till task gets cancelled
465        }
466    } while (unfinishedTask);
467
468    return retVal;
469}
470
471void ExecutorPool::unregisterBucket(EventuallyPersistentEngine *engine) {
472
473    LOG(EXTENSION_LOG_DEBUG, "Unregistering bucket %s", engine->getName());
474
475    stopTaskGroup(engine, NO_TASK_TYPE);
476
477    LockHolder lh(tMutex);
478
479    if (!(--numBuckets)) {
480        assert (!taskLocator.size());
481        LockHolder lm(mutex);
482        for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
483            threadQ[tidx]->stop(false); // only set state to DEAD
484        }
485
486        mutex.notify();
487        lm.unlock();
488
489        for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
490            threadQ[tidx]->stop(/*wait for threads */);
491            delete threadQ[tidx];
492        }
493
494        for (size_t i = 0; i < numTaskSets; i++) {
495            curWorkers[i] = 0;
496        }
497
498        threadQ.clear();
499        LOG(EXTENSION_LOG_DEBUG, "Last bucket has unregistered");
500    }
501}
502
503void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
504                               const void *cookie, ADD_STAT add_stat) {
505    if (engine->getEpStats().isShutdown) {
506        return;
507    }
508
509    char statname[80] = {0};
510    if (isHiPrioQset) {
511        for (size_t i = 0; i < numTaskSets; i++) {
512            snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
513                     hpTaskQ[i]->getName().c_str());
514            add_casted_stat(statname, hpTaskQ[i]->futureQueue.size(), add_stat,
515                            cookie);
516            snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
517                     hpTaskQ[i]->getName().c_str());
518            add_casted_stat(statname, hpTaskQ[i]->readyQueue.size(), add_stat,
519                            cookie);
520            if (!hpTaskQ[i]->pendingQueue.empty()) {
521                snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
522                        hpTaskQ[i]->getName().c_str());
523                add_casted_stat(statname, hpTaskQ[i]->pendingQueue.size(),
524                                add_stat, cookie);
525            }
526        }
527    }
528    if (isLowPrioQset) {
529        for (size_t i = 0; i < numTaskSets; i++) {
530            snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
531                     lpTaskQ[i]->getName().c_str());
532            add_casted_stat(statname, lpTaskQ[i]->futureQueue.size(), add_stat,
533                            cookie);
534            snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
535                     lpTaskQ[i]->getName().c_str());
536            add_casted_stat(statname, lpTaskQ[i]->readyQueue.size(), add_stat,
537                            cookie);
538            if (!lpTaskQ[i]->pendingQueue.empty()) {
539                snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
540                        lpTaskQ[i]->getName().c_str());
541                add_casted_stat(statname, lpTaskQ[i]->pendingQueue.size(),
542                                add_stat, cookie);
543            }
544        }
545    }
546}
547
548static void showJobLog(const char *logname, const char *prefix,
549                       std::vector<TaskLogEntry> log,
550                       const void *cookie, ADD_STAT add_stat) {
551    char statname[80] = {0};
552    for (size_t i = 0;i < log.size(); ++i) {
553        snprintf(statname, sizeof(statname), "%s:%s:%d:task", prefix,
554                logname, static_cast<int>(i));
555        add_casted_stat(statname, log[i].getName().c_str(), add_stat,
556                        cookie);
557        snprintf(statname, sizeof(statname), "%s:%s:%d:starttime",
558                prefix, logname, static_cast<int>(i));
559        add_casted_stat(statname, log[i].getTimestamp(), add_stat,
560                cookie);
561        snprintf(statname, sizeof(statname), "%s:%s:%d:runtime",
562                prefix, logname, static_cast<int>(i));
563        add_casted_stat(statname, log[i].getDuration(), add_stat,
564                cookie);
565    }
566}
567
568static void addWorkerStats(const char *prefix, ExecutorThread *t,
569                           const void *cookie, ADD_STAT add_stat) {
570    char statname[80] = {0};
571    snprintf(statname, sizeof(statname), "%s:state", prefix);
572    add_casted_stat(statname, t->getStateName().c_str(), add_stat, cookie);
573    snprintf(statname, sizeof(statname), "%s:task", prefix);
574    add_casted_stat(statname, t->getTaskName().c_str(), add_stat, cookie);
575
576    if (strcmp(t->getStateName().c_str(), "running") == 0) {
577        snprintf(statname, sizeof(statname), "%s:runtime", prefix);
578        add_casted_stat(statname,
579                (gethrtime() - t->getTaskStart()) / 1000, add_stat, cookie);
580    }
581}
582
583void ExecutorPool::doWorkerStat(EventuallyPersistentEngine *engine,
584                               const void *cookie, ADD_STAT add_stat) {
585    if (engine->getEpStats().isShutdown) {
586        return;
587    }
588
589    //TODO: implement tracking per engine stats ..
590    for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
591        addWorkerStats(threadQ[tidx]->getName().c_str(), threadQ[tidx],
592                     cookie, add_stat);
593    }
594    if (isHiPrioQset) {
595        for (size_t i = 0; i < numTaskSets; i++) {
596            showJobLog("log", hpTaskQ[i]->getName().c_str(),
597                       hpTaskQ[i]->getLog(), cookie, add_stat);
598            showJobLog("slow", hpTaskQ[i]->getName().c_str(),
599                       hpTaskQ[i]->getSlowLog(), cookie, add_stat);
600        }
601    }
602    if (isLowPrioQset) {
603        for (size_t i = 0; i < numTaskSets; i++) {
604            showJobLog("log", lpTaskQ[i]->getName().c_str(),
605                       lpTaskQ[i]->getLog(), cookie, add_stat);
606            showJobLog("slow", lpTaskQ[i]->getName().c_str(),
607                       lpTaskQ[i]->getSlowLog(), cookie, add_stat);
608        }
609    }
610}
611