1dec1f898STrond Norbye/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
25b304997SSteven Grimm/*
35b304997SSteven Grimm * Thread management for memcached.
45b304997SSteven Grimm */
59738c807STrond Norbye#include "connection.h"
677191f6aSDave Rigby#include "connections.h"
79738c807STrond Norbye#include "cookie.h"
87a2703f7STrond Norbye#include "front_end_thread.h"
926d24b39STrond Norbye#include "listening_port.h"
109f29a7f6SDave Rigby#include "log_macros.h"
119738c807STrond Norbye#include "memcached.h"
1250429d8cSDave Rigby#include "opentracing.h"
13c4b6868aSDave Rigby#include "settings.h"
140185d9feSDave Rigby#include "stats.h"
15deba2aacSDave Rigby#include "tracing.h"
1663b2de8aSRichard de Mellow#include <utilities/hdrhistogram.h>
1777191f6aSDave Rigby
1850429d8cSDave Rigby#include <memcached/openssl.h>
19291a7539STim Bradgate#include <nlohmann/json.hpp>
2050429d8cSDave Rigby#include <openssl/conf.h>
21deba2aacSDave Rigby#include <phosphor/phosphor.h>
2250429d8cSDave Rigby#include <platform/socket.h>
233cd4e78dSDaniel Owen#include <platform/strerror.h>
249c34d9c3SDave Rigby
2550429d8cSDave Rigby#include <fcntl.h>
2625358980STim Bradgate#include <atomic>
2750429d8cSDave Rigby#include <cerrno>
28ffb7c145STrond Norbye#include <condition_variable>
2950429d8cSDave Rigby#include <csignal>
3050429d8cSDave Rigby#include <cstdint>
3150429d8cSDave Rigby#include <cstdio>
329738c807STrond Norbye#include <cstdlib>
339738c807STrond Norbye#include <cstring>
349c34d9c3SDave Rigby#ifndef WIN32
359c34d9c3SDave Rigby#include <netinet/tcp.h> // For TCP_NODELAY etc
369c34d9c3SDave Rigby#endif
37394e5a17STrond Norbye#include <memory>
3850429d8cSDave Rigby#include <mutex>
3950429d8cSDave Rigby#include <queue>
405b304997SSteven Grimm
4195b33470SDave Rigbyextern std::atomic<bool> memcached_shutdown;
426d860fb3STrond Norbye
435b304997SSteven Grimm/* An item in the connection queue. */
44c6780a7cSTrond NorbyeFrontEndThread::ConnectionQueue::~ConnectionQueue() {
45c6780a7cSTrond Norbye    for (const auto& entry : connections) {
46c6780a7cSTrond Norbye        safe_close(entry.first);
47394e5a17STrond Norbye    }
48152808d1STrond Norbye}
49394e5a17STrond Norbye
50edd2d65bSTrond Norbyevoid FrontEndThread::ConnectionQueue::push(SOCKET sock,
51edd2d65bSTrond Norbye                                           SharedListeningPort interface) {
52152808d1STrond Norbye    std::lock_guard<std::mutex> guard(mutex);
53edd2d65bSTrond Norbye    connections.emplace_back(sock, interface);
54152808d1STrond Norbye}
55394e5a17STrond Norbye
56c6780a7cSTrond Norbyevoid FrontEndThread::ConnectionQueue::swap(
57edd2d65bSTrond Norbye        std::vector<std::pair<SOCKET, SharedListeningPort>>& other) {
58152808d1STrond Norbye    std::lock_guard<std::mutex> guard(mutex);
59c6780a7cSTrond Norbye    connections.swap(other);
60152808d1STrond Norbye}
615b304997SSteven Grimm
627a2703f7STrond Norbyestatic FrontEndThread dispatcher_thread;
632fe44f1cSDmitry Isaykin
64cad3ea13STrond Norbyevoid FrontEndThread::NotificationList::push(Connection* c) {
65cad3ea13STrond Norbye    std::lock_guard<std::mutex> lock(mutex);
66cad3ea13STrond Norbye    auto iter = std::find(connections.begin(), connections.end(), c);
67cad3ea13STrond Norbye    if (iter == connections.end()) {
68cad3ea13STrond Norbye        try {
69cad3ea13STrond Norbye            connections.push_back(c);
70cad3ea13STrond Norbye        } catch (const std::bad_alloc&) {
71cad3ea13STrond Norbye            // Just ignore and hopefully we'll be able to signal it at a later
72cad3ea13STrond Norbye            // time.
73cad3ea13STrond Norbye        }
74cad3ea13STrond Norbye    }
75cad3ea13STrond Norbye}
76cad3ea13STrond Norbye
77cad3ea13STrond Norbyevoid FrontEndThread::NotificationList::remove(Connection* c) {
78cad3ea13STrond Norbye    std::lock_guard<std::mutex> lock(mutex);
79cad3ea13STrond Norbye    auto iter = std::find(connections.begin(), connections.end(), c);
80cad3ea13STrond Norbye    if (iter != connections.end()) {
81cad3ea13STrond Norbye        connections.erase(iter);
82cad3ea13STrond Norbye    }
83cad3ea13STrond Norbye}
84cad3ea13STrond Norbye
85cad3ea13STrond Norbyevoid FrontEndThread::NotificationList::swap(std::vector<Connection*>& other) {
86cad3ea13STrond Norbye    std::lock_guard<std::mutex> lock(mutex);
87cad3ea13STrond Norbye    connections.swap(other);
88cad3ea13STrond Norbye}
89cad3ea13STrond Norbye
905b304997SSteven Grimm/*
915b304997SSteven Grimm * Each libevent instance has a wakeup pipe, which other threads
925b304997SSteven Grimm * can use to signal that they've put a new connection on its queue.
935b304997SSteven Grimm */
947a2703f7STrond Norbyestatic std::vector<FrontEndThread> threads;
950f1a35e9SRichard de Mellowstd::vector<Hdr1sfMicroSecHistogram> scheduler_info;
965b304997SSteven Grimm
975b304997SSteven Grimm/*
982fe44f1cSDmitry Isaykin * Number of worker threads that have finished setting themselves up.
995b304997SSteven Grimm */
100fa526109STim Bradgatestatic size_t init_count = 0;
101ffb7c145STrond Norbyestatic std::mutex init_mutex;
102ffb7c145STrond Norbyestatic std::condition_variable init_cond;
1035b304997SSteven Grimm
104b4a4f862STrond Norbyestatic void thread_libevent_process(evutil_socket_t, short, void*);
1055b304997SSteven Grimm
1065b304997SSteven Grimm/*
1075b304997SSteven Grimm * Creates a worker thread.
1085b304997SSteven Grimm */
10970599dd8SDave Rigbystatic void create_worker(void (*func)(void *), void *arg, cb_thread_t *id,
11070599dd8SDave Rigby                          const char* name) {
111b4a4f862STrond Norbye    if (cb_create_named_thread(id, func, arg, 0, name) != 0) {
1124dd9b0f4STrond Norbye        FATAL_ERROR(EXIT_FAILURE,
1134dd9b0f4STrond Norbye                    "Can't create thread {}: {}",
1144dd9b0f4STrond Norbye                    name,
11541f62c77STrond Norbye                    cb_strerror());
1165b304997SSteven Grimm    }
1175b304997SSteven Grimm}
1185b304997SSteven Grimm
1195b304997SSteven Grimm/****************************** LIBEVENT THREADS *****************************/
1205b304997SSteven Grimm
121bf3926e0STrond Norbyevoid iterate_all_connections(std::function<void(Connection&)> callback) {
122ff3bb5d2STrond Norbye    for (auto& thr : threads) {
123deba2aacSDave Rigby        TRACE_LOCKGUARD_TIMED(thr.mutex,
124deba2aacSDave Rigby                              "mutex",
125deba2aacSDave Rigby                              "iterate_all_connections::threadLock",
126deba2aacSDave Rigby                              SlowMutexThreshold);
127ff3bb5d2STrond Norbye        iterate_thread_connections(&thr, callback);
128bf3926e0STrond Norbye    }
129bf3926e0STrond Norbye}
130bf3926e0STrond Norbye
13150429d8cSDave Rigbystatic bool create_notification_pipe(FrontEndThread& me) {
13250429d8cSDave Rigby    if (cb::net::socketpair(SOCKETPAIR_AF,
13350429d8cSDave Rigby                            SOCK_STREAM,
13450429d8cSDave Rigby                            0,
13550429d8cSDave Rigby                            reinterpret_cast<SOCKET*>(me.notify)) ==
13650429d8cSDave Rigby        SOCKET_ERROR) {
13750429d8cSDave Rigby        LOG_WARNING("Can't create notify pipe: {}",
13850429d8cSDave Rigby                    cb_strerror(cb::net::get_socket_error()));
13950429d8cSDave Rigby        return false;
14050429d8cSDave Rigby    }
14150429d8cSDave Rigby
14250429d8cSDave Rigby    for (auto sock : me.notify) {
14350429d8cSDave Rigby        int flags = 1;
14450429d8cSDave Rigby        const auto* flag_ptr = reinterpret_cast<const void*>(&flags);
14550429d8cSDave Rigby        cb::net::setsockopt(
14650429d8cSDave Rigby                sock, IPPROTO_TCP, TCP_NODELAY, flag_ptr, sizeof(flags));
14750429d8cSDave Rigby        cb::net::setsockopt(
14850429d8cSDave Rigby                sock, SOL_SOCKET, SO_REUSEADDR, flag_ptr, sizeof(flags));
14950429d8cSDave Rigby
15050429d8cSDave Rigby        if (evutil_make_socket_nonblocking(sock) == -1) {
15150429d8cSDave Rigby            LOG_WARNING("Failed to enable non-blocking: {}",
15250429d8cSDave Rigby                        cb_strerror(cb::net::get_socket_error()));
15350429d8cSDave Rigby            return false;
15450429d8cSDave Rigby        }
15550429d8cSDave Rigby    }
15650429d8cSDave Rigby    return true;
15750429d8cSDave Rigby}
15850429d8cSDave Rigby
15926470267STrond Norbyestatic void setup_dispatcher(struct event_base *main_base,
160ce9a18d0STrond Norbye                             void (*dispatcher_callback)(evutil_socket_t, short, void *))
16126470267STrond Norbye{
16226470267STrond Norbye    dispatcher_thread.base = main_base;
163f603fdb6STrond Norbye	dispatcher_thread.thread_id = cb_thread_self();
16450429d8cSDave Rigby        if (!create_notification_pipe(dispatcher_thread)) {
16550429d8cSDave Rigby            FATAL_ERROR(EXIT_FAILURE, "Unable to create notification pipe");
16650429d8cSDave Rigby    }
16726470267STrond Norbye
168b1ddea27STrond Norbye    /* Listen for notifications from other threads */
16950429d8cSDave Rigby    if ((event_assign(&dispatcher_thread.notify_event,
17050429d8cSDave Rigby                      dispatcher_thread.base,
17150429d8cSDave Rigby                      dispatcher_thread.notify[0],
17250429d8cSDave Rigby                      EV_READ | EV_PERSIST,
17350429d8cSDave Rigby                      dispatcher_callback,
17450429d8cSDave Rigby                      nullptr) == -1) ||
17550429d8cSDave Rigby        (event_add(&dispatcher_thread.notify_event, nullptr) == -1)) {
17650429d8cSDave Rigby        FATAL_ERROR(EXIT_FAILURE, "Can't monitor libevent notify pipe");
17750429d8cSDave Rigby    }
178edd2d65bSTrond Norbye    dispatcher_thread.running = true;
17926470267STrond Norbye}
18026470267STrond Norbye
1815b304997SSteven Grimm/*
1825b304997SSteven Grimm * Set up a thread's information.
1835b304997SSteven Grimm */
1847a2703f7STrond Norbyestatic void setup_thread(FrontEndThread& me) {
185c74c69a1STrond Norbye    me.base = event_base_new();
1864ecec1b3SJim Walker
187c74c69a1STrond Norbye    if (!me.base) {
1883cd4e78dSDaniel Owen        FATAL_ERROR(EXIT_FAILURE, "Can't allocate event base");
1895b304997SSteven Grimm    }
1905b304997SSteven Grimm
1915b304997SSteven Grimm    /* Listen for notifications from other threads */
192c74c69a1STrond Norbye    if ((event_assign(&me.notify_event,
193c74c69a1STrond Norbye                      me.base,
19450429d8cSDave Rigby                      me.notify[0],
195b1ddea27STrond Norbye                      EV_READ | EV_PERSIST,
196b1ddea27STrond Norbye                      thread_libevent_process,
197c74c69a1STrond Norbye                      &me) == -1) ||
198b4a4f862STrond Norbye        (event_add(&me.notify_event, nullptr) == -1)) {
1993cd4e78dSDaniel Owen        FATAL_ERROR(EXIT_FAILURE, "Can't monitor libevent notify pipe");
2005b304997SSteven Grimm    }
2015b304997SSteven Grimm}
2025b304997SSteven Grimm
2035b304997SSteven Grimm/*
2045b304997SSteven Grimm * Worker thread: main event loop
2055b304997SSteven Grimm */
206f603fdb6STrond Norbyestatic void worker_libevent(void *arg) {
207b4a4f862STrond Norbye    auto& me = *reinterpret_cast<FrontEndThread*>(arg);
2085b304997SSteven Grimm
209b4a4f862STrond Norbye    // Any per-thread setup can happen here; thread_init() will block until
210b4a4f862STrond Norbye    // all threads have finished initializing.
211ffb7c145STrond Norbye    {
212ffb7c145STrond Norbye        std::lock_guard<std::mutex> guard(init_mutex);
213ffb7c145STrond Norbye        me.running = true;
214ffb7c145STrond Norbye        init_count++;
215ffb7c145STrond Norbye        init_cond.notify_all();
216ffb7c145STrond Norbye    }
2175b304997SSteven Grimm
218b4a4f862STrond Norbye    event_base_loop(me.base, 0);
219b4a4f862STrond Norbye    me.running = false;
2205b304997SSteven Grimm}
2215b304997SSteven Grimm
22250429d8cSDave Rigbyvoid drain_notification_channel(evutil_socket_t fd) {
22350429d8cSDave Rigby    /* Every time we want to notify a thread, we send 1 byte to its
22450429d8cSDave Rigby     * notification pipe. When the thread wakes up, it tries to drain
22550429d8cSDave Rigby     * it's notification channel before executing any other events.
22650429d8cSDave Rigby     * Other threads (listener and other background threads) may notify
22750429d8cSDave Rigby     * this thread up to 512 times since the last time we checked the
22850429d8cSDave Rigby     * notification pipe, before we'll start draining the it again.
22950429d8cSDave Rigby     */
23050429d8cSDave Rigby
23150429d8cSDave Rigby    ssize_t nread;
23250429d8cSDave Rigby    // Using a small size for devnull will avoid blowing up the stack
23350429d8cSDave Rigby    char devnull[512];
23450429d8cSDave Rigby
23550429d8cSDave Rigby    while ((nread = cb::net::recv(fd, devnull, sizeof(devnull), 0)) ==
23650429d8cSDave Rigby           (int)sizeof(devnull)) {
23750429d8cSDave Rigby        /* empty */
23850429d8cSDave Rigby    }
23950429d8cSDave Rigby
24050429d8cSDave Rigby    if (nread == -1) {
24150429d8cSDave Rigby        LOG_WARNING("Can't read from libevent pipe: {}",
24250429d8cSDave Rigby                    cb_strerror(cb::net::get_socket_error()));
24350429d8cSDave Rigby    }
24450429d8cSDave Rigby}
24550429d8cSDave Rigby
2467a2703f7STrond Norbyestatic void dispatch_new_connections(FrontEndThread& me) {
247edd2d65bSTrond Norbye    std::vector<std::pair<SOCKET, SharedListeningPort>> connections;
248c6780a7cSTrond Norbye    me.new_conn_queue.swap(connections);
249c6780a7cSTrond Norbye
250c6780a7cSTrond Norbye    for (const auto& entry : connections) {
2519e8f5bb7STrond Norbye        if (conn_new(entry.first, *entry.second, me.base, me) == nullptr) {
25226d24b39STrond Norbye            if (entry.second->system) {
25326d24b39STrond Norbye                --stats.system_conns;
25426d24b39STrond Norbye            }
255c6780a7cSTrond Norbye            safe_close(entry.first);
256394e5a17STrond Norbye        }
257394e5a17STrond Norbye    }
258394e5a17STrond Norbye}
259394e5a17STrond Norbye
2605b304997SSteven Grimm/*
2615b304997SSteven Grimm * Processes an incoming "handle a new connection" item. This is called when
2625b304997SSteven Grimm * input arrives on the libevent wakeup pipe.
2635b304997SSteven Grimm */
264b4a4f862STrond Norbyestatic void thread_libevent_process(evutil_socket_t fd, short, void* arg) {
2657a2703f7STrond Norbye    auto& me = *reinterpret_cast<FrontEndThread*>(arg);
266f603fdb6STrond Norbye
26750429d8cSDave Rigby    // Start by draining the notification channel before doing any work.
26850429d8cSDave Rigby    // By doing so we know that we'll be notified again if someone
26950429d8cSDave Rigby    // tries to notify us while we're doing the work below (so we don't have
27050429d8cSDave Rigby    // to care about race conditions for stuff people try to notify us
27150429d8cSDave Rigby    // about.
27250429d8cSDave Rigby    drain_notification_channel(fd);
27350429d8cSDave Rigby
274ed7351efSTrond Norbye    if (memcached_shutdown) {
275cbb8b19eSTrond Norbye        // Someone requested memcached to shut down. The listen thread should
276cbb8b19eSTrond Norbye        // be stopped immediately.
277cbb8b19eSTrond Norbye        if (is_listen_thread()) {
2784dd9b0f4STrond Norbye            LOG_INFO("Stopping listen thread (thread.cc)");
279c74c69a1STrond Norbye            event_base_loopbreak(me.base);
280cbb8b19eSTrond Norbye            return;
281cbb8b19eSTrond Norbye        }
282cbb8b19eSTrond Norbye
28334440eedSTrond Norbye        if (signal_idle_clients(me, false) == 0) {
284d166bc8cSTrond Norbye            LOG_INFO("Stopping worker thread {}", me.index);
285c74c69a1STrond Norbye            event_base_loopbreak(me.base);
286cbb8b19eSTrond Norbye            return;
287cbb8b19eSTrond Norbye        }
288ed7351efSTrond Norbye    }
289d9b97d80SPaul Lindner
290394e5a17STrond Norbye    dispatch_new_connections(me);
291f05b01c4STrond Norbye
2927a2703f7STrond Norbye    FrontEndThread::PendingIoMap pending;
2930800742bSDave Rigby    {
2940800742bSDave Rigby        std::lock_guard<std::mutex> lock(me.pending_io.mutex);
2950800742bSDave Rigby        me.pending_io.map.swap(pending);
2960800742bSDave Rigby    }
2970800742bSDave Rigby
298deba2aacSDave Rigby    TRACE_LOCKGUARD_TIMED(me.mutex,
299deba2aacSDave Rigby                          "mutex",
300deba2aacSDave Rigby                          "thread_libevent_process::threadLock",
301deba2aacSDave Rigby                          SlowMutexThreshold);
302c74c69a1STrond Norbye
303cad3ea13STrond Norbye    std::vector<Connection*> notify;
304cad3ea13STrond Norbye    me.notification.swap(notify);
305cad3ea13STrond Norbye
306b4a4f862STrond Norbye    for (const auto& io : pending) {
3070800742bSDave Rigby        auto* c = io.first;
308cad3ea13STrond Norbye
309cad3ea13STrond Norbye        // Remove from the notify list if it's there as we don't
310cad3ea13STrond Norbye        // want to run them twice
311cad3ea13STrond Norbye        {
312cad3ea13STrond Norbye            auto iter = std::find(notify.begin(), notify.end(), c);
313cad3ea13STrond Norbye            if (iter != notify.end()) {
314cad3ea13STrond Norbye                notify.erase(iter);
315cad3ea13STrond Norbye            }
316cad3ea13STrond Norbye        }
317cad3ea13STrond Norbye
318bf74734dSTrond Norbye        if (c->getSocketDescriptor() != INVALID_SOCKET &&
319bf74734dSTrond Norbye            !c->isRegisteredInLibevent()) {
320bf74734dSTrond Norbye            /* The socket may have been shut down while we're looping */
321bf74734dSTrond Norbye            /* in delayed shutdown */
322bf74734dSTrond Norbye            c->registerEvent();
3230bb3f178STrond Norbye        }
324bf74734dSTrond Norbye
325a02cd553STrond Norbye        for (const auto& pair : io.second) {
326a02cd553STrond Norbye            if (pair.first) {
327a02cd553STrond Norbye                pair.first->setAiostat(pair.second);
328a02cd553STrond Norbye                pair.first->setEwouldblock(false);
329a02cd553STrond Norbye            }
330a02cd553STrond Norbye        }
331a02cd553STrond Norbye
332bf74734dSTrond Norbye        /*
333bf74734dSTrond Norbye         * We don't want the thread to keep on serving all of the data
334bf74734dSTrond Norbye         * from the context of the notification pipe, so just let it
335bf74734dSTrond Norbye         * run one time to set up the correct mask in libevent
336bf74734dSTrond Norbye         */
337bf74734dSTrond Norbye        c->setNumEvents(1);
338bf74734dSTrond Norbye        run_event_loop(c, EV_READ | EV_WRITE);
339f05b01c4STrond Norbye    }
3405461ca70STrond Norbye
341cad3ea13STrond Norbye    // Notify the connections we haven't notified yet
342cad3ea13STrond Norbye    for (auto c : notify) {
343cad3ea13STrond Norbye        c->setNumEvents(1);
344cad3ea13STrond Norbye        run_event_loop(c, EV_READ | EV_WRITE);
345cad3ea13STrond Norbye    }
346cad3ea13STrond Norbye
347cbb8b19eSTrond Norbye    if (memcached_shutdown) {
348cbb8b19eSTrond Norbye        // Someone requested memcached to shut down. If we don't have
349cbb8b19eSTrond Norbye        // any connections bound to this thread we can just shut down
35034440eedSTrond Norbye        time_t now = time(nullptr);
35134440eedSTrond Norbye        bool log = now > me.shutdown_next_log;
35234440eedSTrond Norbye        if (log) {
35334440eedSTrond Norbye            me.shutdown_next_log = now + 5;
35434440eedSTrond Norbye        }
35534440eedSTrond Norbye
35634440eedSTrond Norbye        auto connected = signal_idle_clients(me, log);
357cbb8b19eSTrond Norbye        if (connected == 0) {
3584dd9b0f4STrond Norbye            LOG_INFO("Stopping worker thread {}", me.index);
359c74c69a1STrond Norbye            event_base_loopbreak(me.base);
36034440eedSTrond Norbye        } else if (log) {
3614dd9b0f4STrond Norbye            LOG_INFO("Waiting for {} connected clients on worker thread {}",
3624dd9b0f4STrond Norbye                     connected,
3634dd9b0f4STrond Norbye                     me.index);
364cbb8b19eSTrond Norbye        }
365cbb8b19eSTrond Norbye    }
3665b304997SSteven Grimm}
3675b304997SSteven Grimm
368929e6fbdSDave Rigbyvoid notify_io_complete(gsl::not_null<const void*> void_cookie,
369929e6fbdSDave Rigby                        ENGINE_ERROR_CODE status) {
370e4ea6dd5STrond Norbye    auto* ccookie = reinterpret_cast<const Cookie*>(void_cookie.get());
371e4ea6dd5STrond Norbye    auto& cookie = const_cast<Cookie&>(*ccookie);
37226b37982SDustin Sallings
3739e8f5bb7STrond Norbye    auto& thr = cookie.getConnection().getThread();
3744dd9b0f4STrond Norbye    LOG_DEBUG("notify_io_complete: Got notify from {}, status {}",
375e4ea6dd5STrond Norbye              cookie.getConnection().getId(),
376a9477ebfSTrond Norbye              status);
377c3e2b4f4STrond Norbye
37845a77d91STrond Norbye    /* kick the thread in the butt */
379a02cd553STrond Norbye    if (add_conn_to_pending_io_list(&cookie.getConnection(), &cookie, status)) {
3809e8f5bb7STrond Norbye        notify_thread(thr);
381f05b01c4STrond Norbye    }
38272a52b14STrond Norbye}
38372a52b14STrond Norbye
3845b304997SSteven Grimm/* Which thread we assigned a connection to most recently. */
385c6780a7cSTrond Norbyestatic size_t last_thread = 0;
3865b304997SSteven Grimm
3875b304997SSteven Grimm/*
3885b304997SSteven Grimm * Dispatches a new connection to another thread. This is only ever called
389f603fdb6STrond Norbye * from the main thread, or because of an incoming connection.
3905b304997SSteven Grimm */
391edd2d65bSTrond Norbyevoid dispatch_conn_new(SOCKET sfd, SharedListeningPort& interface) {
39219748396SDave Rigby    size_t tid = (last_thread + 1) % Settings::instance().getNumWorkerThreads();
393ff3bb5d2STrond Norbye    auto& thread = threads[tid];
394c6780a7cSTrond Norbye    last_thread = tid;
395b8cf33a0SDave Rigby
396394e5a17STrond Norbye    try {
397edd2d65bSTrond Norbye        thread.new_conn_queue.push(sfd, interface);
39819c4c3ecSTrond Norbye    } catch (const std::bad_alloc& e) {
3994dd9b0f4STrond Norbye        LOG_WARNING("dispatch_conn_new: Failed to dispatch new connection: {}",
400a9541408STrond Norbye                    e.what());
40126d24b39STrond Norbye
40226d24b39STrond Norbye        if (interface->system) {
40326d24b39STrond Norbye            --stats.system_conns;
40426d24b39STrond Norbye        }
405394e5a17STrond Norbye        safe_close(sfd);
406394e5a17STrond Norbye        return ;
407b8cf33a0SDave Rigby    }
408b8cf33a0SDave Rigby
409c74c69a1STrond Norbye    notify_thread(thread);
4105b304997SSteven Grimm}
4115b304997SSteven Grimm
4125b304997SSteven Grimm/*
4135b304997SSteven Grimm * Returns true if this is the thread that listens for new TCP connections.
4145b304997SSteven Grimm */
415a9dcd9acSToru Maesakaint is_listen_thread() {
416f603fdb6STrond Norbye    return dispatcher_thread.thread_id == cb_thread_self();
4175b304997SSteven Grimm}
4185b304997SSteven Grimm
419c74c69a1STrond Norbyevoid notify_dispatcher() {
420edd2d65bSTrond Norbye    if (dispatcher_thread.running) {
421edd2d65bSTrond Norbye        notify_thread(dispatcher_thread);
422edd2d65bSTrond Norbye    }
42326470267STrond Norbye}
4246f60aac4SSteven Grimm
4255b304997SSteven Grimm/******************************* GLOBAL STATS ******************************/
4265b304997SSteven Grimm
427bbe213d4SEugen-Alexandru Virtanvoid threadlocal_stats_reset(std::vector<thread_stats>& thread_stats) {
428bbe213d4SEugen-Alexandru Virtan    for (auto& ii : thread_stats) {
429bbe213d4SEugen-Alexandru Virtan        ii.reset();
430c3fd96d9SSean Lynch    }
431c3fd96d9SSean Lynch}
432c3fd96d9SSean Lynch
4335b304997SSteven Grimm/*
4345b304997SSteven Grimm * Initializes the thread subsystem, creating various worker threads.
4355b304997SSteven Grimm *
4362fe44f1cSDmitry Isaykin * nthreads  Number of worker event handler threads to spawn
4375b304997SSteven Grimm * main_base Event base for main thread
4385b304997SSteven Grimm */
439fa526109STim Bradgatevoid thread_init(size_t nthr,
440fa526109STim Bradgate                 struct event_base* main_base,
441fa526109STim Bradgate                 void (*dispatcher_callback)(evutil_socket_t, short, void*)) {
442e371f62dSTrond Norbye    scheduler_info.resize(nthr);
443a5711b44STrond Norbye
444a5711b44STrond Norbye    try {
445e371f62dSTrond Norbye        threads = std::vector<FrontEndThread>(nthr);
446a5711b44STrond Norbye    } catch (const std::bad_alloc&) {
4473cd4e78dSDaniel Owen        FATAL_ERROR(EXIT_FAILURE, "Can't allocate thread descriptors");
4485b304997SSteven Grimm    }
4495b304997SSteven Grimm
45026470267STrond Norbye    setup_dispatcher(main_base, dispatcher_callback);
4515b304997SSteven Grimm
452e371f62dSTrond Norbye    for (size_t ii = 0; ii < nthr; ii++) {
45350429d8cSDave Rigby        if (!create_notification_pipe(threads[ii])) {
45450429d8cSDave Rigby            FATAL_ERROR(EXIT_FAILURE, "Cannot create notification pipe");
45550429d8cSDave Rigby        }
456c74c69a1STrond Norbye        threads[ii].index = ii;
45750429d8cSDave Rigby
458c74c69a1STrond Norbye        setup_thread(threads[ii]);
4595b304997SSteven Grimm    }
4605b304997SSteven Grimm
4615b304997SSteven Grimm    /* Create threads after we've done all the libevent setup. */
462c74c69a1STrond Norbye    for (auto& thread : threads) {
463c74c69a1STrond Norbye        const std::string name = "mc:worker_" + std::to_string(thread.index);
464c74c69a1STrond Norbye        create_worker(
465c74c69a1STrond Norbye                worker_libevent, &thread, &thread.thread_id, name.c_str());
4665b304997SSteven Grimm    }
4675b304997SSteven Grimm
468ffb7c145STrond Norbye    // Wait for all the threads to set themselves up before returning.
469ffb7c145STrond Norbye    std::unique_lock<std::mutex> lock(init_mutex);
470ffb7c145STrond Norbye    init_cond.wait(lock, [&nthr] { return !(init_count < nthr); });
4715b304997SSteven Grimm}
4725b304997SSteven Grimm
473c74c69a1STrond Norbyevoid threads_shutdown() {
474cc43592eSTrond Norbye    // Notify all of the threads and let them shut down
475c74c69a1STrond Norbye    for (auto& thread : threads) {
476c74c69a1STrond Norbye        notify_thread(thread);
477cc43592eSTrond Norbye    }
478cc43592eSTrond Norbye
479cc43592eSTrond Norbye    // Wait for all of them to complete
480cc43592eSTrond Norbye    for (auto& thread : threads) {
481cc43592eSTrond Norbye        // When using bufferevents we need to run a few iterations here.
482cc43592eSTrond Norbye        // Calling signalIfIdle won't run the event immediately, but when
483cc43592eSTrond Norbye        // the control goes back to libevent. That means that some of the
484cc43592eSTrond Norbye        // connections could be "stuck" for another round in the event loop.
485cc43592eSTrond Norbye        while (thread.running) {
486cc43592eSTrond Norbye            notify_thread(thread);
487cc43592eSTrond Norbye            std::this_thread::sleep_for(std::chrono::microseconds(250));
488cc43592eSTrond Norbye        }
489c74c69a1STrond Norbye        cb_join_thread(thread.thread_id);
4906d860fb3STrond Norbye    }
491225cf312STrond Norbye}
492225cf312STrond Norbye
493a5711b44STrond Norbyevoid threads_cleanup() {
494c74c69a1STrond Norbye    for (auto& thread : threads) {