1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Thread management for memcached.
4  */
5 #include "config.h"
6 #include "memcached.h"
7 #include <stdio.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <stdint.h>
13 #include <signal.h>
14 #include <fcntl.h>
15 #include <platform/platform.h>
16 
17 #define ITEMS_PER_ALLOC 64
18 
19 static char devnull[8192];
20 extern volatile sig_atomic_t memcached_shutdown;
21 
22 /* An item in the connection queue. */
23 typedef struct conn_queue_item CQ_ITEM;
24 struct conn_queue_item {
25     SOCKET            sfd;
26     int               parent_port;
27     STATE_FUNC        init_state;
28     int               event_flags;
29     int               read_buffer_size;
30     CQ_ITEM          *next;
31 };
32 
33 /* A connection queue. */
34 typedef struct conn_queue CQ;
35 struct conn_queue {
36     CQ_ITEM *head;
37     CQ_ITEM *tail;
38     cb_mutex_t lock;
39     cb_cond_t  cond;
40 };
41 
42 /* Connection lock around accepting new connections */
43 cb_mutex_t conn_lock;
44 
45 /* Free list of CQ_ITEM structs */
46 static CQ_ITEM *cqi_freelist;
47 static cb_mutex_t cqi_freelist_lock;
48 
49 static LIBEVENT_THREAD dispatcher_thread;
50 
51 /*
52  * Each libevent instance has a wakeup pipe, which other threads
53  * can use to signal that they've put a new connection on its queue.
54  */
55 static int nthreads;
56 static LIBEVENT_THREAD *threads;
57 static cb_thread_t *thread_ids;
58 
59 /*
60  * Number of worker threads that have finished setting themselves up.
61  */
62 static int init_count = 0;
63 static cb_mutex_t init_lock;
64 static cb_cond_t init_cond;
65 
66 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
67 
68 /*
69  * Initializes a connection queue.
70  */
cq_init(CQ *cq)71 static void cq_init(CQ *cq) {
72     cb_mutex_initialize(&cq->lock);
73     cb_cond_initialize(&cq->cond);
74     cq->head = NULL;
75     cq->tail = NULL;
76 }
77 
78 /*
79  * Looks for an item on a connection queue, but doesn't block if there isn't
80  * one.
81  * Returns the item, or NULL if no item is available
82  */
cq_pop(CQ *cq)83 static CQ_ITEM *cq_pop(CQ *cq) {
84     CQ_ITEM *item;
85 
86     cb_mutex_enter(&cq->lock);
87     item = cq->head;
88     if (NULL != item) {
89         cq->head = item->next;
90         if (NULL == cq->head)
91             cq->tail = NULL;
92     }
93     cb_mutex_exit(&cq->lock);
94 
95     return item;
96 }
97 
98 /*
99  * Adds an item to a connection queue.
100  */
cq_push(CQ *cq, CQ_ITEM *item)101 static void cq_push(CQ *cq, CQ_ITEM *item) {
102     item->next = NULL;
103 
104     cb_mutex_enter(&cq->lock);
105     if (NULL == cq->tail)
106         cq->head = item;
107     else
108         cq->tail->next = item;
109     cq->tail = item;
110     cb_cond_signal(&cq->cond);
111     cb_mutex_exit(&cq->lock);
112 }
113 
114 /*
115  * Returns a fresh connection queue item.
116  */
cqi_new(void)117 static CQ_ITEM *cqi_new(void) {
118     CQ_ITEM *item = NULL;
119     cb_mutex_enter(&cqi_freelist_lock);
120     if (cqi_freelist) {
121         item = cqi_freelist;
122         cqi_freelist = item->next;
123     }
124     cb_mutex_exit(&cqi_freelist_lock);
125 
126     if (NULL == item) {
127         int i;
128 
129         /* Allocate a bunch of items at once to reduce fragmentation */
130         item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
131         if (NULL == item)
132             return NULL;
133 
134         /*
135          * Link together all the new items except the first one
136          * (which we'll return to the caller) for placement on
137          * the freelist.
138          */
139         for (i = 2; i < ITEMS_PER_ALLOC; i++)
140             item[i - 1].next = &item[i];
141 
142         cb_mutex_enter(&cqi_freelist_lock);
143         item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
144         cqi_freelist = &item[1];
145         cb_mutex_exit(&cqi_freelist_lock);
146     }
147 
148     return item;
149 }
150 
151 
152 /*
153  * Frees a connection queue item (adds it to the freelist.)
154  */
cqi_free(CQ_ITEM *item)155 static void cqi_free(CQ_ITEM *item) {
156     cb_mutex_enter(&cqi_freelist_lock);
157     item->next = cqi_freelist;
158     cqi_freelist = item;
159     cb_mutex_exit(&cqi_freelist_lock);
160 }
161 
162 
163 /*
164  * Creates a worker thread.
165  */
create_worker(void (*func)(void *), void *arg, cb_thread_t *id)166 static void create_worker(void (*func)(void *), void *arg, cb_thread_t *id) {
167     int ret;
168 
169     if ((ret = cb_create_thread(id, func, arg, 0)) != 0) {
170         log_system_error(EXTENSION_LOG_WARNING, NULL,
171                          "Can't create thread: %s");
172         exit(1);
173     }
174 }
175 
176 /****************************** LIBEVENT THREADS *****************************/
177 
create_notification_pipe(LIBEVENT_THREAD *me)178 bool create_notification_pipe(LIBEVENT_THREAD *me)
179 {
180     int j;
181     if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0,
182                           (void*)me->notify) == SOCKET_ERROR) {
183         log_socket_error(EXTENSION_LOG_WARNING, NULL,
184                          "Can't create notify pipe: %s");
185         return false;
186     }
187 
188     for (j = 0; j < 2; ++j) {
189         int flags = 1;
190         setsockopt(me->notify[j], IPPROTO_TCP,
191                    TCP_NODELAY, (void *)&flags, sizeof(flags));
192         setsockopt(me->notify[j], SOL_SOCKET,
193                    SO_REUSEADDR, (void *)&flags, sizeof(flags));
194 
195 
196         if (evutil_make_socket_nonblocking(me->notify[j]) == -1) {
197             log_socket_error(EXTENSION_LOG_WARNING, NULL,
198                              "Failed to enable non-blocking: %s");
199             return false;
200         }
201     }
202     return true;
203 }
204 
setup_dispatcher(struct event_base *main_base, void (*dispatcher_callback)(evutil_socket_t, short, void *))205 static void setup_dispatcher(struct event_base *main_base,
206                              void (*dispatcher_callback)(evutil_socket_t, short, void *))
207 {
208     memset(&dispatcher_thread, 0, sizeof(dispatcher_thread));
209     dispatcher_thread.type = DISPATCHER;
210     dispatcher_thread.base = main_base;
211 	dispatcher_thread.thread_id = cb_thread_self();
212     if (!create_notification_pipe(&dispatcher_thread)) {
213         exit(1);
214     }
215     /* Listen for notifications from other threads */
216     event_set(&dispatcher_thread.notify_event, dispatcher_thread.notify[0],
217               EV_READ | EV_PERSIST, dispatcher_callback, &dispatcher_callback);
218     event_base_set(dispatcher_thread.base, &dispatcher_thread.notify_event);
219 
220     if (event_add(&dispatcher_thread.notify_event, 0) == -1) {
221         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
222                                         "Can't monitor libevent notify pipe\n");
223         exit(1);
224     }
225 }
226 
227 /*
228  * Set up a thread's information.
229  */
setup_thread(LIBEVENT_THREAD *me)230 static void setup_thread(LIBEVENT_THREAD *me) {
231     me->type = GENERAL;
232     me->base = event_base_new();
233     if (! me->base) {
234         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
235                                         "Can't allocate event base\n");
236         exit(1);
237     }
238 
239     /* Listen for notifications from other threads */
240     event_set(&me->notify_event, me->notify[0],
241               EV_READ | EV_PERSIST,
242               thread_libevent_process, me);
243     event_base_set(me->base, &me->notify_event);
244 
245     if (event_add(&me->notify_event, 0) == -1) {
246         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
247                                         "Can't monitor libevent notify pipe\n");
248         exit(1);
249     }
250 
251     me->new_conn_queue = malloc(sizeof(struct conn_queue));
252     if (me->new_conn_queue == NULL) {
253         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
254                                         "Failed to allocate memory for connection queue");
255         exit(EXIT_FAILURE);
256     }
257     cq_init(me->new_conn_queue);
258 
259     cb_mutex_initialize(&me->mutex);
260 }
261 
262 /*
263  * Worker thread: main event loop
264  */
worker_libevent(void *arg)265 static void worker_libevent(void *arg) {
266     LIBEVENT_THREAD *me = arg;
267 
268     /* Any per-thread setup can happen here; thread_init() will block until
269      * all threads have finished initializing.
270      */
271 
272     cb_mutex_enter(&init_lock);
273     init_count++;
274     cb_cond_signal(&init_cond);
275     cb_mutex_exit(&init_lock);
276 
277     event_base_loop(me->base, 0);
278 }
279 
number_of_pending(conn *c, conn *list)280 int number_of_pending(conn *c, conn *list) {
281     int rv = 0;
282     for (; list; list = list->next) {
283         if (list == c) {
284             rv ++;
285         }
286     }
287     return rv;
288 }
289 
290 /*
291  * Processes an incoming "handle a new connection" item. This is called when
292  * input arrives on the libevent wakeup pipe.
293  */
thread_libevent_process(evutil_socket_t fd, short which, void *arg)294 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
295     LIBEVENT_THREAD *me = arg;
296     CQ_ITEM *item;
297     conn* pending;
298 
299     cb_assert(me->type == GENERAL);
300 
301     if (recv(fd, devnull, sizeof(devnull), 0) == -1) {
302         log_socket_error(EXTENSION_LOG_WARNING, NULL,
303                          "Can't read from libevent pipe: %s");
304     }
305 
306     if (memcached_shutdown) {
307          event_base_loopbreak(me->base);
308          return ;
309     }
310 
311     while ((item = cq_pop(me->new_conn_queue)) != NULL) {
312         conn *c = conn_new(item->sfd, item->parent_port, item->init_state,
313                            item->event_flags, item->read_buffer_size,
314                            me->base, NULL);
315         if (c == NULL) {
316             if (settings.verbose > 0) {
317                 settings.extensions.logger->log(EXTENSION_LOG_INFO, NULL,
318                                                 "Can't listen for events on fd %d\n",
319                                                 item->sfd);
320             }
321             closesocket(item->sfd);
322         } else {
323             cb_assert(c->thread == NULL);
324             c->thread = me;
325         }
326         cqi_free(item);
327     }
328 
329     LOCK_THREAD(me);
330     pending = me->pending_io;
331     me->pending_io = NULL;
332     while (pending != NULL) {
333         conn *c = pending;
334         cb_assert(me == c->thread);
335         pending = pending->next;
336         c->next = NULL;
337 
338         if (c->sfd != INVALID_SOCKET && !c->registered_in_libevent) {
339             /* The socket may have been shut down while we're looping */
340             /* in delayed shutdown */
341             register_event(c, 0);
342         }
343         /*
344          * We don't want the thread to keep on serving all of the data
345          * from the context of the notification pipe, so just let it
346          * run one time to set up the correct mask in libevent
347          */
348         c->nevents = 1;
349         do {
350             if (settings.verbose) {
351                 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
352                                                 "%d - Running task: (%s)\n",
353                                                 c->sfd, state_text(c->state));
354             }
355         } while (c->state(c));
356     }
357     UNLOCK_THREAD(me);
358 }
359 
360 extern volatile rel_time_t current_time;
361 
has_cycle(conn *c)362 bool has_cycle(conn *c) {
363     conn *slowNode, *fastNode1, *fastNode2;
364 
365     if (!c) {
366         return false;
367     }
368 
369     slowNode = fastNode1 = fastNode2 = c;
370     while (slowNode && (fastNode1 = fastNode2->next) && (fastNode2 = fastNode1->next)) {
371         if (slowNode == fastNode1 || slowNode == fastNode2) {
372             return true;
373         }
374         slowNode = slowNode->next;
375     }
376     return false;
377 }
378 
list_contains(conn *haystack, conn *needle)379 bool list_contains(conn *haystack, conn *needle) {
380     for (; haystack; haystack = haystack -> next) {
381         if (needle == haystack) {
382             return true;
383         }
384     }
385     return false;
386 }
387 
list_remove(conn *haystack, conn *needle)388 conn* list_remove(conn *haystack, conn *needle) {
389     if (!haystack) {
390         return NULL;
391     }
392 
393     if (haystack == needle) {
394         conn *rv = needle->next;
395         needle->next = NULL;
396         return rv;
397     }
398 
399     haystack->next = list_remove(haystack->next, needle);
400 
401     return haystack;
402 }
403 
list_to_array(conn **dest, size_t max_items, conn **l)404 size_t list_to_array(conn **dest, size_t max_items, conn **l) {
405     size_t n_items = 0;
406     for (; *l && n_items < max_items - 1; ++n_items) {
407         dest[n_items] = *l;
408         *l = dest[n_items]->next;
409         dest[n_items]->next = NULL;
410         dest[n_items]->list_state |= LIST_STATE_PROCESSING;
411     }
412     return n_items;
413 }
414 
enlist_conn(conn *c, conn **list)415 void enlist_conn(conn *c, conn **list) {
416     LIBEVENT_THREAD *thr = c->thread;
417     cb_assert(list == &thr->pending_io);
418     if ((c->list_state & LIST_STATE_PROCESSING) == 0) {
419         cb_assert(!list_contains(thr->pending_io, c));
420         cb_assert(c->next == NULL);
421         c->next = *list;
422         *list = c;
423         cb_assert(list_contains(*list, c));
424         cb_assert(!has_cycle(*list));
425     } else {
426         c->list_state |= LIST_STATE_REQ_PENDING_IO;
427     }
428 }
429 
finalize_list(conn **list, size_t items)430 void finalize_list(conn **list, size_t items) {
431     size_t i;
432     for (i = 0; i < items; i++) {
433         if (list[i] != NULL) {
434             list[i]->list_state &= ~LIST_STATE_PROCESSING;
435             if (list[i]->sfd != INVALID_SOCKET) {
436                 if (list[i]->list_state & LIST_STATE_REQ_PENDING_IO) {
437                     enlist_conn(list[i], &list[i]->thread->pending_io);
438                 }
439             }
440             list[i]->list_state = 0;
441         }
442     }
443 }
444 
notify_io_complete(const void *cookie, ENGINE_ERROR_CODE status)445 void notify_io_complete(const void *cookie, ENGINE_ERROR_CODE status)
446 {
447     struct conn *conn = (struct conn *)cookie;
448     LIBEVENT_THREAD *thr;
449     int notify;
450 
451     cb_assert(conn);
452     thr = conn->thread;
453     cb_assert(thr);
454 
455     settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
456                                     "Got notify from %d, status %x\n",
457                                     conn->sfd, status);
458 
459     LOCK_THREAD(thr);
460     conn->aiostat = status;
461     notify = add_conn_to_pending_io_list(conn);
462     UNLOCK_THREAD(thr);
463 
464     /* kick the thread in the butt */
465     if (notify) {
466         notify_thread(thr);
467     }
468 }
469 
470 /* Which thread we assigned a connection to most recently. */
471 static int last_thread = -1;
472 
473 /*
474  * Dispatches a new connection to another thread. This is only ever called
475  * from the main thread, or because of an incoming connection.
476  */
dispatch_conn_new(SOCKET sfd, int parent_port, STATE_FUNC init_state, int event_flags, int read_buffer_size)477 void dispatch_conn_new(SOCKET sfd, int parent_port,
478                        STATE_FUNC init_state, int event_flags,
479                        int read_buffer_size) {
480     CQ_ITEM *item = cqi_new();
481     int tid = (last_thread + 1) % settings.num_threads;
482 
483     LIBEVENT_THREAD *thread = threads + tid;
484 
485     last_thread = tid;
486 
487     item->sfd = sfd;
488     item->parent_port = parent_port;
489     item->init_state = init_state;
490     item->event_flags = event_flags;
491     item->read_buffer_size = read_buffer_size;
492 
493     cq_push(thread->new_conn_queue, item);
494 
495     MEMCACHED_CONN_DISPATCH(sfd, (uintptr_t)thread->thread_id);
496     notify_thread(thread);
497 }
498 
499 /*
500  * Returns true if this is the thread that listens for new TCP connections.
501  */
is_listen_threadnull502 int is_listen_thread() {
503     return dispatcher_thread.thread_id == cb_thread_self();
504 }
505 
notify_dispatcher(void)506 void notify_dispatcher(void) {
507     notify_thread(&dispatcher_thread);
508 }
509 
510 /******************************* GLOBAL STATS ******************************/
511 
threadlocal_stats_clear(struct thread_stats *stats)512 void threadlocal_stats_clear(struct thread_stats *stats) {
513     stats->cmd_get = 0;
514     stats->get_misses = 0;
515     stats->delete_misses = 0;
516     stats->incr_misses = 0;
517     stats->decr_misses = 0;
518     stats->incr_hits = 0;
519     stats->decr_hits = 0;
520     stats->cas_misses = 0;
521     stats->bytes_written = 0;
522     stats->bytes_read = 0;
523     stats->cmd_flush = 0;
524     stats->conn_yields = 0;
525     stats->auth_cmds = 0;
526     stats->auth_errors = 0;
527 
528     memset(stats->slab_stats, 0,
529            sizeof(struct slab_stats) * MAX_NUMBER_OF_SLAB_CLASSES);
530 }
531 
threadlocal_stats_reset(struct thread_stats *thread_stats)532 void threadlocal_stats_reset(struct thread_stats *thread_stats) {
533     int ii;
534     for (ii = 0; ii < settings.num_threads; ++ii) {
535         cb_mutex_enter(&thread_stats[ii].mutex);
536         threadlocal_stats_clear(&thread_stats[ii]);
537         cb_mutex_exit(&thread_stats[ii].mutex);
538     }
539 }
540 
threadlocal_stats_aggregate(struct thread_stats *thread_stats, struct thread_stats *stats)541 void threadlocal_stats_aggregate(struct thread_stats *thread_stats, struct thread_stats *stats) {
542     int ii, sid;
543     for (ii = 0; ii < settings.num_threads; ++ii) {
544         cb_mutex_enter(&thread_stats[ii].mutex);
545 
546         stats->cmd_get += thread_stats[ii].cmd_get;
547         stats->get_misses += thread_stats[ii].get_misses;
548         stats->delete_misses += thread_stats[ii].delete_misses;
549         stats->decr_misses += thread_stats[ii].decr_misses;
550         stats->incr_misses += thread_stats[ii].incr_misses;
551         stats->decr_hits += thread_stats[ii].decr_hits;
552         stats->incr_hits += thread_stats[ii].incr_hits;
553         stats->cas_misses += thread_stats[ii].cas_misses;
554         stats->bytes_read += thread_stats[ii].bytes_read;
555         stats->bytes_written += thread_stats[ii].bytes_written;
556         stats->cmd_flush += thread_stats[ii].cmd_flush;
557         stats->conn_yields += thread_stats[ii].conn_yields;
558         stats->auth_cmds += thread_stats[ii].auth_cmds;
559         stats->auth_errors += thread_stats[ii].auth_errors;
560 
561         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
562             stats->slab_stats[sid].cmd_set +=
563                 thread_stats[ii].slab_stats[sid].cmd_set;
564             stats->slab_stats[sid].get_hits +=
565                 thread_stats[ii].slab_stats[sid].get_hits;
566             stats->slab_stats[sid].delete_hits +=
567                 thread_stats[ii].slab_stats[sid].delete_hits;
568             stats->slab_stats[sid].cas_hits +=
569                 thread_stats[ii].slab_stats[sid].cas_hits;
570             stats->slab_stats[sid].cas_badval +=
571                 thread_stats[ii].slab_stats[sid].cas_badval;
572         }
573 
574         cb_mutex_exit(&thread_stats[ii].mutex);
575     }
576 }
577 
slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out)578 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
579     int sid;
580 
581     out->cmd_set = 0;
582     out->get_hits = 0;
583     out->delete_hits = 0;
584     out->cas_hits = 0;
585     out->cas_badval = 0;
586 
587     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
588         out->cmd_set += stats->slab_stats[sid].cmd_set;
589         out->get_hits += stats->slab_stats[sid].get_hits;
590         out->delete_hits += stats->slab_stats[sid].delete_hits;
591         out->cas_hits += stats->slab_stats[sid].cas_hits;
592         out->cas_badval += stats->slab_stats[sid].cas_badval;
593     }
594 }
595 
596 /*
597  * Initializes the thread subsystem, creating various worker threads.
598  *
599  * nthreads  Number of worker event handler threads to spawn
600  * main_base Event base for main thread
601  */
thread_init(int nthr, struct event_base *main_base, void (*dispatcher_callback)(evutil_socket_t, short, void *))602 void thread_init(int nthr, struct event_base *main_base,
603                  void (*dispatcher_callback)(evutil_socket_t, short, void *)) {
604     int i;
605     nthreads = nthr + 1;
606 
607     cqi_freelist = NULL;
608 
609     cb_mutex_initialize(&conn_lock);
610     cb_mutex_initialize(&cqi_freelist_lock);
611     cb_mutex_initialize(&init_lock);
612     cb_cond_initialize(&init_cond);
613 
614     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
615     if (! threads) {
616         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
617                                         "Can't allocate thread descriptors: %s",
618                                         strerror(errno));
619         exit(1);
620     }
621     thread_ids = calloc(nthreads, sizeof(cb_thread_t));
622     if (! thread_ids) {
623         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
624                                         "Can't allocate thread descriptors: %s",
625                                         strerror(errno));
626         exit(1);
627     }
628 
629     setup_dispatcher(main_base, dispatcher_callback);
630 
631     for (i = 0; i < nthreads; i++) {
632         if (!create_notification_pipe(&threads[i])) {
633             exit(1);
634         }
635         threads[i].index = i;
636 
637         setup_thread(&threads[i]);
638     }
639 
640     /* Create threads after we've done all the libevent setup. */
641     for (i = 0; i < nthreads; i++) {
642         create_worker(worker_libevent, &threads[i], &thread_ids[i]);
643         threads[i].thread_id = thread_ids[i];
644     }
645 
646     /* Wait for all the threads to set themselves up before returning. */
647     cb_mutex_enter(&init_lock);
648     while (init_count < nthreads) {
649         cb_cond_wait(&init_cond, &init_lock);
650     }
651     cb_mutex_exit(&init_lock);
652 }
653 
threads_shutdown(void)654 void threads_shutdown(void)
655 {
656     int ii;
657     for (ii = 0; ii < nthreads; ++ii) {
658         notify_thread(&threads[ii]);
659         cb_join_thread(thread_ids[ii]);
660     }
661 }
662 
threads_cleanup(void)663 void threads_cleanup(void)
664 {
665     int ii;
666     for (ii = 0; ii < nthreads; ++ii) {
667         CQ_ITEM *it;
668 
669         safe_close(threads[ii].notify[0]);
670         safe_close(threads[ii].notify[1]);
671         event_base_free(threads[ii].base);
672 
673         while ((it = cq_pop(threads[ii].new_conn_queue)) != NULL) {
674             cqi_free(it);
675         }
676         free(threads[ii].new_conn_queue);
677     }
678 
679     free(thread_ids);
680     free(threads);
681 }
682 
notify_thread(LIBEVENT_THREAD *thread)683 void notify_thread(LIBEVENT_THREAD *thread) {
684     if (send(thread->notify[1], "", 1, 0) != 1) {
685         log_socket_error(EXTENSION_LOG_WARNING, NULL,
686                          "Failed to notify thread: %s");
687     }
688 }
689 
add_conn_to_pending_io_list(conn *c)690 int add_conn_to_pending_io_list(conn *c) {
691     int notify = 0;
692     if (number_of_pending(c, c->thread->pending_io) == 0) {
693         if (c->thread->pending_io == NULL) {
694             notify = 1;
695         }
696         enlist_conn(c, &c->thread->pending_io);
697     }
698 
699     return notify;
700 }
701