1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Thread management for memcached.
4  */
5 #include "connection.h"
6 #include "connections.h"
7 #include "cookie.h"
8 #include "front_end_thread.h"
9 #include "listening_port.h"
10 #include "log_macros.h"
11 #include "memcached.h"
12 #include "opentracing.h"
13 #include "settings.h"
14 #include "stats.h"
15 #include "tracing.h"
16 #include <utilities/hdrhistogram.h>
17 
18 #include <memcached/openssl.h>
19 #include <nlohmann/json.hpp>
20 #include <openssl/conf.h>
21 #include <phosphor/phosphor.h>
22 #include <platform/socket.h>
23 #include <platform/strerror.h>
24 
25 #include <fcntl.h>
26 #include <atomic>
27 #include <cerrno>
28 #include <condition_variable>
29 #include <csignal>
30 #include <cstdint>
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cstring>
34 #ifndef WIN32
35 #include <netinet/tcp.h> // For TCP_NODELAY etc
36 #endif
37 #include <memory>
38 #include <mutex>
39 #include <queue>
40 
41 extern std::atomic<bool> memcached_shutdown;
42 
43 /* An item in the connection queue. */
~ConnectionQueue()44 FrontEndThread::ConnectionQueue::~ConnectionQueue() {
45     for (const auto& entry : connections) {
46         safe_close(entry.first);
47     }
48 }
49 
push(SOCKET sock, SharedListeningPort interface)50 void FrontEndThread::ConnectionQueue::push(SOCKET sock,
51                                            SharedListeningPort interface) {
52     std::lock_guard<std::mutex> guard(mutex);
53     connections.emplace_back(sock, interface);
54 }
55 
swap( std::vector<std::pair<SOCKET, SharedListeningPort>>& other)56 void FrontEndThread::ConnectionQueue::swap(
57         std::vector<std::pair<SOCKET, SharedListeningPort>>& other) {
58     std::lock_guard<std::mutex> guard(mutex);
59     connections.swap(other);
60 }
61 
62 static FrontEndThread dispatcher_thread;
63 
push(Connection* c)64 void FrontEndThread::NotificationList::push(Connection* c) {
65     std::lock_guard<std::mutex> lock(mutex);
66     auto iter = std::find(connections.begin(), connections.end(), c);
67     if (iter == connections.end()) {
68         try {
69             connections.push_back(c);
70         } catch (const std::bad_alloc&) {
71             // Just ignore and hopefully we'll be able to signal it at a later
72             // time.
73         }
74     }
75 }
76 
remove(Connection* c)77 void FrontEndThread::NotificationList::remove(Connection* c) {
78     std::lock_guard<std::mutex> lock(mutex);
79     auto iter = std::find(connections.begin(), connections.end(), c);
80     if (iter != connections.end()) {
81         connections.erase(iter);
82     }
83 }
84 
swap(std::vector<Connection*>& other)85 void FrontEndThread::NotificationList::swap(std::vector<Connection*>& other) {
86     std::lock_guard<std::mutex> lock(mutex);
87     connections.swap(other);
88 }
89 
90 /*
91  * Each libevent instance has a wakeup pipe, which other threads
92  * can use to signal that they've put a new connection on its queue.
93  */
94 static std::vector<FrontEndThread> threads;
95 std::vector<Hdr1sfMicroSecHistogram> scheduler_info;
96 
97 /*
98  * Number of worker threads that have finished setting themselves up.
99  */
100 static size_t init_count = 0;
101 static std::mutex init_mutex;
102 static std::condition_variable init_cond;
103 
104 static void thread_libevent_process(evutil_socket_t, short, void*);
105 
106 /*
107  * Creates a worker thread.
108  */
create_worker(void (*func)(void *), void *arg, cb_thread_t *id, const char* name)109 static void create_worker(void (*func)(void *), void *arg, cb_thread_t *id,
110                           const char* name) {
111     if (cb_create_named_thread(id, func, arg, 0, name) != 0) {
112         FATAL_ERROR(EXIT_FAILURE,
113                     "Can't create thread {}: {}",
114                     name,
115                     cb_strerror());
116     }
117 }
118 
119 /****************************** LIBEVENT THREADS *****************************/
120 
iterate_all_connections(std::function<void(Connection&)> callback)121 void iterate_all_connections(std::function<void(Connection&)> callback) {
122     for (auto& thr : threads) {
123         TRACE_LOCKGUARD_TIMED(thr.mutex,
124                               "mutex",
125                               "iterate_all_connections::threadLock",
126                               SlowMutexThreshold);
127         iterate_thread_connections(&thr, callback);
128     }
129 }
130 
create_notification_pipe(FrontEndThread& me)131 static bool create_notification_pipe(FrontEndThread& me) {
132     if (cb::net::socketpair(SOCKETPAIR_AF,
133                             SOCK_STREAM,
134                             0,
135                             reinterpret_cast<SOCKET*>(me.notify)) ==
136         SOCKET_ERROR) {
137         LOG_WARNING("Can't create notify pipe: {}",
138                     cb_strerror(cb::net::get_socket_error()));
139         return false;
140     }
141 
142     for (auto sock : me.notify) {
143         int flags = 1;
144         const auto* flag_ptr = reinterpret_cast<const void*>(&flags);
145         cb::net::setsockopt(
146                 sock, IPPROTO_TCP, TCP_NODELAY, flag_ptr, sizeof(flags));
147         cb::net::setsockopt(
148                 sock, SOL_SOCKET, SO_REUSEADDR, flag_ptr, sizeof(flags));
149 
150         if (evutil_make_socket_nonblocking(sock) == -1) {
151             LOG_WARNING("Failed to enable non-blocking: {}",
152                         cb_strerror(cb::net::get_socket_error()));
153             return false;
154         }
155     }
156     return true;
157 }
158 
setup_dispatcher(struct event_base *main_base, void (*dispatcher_callback)(evutil_socket_t, short, void *))159 static void setup_dispatcher(struct event_base *main_base,
160                              void (*dispatcher_callback)(evutil_socket_t, short, void *))
161 {
162     dispatcher_thread.base = main_base;
163 	dispatcher_thread.thread_id = cb_thread_self();
164         if (!create_notification_pipe(dispatcher_thread)) {
165             FATAL_ERROR(EXIT_FAILURE, "Unable to create notification pipe");
166     }
167 
168     /* Listen for notifications from other threads */
169     if ((event_assign(&dispatcher_thread.notify_event,
170                       dispatcher_thread.base,
171                       dispatcher_thread.notify[0],
172                       EV_READ | EV_PERSIST,
173                       dispatcher_callback,
174                       nullptr) == -1) ||
175         (event_add(&dispatcher_thread.notify_event, nullptr) == -1)) {
176         FATAL_ERROR(EXIT_FAILURE, "Can't monitor libevent notify pipe");
177     }
178     dispatcher_thread.running = true;
179 }
180 
181 /*
182  * Set up a thread's information.
183  */
setup_thread(FrontEndThread& me)184 static void setup_thread(FrontEndThread& me) {
185     me.base = event_base_new();
186 
187     if (!me.base) {
188         FATAL_ERROR(EXIT_FAILURE, "Can't allocate event base");
189     }
190 
191     /* Listen for notifications from other threads */
192     if ((event_assign(&me.notify_event,
193                       me.base,
194                       me.notify[0],
195                       EV_READ | EV_PERSIST,
196                       thread_libevent_process,
197                       &me) == -1) ||
198         (event_add(&me.notify_event, nullptr) == -1)) {
199         FATAL_ERROR(EXIT_FAILURE, "Can't monitor libevent notify pipe");
200     }
201 }
202 
203 /*
204  * Worker thread: main event loop
205  */
worker_libevent(void *arg)206 static void worker_libevent(void *arg) {
207     auto& me = *reinterpret_cast<FrontEndThread*>(arg);
208 
209     // Any per-thread setup can happen here; thread_init() will block until
210     // all threads have finished initializing.
211     {
212         std::lock_guard<std::mutex> guard(init_mutex);
213         me.running = true;
214         init_count++;
215         init_cond.notify_all();
216     }
217 
218     event_base_loop(me.base, 0);
219     me.running = false;
220 }
221 
drain_notification_channel(evutil_socket_t fd)222 void drain_notification_channel(evutil_socket_t fd) {
223     /* Every time we want to notify a thread, we send 1 byte to its
224      * notification pipe. When the thread wakes up, it tries to drain
225      * it's notification channel before executing any other events.
226      * Other threads (listener and other background threads) may notify
227      * this thread up to 512 times since the last time we checked the
228      * notification pipe, before we'll start draining the it again.
229      */
230 
231     ssize_t nread;
232     // Using a small size for devnull will avoid blowing up the stack
233     char devnull[512];
234 
235     while ((nread = cb::net::recv(fd, devnull, sizeof(devnull), 0)) ==
236            (int)sizeof(devnull)) {
237         /* empty */
238     }
239 
240     if (nread == -1) {
241         LOG_WARNING("Can't read from libevent pipe: {}",
242                     cb_strerror(cb::net::get_socket_error()));
243     }
244 }
245 
dispatch_new_connections(FrontEndThread& me)246 static void dispatch_new_connections(FrontEndThread& me) {
247     std::vector<std::pair<SOCKET, SharedListeningPort>> connections;
248     me.new_conn_queue.swap(connections);
249 
250     for (const auto& entry : connections) {
251         if (conn_new(entry.first, *entry.second, me.base, me) == nullptr) {
252             if (entry.second->system) {
253                 --stats.system_conns;
254             }
255             safe_close(entry.first);
256         }
257     }
258 }
259 
260 /*
261  * Processes an incoming "handle a new connection" item. This is called when
262  * input arrives on the libevent wakeup pipe.
263  */
thread_libevent_process(evutil_socket_t fd, short, void* arg)264 static void thread_libevent_process(evutil_socket_t fd, short, void* arg) {
265     auto& me = *reinterpret_cast<FrontEndThread*>(arg);
266 
267     // Start by draining the notification channel before doing any work.
268     // By doing so we know that we'll be notified again if someone
269     // tries to notify us while we're doing the work below (so we don't have
270     // to care about race conditions for stuff people try to notify us
271     // about.
272     drain_notification_channel(fd);
273 
274     if (memcached_shutdown) {
275         // Someone requested memcached to shut down. The listen thread should
276         // be stopped immediately.
277         if (is_listen_thread()) {
278             LOG_INFO("Stopping listen thread (thread.cc)");
279             event_base_loopbreak(me.base);
280             return;
281         }
282 
283         if (signal_idle_clients(me, false) == 0) {
284             LOG_INFO("Stopping worker thread {}", me.index);
285             event_base_loopbreak(me.base);
286             return;
287         }
288     }
289 
290     dispatch_new_connections(me);
291 
292     FrontEndThread::PendingIoMap pending;
293     {
294         std::lock_guard<std::mutex> lock(me.pending_io.mutex);
295         me.pending_io.map.swap(pending);
296     }
297 
298     TRACE_LOCKGUARD_TIMED(me.mutex,
299                           "mutex",
300                           "thread_libevent_process::threadLock",
301                           SlowMutexThreshold);
302 
303     std::vector<Connection*> notify;
304     me.notification.swap(notify);
305 
306     for (const auto& io : pending) {
307         auto* c = io.first;
308 
309         // Remove from the notify list if it's there as we don't
310         // want to run them twice
311         {
312             auto iter = std::find(notify.begin(), notify.end(), c);
313             if (iter != notify.end()) {
314                 notify.erase(iter);
315             }
316         }
317 
318         if (c->getSocketDescriptor() != INVALID_SOCKET &&
319             !c->isRegisteredInLibevent()) {
320             /* The socket may have been shut down while we're looping */
321             /* in delayed shutdown */
322             c->registerEvent();
323         }
324 
325         for (const auto& pair : io.second) {
326             if (pair.first) {
327                 pair.first->setAiostat(pair.second);
328                 pair.first->setEwouldblock(false);
329             }
330         }
331 
332         /*
333          * We don't want the thread to keep on serving all of the data
334          * from the context of the notification pipe, so just let it
335          * run one time to set up the correct mask in libevent
336          */
337         c->setNumEvents(1);
338         run_event_loop(c, EV_READ | EV_WRITE);
339     }
340 
341     // Notify the connections we haven't notified yet
342     for (auto c : notify) {
343         c->setNumEvents(1);
344         run_event_loop(c, EV_READ | EV_WRITE);
345     }
346 
347     if (memcached_shutdown) {
348         // Someone requested memcached to shut down. If we don't have
349         // any connections bound to this thread we can just shut down
350         time_t now = time(nullptr);
351         bool log = now > me.shutdown_next_log;
352         if (log) {
353             me.shutdown_next_log = now + 5;
354         }
355 
356         auto connected = signal_idle_clients(me, log);
357         if (connected == 0) {
358             LOG_INFO("Stopping worker thread {}", me.index);
359             event_base_loopbreak(me.base);
360         } else if (log) {
361             LOG_INFO("Waiting for {} connected clients on worker thread {}",
362                      connected,
363                      me.index);
364         }
365     }
366 }
367 
notify_io_complete(gsl::not_null<const void*> void_cookie, ENGINE_ERROR_CODE status)368 void notify_io_complete(gsl::not_null<const void*> void_cookie,
369                         ENGINE_ERROR_CODE status) {
370     auto* ccookie = reinterpret_cast<const Cookie*>(void_cookie.get());
371     auto& cookie = const_cast<Cookie&>(*ccookie);
372 
373     auto& thr = cookie.getConnection().getThread();
374     LOG_DEBUG("notify_io_complete: Got notify from {}, status {}",
375               cookie.getConnection().getId(),
376               status);
377 
378     /* kick the thread in the butt */
379     if (add_conn_to_pending_io_list(&cookie.getConnection(), &cookie, status)) {
380         notify_thread(thr);
381     }
382 }
383 
384 /* Which thread we assigned a connection to most recently. */
385 static size_t last_thread = 0;
386 
387 /*
388  * Dispatches a new connection to another thread. This is only ever called
389  * from the main thread, or because of an incoming connection.
390  */
dispatch_conn_new(SOCKET sfd, SharedListeningPort& interface)391 void dispatch_conn_new(SOCKET sfd, SharedListeningPort& interface) {
392     size_t tid = (last_thread + 1) % Settings::instance().getNumWorkerThreads();
393     auto& thread = threads[tid];
394     last_thread = tid;
395 
396     try {
397         thread.new_conn_queue.push(sfd, interface);
398     } catch (const std::bad_alloc& e) {
399         LOG_WARNING("dispatch_conn_new: Failed to dispatch new connection: {}",
400                     e.what());
401 
402         if (interface->system) {
403             --stats.system_conns;
404         }
405         safe_close(sfd);
406         return ;
407     }
408 
409     notify_thread(thread);
410 }
411 
412 /*
413  * Returns true if this is the thread that listens for new TCP connections.
414  */
is_listen_thread()415 int is_listen_thread() {
416     return dispatcher_thread.thread_id == cb_thread_self();
417 }
418 
notify_dispatcher()419 void notify_dispatcher() {
420     if (dispatcher_thread.running) {
421         notify_thread(dispatcher_thread);
422     }
423 }
424 
425 /******************************* GLOBAL STATS ******************************/
426 
threadlocal_stats_reset(std::vector<thread_stats>& thread_stats)427 void threadlocal_stats_reset(std::vector<thread_stats>& thread_stats) {
428     for (auto& ii : thread_stats) {
429         ii.reset();
430     }
431 }
432 
433 /*
434  * Initializes the thread subsystem, creating various worker threads.
435  *
436  * nthreads  Number of worker event handler threads to spawn
437  * main_base Event base for main thread
438  */
thread_init(size_t nthr, struct event_base* main_base, void (*dispatcher_callback)(evutil_socket_t, short, void*))439 void thread_init(size_t nthr,
440                  struct event_base* main_base,
441                  void (*dispatcher_callback)(evutil_socket_t, short, void*)) {
442     scheduler_info.resize(nthr);
443 
444     try {
445         threads = std::vector<FrontEndThread>(nthr);
446     } catch (const std::bad_alloc&) {
447         FATAL_ERROR(EXIT_FAILURE, "Can't allocate thread descriptors");
448     }
449 
450     setup_dispatcher(main_base, dispatcher_callback);
451 
452     for (size_t ii = 0; ii < nthr; ii++) {
453         if (!create_notification_pipe(threads[ii])) {
454             FATAL_ERROR(EXIT_FAILURE, "Cannot create notification pipe");
455         }
456         threads[ii].index = ii;
457 
458         setup_thread(threads[ii]);
459     }
460 
461     /* Create threads after we've done all the libevent setup. */
462     for (auto& thread : threads) {
463         const std::string name = "mc:worker_" + std::to_string(thread.index);
464         create_worker(
465                 worker_libevent, &thread, &thread.thread_id, name.c_str());
466     }
467 
468     // Wait for all the threads to set themselves up before returning.
469     std::unique_lock<std::mutex> lock(init_mutex);
470     init_cond.wait(lock, [&nthr] { return !(init_count < nthr); });
471 }
472 
threads_shutdown()473 void threads_shutdown() {
474     // Notify all of the threads and let them shut down
475     for (auto& thread : threads) {
476         notify_thread(thread);
477     }
478 
479     // Wait for all of them to complete
480     for (auto& thread : threads) {
481         // When using bufferevents we need to run a few iterations here.
482         // Calling signalIfIdle won't run the event immediately, but when
483         // the control goes back to libevent. That means that some of the
484         // connections could be "stuck" for another round in the event loop.
485         while (thread.running) {
486             notify_thread(thread);
487             std::this_thread::sleep_for(std::chrono::microseconds(250));
488         }
489         cb_join_thread(thread.thread_id);
490     }
491 }
492 
threads_cleanup()493 void threads_cleanup() {
494     for (auto& thread : threads) {
495         event_base_free(thread.base);
496     }
497 }
498 
~FrontEndThread()499 FrontEndThread::~FrontEndThread() {
500     for (auto& sock : notify) {
501         if (sock != INVALID_SOCKET) {
502             safe_close(sock);
503         }
504     }
505 }
506 
notify_thread(FrontEndThread& thread)507 void notify_thread(FrontEndThread& thread) {
508     if (cb::net::send(thread.notify[1], "", 1, 0) != 1 &&
509         !cb::net::is_blocking(cb::net::get_socket_error())) {
510         LOG_WARNING("Failed to notify thread: {}",
511                     cb_strerror(cb::net::get_socket_error()));
512     }
513 }
514 
add_conn_to_pending_io_list(Connection* c, Cookie* cookie, ENGINE_ERROR_CODE status)515 int add_conn_to_pending_io_list(Connection* c,
516                                 Cookie* cookie,
517                                 ENGINE_ERROR_CODE status) {
518     auto& thread = c->getThread();
519 
520     std::lock_guard<std::mutex> lock(thread.pending_io.mutex);
521     auto iter = thread.pending_io.map.find(c);
522     if (iter == thread.pending_io.map.end()) {
523         thread.pending_io.map.emplace(
524                 c,
525                 std::vector<std::pair<Cookie*, ENGINE_ERROR_CODE>>{
526                         {cookie, status}});
527         return 1;
528     }
529 
530     for (const auto& pair : iter->second) {
531         if (pair.first == cookie) {
532             // we've already got a pending notification for this
533             // cookie.. Ignore it
534             return 0;
535         }
536     }
537     iter->second.emplace_back(cookie, status);
538     return 1;
539 }
540