1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *  memcached - memory caching daemon
4  *
5  *       http://www.danga.com/memcached/
6  *
7  *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8  *
9  *  Use and distribution licensed under the BSD license.  See
10  *  the LICENSE file for full text.
11  *
12  *  Authors:
13  *      Anatoly Vorobey <mellon@pobox.com>
14  *      Brad Fitzpatrick <brad@danga.com>
15  */
16 #include "config.h"
17 #include "memcached.h"
18 #include "memcached/extension_loggers.h"
19 #include "alloc_hooks.h"
20 #include "utilities/engine_loader.h"
21 #include "timings.h"
22 #include "cmdline.h"
23 #include "mc_time.h"
24 
25 #include <signal.h>
26 #include <fcntl.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #include <time.h>
32 #include <limits.h>
33 #include <ctype.h>
34 #include <stdarg.h>
35 #include <stddef.h>
36 #include <snappy-c.h>
37 #include <JSON_checker.h>
38 
39 static bool grow_dynamic_buffer(conn *c, size_t needed);
40 static void cookie_set_admin(const void *cookie);
41 static bool cookie_is_admin(const void *cookie);
42 
43 typedef union {
44     item_info info;
45     char bytes[sizeof(item_info) + ((IOV_MAX - 1) * sizeof(struct iovec))];
46 } item_info_holder;
47 
item_set_cas(const void *cookie, item *it, uint64_t cas)48 static void item_set_cas(const void *cookie, item *it, uint64_t cas) {
49     settings.engine.v1->item_set_cas(settings.engine.v0, cookie, it, cas);
50 }
51 
52 #define MAX_SASL_MECH_LEN 32
53 
54 /* The item must always be called "it" */
55 #define SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
56     thread_stats->slab_stats[info.info.clsid].slab_op++;
57 
58 #define THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
59     thread_stats->thread_op++;
60 
61 #define THREAD_GUTS2(conn, thread_stats, slab_op, thread_op) \
62     thread_stats->slab_op++; \
63     thread_stats->thread_op++;
64 
65 #define SLAB_THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
66     SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
67     THREAD_GUTS(conn, thread_stats, slab_op, thread_op)
68 
69 #define STATS_INCR1(GUTS, conn, slab_op, thread_op, key, nkey) { \
70     struct thread_stats *thread_stats = get_thread_stats(conn); \
71     cb_mutex_enter(&thread_stats->mutex); \
72     GUTS(conn, thread_stats, slab_op, thread_op); \
73     cb_mutex_exit(&thread_stats->mutex); \
74 }
75 
76 #define STATS_INCR(conn, op, key, nkey) \
77     STATS_INCR1(THREAD_GUTS, conn, op, op, key, nkey)
78 
79 #define SLAB_INCR(conn, op, key, nkey) \
80     STATS_INCR1(SLAB_GUTS, conn, op, op, key, nkey)
81 
82 #define STATS_TWO(conn, slab_op, thread_op, key, nkey) \
83     STATS_INCR1(THREAD_GUTS2, conn, slab_op, thread_op, key, nkey)
84 
85 #define SLAB_TWO(conn, slab_op, thread_op, key, nkey) \
86     STATS_INCR1(SLAB_THREAD_GUTS, conn, slab_op, thread_op, key, nkey)
87 
88 #define STATS_HIT(conn, op, key, nkey) \
89     SLAB_TWO(conn, op##_hits, cmd_##op, key, nkey)
90 
91 #define STATS_MISS(conn, op, key, nkey) \
92     STATS_TWO(conn, op##_misses, cmd_##op, key, nkey)
93 
94 #define STATS_NOKEY(conn, op) { \
95     struct thread_stats *thread_stats = \
96         get_thread_stats(conn); \
97     cb_mutex_enter(&thread_stats->mutex); \
98     thread_stats->op++; \
99     cb_mutex_exit(&thread_stats->mutex); \
100 }
101 
102 #define STATS_NOKEY2(conn, op1, op2) { \
103     struct thread_stats *thread_stats = \
104         get_thread_stats(conn); \
105     cb_mutex_enter(&thread_stats->mutex); \
106     thread_stats->op1++; \
107     thread_stats->op2++; \
108     cb_mutex_exit(&thread_stats->mutex); \
109 }
110 
111 #define STATS_ADD(conn, op, amt) { \
112     struct thread_stats *thread_stats = \
113         get_thread_stats(conn); \
114     cb_mutex_enter(&thread_stats->mutex); \
115     thread_stats->op += amt; \
116     cb_mutex_exit(&thread_stats->mutex); \
117 }
118 
119 volatile sig_atomic_t memcached_shutdown;
120 
121 /* Lock for global stats */
122 static cb_mutex_t stats_lock;
123 
124 /**
125  * Structure to save ns_server's session cas token.
126  */
127 static struct session_cas {
128     uint64_t value;
129     uint64_t ctr;
130     cb_mutex_t mutex;
131 } session_cas;
132 
STATS_LOCKnull133 void STATS_LOCK() {
134     cb_mutex_enter(&stats_lock);
135 }
136 
STATS_UNLOCKnull137 void STATS_UNLOCK() {
138     cb_mutex_exit(&stats_lock);
139 }
140 
141 #ifdef WIN32
is_blocking(DWORD dw)142 static int is_blocking(DWORD dw) {
143     return (dw == WSAEWOULDBLOCK);
144 }
is_emfile(DWORD dw)145 static int is_emfile(DWORD dw) {
146     return (dw == WSAEMFILE);
147 }
is_closed_conn(DWORD dw)148 static int is_closed_conn(DWORD dw) {
149     return (dw == WSAENOTCONN || WSAECONNRESET);
150 }
is_addrinuse(DWORD dw)151 static int is_addrinuse(DWORD dw) {
152     return (dw == WSAEADDRINUSE);
153 }
set_ewouldblock(void)154 static void set_ewouldblock(void) {
155     WSASetLastError(WSAEWOULDBLOCK);
156 }
set_econnreset(void)157 static void set_econnreset(void) {
158     WSASetLastError(WSAECONNRESET);
159 }
160 #else
is_blocking(int dw)161 static int is_blocking(int dw) {
162     return (dw == EAGAIN || dw == EWOULDBLOCK);
163 }
is_emfile(int dw)164 static int is_emfile(int dw) {
165     return (dw == EMFILE);
166 }
is_closed_conn(int dw)167 static int is_closed_conn(int dw) {
168     return  (dw == ENOTCONN || dw != ECONNRESET);
169 }
is_addrinuse(int dw)170 static int is_addrinuse(int dw) {
171     return (dw == EADDRINUSE);
172 }
set_ewouldblock(void)173 static void set_ewouldblock(void) {
174     errno = EWOULDBLOCK;
175 }
set_econnreset(void)176 static void set_econnreset(void) {
177     errno = ECONNRESET;
178 }
179 #endif
180 
181 /*
182  * forward declarations
183  */
184 static SOCKET new_socket(struct addrinfo *ai);
185 static int try_read_command(conn *c);
186 static struct thread_stats* get_independent_stats(conn *c);
187 static struct thread_stats* get_thread_stats(conn *c);
188 static void register_callback(ENGINE_HANDLE *eh,
189                               ENGINE_EVENT_TYPE type,
190                               EVENT_CALLBACK cb, const void *cb_data);
191 static SERVER_HANDLE_V1 *get_server_api(void);
192 
193 
194 enum try_read_result {
195     READ_DATA_RECEIVED,
196     READ_NO_DATA_RECEIVED,
197     READ_ERROR,            /** an error occured (on the socket) (or client closed connection) */
198     READ_MEMORY_ERROR      /** failed to allocate more memory */
199 };
200 
201 static enum try_read_result try_read_network(conn *c);
202 
203 /* stats */
204 static void stats_init(void);
205 static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate);
206 static void process_stat_settings(ADD_STAT add_stats, void *c);
207 
208 
209 /* defaults */
210 static void settings_init(void);
211 
212 /* event handling, network IO */
213 static void event_handler(evutil_socket_t fd, short which, void *arg);
214 static void complete_nread(conn *c);
215 static void write_and_free(conn *c, char *buf, size_t bytes);
216 static int ensure_iov_space(conn *c);
217 static int add_iov(conn *c, const void *buf, size_t len);
218 static int add_msghdr(conn *c);
219 
220 /** exported globals **/
221 struct stats stats;
222 struct settings settings;
223 
224 /** file scope variables **/
225 static conn *listen_conn = NULL;
226 static struct event_base *main_base;
227 static struct thread_stats *default_independent_stats;
228 
229 static struct engine_event_handler *engine_event_handlers[MAX_ENGINE_EVENT_TYPE + 1];
230 
231 enum transmit_result {
232     TRANSMIT_COMPLETE,   /** All done writing. */
233     TRANSMIT_INCOMPLETE, /** More data remaining to write. */
234     TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
235     TRANSMIT_HARD_ERROR  /** Can't write (c->state is set to conn_closing) */
236 };
237 
238 static enum transmit_result transmit(conn *c);
239 
240 
241 
242 /* Perform all callbacks of a given type for the given connection. */
perform_callbacks(ENGINE_EVENT_TYPE type, const void *data, const void *c)243 void perform_callbacks(ENGINE_EVENT_TYPE type,
244                        const void *data,
245                        const void *c) {
246     struct engine_event_handler *h;
247     for (h = engine_event_handlers[type]; h; h = h->next) {
248         h->cb(c, type, data, h->cb_data);
249     }
250 }
251 
252 /**
253  * Return the TCP or domain socket listening_port structure that
254  * has a given port number
255  */
get_listening_port_instance(const int port)256 static struct listening_port *get_listening_port_instance(const int port) {
257     struct listening_port *port_ins = NULL;
258     int i;
259     for (i = 0; i < settings.num_interfaces; ++i) {
260         if (stats.listening_ports[i].port == port) {
261             port_ins = &stats.listening_ports[i];
262         }
263     }
264     return port_ins;
265 }
266 
stats_init(void)267 static void stats_init(void) {
268     stats.daemon_conns = 0;
269     stats.rejected_conns = 0;
270     stats.curr_conns = stats.total_conns = 0;
271     stats.listening_ports = calloc(settings.num_interfaces, sizeof(struct listening_port));
272 
273     stats_prefix_init();
274 }
275 
stats_reset(const void *cookie)276 static void stats_reset(const void *cookie) {
277     struct conn *conn = (struct conn*)cookie;
278     STATS_LOCK();
279     stats.rejected_conns = 0;
280     stats.total_conns = 0;
281     stats_prefix_clear();
282     STATS_UNLOCK();
283     threadlocal_stats_reset(get_independent_stats(conn));
284     settings.engine.v1->reset_stats(settings.engine.v0, cookie);
285 }
286 
get_number_of_worker_threads(void)287 static int get_number_of_worker_threads(void) {
288     int ret;
289     char *override = getenv("MEMCACHED_NUM_CPUS");
290     if (override == NULL) {
291 #ifdef WIN32
292         SYSTEM_INFO sysinfo;
293         GetSystemInfo(&sysinfo);
294         ret = (int)sysinfo.dwNumberOfProcessors;
295 #else
296         ret = (int)sysconf(_SC_NPROCESSORS_ONLN);
297 #endif
298         if (ret > 4) {
299             ret = (int)(ret * 0.75f);
300         }
301         if (ret < 4) {
302             ret = 4;
303         }
304     } else {
305         ret = atoi(override);
306         if (ret == 0) {
307             ret = 4;
308         }
309     }
310 
311     return ret;
312 }
313 
settings_init(void)314 static void settings_init(void) {
315     static struct interface default_interface;
316     default_interface.port = 11211;
317     default_interface.maxconn = 1000;
318     default_interface.backlog = 1024;
319 
320     settings.num_interfaces = 1;
321     settings.interfaces = &default_interface;
322     settings.daemonize = false;
323     settings.pid_file = NULL;
324     settings.bio_drain_buffer_sz = 8192;
325 
326     settings.verbose = 0;
327     settings.num_threads = get_number_of_worker_threads();
328     settings.prefix_delimiter = ':';
329     settings.detail_enabled = 0;
330     settings.allow_detailed = true;
331     settings.reqs_per_event_high_priority = 50;
332     settings.reqs_per_event_med_priority = 5;
333     settings.reqs_per_event_low_priority = 1;
334     settings.default_reqs_per_event = 20;
335     settings.require_sasl = false;
336     settings.extensions.logger = get_stderr_logger();
337     settings.tcp_nodelay = getenv("MEMCACHED_DISABLE_TCP_NODELAY") == NULL;
338     settings.engine_module = "default_engine.so";
339     settings.engine_config = NULL;
340     settings.config = NULL;
341     settings.admin = NULL;
342     settings.disable_admin = false;
343     settings.datatype = false;
344 }
345 
346 /*
347  * Adds a message header to a connection.
348  *
349  * Returns 0 on success, -1 on out-of-memory.
350  */
add_msghdr(conn *c)351 static int add_msghdr(conn *c)
352 {
353     struct msghdr *msg;
354 
355     cb_assert(c != NULL);
356 
357     if (c->msgsize == c->msgused) {
358         cb_assert(c->msgsize > 0);
359         msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
360         if (! msg)
361             return -1;
362         c->msglist = msg;
363         c->msgsize *= 2;
364     }
365 
366     msg = c->msglist + c->msgused;
367 
368     /* this wipes msg_iovlen, msg_control, msg_controllen, and
369        msg_flags, the last 3 of which aren't defined on solaris: */
370     memset(msg, 0, sizeof(struct msghdr));
371 
372     msg->msg_iov = &c->iov[c->iovused];
373 
374     if (c->request_addr_size > 0) {
375         msg->msg_name = &c->request_addr;
376         msg->msg_namelen = c->request_addr_size;
377     }
378 
379     c->msgbytes = 0;
380     c->msgused++;
381 
382     return 0;
383 }
384 
385 struct {
386     cb_mutex_t mutex;
387     bool disabled;
388     ssize_t count;
389     uint64_t num_disable;
390 } listen_state;
391 
is_listen_disabled(void)392 static bool is_listen_disabled(void) {
393     bool ret;
394     cb_mutex_enter(&listen_state.mutex);
395     ret = listen_state.disabled;
396     cb_mutex_exit(&listen_state.mutex);
397     return ret;
398 }
399 
get_listen_disabled_num(void)400 static uint64_t get_listen_disabled_num(void) {
401     uint64_t ret;
402     cb_mutex_enter(&listen_state.mutex);
403     ret = listen_state.num_disable;
404     cb_mutex_exit(&listen_state.mutex);
405     return ret;
406 }
407 
disable_listen(void)408 static void disable_listen(void) {
409     conn *next;
410     cb_mutex_enter(&listen_state.mutex);
411     listen_state.disabled = true;
412     listen_state.count = 10;
413     ++listen_state.num_disable;
414     cb_mutex_exit(&listen_state.mutex);
415 
416     for (next = listen_conn; next; next = next->next) {
417         update_event(next, 0);
418         if (listen(next->sfd, 1) != 0) {
419             log_socket_error(EXTENSION_LOG_WARNING, NULL,
420                              "listen() failed: %s");
421         }
422     }
423 }
424 
safe_close(SOCKET sfd)425 void safe_close(SOCKET sfd) {
426     if (sfd != INVALID_SOCKET) {
427         int rval;
428         while ((rval = closesocket(sfd)) == SOCKET_ERROR &&
429                (errno == EINTR || errno == EAGAIN)) {
430             /* go ahead and retry */
431         }
432 
433         if (rval == SOCKET_ERROR) {
434             char msg[80];
435             snprintf(msg, sizeof(msg), "Failed to close socket %d (%%s)!!", (int)sfd);
436             log_socket_error(EXTENSION_LOG_WARNING, NULL,
437                              msg);
438         } else {
439             STATS_LOCK();
440             stats.curr_conns--;
441             STATS_UNLOCK();
442 
443             if (is_listen_disabled()) {
444                 notify_dispatcher();
445             }
446         }
447     }
448 }
449 
450 /**
451  * Reset all of the dynamic buffers used by a connection back to their
452  * default sizes. The strategy for resizing the buffers is to allocate a
453  * new one of the correct size and free the old one if the allocation succeeds
454  * instead of using realloc to change the buffer size (because realloc may
455  * not shrink the buffers, and will also copy the memory). If the allocation
456  * fails the buffer will be unchanged.
457  *
458  * @param c the connection to resize the buffers for
459  * @return true if all allocations succeeded, false if one or more of the
460  *         allocations failed.
461  */
conn_reset_buffersize(conn *c)462 static bool conn_reset_buffersize(conn *c) {
463     bool ret = true;
464 
465     if (c->rsize != DATA_BUFFER_SIZE) {
466         void *ptr = malloc(DATA_BUFFER_SIZE);
467         if (ptr != NULL) {
468             free(c->rbuf);
469             c->rbuf = ptr;
470             c->rsize = DATA_BUFFER_SIZE;
471         } else {
472             ret = false;
473         }
474     }
475 
476     if (c->wsize != DATA_BUFFER_SIZE) {
477         void *ptr = malloc(DATA_BUFFER_SIZE);
478         if (ptr != NULL) {
479             free(c->wbuf);
480             c->wbuf = ptr;
481             c->wsize = DATA_BUFFER_SIZE;
482         } else {
483             ret = false;
484         }
485     }
486 
487     if (c->isize != ITEM_LIST_INITIAL) {
488         void *ptr = malloc(sizeof(item *) * ITEM_LIST_INITIAL);
489         if (ptr != NULL) {
490             free(c->ilist);
491             c->ilist = ptr;
492             c->isize = ITEM_LIST_INITIAL;
493         } else {
494             ret = false;
495         }
496     }
497 
498     if (c->temp_alloc_size != TEMP_ALLOC_LIST_INITIAL) {
499         void *ptr = malloc(sizeof(char *) * TEMP_ALLOC_LIST_INITIAL);
500         if (ptr != NULL) {
501             free(c->temp_alloc_list);
502             c->temp_alloc_list = ptr;
503             c->temp_alloc_size = TEMP_ALLOC_LIST_INITIAL;
504         } else {
505             ret = false;
506         }
507     }
508 
509     if (c->iovsize != IOV_LIST_INITIAL) {
510         void *ptr = malloc(sizeof(struct iovec) * IOV_LIST_INITIAL);
511         if (ptr != NULL) {
512             free(c->iov);
513             c->iov = ptr;
514             c->iovsize = IOV_LIST_INITIAL;
515         } else {
516             ret = false;
517         }
518     }
519 
520     if (c->msgsize != MSG_LIST_INITIAL) {
521         void *ptr = malloc(sizeof(struct msghdr) * MSG_LIST_INITIAL);
522         if (ptr != NULL) {
523             free(c->msglist);
524             c->msglist = ptr;
525             c->msgsize = MSG_LIST_INITIAL;
526         } else {
527             ret = false;
528         }
529     }
530 
531     return ret;
532 }
533 
534 /**
535  * Constructor for all memory allocations of connection objects. Initialize
536  * all members and allocate the transfer buffers.
537  *
538  * @param buffer The memory allocated by the object cache
539  * @return 0 on success, 1 if we failed to allocate memory
540  */
conn_constructor(conn *c)541 static int conn_constructor(conn *c) {
542     memset(c, 0, sizeof(*c));
543     MEMCACHED_CONN_CREATE(c);
544 
545     c->state = conn_immediate_close;
546     c->sfd = INVALID_SOCKET;
547     if (!conn_reset_buffersize(c)) {
548         free(c->rbuf);
549         free(c->wbuf);
550         free(c->ilist);
551         free(c->temp_alloc_list);
552         free(c->iov);
553         free(c->msglist);
554         settings.extensions.logger->log(EXTENSION_LOG_WARNING,
555                                         NULL,
556                                         "Failed to allocate buffers for connection\n");
557         return 1;
558     }
559 
560     STATS_LOCK();
561     stats.conn_structs++;
562     STATS_UNLOCK();
563 
564     return 0;
565 }
566 
567 /**
568  * Destructor for all connection objects. Release all allocated resources.
569  *
570  * @param buffer The memory allocated by the objec cache
571  */
conn_destructor(conn *c)572 static void conn_destructor(conn *c) {
573     free(c->rbuf);
574     free(c->wbuf);
575     free(c->ilist);
576     free(c->temp_alloc_list);
577     free(c->iov);
578     free(c->msglist);
579 
580     STATS_LOCK();
581     stats.conn_structs--;
582     STATS_UNLOCK();
583 }
584 
585 /*
586  * Free list management for connections.
587  */
588 struct connections {
589     conn* free;
590     conn** all;
591     cb_mutex_t mutex;
592     int next;
593 } connections;
594 
initialize_connections(void)595 static void initialize_connections(void)
596 {
597     int preallocate;
598 
599     cb_mutex_initialize(&connections.mutex);
600     connections.all = calloc(settings.maxconns, sizeof(conn*));
601     if (connections.all == NULL) {
602         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
603                                         "Failed to allocate memory for connections");
604         exit(EX_OSERR);
605     }
606 
607     preallocate = settings.maxconns / 2;
608     if (preallocate < 1000) {
609         preallocate = settings.maxconns;
610     } else if (preallocate > 5000) {
611         preallocate = 5000;
612     }
613 
614     for (connections.next = 0; connections.next < preallocate; ++connections.next) {
615         connections.all[connections.next] = malloc(sizeof(conn));
616         if (conn_constructor(connections.all[connections.next]) != 0) {
617             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
618                                             "Failed to allocate memory for connections");
619             exit(EX_OSERR);
620         }
621         connections.all[connections.next]->next = connections.free;
622         connections.free = connections.all[connections.next];
623     }
624 }
625 
destroy_connections(void)626 static void destroy_connections(void)
627 {
628     int ii;
629     for (ii = 0; ii < settings.maxconns; ++ii) {
630         if (connections.all[ii]) {
631             conn *c = connections.all[ii];
632             conn_destructor(c);
633             free(c);
634         }
635     }
636 
637     free(connections.all);
638 }
639 
allocate_connection(void)640 static conn *allocate_connection(void) {
641     conn *ret;
642 
643     cb_mutex_enter(&connections.mutex);
644     ret = connections.free;
645     if (ret != NULL) {
646         connections.free = connections.free->next;
647         ret->next = NULL;
648     }
649     cb_mutex_exit(&connections.mutex);
650 
651     if (ret == NULL) {
652         ret = malloc(sizeof(conn));
653         if (ret == NULL) {
654             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
655                                             "Failed to allocate memory for connection");
656             return NULL;
657         }
658 
659         if (conn_constructor(ret) != 0) {
660             free(ret);
661             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
662                                             "Failed to allocate memory for connection");
663             return NULL;
664         }
665 
666         cb_mutex_enter(&connections.mutex);
667         if (connections.next == settings.maxconns) {
668             free(ret);
669             ret = NULL;
670         } else {
671             connections.all[connections.next++] = ret;
672         }
673         cb_mutex_exit(&connections.mutex);
674     }
675 
676     return ret;
677 }
678 
release_connection(conn *c)679 static void release_connection(conn *c) {
680     c->sfd = INVALID_SOCKET;
681     cb_mutex_enter(&connections.mutex);
682     c->next = connections.free;
683     connections.free = c;
684     cb_mutex_exit(&connections.mutex);
685 }
686 
substate_text(enum bin_substates state)687 static const char *substate_text(enum bin_substates state) {
688     switch (state) {
689     case bin_no_state: return "bin_no_state";
690     case bin_reading_set_header: return "bin_reading_set_header";
691     case bin_reading_cas_header: return "bin_reading_cas_header";
692     case bin_read_set_value: return "bin_read_set_value";
693     case bin_reading_sasl_auth: return "bin_reading_sasl_auth";
694     case bin_reading_sasl_auth_data: return "bin_reading_sasl_auth_data";
695     case bin_reading_packet: return "bin_reading_packet";
696     default:
697         return "illegal";
698     }
699 }
700 
add_connection_stats(ADD_STAT add_stats, conn *d, conn *c)701 static void add_connection_stats(ADD_STAT add_stats, conn *d, conn *c) {
702     append_stat("conn", add_stats, d, "%p", c);
703     if (c->sfd == INVALID_SOCKET) {
704         append_stat("socket", add_stats, d, "disconnected");
705     } else {
706         append_stat("socket", add_stats, d, "%lu", (long)c->sfd);
707         append_stat("protocol", add_stats, d, "%s", "binary");
708         append_stat("transport", add_stats, d, "TCP");
709         append_stat("nevents", add_stats, d, "%u", c->nevents);
710         if (c->sasl_conn != NULL) {
711             append_stat("sasl_conn", add_stats, d, "%p", c->sasl_conn);
712         }
713         append_stat("state", add_stats, d, "%s", state_text(c->state));
714         append_stat("substate", add_stats, d, "%s", substate_text(c->substate));
715         append_stat("registered_in_libevent", add_stats, d, "%d",
716                     (int)c->registered_in_libevent);
717         append_stat("ev_flags", add_stats, d, "%x", c->ev_flags);
718         append_stat("which", add_stats, d, "%x", c->which);
719         append_stat("rbuf", add_stats, d, "%p", c->rbuf);
720         append_stat("rcurr", add_stats, d, "%p", c->rcurr);
721         append_stat("rsize", add_stats, d, "%u", c->rsize);
722         append_stat("rbytes", add_stats, d, "%u", c->rbytes);
723         append_stat("wbuf", add_stats, d, "%p", c->wbuf);
724         append_stat("wcurr", add_stats, d, "%p", c->wcurr);
725         append_stat("wsize", add_stats, d, "%u", c->wsize);
726         append_stat("wbytes", add_stats, d, "%u", c->wbytes);
727         append_stat("write_and_go", add_stats, d, "%p", c->write_and_go);
728         append_stat("write_and_free", add_stats, d, "%p", c->write_and_free);
729         append_stat("ritem", add_stats, d, "%p", c->ritem);
730         append_stat("rlbytes", add_stats, d, "%u", c->rlbytes);
731         append_stat("item", add_stats, d, "%p", c->item);
732         append_stat("store_op", add_stats, d, "%u", c->store_op);
733         append_stat("sbytes", add_stats, d, "%u", c->sbytes);
734         append_stat("iov", add_stats, d, "%p", c->iov);
735         append_stat("iovsize", add_stats, d, "%u", c->iovsize);
736         append_stat("iovused", add_stats, d, "%u", c->iovused);
737         append_stat("msglist", add_stats, d, "%p", c->msglist);
738         append_stat("msgsize", add_stats, d, "%u", c->msgsize);
739         append_stat("msgused", add_stats, d, "%u", c->msgused);
740         append_stat("msgcurr", add_stats, d, "%u", c->msgcurr);
741         append_stat("msgbytes", add_stats, d, "%u", c->msgbytes);
742         append_stat("ilist", add_stats, d, "%p", c->ilist);
743         append_stat("isize", add_stats, d, "%u", c->isize);
744         append_stat("icurr", add_stats, d, "%p", c->icurr);
745         append_stat("ileft", add_stats, d, "%u", c->ileft);
746         append_stat("temp_alloc_list", add_stats, d, "%p", c->temp_alloc_list);
747         append_stat("temp_alloc_size", add_stats, d, "%u", c->temp_alloc_size);
748         append_stat("temp_alloc_curr", add_stats, d, "%p", c->temp_alloc_curr);
749         append_stat("temp_alloc_left", add_stats, d, "%u", c->temp_alloc_left);
750 
751         append_stat("noreply", add_stats, d, "%d", c->noreply);
752         append_stat("refcount", add_stats, d, "%u", (int)c->refcount);
753         append_stat("dynamic_buffer.buffer", add_stats, d, "%p",
754                     c->dynamic_buffer.buffer);
755         append_stat("dynamic_buffer.size", add_stats, d, "%zu",
756                     c->dynamic_buffer.size);
757         append_stat("dynamic_buffer.offset", add_stats, d, "%zu",
758                     c->dynamic_buffer.offset);
759         append_stat("engine_storage", add_stats, d, "%p", c->engine_storage);
760         /* @todo we should decode the binary header */
761         append_stat("cas", add_stats, d, "%"PRIu64, c->cas);
762         append_stat("cmd", add_stats, d, "%u", c->cmd);
763         append_stat("opaque", add_stats, d, "%u", c->opaque);
764         append_stat("keylen", add_stats, d, "%u", c->keylen);
765         append_stat("list_state", add_stats, d, "%u", c->list_state);
766         append_stat("next", add_stats, d, "%p", c->next);
767         append_stat("thread", add_stats, d, "%p", c->thread);
768         append_stat("aiostat", add_stats, d, "%u", c->aiostat);
769         append_stat("ewouldblock", add_stats, d, "%u", c->ewouldblock);
770         append_stat("tap_iterator", add_stats, d, "%p", c->tap_iterator);
771     }
772 }
773 
774 /**
775  * Do a full stats of all of the connections.
776  * Do _NOT_ try to follow _ANY_ of the pointers in the conn structure
777  * because we read all of the values _DIRTY_. We preallocated the array
778  * of all of the connection pointers during startup, so we _KNOW_ that
779  * we can iterate through all of them. All of the conn structs will
780  * only appear in the connections.all array when we've allocated them,
781  * and we don't release them so it's safe to look at them.
782  */
connection_stats(ADD_STAT add_stats, conn *c)783 static void connection_stats(ADD_STAT add_stats, conn *c) {
784     int ii;
785     for (ii = 0; ii < settings.maxconns && connections.all[ii]; ++ii) {
786         add_connection_stats(add_stats, c, connections.all[ii]);
787     }
788 }
789 
conn_new(const SOCKET sfd, in_port_t parent_port, STATE_FUNC init_state, int event_flags, unsigned int read_buffer_size, struct event_base *base, struct timeval *timeout)790 conn *conn_new(const SOCKET sfd, in_port_t parent_port,
791                STATE_FUNC init_state, int event_flags,
792                unsigned int read_buffer_size, struct event_base *base,
793                struct timeval *timeout) {
794     conn *c = allocate_connection();
795     if (c == NULL) {
796         return NULL;
797     }
798 
799     c->admin = false;
800     cb_assert(c->thread == NULL);
801     c->max_reqs_per_event = settings.default_reqs_per_event;
802 
803     if (c->rsize < read_buffer_size) {
804         void *mem = malloc(read_buffer_size);
805         if (mem) {
806             c->rsize = read_buffer_size;
807             free(c->rbuf);
808             c->rbuf = mem;
809         } else {
810             cb_assert(c->thread == NULL);
811             release_connection(c);
812             return NULL;
813         }
814     }
815 
816     memset(&c->ssl, 0, sizeof(c->ssl));
817     if (init_state != conn_listening) {
818         int ii;
819         for (ii = 0; ii < settings.num_interfaces; ++ii) {
820             if (parent_port == settings.interfaces[ii].port) {
821                 if (settings.interfaces[ii].ssl.cert != NULL) {
822                     const char *cert = settings.interfaces[ii].ssl.cert;
823                     const char *pkey = settings.interfaces[ii].ssl.key;
824 
825                     c->ssl.ctx = SSL_CTX_new(SSLv23_server_method());
826 
827                     /* @todo don't read files, but use in-memory-copies */
828                     if (!SSL_CTX_use_certificate_chain_file(c->ssl.ctx, cert) ||
829                         !SSL_CTX_use_PrivateKey_file(c->ssl.ctx, pkey, SSL_FILETYPE_PEM)) {
830                         release_connection(c);
831                         return NULL;
832                     }
833 
834                     c->ssl.enabled = true;
835                     c->ssl.error = false;
836                     c->ssl.client = NULL;
837 
838                     c->ssl.in.buffer = malloc(settings.bio_drain_buffer_sz);
839                     c->ssl.out.buffer = malloc(settings.bio_drain_buffer_sz);
840 
841                     if (c->ssl.in.buffer == NULL || c->ssl.out.buffer == NULL) {
842                         release_connection(c);
843                         return NULL;
844                     }
845 
846                     c->ssl.in.buffsz = settings.bio_drain_buffer_sz;
847                     c->ssl.out.buffsz = settings.bio_drain_buffer_sz;
848                     BIO_new_bio_pair(&c->ssl.application,
849                                      settings.bio_drain_buffer_sz,
850                                      &c->ssl.network,
851                                      settings.bio_drain_buffer_sz);
852 
853                     c->ssl.client = SSL_new(c->ssl.ctx);
854                     SSL_set_bio(c->ssl.client,
855                                 c->ssl.application,
856                                 c->ssl.application);
857                 }
858             }
859         }
860     }
861 
862     c->request_addr_size = 0;
863 
864     if (settings.verbose > 1) {
865         if (init_state == conn_listening) {
866             settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
867                                             "<%d server listening", sfd);
868         } else {
869             settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
870                                             "<%d new client connection", sfd);
871         }
872     }
873 
874     c->sfd = sfd;
875     c->parent_port = parent_port;
876     c->state = init_state;
877     c->rlbytes = 0;
878     c->cmd = -1;
879     c->rbytes = c->wbytes = 0;
880     c->wcurr = c->wbuf;
881     c->rcurr = c->rbuf;
882     c->ritem = 0;
883     c->icurr = c->ilist;
884     c->temp_alloc_curr = c->temp_alloc_list;
885     c->ileft = 0;
886     c->temp_alloc_left = 0;
887     c->iovused = 0;
888     c->msgcurr = 0;
889     c->msgused = 0;
890     c->next = NULL;
891     c->list_state = 0;
892 
893     c->write_and_go = init_state;
894     c->write_and_free = 0;
895     c->item = 0;
896     c->supports_datatype = false;
897     c->noreply = false;
898 
899     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
900     event_base_set(base, &c->event);
901     c->ev_flags = event_flags;
902 
903     if (!register_event(c, timeout)) {
904         cb_assert(c->thread == NULL);
905         release_connection(c);
906         return NULL;
907     }
908 
909     STATS_LOCK();
910     stats.total_conns++;
911     STATS_UNLOCK();
912 
913     c->aiostat = ENGINE_SUCCESS;
914     c->ewouldblock = false;
915     c->refcount = 1;
916 
917     MEMCACHED_CONN_ALLOCATE(c->sfd);
918 
919     perform_callbacks(ON_CONNECT, NULL, c);
920 
921     return c;
922 }
923 
conn_cleanup(conn *c)924 static void conn_cleanup(conn *c) {
925     cb_assert(c != NULL);
926     c->admin = false;
927     if (c->item) {
928         settings.engine.v1->release(settings.engine.v0, c, c->item);
929         c->item = 0;
930     }
931 
932     if (c->ileft != 0) {
933         for (; c->ileft > 0; c->ileft--,c->icurr++) {
934             settings.engine.v1->release(settings.engine.v0, c, *(c->icurr));
935         }
936     }
937 
938     if (c->temp_alloc_left != 0) {
939         for (; c->temp_alloc_left > 0; c->temp_alloc_left--, c->temp_alloc_curr++) {
940             free(*(c->temp_alloc_curr));
941         }
942     }
943 
944     if (c->write_and_free) {
945         free(c->write_and_free);
946         c->write_and_free = 0;
947     }
948 
949     if (c->sasl_conn) {
950         cbsasl_dispose(&c->sasl_conn);
951         c->sasl_conn = NULL;
952     }
953 
954     c->engine_storage = NULL;
955     c->tap_iterator = NULL;
956     c->thread = NULL;
957     cb_assert(c->next == NULL);
958     c->sfd = INVALID_SOCKET;
959     c->dcp = 0;
960     c->start = 0;
961     if (c->ssl.enabled) {
962         BIO_free_all(c->ssl.network);
963         SSL_free(c->ssl.client);
964         c->ssl.enabled = false;
965         c->ssl.error = false;
966         free(c->ssl.in.buffer);
967         free(c->ssl.out.buffer);
968         memset(&c->ssl, 0, sizeof(c->ssl));
969     }
970 }
971 
conn_close(conn *c)972 void conn_close(conn *c) {
973     cb_assert(c != NULL);
974     cb_assert(c->sfd == INVALID_SOCKET);
975     cb_assert(c->state == conn_immediate_close);
976 
977     cb_assert(c->thread);
978     /* remove from pending-io list */
979     if (settings.verbose > 1 && list_contains(c->thread->pending_io, c)) {
980         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
981                                         "Current connection was in the pending-io list.. Nuking it\n");
982     }
983     c->thread->pending_io = list_remove(c->thread->pending_io, c);
984 
985     conn_cleanup(c);
986 
987     /*
988      * The contract with the object cache is that we should return the
989      * object in a constructed state. Reset the buffers to the default
990      * size
991      */
992     conn_reset_buffersize(c);
993     cb_assert(c->thread == NULL);
994     release_connection(c);
995 }
996 
997 /*
998  * Shrinks a connection's buffers if they're too big.  This prevents
999  * periodic large "get" requests from permanently chewing lots of server
1000  * memory.
1001  *
1002  * This should only be called in between requests since it can wipe output
1003  * buffers!
1004  */
conn_shrink(conn *c)1005 static void conn_shrink(conn *c) {
1006     cb_assert(c != NULL);
1007 
1008     if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
1009         void *newbuf;
1010 
1011         if (c->rcurr != c->rbuf) {
1012             /* Pack the buffer */
1013             memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1014         }
1015 
1016         newbuf = realloc(c->rbuf, DATA_BUFFER_SIZE);
1017 
1018         if (newbuf) {
1019             c->rbuf = newbuf;
1020             c->rsize = DATA_BUFFER_SIZE;
1021         }
1022         c->rcurr = c->rbuf;
1023     }
1024 
1025     /* isize is no longer dynamic */
1026     cb_assert(c->isize == ITEM_LIST_INITIAL);
1027 
1028     if (c->msgsize > MSG_LIST_HIGHWAT) {
1029         void *newbuf = realloc(c->msglist,
1030                                MSG_LIST_INITIAL * sizeof(c->msglist[0]));
1031         if (newbuf) {
1032             c->msglist = newbuf;
1033             c->msgsize = MSG_LIST_INITIAL;
1034         }
1035     }
1036 
1037     if (c->iovsize > IOV_LIST_HIGHWAT) {
1038         void *newbuf = realloc(c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
1039         if (newbuf) {
1040             c->iov = newbuf;
1041             c->iovsize = IOV_LIST_INITIAL;
1042         }
1043     }
1044 }
1045 
1046 /**
1047  * Convert a state name to a human readable form.
1048  */
state_text(STATE_FUNC state)1049 const char *state_text(STATE_FUNC state) {
1050     if (state == conn_listening) {
1051         return "conn_listening";
1052     } else if (state == conn_new_cmd) {
1053         return "conn_new_cmd";
1054     } else if (state == conn_waiting) {
1055         return "conn_waiting";
1056     } else if (state == conn_read) {
1057         return "conn_read";
1058     } else if (state == conn_parse_cmd) {
1059         return "conn_parse_cmd";
1060     } else if (state == conn_write) {
1061         return "conn_write";
1062     } else if (state == conn_nread) {
1063         return "conn_nread";
1064     } else if (state == conn_swallow) {
1065         return "conn_swallow";
1066     } else if (state == conn_closing) {
1067         return "conn_closing";
1068     } else if (state == conn_mwrite) {
1069         return "conn_mwrite";
1070     } else if (state == conn_ship_log) {
1071         return "conn_ship_log";
1072     } else if (state == conn_setup_tap_stream) {
1073         return "conn_setup_tap_stream";
1074     } else if (state == conn_pending_close) {
1075         return "conn_pending_close";
1076     } else if (state == conn_immediate_close) {
1077         return "conn_immediate_close";
1078     } else if (state == conn_refresh_cbsasl) {
1079         return "conn_refresh_cbsasl";
1080     } else if (state == conn_refresh_ssl_certs) {
1081         return "conn_refresh_ssl_cert";
1082     } else {
1083         return "Unknown";
1084     }
1085 }
1086 
1087 /*
1088  * Sets a connection's current state in the state machine. Any special
1089  * processing that needs to happen on certain state transitions can
1090  * happen here.
1091  */
conn_set_state(conn *c, STATE_FUNC state)1092 void conn_set_state(conn *c, STATE_FUNC state) {
1093     cb_assert(c != NULL);
1094 
1095     if (state != c->state) {
1096         /*
1097          * The connections in the "tap thread" behaves differently than
1098          * normal connections because they operate in a full duplex mode.
1099          * New messages may appear from both sides, so we can't block on
1100          * read from the nework / engine
1101          */
1102         if (c->tap_iterator != NULL || c->dcp) {
1103             if (state == conn_waiting) {
1104                 c->which = EV_WRITE;
1105                 state = conn_ship_log;
1106             }
1107         }
1108 
1109         if (settings.verbose > 2 || c->state == conn_closing
1110             || c->state == conn_setup_tap_stream) {
1111             settings.extensions.logger->log(EXTENSION_LOG_DETAIL, c,
1112                                             "%d: going from %s to %s\n",
1113                                             c->sfd, state_text(c->state),
1114                                             state_text(state));
1115         }
1116 
1117         if (state == conn_write || state == conn_mwrite) {
1118             if (c->start != 0) {
1119                 collect_timing(c->cmd, gethrtime() - c->start);
1120                 c->start = 0;
1121             }
1122             MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
1123         }
1124 
1125         c->state = state;
1126     }
1127 }
1128 
1129 /*
1130  * Ensures that there is room for another struct iovec in a connection's
1131  * iov list.
1132  *
1133  * Returns 0 on success, -1 on out-of-memory.
1134  */
ensure_iov_space(conn *c)1135 static int ensure_iov_space(conn *c) {
1136     cb_assert(c != NULL);
1137 
1138     if (c->iovused >= c->iovsize) {
1139         int i, iovnum;
1140         struct iovec *new_iov = (struct iovec *)realloc(c->iov,
1141                                 (c->iovsize * 2) * sizeof(struct iovec));
1142         if (! new_iov)
1143             return -1;
1144         c->iov = new_iov;
1145         c->iovsize *= 2;
1146 
1147         /* Point all the msghdr structures at the new list. */
1148         for (i = 0, iovnum = 0; i < c->msgused; i++) {
1149             c->msglist[i].msg_iov = &c->iov[iovnum];
1150             iovnum += c->msglist[i].msg_iovlen;
1151         }
1152     }
1153 
1154     return 0;
1155 }
1156 
1157 
1158 /*
1159  * Adds data to the list of pending data that will be written out to a
1160  * connection.
1161  *
1162  * Returns 0 on success, -1 on out-of-memory.
1163  */
1164 
add_iov(conn *c, const void *buf, size_t len)1165 static int add_iov(conn *c, const void *buf, size_t len) {
1166     struct msghdr *m;
1167     size_t leftover;
1168     bool limit_to_mtu;
1169 
1170     cb_assert(c != NULL);
1171 
1172     if (len == 0) {
1173         return 0;
1174     }
1175 
1176     do {
1177         m = &c->msglist[c->msgused - 1];
1178 
1179         /*
1180          * Limit the first payloads of TCP replies, to
1181          * UDP_MAX_PAYLOAD_SIZE bytes.
1182          */
1183         limit_to_mtu = (1 == c->msgused);
1184 
1185         /* We may need to start a new msghdr if this one is full. */
1186         if (m->msg_iovlen == IOV_MAX ||
1187             (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
1188             add_msghdr(c);
1189         }
1190 
1191         if (ensure_iov_space(c) != 0)
1192             return -1;
1193 
1194         /* If the fragment is too big to fit in the datagram, split it up */
1195         if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
1196             leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
1197             len -= leftover;
1198         } else {
1199             leftover = 0;
1200         }
1201 
1202         m = &c->msglist[c->msgused - 1];
1203         m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
1204         m->msg_iov[m->msg_iovlen].iov_len = len;
1205 
1206         c->msgbytes += (int)len;
1207         c->iovused++;
1208         m->msg_iovlen++;
1209 
1210         buf = ((char *)buf) + len;
1211         len = leftover;
1212     } while (leftover > 0);
1213 
1214     return 0;
1215 }
1216 
1217 /**
1218  * get a pointer to the start of the request struct for the current command
1219  */
binary_get_request(conn *c)1220 static void* binary_get_request(conn *c) {
1221     char *ret = c->rcurr;
1222     ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
1223             c->binary_header.request.extlen);
1224 
1225     cb_assert(ret >= c->rbuf);
1226     return ret;
1227 }
1228 
1229 /**
1230  * get a pointer to the key in this request
1231  */
binary_get_key(conn *c)1232 static char* binary_get_key(conn *c) {
1233     return c->rcurr - (c->binary_header.request.keylen);
1234 }
1235 
1236 /**
1237  * Insert a key into a buffer, but replace all non-printable characters
1238  * with a '.'.
1239  *
1240  * @param dest where to store the output
1241  * @param destsz size of destination buffer
1242  * @param prefix string to insert before the data
1243  * @param client the client we are serving
1244  * @param from_client set to true if this data is from the client
1245  * @param key the key to add to the buffer
1246  * @param nkey the number of bytes in the key
1247  * @return number of bytes in dest if success, -1 otherwise
1248  */
key_to_printable_buffer(char *dest, size_t destsz, SOCKET client, bool from_client, const char *prefix, const char *key, size_t nkey)1249 static ssize_t key_to_printable_buffer(char *dest, size_t destsz,
1250                                        SOCKET client, bool from_client,
1251                                        const char *prefix,
1252                                        const char *key,
1253                                        size_t nkey)
1254 {
1255     char *ptr;
1256     ssize_t ii;
1257     ssize_t nw = snprintf(dest, destsz, "%c%d %s ", from_client ? '>' : '<',
1258                           (int)client, prefix);
1259     if (nw == -1) {
1260         return -1;
1261     }
1262 
1263     ptr = dest + nw;
1264     destsz -= nw;
1265     if (nkey > destsz) {
1266         nkey = destsz;
1267     }
1268 
1269     for (ii = 0; ii < nkey; ++ii, ++key, ++ptr) {
1270         if (isgraph(*key)) {
1271             *ptr = *key;
1272         } else {
1273             *ptr = '.';
1274         }
1275     }
1276 
1277     *ptr = '\0';
1278     return (ssize_t)(ptr - dest);
1279 }
1280 
1281 /**
1282  * Convert a byte array to a text string
1283  *
1284  * @param dest where to store the output
1285  * @param destsz size of destination buffer
1286  * @param prefix string to insert before the data
1287  * @param client the client we are serving
1288  * @param from_client set to true if this data is from the client
1289  * @param data the data to add to the buffer
1290  * @param size the number of bytes in data to print
1291  * @return number of bytes in dest if success, -1 otherwise
1292  */
bytes_to_output_string(char *dest, size_t destsz, SOCKET client, bool from_client, const char *prefix, const char *data, size_t size)1293 static ssize_t bytes_to_output_string(char *dest, size_t destsz,
1294                                       SOCKET client, bool from_client,
1295                                       const char *prefix,
1296                                       const char *data,
1297                                       size_t size)
1298 {
1299     ssize_t nw = snprintf(dest, destsz, "%c%d %s", from_client ? '>' : '<',
1300                           (int)client, prefix);
1301     ssize_t offset = nw;
1302     ssize_t ii;
1303 
1304     if (nw == -1) {
1305         return -1;
1306     }
1307 
1308     for (ii = 0; ii < size; ++ii) {
1309         if (ii % 4 == 0) {
1310             if ((nw = snprintf(dest + offset, destsz - offset, "\n%c%d  ",
1311                                from_client ? '>' : '<', client)) == -1) {
1312                 return  -1;
1313             }
1314             offset += nw;
1315         }
1316         if ((nw = snprintf(dest + offset, destsz - offset,
1317                            " 0x%02x", (unsigned char)data[ii])) == -1) {
1318             return -1;
1319         }
1320         offset += nw;
1321     }
1322 
1323     if ((nw = snprintf(dest + offset, destsz - offset, "\n")) == -1) {
1324         return -1;
1325     }
1326 
1327     return offset + nw;
1328 }
1329 
add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len, uint8_t datatype)1330 static int add_bin_header(conn *c,
1331                           uint16_t err,
1332                           uint8_t hdr_len,
1333                           uint16_t key_len,
1334                           uint32_t body_len,
1335                           uint8_t datatype) {
1336     protocol_binary_response_header* header;
1337 
1338     cb_assert(c);
1339 
1340     c->msgcurr = 0;
1341     c->msgused = 0;
1342     c->iovused = 0;
1343     if (add_msghdr(c) != 0) {
1344         return -1;
1345     }
1346 
1347     header = (protocol_binary_response_header *)c->wbuf;
1348 
1349     header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1350     header->response.opcode = c->binary_header.request.opcode;
1351     header->response.keylen = (uint16_t)htons(key_len);
1352 
1353     header->response.extlen = (uint8_t)hdr_len;
1354     header->response.datatype = datatype;
1355     header->response.status = (uint16_t)htons(err);
1356 
1357     header->response.bodylen = htonl(body_len);
1358     header->response.opaque = c->opaque;
1359     header->response.cas = htonll(c->cas);
1360 
1361     if (settings.verbose > 1) {
1362         char buffer[1024];
1363         if (bytes_to_output_string(buffer, sizeof(buffer), c->sfd, false,
1364                                    "Writing bin response:",
1365                                    (const char*)header->bytes,
1366                                    sizeof(header->bytes)) != -1) {
1367             settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1368                                             "%s", buffer);
1369         }
1370     }
1371 
1372     return add_iov(c, c->wbuf, sizeof(header->response));
1373 }
1374 
1375 /**
1376  * Convert an error code generated from the storage engine to the corresponding
1377  * error code used by the protocol layer.
1378  * @param e the error code as used in the engine
1379  * @return the error code as used by the protocol layer
1380  */
engine_error_2_protocol_error(ENGINE_ERROR_CODE e)1381 static protocol_binary_response_status engine_error_2_protocol_error(ENGINE_ERROR_CODE e) {
1382     protocol_binary_response_status ret;
1383 
1384     switch (e) {
1385     case ENGINE_SUCCESS:
1386         return PROTOCOL_BINARY_RESPONSE_SUCCESS;
1387     case ENGINE_KEY_ENOENT:
1388         return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1389     case ENGINE_KEY_EEXISTS:
1390         return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1391     case ENGINE_ENOMEM:
1392         return PROTOCOL_BINARY_RESPONSE_ENOMEM;
1393     case ENGINE_TMPFAIL:
1394         return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1395     case ENGINE_NOT_STORED:
1396         return PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1397     case ENGINE_EINVAL:
1398         return PROTOCOL_BINARY_RESPONSE_EINVAL;
1399     case ENGINE_ENOTSUP:
1400         return PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED;
1401     case ENGINE_E2BIG:
1402         return PROTOCOL_BINARY_RESPONSE_E2BIG;
1403     case ENGINE_NOT_MY_VBUCKET:
1404         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1405     case ENGINE_ERANGE:
1406         return PROTOCOL_BINARY_RESPONSE_ERANGE;
1407     case ENGINE_ROLLBACK:
1408         return PROTOCOL_BINARY_RESPONSE_ROLLBACK;
1409     default:
1410         ret = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1411     }
1412 
1413     return ret;
1414 }
1415 
get_vb_map_cb(const void *cookie, const void *map, size_t mapsize)1416 static ENGINE_ERROR_CODE get_vb_map_cb(const void *cookie,
1417                                        const void *map,
1418                                        size_t mapsize)
1419 {
1420     char *buf;
1421     conn *c = (conn*)cookie;
1422     protocol_binary_response_header header;
1423     size_t needed = mapsize+ sizeof(protocol_binary_response_header);
1424     if (!grow_dynamic_buffer(c, needed)) {
1425         if (settings.verbose > 0) {
1426             settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1427                     "<%d ERROR: Failed to allocate memory for response\n",
1428                     c->sfd);
1429         }
1430         return ENGINE_ENOMEM;
1431     }
1432 
1433     buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1434     memset(&header, 0, sizeof(header));
1435 
1436     header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1437     header.response.opcode = c->binary_header.request.opcode;
1438     header.response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET);
1439     header.response.bodylen = htonl((uint32_t)mapsize);
1440     header.response.opaque = c->opaque;
1441 
1442     memcpy(buf, header.bytes, sizeof(header.response));
1443     buf += sizeof(header.response);
1444     memcpy(buf, map, mapsize);
1445     c->dynamic_buffer.offset += needed;
1446 
1447     return ENGINE_SUCCESS;
1448 }
1449 
write_bin_packet(conn *c, protocol_binary_response_status err, int swallow)1450 static void write_bin_packet(conn *c, protocol_binary_response_status err, int swallow) {
1451     if (err == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
1452         ENGINE_ERROR_CODE ret;
1453         cb_assert(swallow == 0);
1454 
1455         ret = settings.engine.v1->get_engine_vb_map(settings.engine.v0, c,
1456                                                     get_vb_map_cb);
1457         if (ret == ENGINE_SUCCESS) {
1458             write_and_free(c, c->dynamic_buffer.buffer,
1459                            c->dynamic_buffer.offset);
1460             c->dynamic_buffer.buffer = NULL;
1461         } else {
1462             conn_set_state(c, conn_closing);
1463         }
1464     } else {
1465         ssize_t len = 0;
1466         const char *errtext = NULL;
1467 
1468         if (err != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1469             errtext = memcached_protocol_errcode_2_text(err);
1470             if (errtext != NULL) {
1471                 len = (ssize_t)strlen(errtext);
1472             }
1473         }
1474 
1475         if (errtext && settings.verbose > 1) {
1476             settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1477                                             ">%d Writing an error: %s\n", c->sfd,
1478                                             errtext);
1479         }
1480 
1481         add_bin_header(c, err, 0, 0, len, PROTOCOL_BINARY_RAW_BYTES);
1482         if (errtext) {
1483             add_iov(c, errtext, len);
1484         }
1485         conn_set_state(c, conn_mwrite);
1486         if (swallow > 0) {
1487             c->sbytes = swallow;
1488             c->write_and_go = conn_swallow;
1489         } else {
1490             c->write_and_go = conn_new_cmd;
1491         }
1492     }
1493 }
1494 
1495 /* Form and send a response to a command over the binary protocol */
write_bin_response(conn *c, const void *d, int hlen, int keylen, int dlen)1496 static void write_bin_response(conn *c, const void *d, int hlen, int keylen, int dlen) {
1497     if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1498         c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1499         if (add_bin_header(c, 0, hlen, keylen, dlen, PROTOCOL_BINARY_RAW_BYTES) == -1) {
1500             conn_set_state(c, conn_closing);
1501             return;
1502         }
1503         add_iov(c, d, dlen);
1504         conn_set_state(c, conn_mwrite);
1505         c->write_and_go = conn_new_cmd;
1506     } else {
1507         if (c->start != 0) {
1508             collect_timing(c->cmd, gethrtime() - c->start);
1509             c->start = 0;
1510         }
1511         conn_set_state(c, conn_new_cmd);
1512     }
1513 }
1514 
complete_update_bin(conn *c)1515 static void complete_update_bin(conn *c) {
1516     protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1517     ENGINE_ERROR_CODE ret;
1518     item *it;
1519     item_info_holder info;
1520 
1521     cb_assert(c != NULL);
1522     it = c->item;
1523     memset(&info, 0, sizeof(info));
1524     info.info.nvalue = 1;
1525     if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
1526                                            (void*)&info)) {
1527         settings.engine.v1->release(settings.engine.v0, c, it);
1528         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1529                                         "%d: Failed to get item info",
1530                                         c->sfd);
1531         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1532         return;
1533     }
1534 
1535     ret = c->aiostat;
1536     c->aiostat = ENGINE_SUCCESS;
1537     if (ret == ENGINE_SUCCESS) {
1538         if (!c->supports_datatype) {
1539             if (checkUTF8JSON((void*)info.info.value[0].iov_base,
1540                               (int)info.info.value[0].iov_len)) {
1541                 info.info.datatype = PROTOCOL_BINARY_DATATYPE_JSON;
1542                 if (!settings.engine.v1->set_item_info(settings.engine.v0, c,
1543                                                        it, &info.info)) {
1544                     settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1545                             "%d: Failed to set item info",
1546                             c->sfd);
1547                 }
1548             }
1549         }
1550         ret = settings.engine.v1->store(settings.engine.v0, c,
1551                                         it, &c->cas, c->store_op,
1552                                         c->binary_header.request.vbucket);
1553     }
1554 
1555 #ifdef ENABLE_DTRACE
1556     switch (c->cmd) {
1557     case OPERATION_ADD:
1558         MEMCACHED_COMMAND_ADD(c->sfd, info.info.key, info.info.nkey,
1559                               (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1560         break;
1561     case OPERATION_REPLACE:
1562         MEMCACHED_COMMAND_REPLACE(c->sfd, info.info.key, info.info.nkey,
1563                                   (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1564         break;
1565     case OPERATION_APPEND:
1566         MEMCACHED_COMMAND_APPEND(c->sfd, info.info.key, info.info.nkey,
1567                                  (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1568         break;
1569     case OPERATION_PREPEND:
1570         MEMCACHED_COMMAND_PREPEND(c->sfd, info.info.key, info.info.nkey,
1571                                   (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1572         break;
1573     case OPERATION_SET:
1574         MEMCACHED_COMMAND_SET(c->sfd, info.info.key, info.info.nkey,
1575                               (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1576         break;
1577     }
1578 #endif
1579 
1580     switch (ret) {
1581     case ENGINE_SUCCESS:
1582         /* Stored */
1583         write_bin_response(c, NULL, 0, 0, 0);
1584         break;
1585     case ENGINE_KEY_EEXISTS:
1586         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1587         break;
1588     case ENGINE_KEY_ENOENT:
1589         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1590         break;
1591     case ENGINE_ENOMEM:
1592         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1593         break;
1594     case ENGINE_TMPFAIL:
1595         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1596         break;
1597     case ENGINE_EWOULDBLOCK:
1598         c->ewouldblock = true;
1599         break;
1600     case ENGINE_DISCONNECT:
1601         c->state = conn_closing;
1602         break;
1603     case ENGINE_ENOTSUP:
1604         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1605         break;
1606     case ENGINE_NOT_MY_VBUCKET:
1607         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1608         break;
1609     case ENGINE_E2BIG:
1610         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, 0);
1611         break;
1612     default:
1613         if (c->store_op == OPERATION_ADD) {
1614             eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1615         } else if(c->store_op == OPERATION_REPLACE) {
1616             eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1617         } else {
1618             eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1619         }
1620         write_bin_packet(c, eno, 0);
1621     }
1622 
1623     if (c->store_op == OPERATION_CAS) {
1624         switch (ret) {
1625         case ENGINE_SUCCESS:
1626             SLAB_INCR(c, cas_hits, info.info.key, info.info.nkey);
1627             break;
1628         case ENGINE_KEY_EEXISTS:
1629             SLAB_INCR(c, cas_badval, info.info.key, info.info.nkey);
1630             break;
1631         case ENGINE_KEY_ENOENT:
1632             STATS_NOKEY(c, cas_misses);
1633             break;
1634         default:
1635             ;
1636         }
1637     } else {
1638         SLAB_INCR(c, cmd_set, info.info.key, info.info.nkey);
1639     }
1640 
1641     if (!c->ewouldblock) {
1642         /* release the c->item reference */
1643         settings.engine.v1->release(settings.engine.v0, c, c->item);
1644         c->item = 0;
1645     }
1646 }
1647 
process_bin_get(conn *c)1648 static void process_bin_get(conn *c) {
1649     item *it;
1650     protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1651     char* key = binary_get_key(c);
1652     size_t nkey = c->binary_header.request.keylen;
1653     uint16_t keylen;
1654     uint32_t bodylen;
1655     item_info_holder info;
1656     int ii;
1657     ENGINE_ERROR_CODE ret;
1658     uint8_t datatype;
1659     bool need_inflate = false;
1660 
1661     memset(&info, 0, sizeof(info));
1662     if (settings.verbose > 1) {
1663         char buffer[1024];
1664         if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1665                                     "GET", key, nkey) != -1) {
1666             settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
1667                                             buffer);
1668         }
1669     }
1670 
1671     ret = c->aiostat;
1672     c->aiostat = ENGINE_SUCCESS;
1673     if (ret == ENGINE_SUCCESS) {
1674         ret = settings.engine.v1->get(settings.engine.v0, c, &it, key, (int)nkey,
1675                                       c->binary_header.request.vbucket);
1676     }
1677 
1678     info.info.nvalue = IOV_MAX;
1679     switch (ret) {
1680     case ENGINE_SUCCESS:
1681         STATS_HIT(c, get, key, nkey);
1682 
1683         if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
1684                                                (void*)&info)) {
1685             settings.engine.v1->release(settings.engine.v0, c, it);
1686             settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1687                                             "%d: Failed to get item info",
1688                                             c->sfd);
1689             write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1690             break;
1691         }
1692 
1693         datatype = info.info.datatype;
1694         if (!c->supports_datatype) {
1695             if ((datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) == PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
1696                 need_inflate = true;
1697             } else {
1698                 datatype = PROTOCOL_BINARY_RAW_BYTES;
1699             }
1700         }
1701 
1702         keylen = 0;
1703         bodylen = sizeof(rsp->message.body) + info.info.nbytes;
1704 
1705         if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1706             bodylen += (uint32_t)nkey;
1707             keylen = (uint16_t)nkey;
1708         }
1709 
1710         if (need_inflate) {
1711             if (info.info.nvalue != 1) {
1712                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1713             } else if (binary_response_handler(key, keylen,
1714                                                &info.info.flags, 4,
1715                                                info.info.value[0].iov_base,
1716                                                (uint32_t)info.info.value[0].iov_len,
1717                                                datatype,
1718                                                PROTOCOL_BINARY_RESPONSE_SUCCESS,
1719                                                info.info.cas, c)) {
1720                 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
1721                 c->dynamic_buffer.buffer = NULL;
1722                 settings.engine.v1->release(settings.engine.v0, c, it);
1723             } else {
1724                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1725             }
1726         } else {
1727             if (add_bin_header(c, 0, sizeof(rsp->message.body),
1728                                keylen, bodylen, datatype) == -1) {
1729                 conn_set_state(c, conn_closing);
1730                 return;
1731             }
1732             rsp->message.header.response.cas = htonll(info.info.cas);
1733 
1734             /* add the flags */
1735             rsp->message.body.flags = info.info.flags;
1736             add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1737 
1738             if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1739                 add_iov(c, info.info.key, nkey);
1740             }
1741 
1742             for (ii = 0; ii < info.info.nvalue; ++ii) {
1743                 add_iov(c, info.info.value[ii].iov_base,
1744                         info.info.value[ii].iov_len);
1745             }
1746             conn_set_state(c, conn_mwrite);
1747             /* Remember this item so we can garbage collect it later */
1748             c->item = it;
1749         }
1750         break;
1751     case ENGINE_KEY_ENOENT:
1752         STATS_MISS(c, get, key, nkey);
1753 
1754         MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1755 
1756         if (c->noreply) {
1757             conn_set_state(c, conn_new_cmd);
1758         } else {
1759             if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1760                 char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1761                 if (add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1762                                    0, (uint16_t)nkey,
1763                                    (uint32_t)nkey, PROTOCOL_BINARY_RAW_BYTES) == -1) {
1764                     conn_set_state(c, conn_closing);
1765                     return;
1766                 }
1767                 memcpy(ofs, key, nkey);
1768                 add_iov(c, ofs, nkey);
1769                 conn_set_state(c, conn_mwrite);
1770             } else {
1771                 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1772             }
1773         }
1774         break;
1775     case ENGINE_EWOULDBLOCK:
1776         c->ewouldblock = true;
1777         break;
1778     case ENGINE_DISCONNECT:
1779         c->state = conn_closing;
1780         break;
1781     default:
1782         write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
1783     }
1784 
1785     if (settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) {
1786         stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS);
1787     }
1788 }
1789 
append_bin_stats(const char *key, const uint16_t klen, const char *val, const uint32_t vlen, conn *c)1790 static void append_bin_stats(const char *key, const uint16_t klen,
1791                              const char *val, const uint32_t vlen,
1792                              conn *c) {
1793     char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1794     uint32_t bodylen = klen + vlen;
1795     protocol_binary_response_header header;
1796 
1797     memset(&header, 0, sizeof(header));
1798     header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1799     header.response.opcode = PROTOCOL_BINARY_CMD_STAT;
1800     header.response.keylen = (uint16_t)htons(klen);
1801     header.response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
1802     header.response.bodylen = htonl(bodylen);
1803     header.response.opaque = c->opaque;
1804 
1805     memcpy(buf, header.bytes, sizeof(header.response));
1806     buf += sizeof(header.response);
1807 
1808     if (klen > 0) {
1809         cb_assert(key != NULL);
1810         memcpy(buf, key, klen);
1811         buf += klen;
1812 
1813         if (vlen > 0) {
1814             memcpy(buf, val, vlen);
1815         }
1816     }
1817 
1818     c->dynamic_buffer.offset += sizeof(header.response) + bodylen;
1819 }
1820 
grow_dynamic_buffer(conn *c, size_t needed)1821 static bool grow_dynamic_buffer(conn *c, size_t needed) {
1822     size_t nsize = c->dynamic_buffer.size;
1823     size_t available = nsize - c->dynamic_buffer.offset;
1824     bool rv = true;
1825 
1826     /* Special case: No buffer -- need to allocate fresh */
1827     if (c->dynamic_buffer.buffer == NULL) {
1828         nsize = 1024;
1829         available = c->dynamic_buffer.size = c->dynamic_buffer.offset = 0;
1830     }
1831 
1832     while (needed > available) {
1833         cb_assert(nsize > 0);
1834         nsize = nsize << 1;
1835         available = nsize - c->dynamic_buffer.offset;
1836     }
1837 
1838     if (nsize != c->dynamic_buffer.size) {
1839         char *ptr = realloc(c->dynamic_buffer.buffer, nsize);
1840         if (ptr) {
1841             c->dynamic_buffer.buffer = ptr;
1842             c->dynamic_buffer.size = nsize;
1843         } else {
1844             rv = false;
1845         }
1846     }
1847 
1848     return rv;
1849 }
1850 
append_stats(const char *key, const uint16_t klen, const char *val, const uint32_t vlen, const void *cookie)1851 static void append_stats(const char *key, const uint16_t klen,
1852                          const char *val, const uint32_t vlen,
1853                          const void *cookie)
1854 {
1855     size_t needed;
1856     conn *c = (conn*)cookie;
1857     /* value without a key is invalid */
1858     if (klen == 0 && vlen > 0) {
1859         return ;
1860     }
1861 
1862     needed = vlen + klen + sizeof(protocol_binary_response_header);
1863     if (!grow_dynamic_buffer(c, needed)) {
1864         return ;
1865     }
1866     append_bin_stats(key, klen, val, vlen, c);
1867     cb_assert(c->dynamic_buffer.offset <= c->dynamic_buffer.size);
1868 }
1869 
bin_read_chunk(conn *c, enum bin_substates next_substate, uint32_t chunk)1870 static void bin_read_chunk(conn *c,
1871                            enum bin_substates next_substate,
1872                            uint32_t chunk) {
1873     ptrdiff_t offset;
1874     cb_assert(c);
1875     c->substate = next_substate;
1876     c->rlbytes = chunk;
1877 
1878     /* Ok... do we have room for everything in our buffer? */
1879     offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1880     if (c->rlbytes > c->rsize - offset) {
1881         size_t nsize = c->rsize;
1882         size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
1883 
1884         while (size > nsize) {
1885             nsize *= 2;
1886         }
1887 
1888         if (nsize != c->rsize) {
1889             char *newm;
1890             if (settings.verbose > 1) {
1891                 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1892                         "%d: Need to grow buffer from %lu to %lu\n",
1893                         c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1894             }
1895             newm = realloc(c->rbuf, nsize);
1896             if (newm == NULL) {
1897                 if (settings.verbose) {
1898                     settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1899                             "%d: Failed to grow buffer.. closing connection\n",
1900                             c->sfd);
1901                 }
1902                 conn_set_state(c, conn_closing);
1903                 return;
1904             }
1905 
1906             c->rbuf= newm;
1907             /* rcurr should point to the same offset in the packet */
1908             c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1909             c->rsize = (int)nsize;
1910         }
1911         if (c->rbuf != c->rcurr) {
1912             memmove(c->rbuf, c->rcurr, c->rbytes);
1913             c->rcurr = c->rbuf;
1914             if (settings.verbose > 1) {
1915                 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1916                                                 "%d: Repack input buffer\n",
1917                                                 c->sfd);
1918             }
1919         }
1920     }
1921 
1922     /* preserve the header in the buffer.. */
1923     c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1924     conn_set_state(c, conn_nread);
1925 }
1926 
bin_read_key(conn *c, enum bin_substates next_substate, int extra)1927 static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1928     bin_read_chunk(c, next_substate, c->keylen + extra);
1929 }
1930 
1931 
1932 /* Just write an error message and disconnect the client */
handle_binary_protocol_error(conn *c)1933 static void handle_binary_protocol_error(conn *c) {
1934     write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1935     if (settings.verbose) {
1936         settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1937                 "%d: Protocol error (opcode %02x), close connection\n",
1938                 c->sfd, c->binary_header.request.opcode);
1939     }
1940     c->write_and_go = conn_closing;
1941 }
1942 
get_auth_data(const void *cookie, auth_data_t *data)1943 static void get_auth_data(const void *cookie, auth_data_t *data) {
1944     conn *c = (conn*)cookie;
1945     if (c->sasl_conn) {
1946         cbsasl_getprop(c->sasl_conn, CBSASL_USERNAME, (void*)&data->username);
1947         cbsasl_getprop(c->sasl_conn, CBSASL_CONFIG, (void*)&data->config);
1948     } else {
1949         data->username = NULL;
1950         data->config = NULL;
1951     }
1952 }
1953 
1954 struct sasl_tmp {
1955     int ksize;
1956     int vsize;
1957     char data[1]; /* data + ksize == value */
1958 };
1959 
process_bin_sasl_auth(conn *c)1960 static void process_bin_sasl_auth(conn *c) {
1961     int nkey;
1962     int vlen;
1963     char *key;
1964     size_t buffer_size;
1965     struct sasl_tmp *data;
1966 
1967     cb_assert(c->binary_header.request.extlen == 0);
1968     nkey = c->binary_header.request.keylen;
1969     vlen = c->binary_header.request.bodylen - nkey;
1970 
1971     if (nkey > MAX_SASL_MECH_LEN) {
1972         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
1973         c->write_and_go = conn_swallow;
1974         return;
1975     }
1976 
1977     key = binary_get_key(c);
1978     cb_assert(key);
1979 
1980     buffer_size = sizeof(struct sasl_tmp) + nkey + vlen + 2;
1981     data = calloc(sizeof(struct sasl_tmp) + buffer_size, 1);
1982     if (!data) {
1983         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1984         c->write_and_go = conn_swallow;
1985         return;
1986     }
1987 
1988     data->ksize = nkey;
1989     data->vsize = vlen;
1990     memcpy(data->data, key, nkey);
1991 
1992     c->item = data;
1993     c->ritem = data->data + nkey;
1994     c->rlbytes = vlen;
1995     conn_set_state(c, conn_nread);
1996     c->substate = bin_reading_sasl_auth_data;
1997 }
1998 
process_bin_complete_sasl_auth(conn *c)1999 static void process_bin_complete_sasl_auth(conn *c) {
2000     auth_data_t data;
2001     const char *out = NULL;
2002     unsigned int outlen = 0;
2003     int nkey;
2004     int vlen;
2005     struct sasl_tmp *stmp;
2006     char mech[1024];
2007     const char *challenge;
2008     int result=-1;
2009 
2010     cb_assert(c->item);
2011 
2012     nkey = c->binary_header.request.keylen;
2013     if (nkey > 1023) {
2014         /* too big.. */
2015         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2016                 "%d: sasl error. key: %d > 1023", c->sfd, nkey);
2017         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2018         return;
2019     }
2020     vlen = c->binary_header.request.bodylen - nkey;
2021 
2022     stmp = c->item;
2023     memcpy(mech, stmp->data, nkey);
2024     mech[nkey] = 0x00;
2025 
2026     if (settings.verbose) {
2027         settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2028                 "%d: mech: ``%s'' with %d bytes of data\n", c->sfd, mech, vlen);
2029     }
2030 
2031     challenge = vlen == 0 ? NULL : (stmp->data + nkey);
2032     switch (c->cmd) {
2033     case PROTOCOL_BINARY_CMD_SASL_AUTH:
2034         result = cbsasl_server_start(&c->sasl_conn, mech,
2035                                      challenge, vlen,
2036                                      (unsigned char **)&out, &outlen);
2037         break;
2038     case PROTOCOL_BINARY_CMD_SASL_STEP:
2039         result = cbsasl_server_step(c->sasl_conn, challenge,
2040                                     vlen, &out, &outlen);
2041         break;
2042     default:
2043         cb_assert(false); /* CMD should be one of the above */
2044         /* This code is pretty much impossible, but makes the compiler
2045            happier */
2046         if (settings.verbose) {
2047             settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2048                     "%d: Unhandled command %d with challenge %s\n",
2049                     c->sfd, c->cmd, challenge);
2050         }
2051         break;
2052     }
2053 
2054     free(c->item);
2055     c->item = NULL;
2056     c->ritem = NULL;
2057 
2058     if (settings.verbose) {
2059         settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2060                                         "%d: sasl result code:  %d\n",
2061                                         c->sfd, result);
2062     }
2063 
2064     switch(result) {
2065     case SASL_OK:
2066         write_bin_response(c, "Authenticated", 0, 0, (uint32_t)strlen("Authenticated"));
2067         get_auth_data(c, &data);
2068         if (settings.disable_admin) {
2069             /* "everyone is admins" */
2070             cookie_set_admin(c);
2071         } else if (settings.admin != NULL && data.username != NULL) {
2072             if (strcmp(settings.admin, data.username) == 0) {
2073                 cookie_set_admin(c);
2074             }
2075         }
2076         perform_callbacks(ON_AUTH, (const void*)&data, c);
2077         STATS_NOKEY(c, auth_cmds);
2078         break;
2079     case SASL_CONTINUE:
2080         if (add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0,
2081                            outlen, PROTOCOL_BINARY_RAW_BYTES) == -1) {
2082             conn_set_state(c, conn_closing);
2083             return;
2084         }
2085         add_iov(c, out, outlen);
2086         conn_set_state(c, conn_mwrite);
2087         c->write_and_go = conn_new_cmd;
2088         break;
2089     case SASL_BADPARAM:
2090         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2091                                         "%d: Bad sasl params: %d\n",
2092                                         c->sfd, result);
2093         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2094         STATS_NOKEY2(c, auth_cmds, auth_errors);
2095         break;
2096     default:
2097         if (result == SASL_NOUSER || result == SASL_PWERR) {
2098             settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2099                                             "%d: Invalid username/password combination",
2100                                             c->sfd);
2101         } else {
2102             settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2103                                             "%d: Unknown sasl response: %d",
2104                                             c->sfd, result);
2105         }
2106         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2107         STATS_NOKEY2(c, auth_cmds, auth_errors);
2108     }
2109 }
2110 
authenticated(conn *c)2111 static bool authenticated(conn *c) {
2112     bool rv = false;
2113 
2114     switch (c->cmd) {
2115     case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
2116     case PROTOCOL_BINARY_CMD_SASL_AUTH:       /* FALLTHROUGH */
2117     case PROTOCOL_BINARY_CMD_SASL_STEP:       /* FALLTHROUGH */
2118     case PROTOCOL_BINARY_CMD_VERSION:         /* FALLTHROUGH */
2119     case PROTOCOL_BINARY_CMD_HELLO:
2120         rv = true;
2121         break;
2122     default:
2123         if (c->sasl_conn) {
2124             const void *uname = NULL;
2125             cbsasl_getprop(c->sasl_conn, CBSASL_USERNAME, &uname);
2126             rv = uname != NULL;
2127         }
2128     }
2129 
2130     if (settings.verbose > 1) {
2131         settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2132                 "%d: authenticated() in cmd 0x%02x is %s\n",
2133                 c->sfd, c->cmd, rv ? "true" : "false");
2134     }
2135 
2136     return rv;
2137 }
2138 
binary_response_handler(const void *key, uint16_t keylen, const void *ext, uint8_t extlen, const void *body, uint32_t bodylen, uint8_t datatype, uint16_t status, uint64_t cas, const void *cookie)2139 bool binary_response_handler(const void *key, uint16_t keylen,
2140                              const void *ext, uint8_t extlen,
2141                              const void *body, uint32_t bodylen,
2142                              uint8_t datatype, uint16_t status,
2143                              uint64_t cas, const void *cookie)
2144 {
2145     protocol_binary_response_header header;
2146     char *buf;
2147     conn *c = (conn*)cookie;
2148     /* Look at append_bin_stats */
2149     size_t needed;
2150     bool need_inflate = false;
2151     size_t inflated_length;
2152 
2153     if (!c->supports_datatype) {
2154         if ((datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) == PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
2155             need_inflate = true;
2156         }
2157         /* We may silently drop the knowledge about a JSON item */
2158         datatype = PROTOCOL_BINARY_RAW_BYTES;
2159     }
2160 
2161     needed = keylen + extlen + sizeof(protocol_binary_response_header);
2162     if (need_inflate) {
2163         if (snappy_uncompressed_length(body, bodylen,
2164                                        &inflated_length) != SNAPPY_OK) {
2165             settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2166                     "<%d ERROR: Failed to determine inflated size",
2167                     c->sfd);
2168             return false;
2169         }
2170         needed += inflated_length;
2171     } else {
2172         needed += bodylen;
2173     }
2174 
2175     if (!grow_dynamic_buffer(c, needed)) {
2176         if (settings.verbose > 0) {
2177             settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2178                     "<%d ERROR: Failed to allocate memory for response",
2179                     c->sfd);
2180         }
2181         return false;
2182     }
2183 
2184     buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
2185     memset(&header, 0, sizeof(header));
2186     header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
2187     header.response.opcode = c->binary_header.request.opcode;
2188     header.response.keylen = (uint16_t)htons(keylen);
2189     header.response.extlen = extlen;
2190     header.response.datatype = datatype;
2191     header.response.status = (uint16_t)htons(status);
2192     if (need_inflate) {
2193         header.response.bodylen = htonl((uint32_t)(inflated_length + keylen + extlen));
2194     } else {
2195         header.response.bodylen = htonl(bodylen + keylen + extlen);
2196     }
2197     header.response.opaque = c->opaque;
2198     header.response.cas = htonll(cas);
2199 
2200     memcpy(buf, header.bytes, sizeof(header.response));
2201     buf += sizeof(header.response);
2202 
2203     if (extlen > 0) {
2204         memcpy(buf, ext, extlen);
2205         buf += extlen;
2206     }
2207 
2208     if (keylen > 0) {
2209         cb_assert(key != NULL);
2210         memcpy(buf, key, keylen);
2211         buf += keylen;
2212     }
2213 
2214     if (bodylen > 0) {
2215         if (need_inflate) {
2216             if (snappy_uncompress(body, bodylen, buf, &inflated_length) != SNAPPY_OK) {
2217                 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2218                         "<%d ERROR: Failed to inflate item", c->sfd);
2219                 return false;
2220             }
2221         } else {
2222             memcpy(buf, body, bodylen);
2223         }
2224     }
2225 
2226     c->dynamic_buffer.offset += needed;
2227     return true;
2228 }
2229 
2230 /**
2231  * Tap stats (these are only used by the tap thread, so they don't need
2232  * to be in the threadlocal struct right now...
2233  */
2234 struct tap_cmd_stats {
2235     uint64_t connect;
2236     uint64_t mutation;
2237     uint64_t checkpoint_start;
2238     uint64_t checkpoint_end;
2239     uint64_t delete;
2240     uint64_t flush;
2241     uint64_t opaque;
2242     uint64_t vbucket_set;
2243 };
2244 
2245 struct tap_stats {
2246     cb_mutex_t mutex;
2247     struct tap_cmd_stats sent;
2248     struct tap_cmd_stats received;
2249 } tap_stats;
2250 
ship_tap_log(conn *c)2251 static void ship_tap_log(conn *c) {
2252     bool more_data = true;
2253     bool send_data = false;
2254     bool disconnect = false;
2255     item *it;
2256     uint32_t bodylen;
2257     int ii = 0;
2258 
2259     c->msgcurr = 0;
2260     c->msgused = 0;
2261     c->iovused = 0;
2262     if (add_msghdr(c) != 0) {
2263         if (settings.verbose) {
2264             settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2265                                             "%d: Failed to create output headers. Shutting down tap connection\n", c->sfd);
2266         }
2267         conn_set_state(c, conn_closing);
2268         return ;
2269     }
2270     /* @todo add check for buffer overflow of c->wbuf) */
2271     c->wbytes = 0;
2272     c->wcurr = c->wbuf;
2273     c->icurr = c->ilist;
2274     do {
2275         /* @todo fixme! */
2276         void *engine;
2277         uint16_t nengine;
2278         uint8_t ttl;
2279         uint16_t tap_flags;
2280         uint32_t seqno;
2281         uint16_t vbucket;
2282         tap_event_t event;
2283         bool inflate = false;
2284         size_t inflated_length = 0;
2285 
2286         union {
2287             protocol_binary_request_tap_mutation mutation;
2288             protocol_binary_request_tap_delete delete;
2289             protocol_binary_request_tap_flush flush;
2290             protocol_binary_request_tap_opaque opaque;
2291             protocol_binary_request_noop noop;
2292         } msg;
2293         item_info_holder info;
2294         memset(&info, 0, sizeof(info));
2295 
2296         if (ii++ == 10) {
2297             break;
2298         }
2299 
2300         event = c->tap_iterator(settings.engine.v0, c, &it,
2301                                             &engine, &nengine, &ttl,
2302                                             &tap_flags, &seqno, &vbucket);
2303         memset(&msg, 0, sizeof(msg));
2304         msg.opaque.message.header.request.magic = (uint8_t)PROTOCOL_BINARY_REQ;
2305         msg.opaque.message.header.request.opaque = htonl(seqno);
2306         msg.opaque.message.body.tap.enginespecific_length = htons(nengine);
2307         msg.opaque.message.body.tap.ttl = ttl;
2308         msg.opaque.message.body.tap.flags = htons(tap_flags);
2309         msg.opaque.message.header.request.extlen = 8;
2310         msg.opaque.message.header.request.vbucket = htons(vbucket);
2311         info.info.nvalue = IOV_MAX;
2312 
2313         switch (event) {
2314         case TAP_NOOP :
2315             send_data = true;
2316             msg.noop.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
2317             msg.noop.message.header.request.extlen = 0;
2318             msg.noop.message.header.request.bodylen = htonl(0);
2319             memcpy(c->wcurr, msg.noop.bytes, sizeof(msg.noop.bytes));
2320             add_iov(c, c->wcurr, sizeof(msg.noop.bytes));
2321             c->wcurr += sizeof(msg.noop.bytes);
2322             c->wbytes += sizeof(msg.noop.bytes);
2323             break;
2324         case TAP_PAUSE :
2325             more_data = false;
2326             break;
2327         case TAP_CHECKPOINT_START:
2328         case TAP_CHECKPOINT_END:
2329         case TAP_MUTATION:
2330             if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
2331                                                    (void*)&info)) {
2332                 settings.engine.v1->release(settings.engine.v0, c, it);
2333                 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2334                                                 "%d: Failed to get item info\n", c->sfd);
2335                 break;
2336             }
2337             send_data = true;
2338             c->ilist[c->ileft++] = it;
2339 
2340             if (event == TAP_CHECKPOINT_START) {
2341                 msg.mutation.message.header.request.opcode =
2342                     PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START;
2343                 cb_mutex_enter(&tap_stats.mutex);
2344                 tap_stats.sent.checkpoint_start++;
2345                 cb_mutex_exit(&tap_stats.mutex);
2346             } else if (event == TAP_CHECKPOINT_END) {
2347                 msg.mutation.message.header.request.opcode =
2348                     PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END;
2349                 cb_mutex_enter(&tap_stats.mutex);
2350                 tap_stats.sent.checkpoint_end++;
2351                 cb_mutex_exit(&tap_stats.mutex);
2352             } else if (event == TAP_MUTATION) {
2353                 msg.mutation.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_MUTATION;
2354                 cb_mutex_enter(&tap_stats.mutex);
2355                 tap_stats.sent.mutation++;
2356                 cb_mutex_exit(&tap_stats.mutex);
2357             }
2358 
2359             msg.mutation.message.header.request.cas = htonll(info.info.cas);
2360             msg.mutation.message.header.request.keylen = htons(info.info.nkey);
2361             msg.mutation.message.header.request.extlen = 16;
2362             if (c->supports_datatype) {
2363                 msg.mutation.message.header.request.datatype = info.info.datatype;
2364             } else {
2365                 switch (info.info.datatype) {
2366                 case 0:
2367                     break;
2368                 case PROTOCOL_BINARY_DATATYPE_JSON:
2369                     break;
2370                 case PROTOCOL_BINARY_DATATYPE_COMPRESSED:
2371                 case PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON:
2372                     inflate = true;
2373                     break;
2374                 default:
2375                     settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2376                                                     "%d: shipping data with"
2377                                                     " an invalid datatype "
2378                                                     "(stripping info)",
2379                                                     c->sfd);
2380                 }
2381                 msg.mutation.message.header.request.datatype = 0;
2382             }
2383 
2384             bodylen = 16 + info.info.nkey + nengine;
2385             if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2386                 if (inflate) {
2387                     if (snappy_uncompressed_length(info.info.value[0].iov_base,
2388                                                    info.info.nbytes,
2389                                                    &inflated_length) == SNAPPY_OK) {
2390                         bodylen += inflated_length;
2391                     } else {
2392                         settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2393                                                         "<%d ERROR: Failed to determine inflated size. Sending as compressed",
2394                                                         c->sfd);
2395                         inflate = false;
2396                         bodylen += info.info.nbytes;
2397                     }
2398                 } else {
2399                     bodylen += info.info.nbytes;
2400                 }
2401             }
2402             msg.mutation.message.header.request.bodylen = htonl(bodylen);
2403 
2404             if ((tap_flags & TAP_FLAG_NETWORK_BYTE_ORDER) == 0) {
2405                 msg.mutation.message.body.item.flags = htonl(info.info.flags);
2406             } else {
2407                 msg.mutation.message.body.item.flags = info.info.flags;
2408             }
2409             msg.mutation.message.body.item.expiration = htonl(info.info.exptime);
2410             msg.mutation.message.body.tap.enginespecific_length = htons(nengine);
2411             msg.mutation.message.body.tap.ttl = ttl;
2412             msg.mutation.message.body.tap.flags = htons(tap_flags);
2413             memcpy(c->wcurr, msg.mutation.bytes, sizeof(msg.mutation.bytes));
2414 
2415             add_iov(c, c->wcurr, sizeof(msg.mutation.bytes));
2416             c->wcurr += sizeof(msg.mutation.bytes);
2417             c->wbytes += sizeof(msg.mutation.bytes);
2418 
2419             if (nengine > 0) {
2420                 memcpy(c->wcurr, engine, nengine);
2421                 add_iov(c, c->wcurr, nengine);
2422                 c->wcurr += nengine;
2423                 c->wbytes += nengine;
2424             }
2425 
2426             add_iov(c, info.info.key, info.info.nkey);
2427             if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2428                 if (inflate) {
2429                     void *buf = malloc(inflated_length);
2430                     void *body = info.info.value[0].iov_base;
2431                     size_t bodylen = info.info.value[0].iov_len;
2432                     if (snappy_uncompress(body, bodylen,
2433                                           buf, &inflated_length) == SNAPPY_OK) {
2434                         c->temp_alloc_list[c->temp_alloc_left++] = buf;
2435 
2436                         add_iov(c, buf, inflated_length);
2437                     } else {
2438                         free(buf);
2439                         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2440                                                         "%d: FATAL: failed to inflate object. shutitng down connection", c->sfd);
2441                         conn_set_state(c, conn_closing);
2442                         return;
2443                     }
2444                 } else {
2445                     int xx;
2446                     for (xx = 0; xx < info.info.nvalue; ++xx) {
2447                         add_iov(c, info.info.value[xx].iov_base,
2448                                 info.info.value[xx].iov_len);
2449                     }
2450                 }
2451             }
2452 
2453             break;
2454         case TAP_DELETION:
2455             /* This is a delete */
2456             if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
2457                                                    (void*)&info)) {
2458                 settings.engine.v1->release(settings.engine.v0, c, it);
2459                 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2460                                                 "%d: Failed to get item info\n", c->sfd);
2461                 break;
2462             }
2463             send_data = true;
2464             c->ilist[c->ileft++] = it;
2465             msg.delete.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_DELETE;
2466             msg.delete.message.header.request.cas = htonll(info.info.cas);
2467             msg.delete.message.header.request.keylen = htons(info.info.nkey);
2468 
2469             bodylen = 8 + info.info.nkey + nengine;
2470             if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2471                 bodylen += info.info.nbytes;
2472             }
2473             msg.delete.message.header.request.bodylen = htonl(bodylen);
2474 
2475             memcpy(c->wcurr, msg.delete.bytes, sizeof(msg.delete.bytes));
2476             add_iov(c, c->wcurr, sizeof(msg.delete.bytes));
2477             c->wcurr += sizeof(msg.delete.bytes);
2478             c->wbytes += sizeof(msg.delete.bytes);
2479 
2480             if (nengine > 0) {
2481                 memcpy(c->wcurr, engine, nengine);
2482                 add_iov(c, c->wcurr, nengine);
2483                 c->wcurr += nengine;
2484                 c->wbytes += nengine;
2485             }
2486 
2487             add_iov(c, info.info.key, info.info.nkey);
2488             if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2489                 int xx;
2490                 for (xx = 0; xx < info.info.nvalue; ++xx) {
2491                     add_iov(c, info.info.value[xx].iov_base,
2492                             info.info.value[xx].iov_len);
2493                 }
2494             }
2495 
2496             cb_mutex_enter(&tap_stats.mutex);
2497             tap_stats.sent.delete++;
2498             cb_mutex_exit(&tap_stats.mutex);
2499             break;
2500 
2501         case TAP_DISCONNECT:
2502             disconnect = true;
2503             more_data = false;
2504             break;
2505         case TAP_VBUCKET_SET:
2506         case TAP_FLUSH:
2507         case TAP_OPAQUE:
2508             send_data = true;
2509 
2510             if (event == TAP_OPAQUE) {
2511                 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_OPAQUE;
2512                 cb_mutex_enter(&tap_stats.mutex);
2513                 tap_stats.sent.opaque++;
2514                 cb_mutex_exit(&tap_stats.mutex);
2515 
2516             } else if (event == TAP_FLUSH) {
2517                 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_FLUSH;
2518                 cb_mutex_enter(&tap_stats.mutex);
2519                 tap_stats.sent.flush++;
2520                 cb_mutex_exit(&tap_stats.mutex);
2521             } else if (event == TAP_VBUCKET_SET) {
2522                 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET;
2523                 msg.flush.message.body.tap.flags = htons(tap_flags);
2524                 cb_mutex_enter(&tap_stats.mutex);
2525                 tap_stats.sent.vbucket_set++;
2526                 cb_mutex_exit(&tap_stats.mutex);
2527             }
2528 
2529             msg.flush.message.header.request.bodylen = htonl(8 + nengine);
2530             memcpy(c->wcurr, msg.flush.bytes, sizeof(msg.flush.bytes));
2531             add_iov(c, c->wcurr, sizeof(msg.flush.bytes));
2532             c->wcurr += sizeof(msg.flush.bytes);
2533             c->wbytes += sizeof(msg.flush.bytes);
2534             if (nengine > 0) {
2535                 memcpy(c->wcurr, engine, nengine);
2536                 add_iov(c, c->wcurr, nengine);
2537                 c->wcurr += nengine;
2538                 c->wbytes += nengine;
2539             }
2540             break;
2541         default:
2542             abort();
2543         }
2544     } while (more_data);
2545 
2546     c->ewouldblock = false;
2547     if (send_data) {
2548         conn_set_state(c, conn_mwrite);
2549         if (disconnect) {
2550             c->write_and_go = conn_closing;
2551         } else {
2552             c->write_and_go = conn_ship_log;
2553         }
2554     } else {
2555         if (disconnect) {
2556             conn_set_state(c, conn_closing);
2557         } else {
2558             /* No more items to ship to the slave at this time.. suspend.. */
2559             if (settings.verbose > 1) {
2560                 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2561                                                 "%d: No more items in tap log.. waiting\n",
2562                                                 c->sfd);
2563             }
2564             c->ewouldblock = true;
2565         }
2566     }
2567 }
2568 
default_unknown_command(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor, ENGINE_HANDLE* handle, const void* cookie, protocol_binary_request_header *request, ADD_RESPONSE response)2569 static ENGINE_ERROR_CODE default_unknown_command(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2570                                                  ENGINE_HANDLE* handle,
2571                                                  const void* cookie,
2572                                                  protocol_binary_request_header *request,
2573                                                  ADD_RESPONSE response)
2574 {
2575     const conn *c = (void*)cookie;
2576 
2577     if (!c->supports_datatype && request->request.datatype != PROTOCOL_BINARY_RAW_BYTES) {
2578         if (response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
2579                      PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie)) {
2580             return ENGINE_SUCCESS;
2581         } else {
2582             return ENGINE_DISCONNECT;
2583         }
2584     } else {
2585         return settings.engine.v1->unknown_command(handle, cookie,
2586                                                    request, response);
2587     }
2588 }
2589 
2590 struct request_lookup {
2591     EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor;
2592     BINARY_COMMAND_CALLBACK callback;
2593 };
2594 
2595 static struct request_lookup request_handlers[0x100];
2596 
2597 typedef void (*RESPONSE_HANDLER)(conn*);
2598 /**
2599  * A map between the response packets op-code and the function to handle
2600  * the response message.
2601  */
2602 static RESPONSE_HANDLER response_handlers[0x100];
2603 
setup_binary_lookup_cmd(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor, uint8_t cmd, BINARY_COMMAND_CALLBACK new_handler)2604 static void setup_binary_lookup_cmd(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2605                                     uint8_t cmd,
2606                                     BINARY_COMMAND_CALLBACK new_handler) {
2607     request_handlers[cmd].descriptor = descriptor;
2608     request_handlers[cmd].callback = new_handler;
2609 }
2610 
process_bin_unknown_packet(conn *c)2611 static void process_bin_unknown_packet(conn *c) {
2612     void *packet = c->rcurr - (c->binary_header.request.bodylen +
2613                                sizeof(c->binary_header));
2614     ENGINE_ERROR_CODE ret = c->aiostat;
2615     c->aiostat = ENGINE_SUCCESS;
2616     c->ewouldblock = false;
2617 
2618     if (ret == ENGINE_SUCCESS) {
2619         struct request_lookup *rq = request_handlers + c->binary_header.request.opcode;
2620         ret = rq->callback(rq->descriptor, settings.engine.v0, c, packet,
2621                            binary_response_handler);
2622     }
2623 
2624     switch (ret) {
2625     case ENGINE_SUCCESS:
2626         if (c->dynamic_buffer.buffer != NULL) {
2627             write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
2628             c->dynamic_buffer.buffer = NULL;
2629         } else {
2630             conn_set_state(c, conn_new_cmd);
2631         }
2632         break;
2633     case ENGINE_EWOULDBLOCK:
2634         c->ewouldblock = true;
2635         break;
2636     case ENGINE_DISCONNECT:
2637         conn_set_state(c, conn_closing);
2638         break;
2639     default:
2640         /* Release the dynamic buffer.. it may be partial.. */
2641         free(c->dynamic_buffer.buffer);
2642         c->dynamic_buffer.buffer = NULL;
2643         write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2644     }
2645 }
2646 
cbsasl_refresh_main(void *c)2647 static void cbsasl_refresh_main(void *c)
2648 {
2649     int rv = cbsasl_server_refresh();
2650     if (rv == SASL_OK) {
2651         notify_io_complete(c, ENGINE_SUCCESS);
2652     } else {
2653         notify_io_complete(c, ENGINE_EINVAL);
2654     }
2655 }
2656 
refresh_cbsasl(conn *c)2657 static ENGINE_ERROR_CODE refresh_cbsasl(conn *c)
2658 {
2659     cb_thread_t tid;
2660     int err;
2661 
2662     err = cb_create_thread(&tid, cbsasl_refresh_main, c, 1);
2663     if (err != 0) {
2664         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2665                                         "Failed to create cbsasl db "
2666                                         "update thread: %s",
2667                                         strerror(err));
2668         return ENGINE_DISCONNECT;
2669     }
2670 
2671     return ENGINE_EWOULDBLOCK;
2672 }
2673 
2674 #if 0
2675 static void ssl_certs_refresh_main(void *c)
2676 {
2677     /* Update the internal certificates */
2678 
2679     notify_io_complete(c, ENGINE_SUCCESS);
2680 }
2681 #endif
refresh_ssl_certs(conn *c)2682 static ENGINE_ERROR_CODE refresh_ssl_certs(conn *c)
2683 {
2684 #if 0
2685     cb_thread_t tid;
2686     int err;
2687 
2688     err = cb_create_thread(&tid, ssl_certs_refresh_main, c, 1);
2689     if (err != 0) {
2690         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2691                                         "Failed to create ssl_certificate "
2692                                         "update thread: %s",
2693                                         strerror(err));
2694         return ENGINE_DISCONNECT;
2695     }
2696 
2697     return ENGINE_EWOULDBLOCK;
2698 #endif
2699     return ENGINE_SUCCESS;
2700 }
2701 
process_bin_tap_connect(conn *c)2702 static void process_bin_tap_connect(conn *c) {
2703     TAP_ITERATOR iterator;
2704     char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2705                                 sizeof(c->binary_header)));
2706     protocol_binary_request_tap_connect *req = (void*)packet;
2707     const char *key = packet + sizeof(req->bytes);
2708     const char *data = key + c->binary_header.request.keylen;
2709     uint32_t flags = 0;
2710     size_t ndata = c->binary_header.request.bodylen -
2711         c->binary_header.request.extlen -
2712         c->binary_header.request.keylen;
2713 
2714     if (c->binary_header.request.extlen == 4) {
2715         flags = ntohl(req->message.body.flags);
2716 
2717         if (flags & TAP_CONNECT_FLAG_BACKFILL) {
2718             /* the userdata has to be at least 8 bytes! */
2719             if (ndata < 8) {
2720                 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2721                                                 "%d: ERROR: Invalid tap connect message\n",
2722                                                 c->sfd);
2723                 conn_set_state(c, conn_closing);
2724                 return ;
2725             }
2726         }
2727     } else {
2728         data -= 4;
2729         key -= 4;
2730     }
2731 
2732     if (settings.verbose && c->binary_header.request.keylen > 0) {
2733         char buffer[1024];
2734         int len = c->binary_header.request.keylen;
2735         if (len >= sizeof(buffer)) {
2736             len = sizeof(buffer) - 1;
2737         }
2738         memcpy(buffer, key, len);
2739         buffer[len] = '\0';
2740         settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2741                                         "%d: Trying to connect with named tap connection: <%s>\n",
2742                                         c->sfd, buffer);
2743     }
2744 
2745     iterator = settings.engine.v1->get_tap_iterator(
2746         settings.engine.v0, c, key, c->binary_header.request.keylen,
2747         flags, data, ndata);
2748 
2749     if (iterator == NULL) {
2750         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2751                                         "%d: FATAL: The engine does not support tap\n",
2752                                         c->sfd);
2753         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
2754         c->write_and_go = conn_closing;
2755     } else {
2756         c->max_reqs_per_event = settings.reqs_per_event_high_priority;
2757         c->tap_iterator = iterator;
2758         c->which = EV_WRITE;
2759         conn_set_state(c, conn_ship_log);
2760     }
2761 }
2762 
process_bin_tap_packet(tap_event_t event, conn *c)2763 static void process_bin_tap_packet(tap_event_t event, conn *c) {
2764     char *packet;
2765     protocol_binary_request_tap_no_extras *tap;
2766     uint16_t nengine;
2767     uint16_t tap_flags;
2768     uint32_t seqno;
2769     uint8_t ttl;
2770     char *engine_specific;
2771     char *key;
2772     uint16_t nkey;
2773     char *data;
2774     uint32_t flags;
2775     uint32_t exptime;
2776     uint32_t ndata;
2777     ENGINE_ERROR_CODE ret;
2778 
2779     cb_assert(c != NULL);
2780     packet = (c->rcurr - (c->binary_header.request.bodylen +
2781                                 sizeof(c->binary_header)));
2782     tap = (void*)packet;
2783     nengine = ntohs(tap->message.body.tap.enginespecific_length);
2784     tap_flags = ntohs(tap->message.body.tap.flags);
2785     seqno = ntohl(tap->message.header.request.opaque);
2786     ttl = tap->message.body.tap.ttl;
2787     engine_specific = packet + sizeof(tap->bytes);
2788     key = engine_specific + nengine;
2789     nkey = c->binary_header.request.keylen;
2790     data = key + nkey;
2791     flags = 0;
2792     exptime = 0;
2793     ndata = c->binary_header.request.bodylen - nengine - nkey - 8;
2794     ret = c->aiostat;
2795 
2796     if (ttl == 0) {
2797         ret = ENGINE_EINVAL;
2798     } else {
2799         if (event == TAP_MUTATION || event == TAP_CHECKPOINT_START ||
2800             event == TAP_CHECKPOINT_END) {
2801             protocol_binary_request_tap_mutation *mutation = (void*)tap;
2802 
2803             /* engine_specific data in protocol_binary_request_tap_mutation is */
2804             /* at a different offset than protocol_binary_request_tap_no_extras */
2805             engine_specific = packet + sizeof(mutation->bytes);
2806 
2807             flags = mutation->message.body.item.flags;
2808             if ((tap_flags & TAP_FLAG_NETWORK_BYTE_ORDER) == 0) {
2809                 flags = ntohl(flags);
2810             }
2811 
2812             exptime = ntohl(mutation->message.body.item.expiration);
2813             key += 8;
2814             data += 8;
2815             ndata -= 8;
2816         }
2817 
2818         if (ret == ENGINE_SUCCESS) {
2819             uint8_t datatype = c->binary_header.request.datatype;
2820             if (event == TAP_MUTATION && !c->supports_datatype) {
2821                 if (checkUTF8JSON((void*)data, ndata)) {
2822                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2823                 }
2824             }
2825 
2826             ret = settings.engine.v1->tap_notify(settings.engine.v0, c,
2827                                                  engine_specific, nengine,
2828                                                  ttl - 1, tap_flags,
2829                                                  event, seqno,
2830                                                  key, nkey,
2831                                                  flags, exptime,
2832                                                  ntohll(tap->message.header.request.cas),
2833                                                  datatype,
2834                                                  data, ndata,
2835                                                  c->binary_header.request.vbucket);
2836         }
2837     }
2838 
2839     switch (ret) {
2840     case ENGINE_DISCONNECT:
2841         conn_set_state(c, conn_closing);
2842         break;
2843     case ENGINE_EWOULDBLOCK:
2844         c->ewouldblock = true;
2845         break;
2846     default:
2847         if ((tap_flags & TAP_FLAG_ACK) || (ret != ENGINE_SUCCESS)) {
2848             write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2849         } else {
2850             conn_set_state(c, conn_new_cmd);
2851         }
2852     }
2853 }
2854 
process_bin_tap_ack(conn *c)2855 static void process_bin_tap_ack(conn *c) {
2856     char *packet;
2857     protocol_binary_response_no_extras *rsp;
2858     uint32_t seqno;
2859     uint16_t status;
2860     char *key;
2861     ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
2862 
2863     cb_assert(c != NULL);
2864     packet = (c->rcurr - (c->binary_header.request.bodylen + sizeof(c->binary_header)));
2865     rsp = (void*)packet;
2866     seqno = ntohl(rsp->message.header.response.opaque);
2867     status = ntohs(rsp->message.header.response.status);
2868     key = packet + sizeof(rsp->bytes);
2869 
2870     if (settings.engine.v1->tap_notify != NULL) {
2871         ret = settings.engine.v1->tap_notify(settings.engine.v0, c, NULL, 0, 0, status,
2872                                              TAP_ACK, seqno, key,
2873                                              c->binary_header.request.keylen, 0, 0,
2874                                              0, c->binary_header.request.datatype, NULL,
2875                                              0, 0);
2876     }
2877 
2878     if (ret == ENGINE_DISCONNECT) {
2879         conn_set_state(c, conn_closing);
2880     } else {
2881         conn_set_state(c, conn_ship_log);
2882     }
2883 }
2884 
2885 /**
2886  * We received a noop response.. just ignore it
2887  */
process_bin_noop_response(conn *c)2888 static void process_bin_noop_response(conn *c) {
2889     cb_assert(c != NULL);
2890     conn_set_state(c, conn_new_cmd);
2891 }
2892 
2893 /*******************************************************************************
2894  **                             DCP MESSAGE PRODUCERS                         **
2895  ******************************************************************************/
dcp_message_get_failover_log(const void *cookie, uint32_t opaque, uint16_t vbucket)2896 static ENGINE_ERROR_CODE dcp_message_get_failover_log(const void *cookie,
2897                                                       uint32_t opaque,
2898                                                       uint16_t vbucket)
2899 {
2900     protocol_binary_request_dcp_get_failover_log packet;
2901     conn *c = (void*)cookie;
2902 
2903     if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2904         /* We don't have room in the buffer */
2905         return ENGINE_E2BIG;
2906     }
2907 
2908     memset(packet.bytes, 0, sizeof(packet.bytes));
2909     packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
2910     packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_GET_FAILOVER_LOG;
2911     packet.message.header.request.opaque = opaque;
2912     packet.message.header.request.vbucket = htons(vbucket);
2913 
2914     memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2915     add_iov(c, c->wcurr, sizeof(packet.bytes));
2916     c->wcurr += sizeof(packet.bytes);
2917     c->wbytes += sizeof(packet.bytes);
2918 
2919     return ENGINE_SUCCESS;
2920 }
2921 
dcp_message_stream_req(const void *cookie, uint32_t opaque, uint16_t vbucket, uint32_t flags, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)2922 static ENGINE_ERROR_CODE dcp_message_stream_req(const void *cookie,
2923                                                 uint32_t opaque,
2924                                                 uint16_t vbucket,
2925                                                 uint32_t flags,
2926                                                 uint64_t start_seqno,
2927                                                 uint64_t end_seqno,
2928                                                 uint64_t vbucket_uuid,
2929                                                 uint64_t snap_start_seqno,
2930                                                 uint64_t snap_end_seqno)
2931 {
2932     protocol_binary_request_dcp_stream_req packet;
2933     conn *c = (void*)cookie;
2934 
2935     if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2936         /* We don't have room in the buffer */
2937         return ENGINE_E2BIG;
2938     }
2939 
2940     memset(packet.bytes, 0, sizeof(packet.bytes));
2941     packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
2942     packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_STREAM_REQ;
2943     packet.message.header.request.extlen = 48;
2944     packet.message.header.request.bodylen = htonl(48);
2945     packet.message.header.request.opaque = opaque;
2946     packet.message.header.request.vbucket = htons(vbucket);
2947 
2948     packet.message.body.flags = ntohl(flags);
2949     packet.message.body.start_seqno = ntohll(start_seqno);
2950     packet.message.body.end_seqno = ntohll(end_seqno);
2951     packet.message.body.vbucket_uuid = ntohll(vbucket_uuid);
2952     packet.message.body.snap_start_seqno = ntohll(snap_start_seqno);
2953     packet.message.body.snap_end_seqno = ntohll(snap_end_seqno);
2954 
2955     memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2956     add_iov(c, c->wcurr, sizeof(packet.bytes));
2957     c->wcurr += sizeof(packet.bytes);
2958     c->wbytes += sizeof(packet.bytes);
2959 
2960     return ENGINE_SUCCESS;
2961 }
2962 
dcp_message_add_stream_response(const void *cookie, uint32_t opaque, uint32_t dialogopaque, uint8_t status)2963 static ENGINE_ERROR_CODE dcp_message_add_stream_response(const void *cookie,
2964                                                          uint32_t opaque,
2965                                                          uint32_t dialogopaque,
2966                                                          uint8_t status)
2967 {
2968     protocol_binary_response_dcp_add_stream packet;
2969     conn *c = (void*)cookie;
2970 
2971     if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2972         /* We don't have room in the buffer */
2973         return ENGINE_E2BIG;
2974     }
2975 
2976     memset(packet.bytes, 0, sizeof(packet.