1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Thread management for memcached.
4  */
5 #include "memcached.h"
6 #include <platform/cbassert.h>
7 #include <stdio.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include "log.h"
12 
13 #define ITEMS_PER_ALLOC 64
14 
15 extern struct hash_ops strhash_ops;
16 extern struct hash_ops skeyhash_ops;
17 
18 /* An item in the connection queue. */
19 typedef struct conn_queue_item CQ_ITEM;
20 struct conn_queue_item {
21     SOCKET            sfd;
22     enum conn_states  init_state;
23     int               event_flags;
24     int               read_buffer_size;
25     enum protocol     protocol;
26     enum network_transport     transport;
27     conn_funcs       *funcs;
28     void             *extra;
29     CQ_ITEM          *next;
30 };
31 
32 /* A connection queue. */
33 typedef struct conn_queue CQ;
34 struct conn_queue {
35     CQ_ITEM *head;
36     CQ_ITEM *tail;
37     cb_mutex_t lock;
38     cb_cond_t  cond;
39 };
40 
41 /* Lock for cache operations (item_*, assoc_*) */
42 cb_mutex_t cache_lock;
43 
44 /* Connection lock around accepting new connections */
45 cb_mutex_t conn_lock;
46 
47 /* Lock for global stats */
48 static cb_mutex_t stats_lock;
49 
50 /* Free list of CQ_ITEM structs */
51 static CQ_ITEM *cqi_freelist;
52 static cb_mutex_t cqi_freelist_lock;
53 
54 /*
55  * Each libevent instance has a wakeup pipe, which other threads
56  * can use to signal that they've put a new connection on its queue.
57  */
58 static LIBEVENT_THREAD *threads;
59 
60 /*
61  * Number of threads that have finished setting themselves up.
62  */
63 static int init_count = 0;
64 static cb_mutex_t init_lock;
65 static cb_cond_t init_cond;
66 
67 
68 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
69 
70 /*
71  * Initializes a connection queue.
72  */
cq_init(CQ *cq)73 static void cq_init(CQ *cq) {
74     cb_mutex_initialize(&cq->lock);
75     cb_cond_initialize(&cq->cond);
76     cq->head = NULL;
77     cq->tail = NULL;
78 }
79 
80 /*
81  * Looks for an item on a connection queue, but doesn't block if there isn't
82  * one.
83  * Returns the item, or NULL if no item is available
84  */
cq_pop(CQ *cq)85 static CQ_ITEM *cq_pop(CQ *cq) {
86     CQ_ITEM *cq_item;
87 
88     cb_mutex_enter(&cq->lock);
89     cq_item = cq->head;
90     if (NULL != cq_item) {
91         cq->head = cq_item->next;
92         if (NULL == cq->head)
93             cq->tail = NULL;
94     }
95     cb_mutex_exit(&cq->lock);
96 
97     return cq_item;
98 }
99 
100 /*
101  * Adds an item to a connection queue.
102  */
cq_push(CQ *cq, CQ_ITEM *cq_item)103 static void cq_push(CQ *cq, CQ_ITEM *cq_item) {
104     cq_item->next = NULL;
105 
106     cb_mutex_enter(&cq->lock);
107     if (NULL == cq->tail)
108         cq->head = cq_item;
109     else
110         cq->tail->next = cq_item;
111     cq->tail = cq_item;
112     cb_cond_signal(&cq->cond);
113     cb_mutex_exit(&cq->lock);
114 }
115 
116 /*
117  * Returns a fresh connection queue item.
118  */
cqi_new(void)119 static CQ_ITEM *cqi_new(void) {
120     CQ_ITEM *cq_item = NULL;
121     cb_mutex_enter(&cqi_freelist_lock);
122     if (cqi_freelist) {
123         cq_item = cqi_freelist;
124         cqi_freelist = cq_item->next;
125     }
126     cb_mutex_exit(&cqi_freelist_lock);
127 
128     if (NULL == cq_item) {
129         int i;
130 
131         /* Allocate a bunch of items at once to reduce fragmentation */
132         cq_item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
133         if (NULL == cq_item)
134             return NULL;
135 
136         /*
137          * Link together all the new items except the first one
138          * (which we'll return to the caller) for placement on
139          * the freelist.
140          */
141         for (i = 2; i < ITEMS_PER_ALLOC; i++)
142             cq_item[i - 1].next = &cq_item[i];
143 
144         cb_mutex_enter(&cqi_freelist_lock);
145         cq_item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
146         cqi_freelist = &cq_item[1];
147         cb_mutex_exit(&cqi_freelist_lock);
148     }
149 
150     return cq_item;
151 }
152 
153 
154 /*
155  * Frees a connection queue item (adds it to the freelist.)
156  */
cqi_free(CQ_ITEM *cq_item)157 static void cqi_free(CQ_ITEM *cq_item) {
158     cb_mutex_enter(&cqi_freelist_lock);
159     cq_item->next = cqi_freelist;
160     cqi_freelist = cq_item;
161     cb_mutex_exit(&cqi_freelist_lock);
162 }
163 
164 
165 /*
166  * Creates a worker thread.
167  */
create_worker(void (*func)(void *), void *arg)168 static void create_worker(void (*func)(void *), void *arg) {
169     cb_thread_t thread;
170     int ret;
171 
172     if ((ret = cb_create_thread(&thread, func, arg, 0)) != 0) {
173         moxi_log_write("Can't create thread: %s\n",
174                 strerror(ret));
175         exit(1);
176     }
177 }
178 
initialize_conn_lock(void)179 void initialize_conn_lock(void)
180 {
181     cb_mutex_initialize(&conn_lock);
182 }
183 
184 /*
185  * Sets whether or not we accept new connections.
186  */
accept_new_conns(const bool do_accept)187 void accept_new_conns(const bool do_accept) {
188     cb_mutex_enter(&conn_lock);
189     do_accept_new_conns(do_accept);
190     cb_mutex_exit(&conn_lock);
191 }
192 /****************************** LIBEVENT THREADS *****************************/
193 
194 /*
195  * Set up a thread's information.
196  */
setup_thread(LIBEVENT_THREAD *me)197 static void setup_thread(LIBEVENT_THREAD *me) {
198     if (! me->base) {
199         me->base = event_init();
200         if (! me->base) {
201             moxi_log_write("Can't allocate event base\n");
202             exit(1);
203         }
204     }
205 
206     /* Listen for notifications from other threads */
207     event_set(&me->notify_event, me->notify_receive_fd,
208               EV_READ | EV_PERSIST, thread_libevent_process, me);
209     event_base_set(me->base, &me->notify_event);
210 
211     if (event_add(&me->notify_event, 0) == -1) {
212         moxi_log_write("Can't monitor libevent notify pipe\n");
213         exit(1);
214     }
215 
216     me->new_conn_queue = malloc(sizeof(struct conn_queue));
217     if (me->new_conn_queue == NULL) {
218         perror("Failed to allocate memory for connection queue");
219         exit(EXIT_FAILURE);
220     }
221     cq_init(me->new_conn_queue);
222 
223     /* TODO: Merge new_conn_queue with work_queue. */
224 
225     me->work_queue = calloc(1, sizeof(work_queue));
226     if (me->work_queue == NULL) {
227         perror("Failed to allocate memory for work queue");
228         exit(EXIT_FAILURE);
229     }
230     work_queue_init(me->work_queue, me->base);
231 
232     cb_mutex_initialize(&me->stats.mutex);
233     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
234                                     NULL, NULL);
235     if (me->suffix_cache == NULL) {
236         moxi_log_write("Failed to create suffix cache\n");
237         exit(EXIT_FAILURE);
238     }
239 
240     me->conn_hash = genhash_init(512, strhash_ops);
241     if (me->conn_hash == NULL) {
242         moxi_log_write("Failed to create connection hash\n");
243         exit(EXIT_FAILURE);
244     }
245 }
246 
247 
248 /*
249  * Worker thread: main event loop
250  */
worker_libevent(void *arg)251 static void worker_libevent(void *arg) {
252     LIBEVENT_THREAD *me = arg;
253 
254     /* Any per-thread setup can happen here; thread_init() will block until
255      * all threads have finished initializing.
256      */
257     me->thread_id = cb_thread_self();
258 #ifndef WIN32
259     if (settings.verbose > 1)
260         moxi_log_write("worker_libevent thread_id %ld\n", (long)me->thread_id);
261 #endif
262 
263     cb_mutex_enter(&init_lock);
264     init_count++;
265     cb_cond_signal(&init_cond);
266     cb_mutex_exit(&init_lock);
267 
268     event_base_loop(me->base, 0);
269 }
270 
271 
272 /*
273  * Processes an incoming "handle a new connection" item. This is called when
274  * input arrives on the libevent wakeup pipe.
275  */
thread_libevent_process(evutil_socket_t fd, short which, void *arg)276 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
277     LIBEVENT_THREAD *me = arg;
278     CQ_ITEM *cq_item;
279     char buf[1];
280 
281     (void)which;
282 
283     if (recv(fd, buf, 1, 0) != 1) {
284         if (settings.verbose > 0) {
285             moxi_log_write("Can't read from libevent pipe\n");
286         }
287     }
288 
289     cq_item = cq_pop(me->new_conn_queue);
290 
291     if (NULL != cq_item) {
292         conn *c = conn_new(cq_item->sfd, cq_item->init_state, cq_item->event_flags,
293                            cq_item->read_buffer_size,
294                            cq_item->transport,
295                            me->base,
296                            cq_item->funcs, cq_item->extra);
297         if (c == NULL) {
298             if (IS_UDP(cq_item->transport)) {
299                 moxi_log_write("Can't listen for events on UDP socket\n");
300                 exit(1);
301             } else {
302                 if (settings.verbose > 0) {
303                     moxi_log_write("Can't listen for events on fd %d\n",
304                         cq_item->sfd);
305                 }
306                 closesocket(cq_item->sfd);
307             }
308         } else {
309             c->protocol = cq_item->protocol;
310             c->thread = me;
311         }
312         cqi_free(cq_item);
313     }
314 }
315 
316 /* Which thread we assigned a connection to most recently. */
317 static int last_thread = 0;
318 
319 /*
320  * Dispatches a new connection to another thread. This is only ever called
321  * from the main thread, either during initialization (for UDP) or because
322  * of an incoming connection.
323  */
dispatch_conn_new(SOCKET sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum protocol prot, enum network_transport transport, conn_funcs *funcs, void *extra)324 void dispatch_conn_new(SOCKET sfd, enum conn_states init_state, int event_flags,
325                        int read_buffer_size,
326                        enum protocol prot,
327                        enum network_transport transport,
328                        conn_funcs *funcs, void *extra) {
329     int tid = last_thread % (settings.num_threads - 1);
330 
331     /* Skip the dispatch thread (0) */
332     tid++;
333 
334     last_thread = tid;
335 
336     dispatch_conn_new_to_thread(tid, sfd, init_state, event_flags,
337                                 read_buffer_size,
338                                 prot,
339                                 transport,
340                                 funcs, extra);
341 }
342 
dispatch_conn_new_to_thread(int tid, SOCKET sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum protocol prot, enum network_transport transport, conn_funcs *funcs, void *extra)343 void dispatch_conn_new_to_thread(int tid, SOCKET sfd, enum conn_states init_state,
344                                  int event_flags, int read_buffer_size,
345                                  enum protocol prot,
346                                  enum network_transport transport,
347                                  conn_funcs *funcs, void *extra) {
348     LIBEVENT_THREAD *thread;
349     CQ_ITEM *cq_item;
350 
351     cb_assert(tid > 0);
352     cb_assert(tid < settings.num_threads);
353 
354     thread = threads + tid;
355     cq_item = cqi_new();
356 
357     cq_item->sfd = sfd;
358     cq_item->init_state = init_state;
359     cq_item->event_flags = event_flags;
360     cq_item->read_buffer_size = read_buffer_size;
361     cq_item->protocol = prot;
362     cq_item->transport = transport;
363     cq_item->funcs = funcs;
364     cq_item->extra = extra;
365 
366     cq_push(thread->new_conn_queue, cq_item);
367 
368     MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
369     if (send(thread->notify_send_fd, "", 1, 0) != 1) {
370         perror("Writing to thread notify pipe");
371     }
372 }
373 
374 /*
375  * Returns true if this is the thread that listens for new TCP connections.
376  */
is_listen_thread(void)377 int is_listen_thread(void) {
378     return cb_thread_self() == threads[0].thread_id;
379 }
380 
thread_index(cb_thread_t thread_id)381 int thread_index(cb_thread_t thread_id) {
382     int i;
383     for (i = 0; i < settings.num_threads; i++) {
384         if (threads[i].thread_id == thread_id) {
385             return i;
386         }
387     }
388     return -1;
389 }
390 
thread_by_index(int i)391 LIBEVENT_THREAD *thread_by_index(int i) {
392     return &threads[i];
393 }
394 
395 /********************************* ITEM ACCESS *******************************/
396 
397 /*
398  * Allocates a new item.
399  */
item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes)400 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
401 #ifdef MOXI_ITEM_MALLOC
402     /* Skip past the lock, since we're using malloc. */
403     return do_item_alloc(key, nkey, flags, exptime, nbytes);
404 #else
405     item *it;
406     cb_mutex_enter(&cache_lock);
407     it = do_item_alloc(key, nkey, flags, exptime, nbytes);
408     cb_mutex_exit(&cache_lock);
409     return it;
410 #endif
411 }
412 
413 /*
414  * Returns an item if it hasn't been marked as expired,
415  * lazy-expiring as needed.
416  */
item_get(const char *key, const size_t nkey)417 item *item_get(const char *key, const size_t nkey) {
418     item *it;
419     cb_mutex_enter(&cache_lock);
420     it = do_item_get(key, nkey);
421     cb_mutex_exit(&cache_lock);
422     return it;
423 }
424 
425 /*
426  * Links an item into the LRU and hashtable.
427  */
item_link(item *cq_item)428 int item_link(item *cq_item) {
429     int ret;
430 
431     cb_mutex_enter(&cache_lock);
432     ret = do_item_link(cq_item);
433     cb_mutex_exit(&cache_lock);
434     return ret;
435 }
436 
437 /*
438  * Decrements the reference count on an item and adds it to the freelist if
439  * needed.
440  */
item_remove(item *cq_item)441 void item_remove(item *cq_item) {
442 #ifdef MOXI_ITEM_MALLOC
443     /* Skip past the lock, since we're using malloc. */
444     do_item_remove(cq_item);
445 #else
446     cb_mutex_enter(&cache_lock);
447     do_item_remove(cq_item);
448     cb_mutex_exit(&cache_lock);
449 #endif
450 }
451 
452 /*
453  * Replaces one item with another in the hashtable.
454  * Unprotected by a mutex lock since the core server does not require
455  * it to be thread-safe.
456  */
item_replace(item *old_it, item *new_it)457 int item_replace(item *old_it, item *new_it) {
458     return do_item_replace(old_it, new_it);
459 }
460 
461 /*
462  * Unlinks an item from the LRU and hashtable.
463  */
item_unlink(item *cq_item)464 void item_unlink(item *cq_item) {
465     cb_mutex_enter(&cache_lock);
466     do_item_unlink(cq_item);
467     cb_mutex_exit(&cache_lock);
468 }
469 
470 /*
471  * Moves an item to the back of the LRU queue.
472  */
item_update(item *cq_item)473 void item_update(item *cq_item) {
474     cb_mutex_enter(&cache_lock);
475     do_item_update(cq_item);
476     cb_mutex_exit(&cache_lock);
477 }
478 
479 /*
480  * Does arithmetic on a numeric item value.
481  */
add_delta(conn *c, item *cq_item, const int incr, const int64_t delta, char *buf)482 enum delta_result_type add_delta(conn *c, item *cq_item, const int incr,
483                                  const int64_t delta, char *buf) {
484     enum delta_result_type ret;
485 
486     cb_mutex_enter(&cache_lock);
487     ret = do_add_delta(c, cq_item, incr, delta, buf);
488     cb_mutex_exit(&cache_lock);
489     return ret;
490 }
491 
492 /*
493  * Stores an item in the cache (high level, obeys set/add/replace semantics)
494  */
store_item(item *cq_item, int comm, conn* c)495 enum store_item_type store_item(item *cq_item, int comm, conn* c) {
496     enum store_item_type ret;
497 
498     cb_mutex_enter(&cache_lock);
499     ret = do_store_item(cq_item, comm, c);
500     cb_mutex_exit(&cache_lock);
501     return ret;
502 }
503 
504 /*
505  * Flushes expired items after a flush_all call
506  */
item_flush_expirednull507 void item_flush_expired() {
508     cb_mutex_enter(&cache_lock);
509     do_item_flush_expired();
510     cb_mutex_exit(&cache_lock);
511 }
512 
513 /*
514  * Dumps part of the cache
515  */
item_cachedump(const unsigned int clsid, const unsigned int limit, unsigned int *bytes)516 char *item_cachedump(const unsigned int clsid, const unsigned int limit, unsigned int *bytes) {
517     char *ret;
518 
519     cb_mutex_enter(&cache_lock);
520     ret = do_item_cachedump(clsid, limit, bytes);
521     cb_mutex_exit(&cache_lock);
522     return ret;
523 }
524 
525 /*
526  * Dumps statistics about slab classes
527  */
item_stats(ADD_STAT add_stats, void *c)528 void  item_stats(ADD_STAT add_stats, void *c) {
529     cb_mutex_enter(&cache_lock);
530     do_item_stats(add_stats, c);
531     cb_mutex_exit(&cache_lock);
532 }
533 
534 /*
535  * Dumps a list of objects of each size in 32-byte increments
536  */
item_stats_sizes(ADD_STAT add_stats, void *c)537 void  item_stats_sizes(ADD_STAT add_stats, void *c) {
538     cb_mutex_enter(&cache_lock);
539     do_item_stats_sizes(add_stats, c);
540     cb_mutex_exit(&cache_lock);
541 }
542 
543 /******************************* GLOBAL STATS ******************************/
544 
STATS_LOCKnull545 void STATS_LOCK() {
546     cb_mutex_enter(&stats_lock);
547 }
548 
STATS_UNLOCKnull549 void STATS_UNLOCK() {
550     cb_mutex_exit(&stats_lock);
551 }
552 
threadlocal_stats_reset(void)553 void threadlocal_stats_reset(void) {
554     int ii, sid;
555     for (ii = 0; ii < settings.num_threads; ++ii) {
556         cb_mutex_enter(&threads[ii].stats.mutex);
557 
558         threads[ii].stats.get_cmds = 0;
559         threads[ii].stats.get_misses = 0;
560         threads[ii].stats.delete_misses = 0;
561         threads[ii].stats.incr_misses = 0;
562         threads[ii].stats.decr_misses = 0;
563         threads[ii].stats.cas_misses = 0;
564         threads[ii].stats.bytes_read = 0;
565         threads[ii].stats.bytes_written = 0;
566         threads[ii].stats.flush_cmds = 0;
567         threads[ii].stats.conn_yields = 0;
568 
569         for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
570             threads[ii].stats.slab_stats[sid].set_cmds = 0;
571             threads[ii].stats.slab_stats[sid].get_hits = 0;
572             threads[ii].stats.slab_stats[sid].delete_hits = 0;
573             threads[ii].stats.slab_stats[sid].incr_hits = 0;
574             threads[ii].stats.slab_stats[sid].decr_hits = 0;
575             threads[ii].stats.slab_stats[sid].cas_hits = 0;
576             threads[ii].stats.slab_stats[sid].cas_badval = 0;
577         }
578 
579         cb_mutex_exit(&threads[ii].stats.mutex);
580     }
581 }
582 
threadlocal_stats_aggregate(struct thread_stats *thread_stats)583 void threadlocal_stats_aggregate(struct thread_stats *thread_stats) {
584     int ii, sid;
585     /* The struct contains a mutex, so I should probably not memset it.. */
586     thread_stats->get_cmds = 0;
587     thread_stats->get_misses = 0;
588     thread_stats->delete_misses = 0;
589     thread_stats->incr_misses = 0;
590     thread_stats->decr_misses = 0;
591     thread_stats->cas_misses = 0;
592     thread_stats->bytes_written = 0;
593     thread_stats->bytes_read = 0;
594     thread_stats->flush_cmds = 0;
595     thread_stats->conn_yields = 0;
596 
597     memset(thread_stats->slab_stats, 0,
598            sizeof(struct slab_stats) * MAX_NUMBER_OF_SLAB_CLASSES);
599 
600     for (ii = 0; ii < settings.num_threads; ++ii) {
601         cb_mutex_enter(&threads[ii].stats.mutex);
602 
603         thread_stats->get_cmds += threads[ii].stats.get_cmds;
604         thread_stats->get_misses += threads[ii].stats.get_misses;
605         thread_stats->delete_misses += threads[ii].stats.delete_misses;
606         thread_stats->decr_misses += threads[ii].stats.decr_misses;
607         thread_stats->incr_misses += threads[ii].stats.incr_misses;
608         thread_stats->cas_misses += threads[ii].stats.cas_misses;
609         thread_stats->bytes_read += threads[ii].stats.bytes_read;
610         thread_stats->bytes_written += threads[ii].stats.bytes_written;
611         thread_stats->flush_cmds += threads[ii].stats.flush_cmds;
612         thread_stats->conn_yields += threads[ii].stats.conn_yields;
613 
614         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
615             thread_stats->slab_stats[sid].set_cmds +=
616                 threads[ii].stats.slab_stats[sid].set_cmds;
617             thread_stats->slab_stats[sid].get_hits +=
618                 threads[ii].stats.slab_stats[sid].get_hits;
619             thread_stats->slab_stats[sid].delete_hits +=
620                 threads[ii].stats.slab_stats[sid].delete_hits;
621             thread_stats->slab_stats[sid].decr_hits +=
622                 threads[ii].stats.slab_stats[sid].decr_hits;
623             thread_stats->slab_stats[sid].incr_hits +=
624                 threads[ii].stats.slab_stats[sid].incr_hits;
625             thread_stats->slab_stats[sid].cas_hits +=
626                 threads[ii].stats.slab_stats[sid].cas_hits;
627             thread_stats->slab_stats[sid].cas_badval +=
628                 threads[ii].stats.slab_stats[sid].cas_badval;
629         }
630 
631         cb_mutex_exit(&threads[ii].stats.mutex);
632     }
633 }
634 
slab_stats_aggregate(struct thread_stats *thread_stats, struct slab_stats *out)635 void slab_stats_aggregate(struct thread_stats *thread_stats, struct slab_stats *out) {
636     int sid;
637 
638     out->set_cmds = 0;
639     out->get_hits = 0;
640     out->delete_hits = 0;
641     out->incr_hits = 0;
642     out->decr_hits = 0;
643     out->cas_hits = 0;
644     out->cas_badval = 0;
645 
646     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
647         out->set_cmds += thread_stats->slab_stats[sid].set_cmds;
648         out->get_hits += thread_stats->slab_stats[sid].get_hits;
649         out->delete_hits += thread_stats->slab_stats[sid].delete_hits;
650         out->decr_hits += thread_stats->slab_stats[sid].decr_hits;
651         out->incr_hits += thread_stats->slab_stats[sid].incr_hits;
652         out->cas_hits += thread_stats->slab_stats[sid].cas_hits;
653         out->cas_badval += thread_stats->slab_stats[sid].cas_badval;
654     }
655 }
656 
create_notification_pipe(LIBEVENT_THREAD *me)657 static bool create_notification_pipe(LIBEVENT_THREAD *me) {
658     int j;
659     SOCKET notify[2];
660     if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0,
661         (void*)notify) == SOCKET_ERROR) {
662         moxi_log_write("Failed to create notification pipe");
663         return false;
664     }
665 
666     for (j = 0; j < 2; ++j) {
667         int flags = 1;
668         setsockopt(notify[j], IPPROTO_TCP,
669                    TCP_NODELAY, (void *)&flags, sizeof(flags));
670         setsockopt(notify[j], SOL_SOCKET,
671                    SO_REUSEADDR, (void *)&flags, sizeof(flags));
672 
673         if (evutil_make_socket_nonblocking(notify[j]) == -1) {
674             moxi_log_write("Failed to enable non-blocking");
675             return false;
676         }
677     }
678 
679     me->notify_receive_fd = notify[0];
680     me->notify_send_fd = notify[1];
681 
682     return true;
683 }
684 
685 /*
686  * Initializes the thread subsystem, creating various worker threads.
687  *
688  * nthreads  Number of event handler threads to spawn
689  * main_base Event base for main thread
690  */
thread_init(int nthreads, struct event_base *main_base)691 void thread_init(int nthreads, struct event_base *main_base) {
692     int i;
693 
694     cb_mutex_initialize(&cache_lock);
695     cb_mutex_initialize(&stats_lock);
696 
697     cb_mutex_initialize(&init_lock);
698     cb_cond_initialize(&init_cond);
699 
700     cb_mutex_initialize(&cqi_freelist_lock);
701     cqi_freelist = NULL;
702 
703     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
704     if (! threads) {
705         perror("Can't allocate thread descriptors");
706         exit(1);
707     }
708 
709     threads[0].base = main_base;
710     threads[0].thread_id = cb_thread_self();
711 
712     for (i = 0; i < nthreads; i++) {
713         if (!create_notification_pipe(&threads[i])) {
714             exit(1);
715         }
716 
717         setup_thread(&threads[i]);
718     }
719 
720     /* Create threads after we've done all the libevent setup. */
721     for (i = 1; i < nthreads; i++) {
722         create_worker(worker_libevent, &threads[i]);
723     }
724 
725     /* Wait for all the threads to set themselves up before returning. */
726     cb_mutex_enter(&init_lock);
727     init_count++; /* main thread */
728     while (init_count < nthreads) {
729         cb_cond_wait(&init_cond, &init_lock);
730     }
731     cb_mutex_exit(&init_lock);
732 }
733 
734