xref: /3.0.2-MP2/memcached/daemon/memcached.c (revision ca4ab0ea)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15 */
16#include "config.h"
17#include "memcached.h"
18#include "memcached/extension_loggers.h"
19#include "alloc_hooks.h"
20#include "utilities/engine_loader.h"
21#include "timings.h"
22#include "cmdline.h"
23
24#include <signal.h>
25#include <fcntl.h>
26#include <errno.h>
27#include <stdlib.h>
28#include <stdio.h>
29#include <string.h>
30#include <time.h>
31#include <limits.h>
32#include <ctype.h>
33#include <stdarg.h>
34#include <stddef.h>
35#include <snappy-c.h>
36#include <JSON_checker.h>
37
38static bool grow_dynamic_buffer(conn *c, size_t needed);
39static void cookie_set_admin(const void *cookie);
40static bool cookie_is_admin(const void *cookie);
41
42typedef union {
43    item_info info;
44    char bytes[sizeof(item_info) + ((IOV_MAX - 1) * sizeof(struct iovec))];
45} item_info_holder;
46
47const char* get_server_version(void);
48
49static void item_set_cas(const void *cookie, item *it, uint64_t cas) {
50    settings.engine.v1->item_set_cas(settings.engine.v0, cookie, it, cas);
51}
52
53#define MAX_SASL_MECH_LEN 32
54
55/* The item must always be called "it" */
56#define SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
57    thread_stats->slab_stats[info.info.clsid].slab_op++;
58
59#define THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
60    thread_stats->thread_op++;
61
62#define THREAD_GUTS2(conn, thread_stats, slab_op, thread_op) \
63    thread_stats->slab_op++; \
64    thread_stats->thread_op++;
65
66#define SLAB_THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
67    SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
68    THREAD_GUTS(conn, thread_stats, slab_op, thread_op)
69
70#define STATS_INCR1(GUTS, conn, slab_op, thread_op, key, nkey) { \
71    struct thread_stats *thread_stats = get_thread_stats(conn); \
72    cb_mutex_enter(&thread_stats->mutex); \
73    GUTS(conn, thread_stats, slab_op, thread_op); \
74    cb_mutex_exit(&thread_stats->mutex); \
75}
76
77#define STATS_INCR(conn, op, key, nkey) \
78    STATS_INCR1(THREAD_GUTS, conn, op, op, key, nkey)
79
80#define SLAB_INCR(conn, op, key, nkey) \
81    STATS_INCR1(SLAB_GUTS, conn, op, op, key, nkey)
82
83#define STATS_TWO(conn, slab_op, thread_op, key, nkey) \
84    STATS_INCR1(THREAD_GUTS2, conn, slab_op, thread_op, key, nkey)
85
86#define SLAB_TWO(conn, slab_op, thread_op, key, nkey) \
87    STATS_INCR1(SLAB_THREAD_GUTS, conn, slab_op, thread_op, key, nkey)
88
89#define STATS_HIT(conn, op, key, nkey) \
90    SLAB_TWO(conn, op##_hits, cmd_##op, key, nkey)
91
92#define STATS_MISS(conn, op, key, nkey) \
93    STATS_TWO(conn, op##_misses, cmd_##op, key, nkey)
94
95#define STATS_NOKEY(conn, op) { \
96    struct thread_stats *thread_stats = \
97        get_thread_stats(conn); \
98    cb_mutex_enter(&thread_stats->mutex); \
99    thread_stats->op++; \
100    cb_mutex_exit(&thread_stats->mutex); \
101}
102
103#define STATS_NOKEY2(conn, op1, op2) { \
104    struct thread_stats *thread_stats = \
105        get_thread_stats(conn); \
106    cb_mutex_enter(&thread_stats->mutex); \
107    thread_stats->op1++; \
108    thread_stats->op2++; \
109    cb_mutex_exit(&thread_stats->mutex); \
110}
111
112#define STATS_ADD(conn, op, amt) { \
113    struct thread_stats *thread_stats = \
114        get_thread_stats(conn); \
115    cb_mutex_enter(&thread_stats->mutex); \
116    thread_stats->op += amt; \
117    cb_mutex_exit(&thread_stats->mutex); \
118}
119
120volatile sig_atomic_t memcached_shutdown;
121
122/* Lock for global stats */
123static cb_mutex_t stats_lock;
124
125/**
126 * Structure to save ns_server's session cas token.
127 */
128static struct session_cas {
129    uint64_t value;
130    uint64_t ctr;
131    cb_mutex_t mutex;
132} session_cas;
133
134void STATS_LOCK() {
135    cb_mutex_enter(&stats_lock);
136}
137
138void STATS_UNLOCK() {
139    cb_mutex_exit(&stats_lock);
140}
141
142#ifdef WIN32
143static int is_blocking(DWORD dw) {
144    return (dw == WSAEWOULDBLOCK);
145}
146static int is_emfile(DWORD dw) {
147    return (dw == WSAEMFILE);
148}
149static int is_closed_conn(DWORD dw) {
150    return (dw == WSAENOTCONN || WSAECONNRESET);
151}
152static int is_addrinuse(DWORD dw) {
153    return (dw == WSAEADDRINUSE);
154}
155static void set_ewouldblock(void) {
156    WSASetLastError(WSAEWOULDBLOCK);
157}
158static void set_econnreset(void) {
159    WSASetLastError(WSAECONNRESET);
160}
161#else
162static int is_blocking(int dw) {
163    return (dw == EAGAIN || dw == EWOULDBLOCK);
164}
165static int is_emfile(int dw) {
166    return (dw == EMFILE);
167}
168static int is_closed_conn(int dw) {
169    return  (dw == ENOTCONN || dw != ECONNRESET);
170}
171static int is_addrinuse(int dw) {
172    return (dw == EADDRINUSE);
173}
174static void set_ewouldblock(void) {
175    errno = EWOULDBLOCK;
176}
177static void set_econnreset(void) {
178    errno = ECONNRESET;
179}
180#endif
181
182
183/*
184 * We keep the current time of day in a global variable that's updated by a
185 * timer event. This saves us a bunch of time() system calls (we really only
186 * need to get the time once a second, whereas there can be tens of thousands
187 * of requests a second) and allows us to use server-start-relative timestamps
188 * rather than absolute UNIX timestamps, a space savings on systems where
189 * sizeof(time_t) > sizeof(unsigned int).
190 */
191volatile rel_time_t current_time;
192
193/*
194 * forward declarations
195 */
196static SOCKET new_socket(struct addrinfo *ai);
197static int try_read_command(conn *c);
198static struct thread_stats* get_independent_stats(conn *c);
199static struct thread_stats* get_thread_stats(conn *c);
200static void register_callback(ENGINE_HANDLE *eh,
201                              ENGINE_EVENT_TYPE type,
202                              EVENT_CALLBACK cb, const void *cb_data);
203static SERVER_HANDLE_V1 *get_server_api(void);
204
205
206enum try_read_result {
207    READ_DATA_RECEIVED,
208    READ_NO_DATA_RECEIVED,
209    READ_ERROR,            /** an error occured (on the socket) (or client closed connection) */
210    READ_MEMORY_ERROR      /** failed to allocate more memory */
211};
212
213static enum try_read_result try_read_network(conn *c);
214
215/* stats */
216static void stats_init(void);
217static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate);
218static void process_stat_settings(ADD_STAT add_stats, void *c);
219
220
221/* defaults */
222static void settings_init(void);
223
224/* event handling, network IO */
225static void event_handler(evutil_socket_t fd, short which, void *arg);
226static void complete_nread(conn *c);
227static void write_and_free(conn *c, char *buf, size_t bytes);
228static int ensure_iov_space(conn *c);
229static int add_iov(conn *c, const void *buf, size_t len);
230static int add_msghdr(conn *c);
231
232
233/* time handling */
234static void set_current_time(void);  /* update the global variable holding
235                              global 32-bit seconds-since-start time
236                              (to avoid 64 bit time_t) */
237
238/** exported globals **/
239struct stats stats;
240struct settings settings;
241static time_t process_started;     /* when the process was started */
242
243/** file scope variables **/
244static conn *listen_conn = NULL;
245static struct event_base *main_base;
246static struct thread_stats *default_independent_stats;
247
248static struct engine_event_handler *engine_event_handlers[MAX_ENGINE_EVENT_TYPE + 1];
249
250enum transmit_result {
251    TRANSMIT_COMPLETE,   /** All done writing. */
252    TRANSMIT_INCOMPLETE, /** More data remaining to write. */
253    TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
254    TRANSMIT_HARD_ERROR  /** Can't write (c->state is set to conn_closing) */
255};
256
257static enum transmit_result transmit(conn *c);
258
259#define REALTIME_MAXDELTA 60*60*24*30
260
261/* Perform all callbacks of a given type for the given connection. */
262void perform_callbacks(ENGINE_EVENT_TYPE type,
263                       const void *data,
264                       const void *c) {
265    struct engine_event_handler *h;
266    for (h = engine_event_handlers[type]; h; h = h->next) {
267        h->cb(c, type, data, h->cb_data);
268    }
269}
270
271/*
272 * given time value that's either unix time or delta from current unix time,
273 * return unix time. Use the fact that delta can't exceed one month
274 * (and real time value can't be that low).
275 */
276static rel_time_t realtime(const time_t exptime) {
277    /* no. of seconds in 30 days - largest possible delta exptime */
278
279    if (exptime == 0) return 0; /* 0 means never expire */
280
281    if (exptime > REALTIME_MAXDELTA) {
282        /* if item expiration is at/before the server started, give it an
283           expiration time of 1 second after the server started.
284           (because 0 means don't expire).  without this, we'd
285           underflow and wrap around to some large value way in the
286           future, effectively making items expiring in the past
287           really expiring never */
288        if (exptime <= process_started)
289            return (rel_time_t)1;
290        return (rel_time_t)(exptime - process_started);
291    } else {
292        return (rel_time_t)(exptime + current_time);
293    }
294}
295
296/**
297 * Convert the relative time to an absolute time (relative to EPOC ;) )
298 */
299static time_t abstime(const rel_time_t exptime)
300{
301    return process_started + exptime;
302}
303
304/**
305 * Return the TCP or domain socket listening_port structure that
306 * has a given port number
307 */
308static struct listening_port *get_listening_port_instance(const int port) {
309    struct listening_port *port_ins = NULL;
310    int i;
311    for (i = 0; i < settings.num_interfaces; ++i) {
312        if (stats.listening_ports[i].port == port) {
313            port_ins = &stats.listening_ports[i];
314        }
315    }
316    return port_ins;
317}
318
319static void stats_init(void) {
320    stats.daemon_conns = 0;
321    stats.rejected_conns = 0;
322    stats.curr_conns = stats.total_conns = 0;
323    stats.listening_ports = calloc(settings.num_interfaces, sizeof(struct listening_port));
324
325    stats_prefix_init();
326}
327
328static void stats_reset(const void *cookie) {
329    struct conn *conn = (struct conn*)cookie;
330    STATS_LOCK();
331    stats.rejected_conns = 0;
332    stats.total_conns = 0;
333    stats_prefix_clear();
334    STATS_UNLOCK();
335    threadlocal_stats_reset(get_independent_stats(conn));
336    settings.engine.v1->reset_stats(settings.engine.v0, cookie);
337}
338
339static int get_number_of_worker_threads(void) {
340    int ret;
341    char *override = getenv("MEMCACHED_NUM_CPUS");
342    if (override == NULL) {
343#ifdef WIN32
344        SYSTEM_INFO sysinfo;
345        GetSystemInfo(&sysinfo);
346        ret = (int)sysinfo.dwNumberOfProcessors;
347#else
348        ret = (int)sysconf(_SC_NPROCESSORS_ONLN);
349#endif
350        if (ret > 4) {
351            ret = (int)(ret * 0.75f);
352        }
353        if (ret < 4) {
354            ret = 4;
355        }
356    } else {
357        ret = atoi(override);
358        if (ret == 0) {
359            ret = 4;
360        }
361    }
362
363    return ret;
364}
365
366static void settings_init(void) {
367    static struct interface default_interface;
368    default_interface.port = 11211;
369    default_interface.maxconn = 1000;
370    default_interface.backlog = 1024;
371
372    settings.num_interfaces = 1;
373    settings.interfaces = &default_interface;
374    settings.daemonize = false;
375    settings.pid_file = NULL;
376    settings.bio_drain_buffer_sz = 8192;
377
378    settings.verbose = 0;
379    settings.num_threads = get_number_of_worker_threads();
380    settings.prefix_delimiter = ':';
381    settings.detail_enabled = 0;
382    settings.allow_detailed = true;
383    settings.reqs_per_event = DEFAULT_REQS_PER_EVENT;
384    settings.require_sasl = false;
385    settings.extensions.logger = get_stderr_logger();
386    settings.tcp_nodelay = getenv("MEMCACHED_DISABLE_TCP_NODELAY") == NULL;
387    settings.engine_module = "default_engine.so";
388    settings.engine_config = NULL;
389    settings.config = NULL;
390    settings.admin = NULL;
391    settings.disable_admin = false;
392    settings.datatype = false;
393}
394
395/*
396 * Adds a message header to a connection.
397 *
398 * Returns 0 on success, -1 on out-of-memory.
399 */
400static int add_msghdr(conn *c)
401{
402    struct msghdr *msg;
403
404    cb_assert(c != NULL);
405
406    if (c->msgsize == c->msgused) {
407        cb_assert(c->msgsize > 0);
408        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
409        if (! msg)
410            return -1;
411        c->msglist = msg;
412        c->msgsize *= 2;
413    }
414
415    msg = c->msglist + c->msgused;
416
417    /* this wipes msg_iovlen, msg_control, msg_controllen, and
418       msg_flags, the last 3 of which aren't defined on solaris: */
419    memset(msg, 0, sizeof(struct msghdr));
420
421    msg->msg_iov = &c->iov[c->iovused];
422
423    if (c->request_addr_size > 0) {
424        msg->msg_name = &c->request_addr;
425        msg->msg_namelen = c->request_addr_size;
426    }
427
428    c->msgbytes = 0;
429    c->msgused++;
430
431    return 0;
432}
433
434struct {
435    cb_mutex_t mutex;
436    bool disabled;
437    ssize_t count;
438    uint64_t num_disable;
439} listen_state;
440
441static bool is_listen_disabled(void) {
442    bool ret;
443    cb_mutex_enter(&listen_state.mutex);
444    ret = listen_state.disabled;
445    cb_mutex_exit(&listen_state.mutex);
446    return ret;
447}
448
449static uint64_t get_listen_disabled_num(void) {
450    uint64_t ret;
451    cb_mutex_enter(&listen_state.mutex);
452    ret = listen_state.num_disable;
453    cb_mutex_exit(&listen_state.mutex);
454    return ret;
455}
456
457static void disable_listen(void) {
458    conn *next;
459    cb_mutex_enter(&listen_state.mutex);
460    listen_state.disabled = true;
461    listen_state.count = 10;
462    ++listen_state.num_disable;
463    cb_mutex_exit(&listen_state.mutex);
464
465    for (next = listen_conn; next; next = next->next) {
466        update_event(next, 0);
467        if (listen(next->sfd, 1) != 0) {
468            log_socket_error(EXTENSION_LOG_WARNING, NULL,
469                             "listen() failed: %s");
470        }
471    }
472}
473
474void safe_close(SOCKET sfd) {
475    if (sfd != INVALID_SOCKET) {
476        int rval;
477        while ((rval = closesocket(sfd)) == SOCKET_ERROR &&
478               (errno == EINTR || errno == EAGAIN)) {
479            /* go ahead and retry */
480        }
481
482        if (rval == SOCKET_ERROR) {
483            char msg[80];
484            snprintf(msg, sizeof(msg), "Failed to close socket %d (%%s)!!", (int)sfd);
485            log_socket_error(EXTENSION_LOG_WARNING, NULL,
486                             msg);
487        } else {
488            STATS_LOCK();
489            stats.curr_conns--;
490            STATS_UNLOCK();
491
492            if (is_listen_disabled()) {
493                notify_dispatcher();
494            }
495        }
496    }
497}
498
499/**
500 * Reset all of the dynamic buffers used by a connection back to their
501 * default sizes. The strategy for resizing the buffers is to allocate a
502 * new one of the correct size and free the old one if the allocation succeeds
503 * instead of using realloc to change the buffer size (because realloc may
504 * not shrink the buffers, and will also copy the memory). If the allocation
505 * fails the buffer will be unchanged.
506 *
507 * @param c the connection to resize the buffers for
508 * @return true if all allocations succeeded, false if one or more of the
509 *         allocations failed.
510 */
511static bool conn_reset_buffersize(conn *c) {
512    bool ret = true;
513
514    if (c->rsize != DATA_BUFFER_SIZE) {
515        void *ptr = malloc(DATA_BUFFER_SIZE);
516        if (ptr != NULL) {
517            free(c->rbuf);
518            c->rbuf = ptr;
519            c->rsize = DATA_BUFFER_SIZE;
520        } else {
521            ret = false;
522        }
523    }
524
525    if (c->wsize != DATA_BUFFER_SIZE) {
526        void *ptr = malloc(DATA_BUFFER_SIZE);
527        if (ptr != NULL) {
528            free(c->wbuf);
529            c->wbuf = ptr;
530            c->wsize = DATA_BUFFER_SIZE;
531        } else {
532            ret = false;
533        }
534    }
535
536    if (c->isize != ITEM_LIST_INITIAL) {
537        void *ptr = malloc(sizeof(item *) * ITEM_LIST_INITIAL);
538        if (ptr != NULL) {
539            free(c->ilist);
540            c->ilist = ptr;
541            c->isize = ITEM_LIST_INITIAL;
542        } else {
543            ret = false;
544        }
545    }
546
547    if (c->temp_alloc_size != TEMP_ALLOC_LIST_INITIAL) {
548        void *ptr = malloc(sizeof(char *) * TEMP_ALLOC_LIST_INITIAL);
549        if (ptr != NULL) {
550            free(c->temp_alloc_list);
551            c->temp_alloc_list = ptr;
552            c->temp_alloc_size = TEMP_ALLOC_LIST_INITIAL;
553        } else {
554            ret = false;
555        }
556    }
557
558    if (c->iovsize != IOV_LIST_INITIAL) {
559        void *ptr = malloc(sizeof(struct iovec) * IOV_LIST_INITIAL);
560        if (ptr != NULL) {
561            free(c->iov);
562            c->iov = ptr;
563            c->iovsize = IOV_LIST_INITIAL;
564        } else {
565            ret = false;
566        }
567    }
568
569    if (c->msgsize != MSG_LIST_INITIAL) {
570        void *ptr = malloc(sizeof(struct msghdr) * MSG_LIST_INITIAL);
571        if (ptr != NULL) {
572            free(c->msglist);
573            c->msglist = ptr;
574            c->msgsize = MSG_LIST_INITIAL;
575        } else {
576            ret = false;
577        }
578    }
579
580    return ret;
581}
582
583/**
584 * Constructor for all memory allocations of connection objects. Initialize
585 * all members and allocate the transfer buffers.
586 *
587 * @param buffer The memory allocated by the object cache
588 * @return 0 on success, 1 if we failed to allocate memory
589 */
590static int conn_constructor(conn *c) {
591    memset(c, 0, sizeof(*c));
592    MEMCACHED_CONN_CREATE(c);
593
594    c->state = conn_immediate_close;
595    c->sfd = INVALID_SOCKET;
596    if (!conn_reset_buffersize(c)) {
597        free(c->rbuf);
598        free(c->wbuf);
599        free(c->ilist);
600        free(c->temp_alloc_list);
601        free(c->iov);
602        free(c->msglist);
603        settings.extensions.logger->log(EXTENSION_LOG_WARNING,
604                                        NULL,
605                                        "Failed to allocate buffers for connection\n");
606        return 1;
607    }
608
609    STATS_LOCK();
610    stats.conn_structs++;
611    STATS_UNLOCK();
612
613    return 0;
614}
615
616/**
617 * Destructor for all connection objects. Release all allocated resources.
618 *
619 * @param buffer The memory allocated by the objec cache
620 */
621static void conn_destructor(conn *c) {
622    free(c->rbuf);
623    free(c->wbuf);
624    free(c->ilist);
625    free(c->temp_alloc_list);
626    free(c->iov);
627    free(c->msglist);
628
629    STATS_LOCK();
630    stats.conn_structs--;
631    STATS_UNLOCK();
632}
633
634/*
635 * Free list management for connections.
636 */
637struct connections {
638    conn* free;
639    conn** all;
640    cb_mutex_t mutex;
641    int next;
642} connections;
643
644static void initialize_connections(void)
645{
646    int preallocate;
647
648    cb_mutex_initialize(&connections.mutex);
649    connections.all = calloc(settings.maxconns, sizeof(conn*));
650    if (connections.all == NULL) {
651        settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
652                                        "Failed to allocate memory for connections");
653        exit(EX_OSERR);
654    }
655
656    preallocate = settings.maxconns / 2;
657    if (preallocate < 1000) {
658        preallocate = settings.maxconns;
659    } else if (preallocate > 5000) {
660        preallocate = 5000;
661    }
662
663    for (connections.next = 0; connections.next < preallocate; ++connections.next) {
664        connections.all[connections.next] = malloc(sizeof(conn));
665        if (conn_constructor(connections.all[connections.next]) != 0) {
666            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
667                                            "Failed to allocate memory for connections");
668            exit(EX_OSERR);
669        }
670        connections.all[connections.next]->next = connections.free;
671        connections.free = connections.all[connections.next];
672    }
673}
674
675static void destroy_connections(void)
676{
677    int ii;
678    for (ii = 0; ii < settings.maxconns; ++ii) {
679        if (connections.all[ii]) {
680            conn *c = connections.all[ii];
681            conn_destructor(c);
682            free(c);
683        }
684    }
685
686    free(connections.all);
687}
688
689static conn *allocate_connection(void) {
690    conn *ret;
691
692    cb_mutex_enter(&connections.mutex);
693    ret = connections.free;
694    if (ret != NULL) {
695        connections.free = connections.free->next;
696        ret->next = NULL;
697    }
698    cb_mutex_exit(&connections.mutex);
699
700    if (ret == NULL) {
701        ret = malloc(sizeof(conn));
702        if (ret == NULL) {
703            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
704                                            "Failed to allocate memory for connection");
705            return NULL;
706        }
707
708        if (conn_constructor(ret) != 0) {
709            free(ret);
710            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
711                                            "Failed to allocate memory for connection");
712            return NULL;
713        }
714
715        cb_mutex_enter(&connections.mutex);
716        if (connections.next == settings.maxconns) {
717            free(ret);
718            ret = NULL;
719        } else {
720            connections.all[connections.next++] = ret;
721        }
722        cb_mutex_exit(&connections.mutex);
723    }
724
725    return ret;
726}
727
728static void release_connection(conn *c) {
729    c->sfd = INVALID_SOCKET;
730    cb_mutex_enter(&connections.mutex);
731    c->next = connections.free;
732    connections.free = c;
733    cb_mutex_exit(&connections.mutex);
734}
735
736static const char *substate_text(enum bin_substates state) {
737    switch (state) {
738    case bin_no_state: return "bin_no_state";
739    case bin_reading_set_header: return "bin_reading_set_header";
740    case bin_reading_cas_header: return "bin_reading_cas_header";
741    case bin_read_set_value: return "bin_read_set_value";
742    case bin_reading_sasl_auth: return "bin_reading_sasl_auth";
743    case bin_reading_sasl_auth_data: return "bin_reading_sasl_auth_data";
744    case bin_reading_packet: return "bin_reading_packet";
745    default:
746        return "illegal";
747    }
748}
749
750static void add_connection_stats(ADD_STAT add_stats, conn *d, conn *c) {
751    append_stat("conn", add_stats, d, "%p", c);
752    if (c->sfd == INVALID_SOCKET) {
753        append_stat("socket", add_stats, d, "disconnected");
754    } else {
755        append_stat("socket", add_stats, d, "%lu", (long)c->sfd);
756        append_stat("protocol", add_stats, d, "%s", "binary");
757        append_stat("transport", add_stats, d, "TCP");
758        append_stat("nevents", add_stats, d, "%u", c->nevents);
759        if (c->sasl_conn != NULL) {
760            append_stat("sasl_conn", add_stats, d, "%p", c->sasl_conn);
761        }
762        append_stat("state", add_stats, d, "%s", state_text(c->state));
763        append_stat("substate", add_stats, d, "%s", substate_text(c->substate));
764        append_stat("registered_in_libevent", add_stats, d, "%d",
765                    (int)c->registered_in_libevent);
766        append_stat("ev_flags", add_stats, d, "%x", c->ev_flags);
767        append_stat("which", add_stats, d, "%x", c->which);
768        append_stat("rbuf", add_stats, d, "%p", c->rbuf);
769        append_stat("rcurr", add_stats, d, "%p", c->rcurr);
770        append_stat("rsize", add_stats, d, "%u", c->rsize);
771        append_stat("rbytes", add_stats, d, "%u", c->rbytes);
772        append_stat("wbuf", add_stats, d, "%p", c->wbuf);
773        append_stat("wcurr", add_stats, d, "%p", c->wcurr);
774        append_stat("wsize", add_stats, d, "%u", c->wsize);
775        append_stat("wbytes", add_stats, d, "%u", c->wbytes);
776        append_stat("write_and_go", add_stats, d, "%p", c->write_and_go);
777        append_stat("write_and_free", add_stats, d, "%p", c->write_and_free);
778        append_stat("ritem", add_stats, d, "%p", c->ritem);
779        append_stat("rlbytes", add_stats, d, "%u", c->rlbytes);
780        append_stat("item", add_stats, d, "%p", c->item);
781        append_stat("store_op", add_stats, d, "%u", c->store_op);
782        append_stat("sbytes", add_stats, d, "%u", c->sbytes);
783        append_stat("iov", add_stats, d, "%p", c->iov);
784        append_stat("iovsize", add_stats, d, "%u", c->iovsize);
785        append_stat("iovused", add_stats, d, "%u", c->iovused);
786        append_stat("msglist", add_stats, d, "%p", c->msglist);
787        append_stat("msgsize", add_stats, d, "%u", c->msgsize);
788        append_stat("msgused", add_stats, d, "%u", c->msgused);
789        append_stat("msgcurr", add_stats, d, "%u", c->msgcurr);
790        append_stat("msgbytes", add_stats, d, "%u", c->msgbytes);
791        append_stat("ilist", add_stats, d, "%p", c->ilist);
792        append_stat("isize", add_stats, d, "%u", c->isize);
793        append_stat("icurr", add_stats, d, "%p", c->icurr);
794        append_stat("ileft", add_stats, d, "%u", c->ileft);
795        append_stat("temp_alloc_list", add_stats, d, "%p", c->temp_alloc_list);
796        append_stat("temp_alloc_size", add_stats, d, "%u", c->temp_alloc_size);
797        append_stat("temp_alloc_curr", add_stats, d, "%p", c->temp_alloc_curr);
798        append_stat("temp_alloc_left", add_stats, d, "%u", c->temp_alloc_left);
799
800        append_stat("noreply", add_stats, d, "%d", c->noreply);
801        append_stat("refcount", add_stats, d, "%u", (int)c->refcount);
802        append_stat("dynamic_buffer.buffer", add_stats, d, "%p",
803                    c->dynamic_buffer.buffer);
804        append_stat("dynamic_buffer.size", add_stats, d, "%zu",
805                    c->dynamic_buffer.size);
806        append_stat("dynamic_buffer.offset", add_stats, d, "%zu",
807                    c->dynamic_buffer.offset);
808        append_stat("engine_storage", add_stats, d, "%p", c->engine_storage);
809        /* @todo we should decode the binary header */
810        append_stat("cas", add_stats, d, "%"PRIu64, c->cas);
811        append_stat("cmd", add_stats, d, "%u", c->cmd);
812        append_stat("opaque", add_stats, d, "%u", c->opaque);
813        append_stat("keylen", add_stats, d, "%u", c->keylen);
814        append_stat("list_state", add_stats, d, "%u", c->list_state);
815        append_stat("next", add_stats, d, "%p", c->next);
816        append_stat("thread", add_stats, d, "%p", c->thread);
817        append_stat("aiostat", add_stats, d, "%u", c->aiostat);
818        append_stat("ewouldblock", add_stats, d, "%u", c->ewouldblock);
819        append_stat("tap_iterator", add_stats, d, "%p", c->tap_iterator);
820    }
821}
822
823/**
824 * Do a full stats of all of the connections.
825 * Do _NOT_ try to follow _ANY_ of the pointers in the conn structure
826 * because we read all of the values _DIRTY_. We preallocated the array
827 * of all of the connection pointers during startup, so we _KNOW_ that
828 * we can iterate through all of them. All of the conn structs will
829 * only appear in the connections.all array when we've allocated them,
830 * and we don't release them so it's safe to look at them.
831 */
832static void connection_stats(ADD_STAT add_stats, conn *c) {
833    int ii;
834    for (ii = 0; ii < settings.maxconns && connections.all[ii]; ++ii) {
835        add_connection_stats(add_stats, c, connections.all[ii]);
836    }
837}
838
839conn *conn_new(const SOCKET sfd, in_port_t parent_port,
840               STATE_FUNC init_state, int event_flags,
841               unsigned int read_buffer_size, struct event_base *base,
842               struct timeval *timeout) {
843    conn *c = allocate_connection();
844    if (c == NULL) {
845        return NULL;
846    }
847
848    c->admin = false;
849    cb_assert(c->thread == NULL);
850
851    if (c->rsize < read_buffer_size) {
852        void *mem = malloc(read_buffer_size);
853        if (mem) {
854            c->rsize = read_buffer_size;
855            free(c->rbuf);
856            c->rbuf = mem;
857        } else {
858            cb_assert(c->thread == NULL);
859            release_connection(c);
860            return NULL;
861        }
862    }
863
864    memset(&c->ssl, 0, sizeof(c->ssl));
865    if (init_state != conn_listening) {
866        int ii;
867        for (ii = 0; ii < settings.num_interfaces; ++ii) {
868            if (parent_port == settings.interfaces[ii].port) {
869                if (settings.interfaces[ii].ssl.cert != NULL) {
870                    const char *cert = settings.interfaces[ii].ssl.cert;
871                    const char *pkey = settings.interfaces[ii].ssl.key;
872
873                    c->ssl.ctx = SSL_CTX_new(SSLv23_server_method());
874
875                    /* @todo don't read files, but use in-memory-copies */
876                    if (!SSL_CTX_use_certificate_chain_file(c->ssl.ctx, cert) ||
877                        !SSL_CTX_use_PrivateKey_file(c->ssl.ctx, pkey, SSL_FILETYPE_PEM)) {
878                        release_connection(c);
879                        return NULL;
880                    }
881
882                    c->ssl.enabled = true;
883                    c->ssl.error = false;
884                    c->ssl.client = NULL;
885
886                    c->ssl.in.buffer = malloc(settings.bio_drain_buffer_sz);
887                    c->ssl.out.buffer = malloc(settings.bio_drain_buffer_sz);
888
889                    if (c->ssl.in.buffer == NULL || c->ssl.out.buffer == NULL) {
890                        release_connection(c);
891                        return NULL;
892                    }
893
894                    c->ssl.in.buffsz = settings.bio_drain_buffer_sz;
895                    c->ssl.out.buffsz = settings.bio_drain_buffer_sz;
896                    BIO_new_bio_pair(&c->ssl.application,
897                                     settings.bio_drain_buffer_sz,
898                                     &c->ssl.network,
899                                     settings.bio_drain_buffer_sz);
900
901                    c->ssl.client = SSL_new(c->ssl.ctx);
902                    SSL_set_bio(c->ssl.client,
903                                c->ssl.application,
904                                c->ssl.application);
905                }
906            }
907        }
908    }
909
910    c->request_addr_size = 0;
911
912    if (settings.verbose > 1) {
913        if (init_state == conn_listening) {
914            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
915                                            "<%d server listening", sfd);
916        } else {
917            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
918                                            "<%d new client connection", sfd);
919        }
920    }
921
922    c->sfd = sfd;
923    c->parent_port = parent_port;
924    c->state = init_state;
925    c->rlbytes = 0;
926    c->cmd = -1;
927    c->rbytes = c->wbytes = 0;
928    c->wcurr = c->wbuf;
929    c->rcurr = c->rbuf;
930    c->ritem = 0;
931    c->icurr = c->ilist;
932    c->temp_alloc_curr = c->temp_alloc_list;
933    c->ileft = 0;
934    c->temp_alloc_left = 0;
935    c->iovused = 0;
936    c->msgcurr = 0;
937    c->msgused = 0;
938    c->next = NULL;
939    c->list_state = 0;
940
941    c->write_and_go = init_state;
942    c->write_and_free = 0;
943    c->item = 0;
944    c->supports_datatype = false;
945    c->noreply = false;
946
947    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
948    event_base_set(base, &c->event);
949    c->ev_flags = event_flags;
950
951    if (!register_event(c, timeout)) {
952        cb_assert(c->thread == NULL);
953        release_connection(c);
954        return NULL;
955    }
956
957    STATS_LOCK();
958    stats.total_conns++;
959    STATS_UNLOCK();
960
961    c->aiostat = ENGINE_SUCCESS;
962    c->ewouldblock = false;
963    c->refcount = 1;
964
965    MEMCACHED_CONN_ALLOCATE(c->sfd);
966
967    perform_callbacks(ON_CONNECT, NULL, c);
968
969    return c;
970}
971
972static void conn_cleanup(conn *c) {
973    cb_assert(c != NULL);
974    c->admin = false;
975    if (c->item) {
976        settings.engine.v1->release(settings.engine.v0, c, c->item);
977        c->item = 0;
978    }
979
980    if (c->ileft != 0) {
981        for (; c->ileft > 0; c->ileft--,c->icurr++) {
982            settings.engine.v1->release(settings.engine.v0, c, *(c->icurr));
983        }
984    }
985
986    if (c->temp_alloc_left != 0) {
987        for (; c->temp_alloc_left > 0; c->temp_alloc_left--, c->temp_alloc_curr++) {
988            free(*(c->temp_alloc_curr));
989        }
990    }
991
992    if (c->write_and_free) {
993        free(c->write_and_free);
994        c->write_and_free = 0;
995    }
996
997    if (c->sasl_conn) {
998        cbsasl_dispose(&c->sasl_conn);
999        c->sasl_conn = NULL;
1000    }
1001
1002    c->engine_storage = NULL;
1003    c->tap_iterator = NULL;
1004    c->thread = NULL;
1005    cb_assert(c->next == NULL);
1006    c->sfd = INVALID_SOCKET;
1007    c->dcp = 0;
1008    c->start = 0;
1009    if (c->ssl.enabled) {
1010        BIO_free_all(c->ssl.network);
1011        SSL_free(c->ssl.client);
1012        c->ssl.enabled = false;
1013        c->ssl.error = false;
1014        free(c->ssl.in.buffer);
1015        free(c->ssl.out.buffer);
1016        memset(&c->ssl, 0, sizeof(c->ssl));
1017    }
1018}
1019
1020void conn_close(conn *c) {
1021    cb_assert(c != NULL);
1022    cb_assert(c->sfd == INVALID_SOCKET);
1023    cb_assert(c->state == conn_immediate_close);
1024
1025    cb_assert(c->thread);
1026    /* remove from pending-io list */
1027    if (settings.verbose > 1 && list_contains(c->thread->pending_io, c)) {
1028        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1029                                        "Current connection was in the pending-io list.. Nuking it\n");
1030    }
1031    c->thread->pending_io = list_remove(c->thread->pending_io, c);
1032
1033    conn_cleanup(c);
1034
1035    /*
1036     * The contract with the object cache is that we should return the
1037     * object in a constructed state. Reset the buffers to the default
1038     * size
1039     */
1040    conn_reset_buffersize(c);
1041    cb_assert(c->thread == NULL);
1042    release_connection(c);
1043}
1044
1045/*
1046 * Shrinks a connection's buffers if they're too big.  This prevents
1047 * periodic large "get" requests from permanently chewing lots of server
1048 * memory.
1049 *
1050 * This should only be called in between requests since it can wipe output
1051 * buffers!
1052 */
1053static void conn_shrink(conn *c) {
1054    cb_assert(c != NULL);
1055
1056    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
1057        void *newbuf;
1058
1059        if (c->rcurr != c->rbuf) {
1060            /* Pack the buffer */
1061            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1062        }
1063
1064        newbuf = realloc(c->rbuf, DATA_BUFFER_SIZE);
1065
1066        if (newbuf) {
1067            c->rbuf = newbuf;
1068            c->rsize = DATA_BUFFER_SIZE;
1069        }
1070        c->rcurr = c->rbuf;
1071    }
1072
1073    /* isize is no longer dynamic */
1074    cb_assert(c->isize == ITEM_LIST_INITIAL);
1075
1076    if (c->msgsize > MSG_LIST_HIGHWAT) {
1077        void *newbuf = realloc(c->msglist,
1078                               MSG_LIST_INITIAL * sizeof(c->msglist[0]));
1079        if (newbuf) {
1080            c->msglist = newbuf;
1081            c->msgsize = MSG_LIST_INITIAL;
1082        }
1083    }
1084
1085    if (c->iovsize > IOV_LIST_HIGHWAT) {
1086        void *newbuf = realloc(c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
1087        if (newbuf) {
1088            c->iov = newbuf;
1089            c->iovsize = IOV_LIST_INITIAL;
1090        }
1091    }
1092}
1093
1094/**
1095 * Convert a state name to a human readable form.
1096 */
1097const char *state_text(STATE_FUNC state) {
1098    if (state == conn_listening) {
1099        return "conn_listening";
1100    } else if (state == conn_new_cmd) {
1101        return "conn_new_cmd";
1102    } else if (state == conn_waiting) {
1103        return "conn_waiting";
1104    } else if (state == conn_read) {
1105        return "conn_read";
1106    } else if (state == conn_parse_cmd) {
1107        return "conn_parse_cmd";
1108    } else if (state == conn_write) {
1109        return "conn_write";
1110    } else if (state == conn_nread) {
1111        return "conn_nread";
1112    } else if (state == conn_swallow) {
1113        return "conn_swallow";
1114    } else if (state == conn_closing) {
1115        return "conn_closing";
1116    } else if (state == conn_mwrite) {
1117        return "conn_mwrite";
1118    } else if (state == conn_ship_log) {
1119        return "conn_ship_log";
1120    } else if (state == conn_setup_tap_stream) {
1121        return "conn_setup_tap_stream";
1122    } else if (state == conn_pending_close) {
1123        return "conn_pending_close";
1124    } else if (state == conn_immediate_close) {
1125        return "conn_immediate_close";
1126    } else if (state == conn_refresh_cbsasl) {
1127        return "conn_refresh_cbsasl";
1128    } else if (state == conn_refresh_ssl_certs) {
1129        return "conn_refresh_ssl_cert";
1130    } else {
1131        return "Unknown";
1132    }
1133}
1134
1135/*
1136 * Sets a connection's current state in the state machine. Any special
1137 * processing that needs to happen on certain state transitions can
1138 * happen here.
1139 */
1140void conn_set_state(conn *c, STATE_FUNC state) {
1141    cb_assert(c != NULL);
1142
1143    if (state != c->state) {
1144        /*
1145         * The connections in the "tap thread" behaves differently than
1146         * normal connections because they operate in a full duplex mode.
1147         * New messages may appear from both sides, so we can't block on
1148         * read from the nework / engine
1149         */
1150        if (c->tap_iterator != NULL || c->dcp) {
1151            if (state == conn_waiting) {
1152                c->which = EV_WRITE;
1153                state = conn_ship_log;
1154            }
1155        }
1156
1157        if (settings.verbose > 2 || c->state == conn_closing
1158            || c->state == conn_setup_tap_stream) {
1159            settings.extensions.logger->log(EXTENSION_LOG_DETAIL, c,
1160                                            "%d: going from %s to %s\n",
1161                                            c->sfd, state_text(c->state),
1162                                            state_text(state));
1163        }
1164
1165        if (state == conn_write || state == conn_mwrite) {
1166            if (c->start != 0) {
1167                collect_timing(c->cmd, gethrtime() - c->start);
1168                c->start = 0;
1169            }
1170            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
1171        }
1172
1173        c->state = state;
1174    }
1175}
1176
1177/*
1178 * Ensures that there is room for another struct iovec in a connection's
1179 * iov list.
1180 *
1181 * Returns 0 on success, -1 on out-of-memory.
1182 */
1183static int ensure_iov_space(conn *c) {
1184    cb_assert(c != NULL);
1185
1186    if (c->iovused >= c->iovsize) {
1187        int i, iovnum;
1188        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
1189                                (c->iovsize * 2) * sizeof(struct iovec));
1190        if (! new_iov)
1191            return -1;
1192        c->iov = new_iov;
1193        c->iovsize *= 2;
1194
1195        /* Point all the msghdr structures at the new list. */
1196        for (i = 0, iovnum = 0; i < c->msgused; i++) {
1197            c->msglist[i].msg_iov = &c->iov[iovnum];
1198            iovnum += c->msglist[i].msg_iovlen;
1199        }
1200    }
1201
1202    return 0;
1203}
1204
1205
1206/*
1207 * Adds data to the list of pending data that will be written out to a
1208 * connection.
1209 *
1210 * Returns 0 on success, -1 on out-of-memory.
1211 */
1212
1213static int add_iov(conn *c, const void *buf, size_t len) {
1214    struct msghdr *m;
1215    size_t leftover;
1216    bool limit_to_mtu;
1217
1218    cb_assert(c != NULL);
1219
1220    if (len == 0) {
1221        return 0;
1222    }
1223
1224    do {
1225        m = &c->msglist[c->msgused - 1];
1226
1227        /*
1228         * Limit the first payloads of TCP replies, to
1229         * UDP_MAX_PAYLOAD_SIZE bytes.
1230         */
1231        limit_to_mtu = (1 == c->msgused);
1232
1233        /* We may need to start a new msghdr if this one is full. */
1234        if (m->msg_iovlen == IOV_MAX ||
1235            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
1236            add_msghdr(c);
1237        }
1238
1239        if (ensure_iov_space(c) != 0)
1240            return -1;
1241
1242        /* If the fragment is too big to fit in the datagram, split it up */
1243        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
1244            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
1245            len -= leftover;
1246        } else {
1247            leftover = 0;
1248        }
1249
1250        m = &c->msglist[c->msgused - 1];
1251        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
1252        m->msg_iov[m->msg_iovlen].iov_len = len;
1253
1254        c->msgbytes += (int)len;
1255        c->iovused++;
1256        m->msg_iovlen++;
1257
1258        buf = ((char *)buf) + len;
1259        len = leftover;
1260    } while (leftover > 0);
1261
1262    return 0;
1263}
1264
1265/**
1266 * get a pointer to the start of the request struct for the current command
1267 */
1268static void* binary_get_request(conn *c) {
1269    char *ret = c->rcurr;
1270    ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
1271            c->binary_header.request.extlen);
1272
1273    cb_assert(ret >= c->rbuf);
1274    return ret;
1275}
1276
1277/**
1278 * get a pointer to the key in this request
1279 */
1280static char* binary_get_key(conn *c) {
1281    return c->rcurr - (c->binary_header.request.keylen);
1282}
1283
1284/**
1285 * Insert a key into a buffer, but replace all non-printable characters
1286 * with a '.'.
1287 *
1288 * @param dest where to store the output
1289 * @param destsz size of destination buffer
1290 * @param prefix string to insert before the data
1291 * @param client the client we are serving
1292 * @param from_client set to true if this data is from the client
1293 * @param key the key to add to the buffer
1294 * @param nkey the number of bytes in the key
1295 * @return number of bytes in dest if success, -1 otherwise
1296 */
1297static ssize_t key_to_printable_buffer(char *dest, size_t destsz,
1298                                       SOCKET client, bool from_client,
1299                                       const char *prefix,
1300                                       const char *key,
1301                                       size_t nkey)
1302{
1303    char *ptr;
1304    ssize_t ii;
1305    ssize_t nw = snprintf(dest, destsz, "%c%d %s ", from_client ? '>' : '<',
1306                          (int)client, prefix);
1307    if (nw == -1) {
1308        return -1;
1309    }
1310
1311    ptr = dest + nw;
1312    destsz -= nw;
1313    if (nkey > destsz) {
1314        nkey = destsz;
1315    }
1316
1317    for (ii = 0; ii < nkey; ++ii, ++key, ++ptr) {
1318        if (isgraph(*key)) {
1319            *ptr = *key;
1320        } else {
1321            *ptr = '.';
1322        }
1323    }
1324
1325    *ptr = '\0';
1326    return (ssize_t)(ptr - dest);
1327}
1328
1329/**
1330 * Convert a byte array to a text string
1331 *
1332 * @param dest where to store the output
1333 * @param destsz size of destination buffer
1334 * @param prefix string to insert before the data
1335 * @param client the client we are serving
1336 * @param from_client set to true if this data is from the client
1337 * @param data the data to add to the buffer
1338 * @param size the number of bytes in data to print
1339 * @return number of bytes in dest if success, -1 otherwise
1340 */
1341static ssize_t bytes_to_output_string(char *dest, size_t destsz,
1342                                      SOCKET client, bool from_client,
1343                                      const char *prefix,
1344                                      const char *data,
1345                                      size_t size)
1346{
1347    ssize_t nw = snprintf(dest, destsz, "%c%d %s", from_client ? '>' : '<',
1348                          (int)client, prefix);
1349    ssize_t offset = nw;
1350    ssize_t ii;
1351
1352    if (nw == -1) {
1353        return -1;
1354    }
1355
1356    for (ii = 0; ii < size; ++ii) {
1357        if (ii % 4 == 0) {
1358            if ((nw = snprintf(dest + offset, destsz - offset, "\n%c%d  ",
1359                               from_client ? '>' : '<', client)) == -1) {
1360                return  -1;
1361            }
1362            offset += nw;
1363        }
1364        if ((nw = snprintf(dest + offset, destsz - offset,
1365                           " 0x%02x", (unsigned char)data[ii])) == -1) {
1366            return -1;
1367        }
1368        offset += nw;
1369    }
1370
1371    if ((nw = snprintf(dest + offset, destsz - offset, "\n")) == -1) {
1372        return -1;
1373    }
1374
1375    return offset + nw;
1376}
1377
1378static int add_bin_header(conn *c,
1379                          uint16_t err,
1380                          uint8_t hdr_len,
1381                          uint16_t key_len,
1382                          uint32_t body_len,
1383                          uint8_t datatype) {
1384    protocol_binary_response_header* header;
1385
1386    cb_assert(c);
1387
1388    c->msgcurr = 0;
1389    c->msgused = 0;
1390    c->iovused = 0;
1391    if (add_msghdr(c) != 0) {
1392        return -1;
1393    }
1394
1395    header = (protocol_binary_response_header *)c->wbuf;
1396
1397    header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1398    header->response.opcode = c->binary_header.request.opcode;
1399    header->response.keylen = (uint16_t)htons(key_len);
1400
1401    header->response.extlen = (uint8_t)hdr_len;
1402    header->response.datatype = datatype;
1403    header->response.status = (uint16_t)htons(err);
1404
1405    header->response.bodylen = htonl(body_len);
1406    header->response.opaque = c->opaque;
1407    header->response.cas = htonll(c->cas);
1408
1409    if (settings.verbose > 1) {
1410        char buffer[1024];
1411        if (bytes_to_output_string(buffer, sizeof(buffer), c->sfd, false,
1412                                   "Writing bin response:",
1413                                   (const char*)header->bytes,
1414                                   sizeof(header->bytes)) != -1) {
1415            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1416                                            "%s", buffer);
1417        }
1418    }
1419
1420    return add_iov(c, c->wbuf, sizeof(header->response));
1421}
1422
1423/**
1424 * Convert an error code generated from the storage engine to the corresponding
1425 * error code used by the protocol layer.
1426 * @param e the error code as used in the engine
1427 * @return the error code as used by the protocol layer
1428 */
1429static protocol_binary_response_status engine_error_2_protocol_error(ENGINE_ERROR_CODE e) {
1430    protocol_binary_response_status ret;
1431
1432    switch (e) {
1433    case ENGINE_SUCCESS:
1434        return PROTOCOL_BINARY_RESPONSE_SUCCESS;
1435    case ENGINE_KEY_ENOENT:
1436        return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1437    case ENGINE_KEY_EEXISTS:
1438        return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1439    case ENGINE_ENOMEM:
1440        return PROTOCOL_BINARY_RESPONSE_ENOMEM;
1441    case ENGINE_TMPFAIL:
1442        return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1443    case ENGINE_NOT_STORED:
1444        return PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1445    case ENGINE_EINVAL:
1446        return PROTOCOL_BINARY_RESPONSE_EINVAL;
1447    case ENGINE_ENOTSUP:
1448        return PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED;
1449    case ENGINE_E2BIG:
1450        return PROTOCOL_BINARY_RESPONSE_E2BIG;
1451    case ENGINE_NOT_MY_VBUCKET:
1452        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1453    case ENGINE_ERANGE:
1454        return PROTOCOL_BINARY_RESPONSE_ERANGE;
1455    case ENGINE_ROLLBACK:
1456        return PROTOCOL_BINARY_RESPONSE_ROLLBACK;
1457    default:
1458        ret = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1459    }
1460
1461    return ret;
1462}
1463
1464static ENGINE_ERROR_CODE get_vb_map_cb(const void *cookie,
1465                                       const void *map,
1466                                       size_t mapsize)
1467{
1468    char *buf;
1469    conn *c = (conn*)cookie;
1470    protocol_binary_response_header header;
1471    size_t needed = mapsize+ sizeof(protocol_binary_response_header);
1472    if (!grow_dynamic_buffer(c, needed)) {
1473        if (settings.verbose > 0) {
1474            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1475                    "<%d ERROR: Failed to allocate memory for response\n",
1476                    c->sfd);
1477        }
1478        return ENGINE_ENOMEM;
1479    }
1480
1481    buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1482    memset(&header, 0, sizeof(header));
1483
1484    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1485    header.response.opcode = c->binary_header.request.opcode;
1486    header.response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET);
1487    header.response.bodylen = htonl((uint32_t)mapsize);
1488    header.response.opaque = c->opaque;
1489
1490    memcpy(buf, header.bytes, sizeof(header.response));
1491    buf += sizeof(header.response);
1492    memcpy(buf, map, mapsize);
1493    c->dynamic_buffer.offset += needed;
1494
1495    return ENGINE_SUCCESS;
1496}
1497
1498static void write_bin_packet(conn *c, protocol_binary_response_status err, int swallow) {
1499    if (err == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
1500        ENGINE_ERROR_CODE ret;
1501        cb_assert(swallow == 0);
1502
1503        ret = settings.engine.v1->get_engine_vb_map(settings.engine.v0, c,
1504                                                    get_vb_map_cb);
1505        if (ret == ENGINE_SUCCESS) {
1506            write_and_free(c, c->dynamic_buffer.buffer,
1507                           c->dynamic_buffer.offset);
1508            c->dynamic_buffer.buffer = NULL;
1509        } else {
1510            conn_set_state(c, conn_closing);
1511        }
1512    } else {
1513        ssize_t len = 0;
1514        const char *errtext = NULL;
1515
1516        if (err != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1517            errtext = memcached_protocol_errcode_2_text(err);
1518            if (errtext != NULL) {
1519                len = (ssize_t)strlen(errtext);
1520            }
1521        }
1522
1523        if (errtext && settings.verbose > 1) {
1524            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1525                                            ">%d Writing an error: %s\n", c->sfd,
1526                                            errtext);
1527        }
1528
1529        add_bin_header(c, err, 0, 0, len, PROTOCOL_BINARY_RAW_BYTES);
1530        if (errtext) {
1531            add_iov(c, errtext, len);
1532        }
1533        conn_set_state(c, conn_mwrite);
1534        if (swallow > 0) {
1535            c->sbytes = swallow;
1536            c->write_and_go = conn_swallow;
1537        } else {
1538            c->write_and_go = conn_new_cmd;
1539        }
1540    }
1541}
1542
1543/* Form and send a response to a command over the binary protocol */
1544static void write_bin_response(conn *c, const void *d, int hlen, int keylen, int dlen) {
1545    if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1546        c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1547        if (add_bin_header(c, 0, hlen, keylen, dlen, PROTOCOL_BINARY_RAW_BYTES) == -1) {
1548            conn_set_state(c, conn_closing);
1549            return;
1550        }
1551        add_iov(c, d, dlen);
1552        conn_set_state(c, conn_mwrite);
1553        c->write_and_go = conn_new_cmd;
1554    } else {
1555        if (c->start != 0) {
1556            collect_timing(c->cmd, gethrtime() - c->start);
1557            c->start = 0;
1558        }
1559        conn_set_state(c, conn_new_cmd);
1560    }
1561}
1562
1563static void complete_update_bin(conn *c) {
1564    protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1565    ENGINE_ERROR_CODE ret;
1566    item *it;
1567    item_info_holder info;
1568
1569    cb_assert(c != NULL);
1570    it = c->item;
1571    memset(&info, 0, sizeof(info));
1572    info.info.nvalue = 1;
1573    if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
1574                                           (void*)&info)) {
1575        settings.engine.v1->release(settings.engine.v0, c, it);
1576        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1577                                        "%d: Failed to get item info",
1578                                        c->sfd);
1579        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1580        return;
1581    }
1582
1583    ret = c->aiostat;
1584    c->aiostat = ENGINE_SUCCESS;
1585    if (ret == ENGINE_SUCCESS) {
1586        if (!c->supports_datatype) {
1587            if (checkUTF8JSON((void*)info.info.value[0].iov_base,
1588                              (int)info.info.value[0].iov_len)) {
1589                info.info.datatype = PROTOCOL_BINARY_DATATYPE_JSON;
1590                if (!settings.engine.v1->set_item_info(settings.engine.v0, c,
1591                                                       it, &info.info)) {
1592                    settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1593                            "%d: Failed to set item info",
1594                            c->sfd);
1595                }
1596            }
1597        }
1598        ret = settings.engine.v1->store(settings.engine.v0, c,
1599                                        it, &c->cas, c->store_op,
1600                                        c->binary_header.request.vbucket);
1601    }
1602
1603#ifdef ENABLE_DTRACE
1604    switch (c->cmd) {
1605    case OPERATION_ADD:
1606        MEMCACHED_COMMAND_ADD(c->sfd, info.info.key, info.info.nkey,
1607                              (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1608        break;
1609    case OPERATION_REPLACE:
1610        MEMCACHED_COMMAND_REPLACE(c->sfd, info.info.key, info.info.nkey,
1611                                  (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1612        break;
1613    case OPERATION_APPEND:
1614        MEMCACHED_COMMAND_APPEND(c->sfd, info.info.key, info.info.nkey,
1615                                 (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1616        break;
1617    case OPERATION_PREPEND:
1618        MEMCACHED_COMMAND_PREPEND(c->sfd, info.info.key, info.info.nkey,
1619                                  (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1620        break;
1621    case OPERATION_SET:
1622        MEMCACHED_COMMAND_SET(c->sfd, info.info.key, info.info.nkey,
1623                              (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1624        break;
1625    }
1626#endif
1627
1628    switch (ret) {
1629    case ENGINE_SUCCESS:
1630        /* Stored */
1631        write_bin_response(c, NULL, 0, 0, 0);
1632        break;
1633    case ENGINE_KEY_EEXISTS:
1634        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1635        break;
1636    case ENGINE_KEY_ENOENT:
1637        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1638        break;
1639    case ENGINE_ENOMEM:
1640        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1641        break;
1642    case ENGINE_TMPFAIL:
1643        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1644        break;
1645    case ENGINE_EWOULDBLOCK:
1646        c->ewouldblock = true;
1647        break;
1648    case ENGINE_DISCONNECT:
1649        c->state = conn_closing;
1650        break;
1651    case ENGINE_ENOTSUP:
1652        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1653        break;
1654    case ENGINE_NOT_MY_VBUCKET:
1655        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1656        break;
1657    case ENGINE_E2BIG:
1658        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, 0);
1659        break;
1660    default:
1661        if (c->store_op == OPERATION_ADD) {
1662            eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1663        } else if(c->store_op == OPERATION_REPLACE) {
1664            eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1665        } else {
1666            eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1667        }
1668        write_bin_packet(c, eno, 0);
1669    }
1670
1671    if (c->store_op == OPERATION_CAS) {
1672        switch (ret) {
1673        case ENGINE_SUCCESS:
1674            SLAB_INCR(c, cas_hits, info.info.key, info.info.nkey);
1675            break;
1676        case ENGINE_KEY_EEXISTS:
1677            SLAB_INCR(c, cas_badval, info.info.key, info.info.nkey);
1678            break;
1679        case ENGINE_KEY_ENOENT:
1680            STATS_NOKEY(c, cas_misses);
1681            break;
1682        default:
1683            ;
1684        }
1685    } else {
1686        SLAB_INCR(c, cmd_set, info.info.key, info.info.nkey);
1687    }
1688
1689    if (!c->ewouldblock) {
1690        /* release the c->item reference */
1691        settings.engine.v1->release(settings.engine.v0, c, c->item);
1692        c->item = 0;
1693    }
1694}
1695
1696static void process_bin_get(conn *c) {
1697    item *it;
1698    protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1699    char* key = binary_get_key(c);
1700    size_t nkey = c->binary_header.request.keylen;
1701    uint16_t keylen;
1702    uint32_t bodylen;
1703    item_info_holder info;
1704    int ii;
1705    ENGINE_ERROR_CODE ret;
1706    uint8_t datatype;
1707    bool need_inflate = false;
1708
1709    memset(&info, 0, sizeof(info));
1710    if (settings.verbose > 1) {
1711        char buffer[1024];
1712        if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1713                                    "GET", key, nkey) != -1) {
1714            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
1715                                            buffer);
1716        }
1717    }
1718
1719    ret = c->aiostat;
1720    c->aiostat = ENGINE_SUCCESS;
1721    if (ret == ENGINE_SUCCESS) {
1722        ret = settings.engine.v1->get(settings.engine.v0, c, &it, key, (int)nkey,
1723                                      c->binary_header.request.vbucket);
1724    }
1725
1726    info.info.nvalue = IOV_MAX;
1727    switch (ret) {
1728    case ENGINE_SUCCESS:
1729        STATS_HIT(c, get, key, nkey);
1730
1731        if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
1732                                               (void*)&info)) {
1733            settings.engine.v1->release(settings.engine.v0, c, it);
1734            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1735                                            "%d: Failed to get item info",
1736                                            c->sfd);
1737            write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1738            break;
1739        }
1740
1741        datatype = info.info.datatype;
1742        if (!c->supports_datatype) {
1743            if ((datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) == PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
1744                need_inflate = true;
1745            } else {
1746                datatype = PROTOCOL_BINARY_RAW_BYTES;
1747            }
1748        }
1749
1750        keylen = 0;
1751        bodylen = sizeof(rsp->message.body) + info.info.nbytes;
1752
1753        if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1754            bodylen += (uint32_t)nkey;
1755            keylen = (uint16_t)nkey;
1756        }
1757
1758        if (need_inflate) {
1759            if (info.info.nvalue != 1) {
1760                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1761            } else if (binary_response_handler(key, keylen,
1762                                               &info.info.flags, 4,
1763                                               info.info.value[0].iov_base,
1764                                               (uint32_t)info.info.value[0].iov_len,
1765                                               datatype,
1766                                               PROTOCOL_BINARY_RESPONSE_SUCCESS,
1767                                               info.info.cas, c)) {
1768                write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
1769                c->dynamic_buffer.buffer = NULL;
1770                settings.engine.v1->release(settings.engine.v0, c, it);
1771            } else {
1772                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1773            }
1774        } else {
1775            if (add_bin_header(c, 0, sizeof(rsp->message.body),
1776                               keylen, bodylen, datatype) == -1) {
1777                conn_set_state(c, conn_closing);
1778                return;
1779            }
1780            rsp->message.header.response.cas = htonll(info.info.cas);
1781
1782            /* add the flags */
1783            rsp->message.body.flags = info.info.flags;
1784            add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1785
1786            if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1787                add_iov(c, info.info.key, nkey);
1788            }
1789
1790            for (ii = 0; ii < info.info.nvalue; ++ii) {
1791                add_iov(c, info.info.value[ii].iov_base,
1792                        info.info.value[ii].iov_len);
1793            }
1794            conn_set_state(c, conn_mwrite);
1795            /* Remember this item so we can garbage collect it later */
1796            c->item = it;
1797        }
1798        break;
1799    case ENGINE_KEY_ENOENT:
1800        STATS_MISS(c, get, key, nkey);
1801
1802        MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1803
1804        if (c->noreply) {
1805            conn_set_state(c, conn_new_cmd);
1806        } else {
1807            if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1808                char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1809                if (add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1810                                   0, (uint16_t)nkey,
1811                                   (uint32_t)nkey, PROTOCOL_BINARY_RAW_BYTES) == -1) {
1812                    conn_set_state(c, conn_closing);
1813                    return;
1814                }
1815                memcpy(ofs, key, nkey);
1816                add_iov(c, ofs, nkey);
1817                conn_set_state(c, conn_mwrite);
1818            } else {
1819                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1820            }
1821        }
1822        break;
1823    case ENGINE_EWOULDBLOCK:
1824        c->ewouldblock = true;
1825        break;
1826    case ENGINE_DISCONNECT:
1827        c->state = conn_closing;
1828        break;
1829    default:
1830        write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
1831    }
1832
1833    if (settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) {
1834        stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS);
1835    }
1836}
1837
1838static void append_bin_stats(const char *key, const uint16_t klen,
1839                             const char *val, const uint32_t vlen,
1840                             conn *c) {
1841    char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1842    uint32_t bodylen = klen + vlen;
1843    protocol_binary_response_header header;
1844
1845    memset(&header, 0, sizeof(header));
1846    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1847    header.response.opcode = PROTOCOL_BINARY_CMD_STAT;
1848    header.response.keylen = (uint16_t)htons(klen);
1849    header.response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
1850    header.response.bodylen = htonl(bodylen);
1851    header.response.opaque = c->opaque;
1852
1853    memcpy(buf, header.bytes, sizeof(header.response));
1854    buf += sizeof(header.response);
1855
1856    if (klen > 0) {
1857        cb_assert(key != NULL);
1858        memcpy(buf, key, klen);
1859        buf += klen;
1860
1861        if (vlen > 0) {
1862            memcpy(buf, val, vlen);
1863        }
1864    }
1865
1866    c->dynamic_buffer.offset += sizeof(header.response) + bodylen;
1867}
1868
1869static bool grow_dynamic_buffer(conn *c, size_t needed) {
1870    size_t nsize = c->dynamic_buffer.size;
1871    size_t available = nsize - c->dynamic_buffer.offset;
1872    bool rv = true;
1873
1874    /* Special case: No buffer -- need to allocate fresh */
1875    if (c->dynamic_buffer.buffer == NULL) {
1876        nsize = 1024;
1877        available = c->dynamic_buffer.size = c->dynamic_buffer.offset = 0;
1878    }
1879
1880    while (needed > available) {
1881        cb_assert(nsize > 0);
1882        nsize = nsize << 1;
1883        available = nsize - c->dynamic_buffer.offset;
1884    }
1885
1886    if (nsize != c->dynamic_buffer.size) {
1887        char *ptr = realloc(c->dynamic_buffer.buffer, nsize);
1888        if (ptr) {
1889            c->dynamic_buffer.buffer = ptr;
1890            c->dynamic_buffer.size = nsize;
1891        } else {
1892            rv = false;
1893        }
1894    }
1895
1896    return rv;
1897}
1898
1899static void append_stats(const char *key, const uint16_t klen,
1900                         const char *val, const uint32_t vlen,
1901                         const void *cookie)
1902{
1903    size_t needed;
1904    conn *c = (conn*)cookie;
1905    /* value without a key is invalid */
1906    if (klen == 0 && vlen > 0) {
1907        return ;
1908    }
1909
1910    needed = vlen + klen + sizeof(protocol_binary_response_header);
1911    if (!grow_dynamic_buffer(c, needed)) {
1912        return ;
1913    }
1914    append_bin_stats(key, klen, val, vlen, c);
1915    cb_assert(c->dynamic_buffer.offset <= c->dynamic_buffer.size);
1916}
1917
1918static void bin_read_chunk(conn *c,
1919                           enum bin_substates next_substate,
1920                           uint32_t chunk) {
1921    ptrdiff_t offset;
1922    cb_assert(c);
1923    c->substate = next_substate;
1924    c->rlbytes = chunk;
1925
1926    /* Ok... do we have room for everything in our buffer? */
1927    offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1928    if (c->rlbytes > c->rsize - offset) {
1929        size_t nsize = c->rsize;
1930        size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
1931
1932        while (size > nsize) {
1933            nsize *= 2;
1934        }
1935
1936        if (nsize != c->rsize) {
1937            char *newm;
1938            if (settings.verbose > 1) {
1939                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1940                        "%d: Need to grow buffer from %lu to %lu\n",
1941                        c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1942            }
1943            newm = realloc(c->rbuf, nsize);
1944            if (newm == NULL) {
1945                if (settings.verbose) {
1946                    settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1947                            "%d: Failed to grow buffer.. closing connection\n",
1948                            c->sfd);
1949                }
1950                conn_set_state(c, conn_closing);
1951                return;
1952            }
1953
1954            c->rbuf= newm;
1955            /* rcurr should point to the same offset in the packet */
1956            c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1957            c->rsize = (int)nsize;
1958        }
1959        if (c->rbuf != c->rcurr) {
1960            memmove(c->rbuf, c->rcurr, c->rbytes);
1961            c->rcurr = c->rbuf;
1962            if (settings.verbose > 1) {
1963                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1964                                                "%d: Repack input buffer\n",
1965                                                c->sfd);
1966            }
1967        }
1968    }
1969
1970    /* preserve the header in the buffer.. */
1971    c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1972    conn_set_state(c, conn_nread);
1973}
1974
1975static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1976    bin_read_chunk(c, next_substate, c->keylen + extra);
1977}
1978
1979
1980/* Just write an error message and disconnect the client */
1981static void handle_binary_protocol_error(conn *c) {
1982    write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1983    if (settings.verbose) {
1984        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1985                "%d: Protocol error (opcode %02x), close connection\n",
1986                c->sfd, c->binary_header.request.opcode);
1987    }
1988    c->write_and_go = conn_closing;
1989}
1990
1991static void get_auth_data(const void *cookie, auth_data_t *data) {
1992    conn *c = (conn*)cookie;
1993    if (c->sasl_conn) {
1994        cbsasl_getprop(c->sasl_conn, CBSASL_USERNAME, (void*)&data->username);
1995        cbsasl_getprop(c->sasl_conn, CBSASL_CONFIG, (void*)&data->config);
1996    } else {
1997        data->username = NULL;
1998        data->config = NULL;
1999    }
2000}
2001
2002struct sasl_tmp {
2003    int ksize;
2004    int vsize;
2005    char data[1]; /* data + ksize == value */
2006};
2007
2008static void process_bin_sasl_auth(conn *c) {
2009    int nkey;
2010    int vlen;
2011    char *key;
2012    size_t buffer_size;
2013    struct sasl_tmp *data;
2014
2015    cb_assert(c->binary_header.request.extlen == 0);
2016    nkey = c->binary_header.request.keylen;
2017    vlen = c->binary_header.request.bodylen - nkey;
2018
2019    if (nkey > MAX_SASL_MECH_LEN) {
2020        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
2021        c->write_and_go = conn_swallow;
2022        return;
2023    }
2024
2025    key = binary_get_key(c);
2026    cb_assert(key);
2027
2028    buffer_size = sizeof(struct sasl_tmp) + nkey + vlen + 2;
2029    data = calloc(sizeof(struct sasl_tmp) + buffer_size, 1);
2030    if (!data) {
2031        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2032        c->write_and_go = conn_swallow;
2033        return;
2034    }
2035
2036    data->ksize = nkey;
2037    data->vsize = vlen;
2038    memcpy(data->data, key, nkey);
2039
2040    c->item = data;
2041    c->ritem = data->data + nkey;
2042    c->rlbytes = vlen;
2043    conn_set_state(c, conn_nread);
2044    c->substate = bin_reading_sasl_auth_data;
2045}
2046
2047static void process_bin_complete_sasl_auth(conn *c) {
2048    auth_data_t data;
2049    const char *out = NULL;
2050    unsigned int outlen = 0;
2051    int nkey;
2052    int vlen;
2053    struct sasl_tmp *stmp;
2054    char mech[1024];
2055    const char *challenge;
2056    int result=-1;
2057
2058    cb_assert(c->item);
2059
2060    nkey = c->binary_header.request.keylen;
2061    if (nkey > 1023) {
2062        /* too big.. */
2063        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2064                "%d: sasl error. key: %d > 1023", c->sfd, nkey);
2065        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2066        return;
2067    }
2068    vlen = c->binary_header.request.bodylen - nkey;
2069
2070    stmp = c->item;
2071    memcpy(mech, stmp->data, nkey);
2072    mech[nkey] = 0x00;
2073
2074    if (settings.verbose) {
2075        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2076                "%d: mech: ``%s'' with %d bytes of data\n", c->sfd, mech, vlen);
2077    }
2078
2079    challenge = vlen == 0 ? NULL : (stmp->data + nkey);
2080    switch (c->cmd) {
2081    case PROTOCOL_BINARY_CMD_SASL_AUTH:
2082        result = cbsasl_server_start(&c->sasl_conn, mech,
2083                                     challenge, vlen,
2084                                     (unsigned char **)&out, &outlen);
2085        break;
2086    case PROTOCOL_BINARY_CMD_SASL_STEP:
2087        result = cbsasl_server_step(c->sasl_conn, challenge,
2088                                    vlen, &out, &outlen);
2089        break;
2090    default:
2091        cb_assert(false); /* CMD should be one of the above */
2092        /* This code is pretty much impossible, but makes the compiler
2093           happier */
2094        if (settings.verbose) {
2095            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2096                    "%d: Unhandled command %d with challenge %s\n",
2097                    c->sfd, c->cmd, challenge);
2098        }
2099        break;
2100    }
2101
2102    free(c->item);
2103    c->item = NULL;
2104    c->ritem = NULL;
2105
2106    if (settings.verbose) {
2107        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2108                                        "%d: sasl result code:  %d\n",
2109                                        c->sfd, result);
2110    }
2111
2112    switch(result) {
2113    case SASL_OK:
2114        write_bin_response(c, "Authenticated", 0, 0, (uint32_t)strlen("Authenticated"));
2115        get_auth_data(c, &data);
2116        if (settings.disable_admin) {
2117            /* "everyone is admins" */
2118            cookie_set_admin(c);
2119        } else if (settings.admin != NULL && data.username != NULL) {
2120            if (strcmp(settings.admin, data.username) == 0) {
2121                cookie_set_admin(c);
2122            }
2123        }
2124        perform_callbacks(ON_AUTH, (const void*)&data, c);
2125        STATS_NOKEY(c, auth_cmds);
2126        break;
2127    case SASL_CONTINUE:
2128        if (add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0,
2129                           outlen, PROTOCOL_BINARY_RAW_BYTES) == -1) {
2130            conn_set_state(c, conn_closing);
2131            return;
2132        }
2133        add_iov(c, out, outlen);
2134        conn_set_state(c, conn_mwrite);
2135        c->write_and_go = conn_new_cmd;
2136        break;
2137    case SASL_BADPARAM:
2138        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2139                                        "%d: Bad sasl params: %d\n",
2140                                        c->sfd, result);
2141        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2142        STATS_NOKEY2(c, auth_cmds, auth_errors);
2143        break;
2144    default:
2145        if (result == SASL_NOUSER || result == SASL_PWERR) {
2146            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2147                                            "%d: Invalid username/password combination",
2148                                            c->sfd);
2149        } else {
2150            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2151                                            "%d: Unknown sasl response: %d",
2152                                            c->sfd, result);
2153        }
2154        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2155        STATS_NOKEY2(c, auth_cmds, auth_errors);
2156    }
2157}
2158
2159static bool authenticated(conn *c) {
2160    bool rv = false;
2161
2162    switch (c->cmd) {
2163    case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
2164    case PROTOCOL_BINARY_CMD_SASL_AUTH:       /* FALLTHROUGH */
2165    case PROTOCOL_BINARY_CMD_SASL_STEP:       /* FALLTHROUGH */
2166    case PROTOCOL_BINARY_CMD_VERSION:         /* FALLTHROUGH */
2167    case PROTOCOL_BINARY_CMD_HELLO:
2168        rv = true;
2169        break;
2170    default:
2171        if (c->sasl_conn) {
2172            const void *uname = NULL;
2173            cbsasl_getprop(c->sasl_conn, CBSASL_USERNAME, &uname);
2174            rv = uname != NULL;
2175        }
2176    }
2177
2178    if (settings.verbose > 1) {
2179        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2180                "%d: authenticated() in cmd 0x%02x is %s\n",
2181                c->sfd, c->cmd, rv ? "true" : "false");
2182    }
2183
2184    return rv;
2185}
2186
2187bool binary_response_handler(const void *key, uint16_t keylen,
2188                             const void *ext, uint8_t extlen,
2189                             const void *body, uint32_t bodylen,
2190                             uint8_t datatype, uint16_t status,
2191                             uint64_t cas, const void *cookie)
2192{
2193    protocol_binary_response_header header;
2194    char *buf;
2195    conn *c = (conn*)cookie;
2196    /* Look at append_bin_stats */
2197    size_t needed;
2198    bool need_inflate = false;
2199    size_t inflated_length;
2200
2201    if (!c->supports_datatype) {
2202        if ((datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) == PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
2203            need_inflate = true;
2204        }
2205        /* We may silently drop the knowledge about a JSON item */
2206        datatype = PROTOCOL_BINARY_RAW_BYTES;
2207    }
2208
2209    needed = keylen + extlen + sizeof(protocol_binary_response_header);
2210    if (need_inflate) {
2211        if (snappy_uncompressed_length(body, bodylen,
2212                                       &inflated_length) != SNAPPY_OK) {
2213            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2214                    "<%d ERROR: Failed to determine inflated size",
2215                    c->sfd);
2216            return false;
2217        }
2218        needed += inflated_length;
2219    } else {
2220        needed += bodylen;
2221    }
2222
2223    if (!grow_dynamic_buffer(c, needed)) {
2224        if (settings.verbose > 0) {
2225            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2226                    "<%d ERROR: Failed to allocate memory for response",
2227                    c->sfd);
2228        }
2229        return false;
2230    }
2231
2232    buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
2233    memset(&header, 0, sizeof(header));
2234    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
2235    header.response.opcode = c->binary_header.request.opcode;
2236    header.response.keylen = (uint16_t)htons(keylen);
2237    header.response.extlen = extlen;
2238    header.response.datatype = datatype;
2239    header.response.status = (uint16_t)htons(status);
2240    if (need_inflate) {
2241        header.response.bodylen = htonl((uint32_t)(inflated_length + keylen + extlen));
2242    } else {
2243        header.response.bodylen = htonl(bodylen + keylen + extlen);
2244    }
2245    header.response.opaque = c->opaque;
2246    header.response.cas = htonll(cas);
2247
2248    memcpy(buf, header.bytes, sizeof(header.response));
2249    buf += sizeof(header.response);
2250
2251    if (extlen > 0) {
2252        memcpy(buf, ext, extlen);
2253        buf += extlen;
2254    }
2255
2256    if (keylen > 0) {
2257        cb_assert(key != NULL);
2258        memcpy(buf, key, keylen);
2259        buf += keylen;
2260    }
2261
2262    if (bodylen > 0) {
2263        if (need_inflate) {
2264            if (snappy_uncompress(body, bodylen, buf, &inflated_length) != SNAPPY_OK) {
2265                settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2266                        "<%d ERROR: Failed to inflate item", c->sfd);
2267                return false;
2268            }
2269        } else {
2270            memcpy(buf, body, bodylen);
2271        }
2272    }
2273
2274    c->dynamic_buffer.offset += needed;
2275    return true;
2276}
2277
2278/**
2279 * Tap stats (these are only used by the tap thread, so they don't need
2280 * to be in the threadlocal struct right now...
2281 */
2282struct tap_cmd_stats {
2283    uint64_t connect;
2284    uint64_t mutation;
2285    uint64_t checkpoint_start;
2286    uint64_t checkpoint_end;
2287    uint64_t delete;
2288    uint64_t flush;
2289    uint64_t opaque;
2290    uint64_t vbucket_set;
2291};
2292
2293struct tap_stats {
2294    cb_mutex_t mutex;
2295    struct tap_cmd_stats sent;
2296    struct tap_cmd_stats received;
2297} tap_stats;
2298
2299static void ship_tap_log(conn *c) {
2300    bool more_data = true;
2301    bool send_data = false;
2302    bool disconnect = false;
2303    item *it;
2304    uint32_t bodylen;
2305    int ii = 0;
2306
2307    c->msgcurr = 0;
2308    c->msgused = 0;
2309    c->iovused = 0;
2310    if (add_msghdr(c) != 0) {
2311        if (settings.verbose) {
2312            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2313                                            "%d: Failed to create output headers. Shutting down tap connection\n", c->sfd);
2314        }
2315        conn_set_state(c, conn_closing);
2316        return ;
2317    }
2318    /* @todo add check for buffer overflow of c->wbuf) */
2319    c->wbytes = 0;
2320    c->wcurr = c->wbuf;
2321    c->icurr = c->ilist;
2322    do {
2323        /* @todo fixme! */
2324        void *engine;
2325        uint16_t nengine;
2326        uint8_t ttl;
2327        uint16_t tap_flags;
2328        uint32_t seqno;
2329        uint16_t vbucket;
2330        tap_event_t event;
2331        bool inflate = false;
2332        size_t inflated_length = 0;
2333
2334        union {
2335            protocol_binary_request_tap_mutation mutation;
2336            protocol_binary_request_tap_delete delete;
2337            protocol_binary_request_tap_flush flush;
2338            protocol_binary_request_tap_opaque opaque;
2339            protocol_binary_request_noop noop;
2340        } msg;
2341        item_info_holder info;
2342        memset(&info, 0, sizeof(info));
2343
2344        if (ii++ == 10) {
2345            break;
2346        }
2347
2348        event = c->tap_iterator(settings.engine.v0, c, &it,
2349                                            &engine, &nengine, &ttl,
2350                                            &tap_flags, &seqno, &vbucket);
2351        memset(&msg, 0, sizeof(msg));
2352        msg.opaque.message.header.request.magic = (uint8_t)PROTOCOL_BINARY_REQ;
2353        msg.opaque.message.header.request.opaque = htonl(seqno);
2354        msg.opaque.message.body.tap.enginespecific_length = htons(nengine);
2355        msg.opaque.message.body.tap.ttl = ttl;
2356        msg.opaque.message.body.tap.flags = htons(tap_flags);
2357        msg.opaque.message.header.request.extlen = 8;
2358        msg.opaque.message.header.request.vbucket = htons(vbucket);
2359        info.info.nvalue = IOV_MAX;
2360
2361        switch (event) {
2362        case TAP_NOOP :
2363            send_data = true;
2364            msg.noop.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
2365            msg.noop.message.header.request.extlen = 0;
2366            msg.noop.message.header.request.bodylen = htonl(0);
2367            memcpy(c->wcurr, msg.noop.bytes, sizeof(msg.noop.bytes));
2368            add_iov(c, c->wcurr, sizeof(msg.noop.bytes));
2369            c->wcurr += sizeof(msg.noop.bytes);
2370            c->wbytes += sizeof(msg.noop.bytes);
2371            break;
2372        case TAP_PAUSE :
2373            more_data = false;
2374            break;
2375        case TAP_CHECKPOINT_START:
2376        case TAP_CHECKPOINT_END:
2377        case TAP_MUTATION:
2378            if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
2379                                                   (void*)&info)) {
2380                settings.engine.v1->release(settings.engine.v0, c, it);
2381                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2382                                                "%d: Failed to get item info\n", c->sfd);
2383                break;
2384            }
2385            send_data = true;
2386            c->ilist[c->ileft++] = it;
2387
2388            if (event == TAP_CHECKPOINT_START) {
2389                msg.mutation.message.header.request.opcode =
2390                    PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START;
2391                cb_mutex_enter(&tap_stats.mutex);
2392                tap_stats.sent.checkpoint_start++;
2393                cb_mutex_exit(&tap_stats.mutex);
2394            } else if (event == TAP_CHECKPOINT_END) {
2395                msg.mutation.message.header.request.opcode =
2396                    PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END;
2397                cb_mutex_enter(&tap_stats.mutex);
2398                tap_stats.sent.checkpoint_end++;
2399                cb_mutex_exit(&tap_stats.mutex);
2400            } else if (event == TAP_MUTATION) {
2401                msg.mutation.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_MUTATION;
2402                cb_mutex_enter(&tap_stats.mutex);
2403                tap_stats.sent.mutation++;
2404                cb_mutex_exit(&tap_stats.mutex);
2405            }
2406
2407            msg.mutation.message.header.request.cas = htonll(info.info.cas);
2408            msg.mutation.message.header.request.keylen = htons(info.info.nkey);
2409            msg.mutation.message.header.request.extlen = 16;
2410            if (c->supports_datatype) {
2411                msg.mutation.message.header.request.datatype = info.info.datatype;
2412            } else {
2413                switch (info.info.datatype) {
2414                case 0:
2415                    break;
2416                case PROTOCOL_BINARY_DATATYPE_JSON:
2417                    break;
2418                case PROTOCOL_BINARY_DATATYPE_COMPRESSED:
2419                case PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON:
2420                    inflate = true;
2421                    break;
2422                default:
2423                    settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2424                                                    "%d: shipping data with"
2425                                                    " an invalid datatype "
2426                                                    "(stripping info)",
2427                                                    c->sfd);
2428                }
2429                msg.mutation.message.header.request.datatype = 0;
2430            }
2431
2432            bodylen = 16 + info.info.nkey + nengine;
2433            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2434                if (inflate) {
2435                    if (snappy_uncompressed_length(info.info.value[0].iov_base,
2436                                                   info.info.nbytes,
2437                                                   &inflated_length) == SNAPPY_OK) {
2438                        bodylen += inflated_length;
2439                    } else {
2440                        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2441                                                        "<%d ERROR: Failed to determine inflated size. Sending as compressed",
2442                                                        c->sfd);
2443                        inflate = false;
2444                        bodylen += info.info.nbytes;
2445                    }
2446                } else {
2447                    bodylen += info.info.nbytes;
2448                }
2449            }
2450            msg.mutation.message.header.request.bodylen = htonl(bodylen);
2451
2452            if ((tap_flags & TAP_FLAG_NETWORK_BYTE_ORDER) == 0) {
2453                msg.mutation.message.body.item.flags = htonl(info.info.flags);
2454            } else {
2455                msg.mutation.message.body.item.flags = info.info.flags;
2456            }
2457            msg.mutation.message.body.item.expiration = htonl(info.info.exptime);
2458            msg.mutation.message.body.tap.enginespecific_length = htons(nengine);
2459            msg.mutation.message.body.tap.ttl = ttl;
2460            msg.mutation.message.body.tap.flags = htons(tap_flags);
2461            memcpy(c->wcurr, msg.mutation.bytes, sizeof(msg.mutation.bytes));
2462
2463            add_iov(c, c->wcurr, sizeof(msg.mutation.bytes));
2464            c->wcurr += sizeof(msg.mutation.bytes);
2465            c->wbytes += sizeof(msg.mutation.bytes);
2466
2467            if (nengine > 0) {
2468                memcpy(c->wcurr, engine, nengine);
2469                add_iov(c, c->wcurr, nengine);
2470                c->wcurr += nengine;
2471                c->wbytes += nengine;
2472            }
2473
2474            add_iov(c, info.info.key, info.info.nkey);
2475            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2476                if (inflate) {
2477                    void *buf = malloc(inflated_length);
2478                    void *body = info.info.value[0].iov_base;
2479                    size_t bodylen = info.info.value[0].iov_len;
2480                    if (snappy_uncompress(body, bodylen,
2481                                          buf, &inflated_length) == SNAPPY_OK) {
2482                        c->temp_alloc_list[c->temp_alloc_left++] = buf;
2483
2484                        add_iov(c, buf, inflated_length);
2485                    } else {
2486                        free(buf);
2487                        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2488                                                        "%d: FATAL: failed to inflate object. shutitng down connection", c->sfd);
2489                        conn_set_state(c, conn_closing);
2490                        return;
2491                    }
2492                } else {
2493                    int xx;
2494                    for (xx = 0; xx < info.info.nvalue; ++xx) {
2495                        add_iov(c, info.info.value[xx].iov_base,
2496                                info.info.value[xx].iov_len);
2497                    }
2498                }
2499            }
2500
2501            break;
2502        case TAP_DELETION:
2503            /* This is a delete */
2504            if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
2505                                                   (void*)&info)) {
2506                settings.engine.v1->release(settings.engine.v0, c, it);
2507                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2508                                                "%d: Failed to get item info\n", c->sfd);
2509                break;
2510            }
2511            send_data = true;
2512            c->ilist[c->ileft++] = it;
2513            msg.delete.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_DELETE;
2514            msg.delete.message.header.request.cas = htonll(info.info.cas);
2515            msg.delete.message.header.request.keylen = htons(info.info.nkey);
2516
2517            bodylen = 8 + info.info.nkey + nengine;
2518            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2519                bodylen += info.info.nbytes;
2520            }
2521            msg.delete.message.header.request.bodylen = htonl(bodylen);
2522
2523            memcpy(c->wcurr, msg.delete.bytes, sizeof(msg.delete.bytes));
2524            add_iov(c, c->wcurr, sizeof(msg.delete.bytes));
2525            c->wcurr += sizeof(msg.delete.bytes);
2526            c->wbytes += sizeof(msg.delete.bytes);
2527
2528            if (nengine > 0) {
2529                memcpy(c->wcurr, engine, nengine);
2530                add_iov(c, c->wcurr, nengine);
2531                c->wcurr += nengine;
2532                c->wbytes += nengine;
2533            }
2534
2535            add_iov(c, info.info.key, info.info.nkey);
2536            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2537                int xx;
2538                for (xx = 0; xx < info.info.nvalue; ++xx) {
2539                    add_iov(c, info.info.value[xx].iov_base,
2540                            info.info.value[xx].iov_len);
2541                }
2542            }
2543
2544            cb_mutex_enter(&tap_stats.mutex);
2545            tap_stats.sent.delete++;
2546            cb_mutex_exit(&tap_stats.mutex);
2547            break;
2548
2549        case TAP_DISCONNECT:
2550            disconnect = true;
2551            more_data = false;
2552            break;
2553        case TAP_VBUCKET_SET:
2554        case TAP_FLUSH:
2555        case TAP_OPAQUE:
2556            send_data = true;
2557
2558            if (event == TAP_OPAQUE) {
2559                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_OPAQUE;
2560                cb_mutex_enter(&tap_stats.mutex);
2561                tap_stats.sent.opaque++;
2562                cb_mutex_exit(&tap_stats.mutex);
2563
2564            } else if (event == TAP_FLUSH) {
2565                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_FLUSH;
2566                cb_mutex_enter(&tap_stats.mutex);
2567                tap_stats.sent.flush++;
2568                cb_mutex_exit(&tap_stats.mutex);
2569            } else if (event == TAP_VBUCKET_SET) {
2570                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET;
2571                msg.flush.message.body.tap.flags = htons(tap_flags);
2572                cb_mutex_enter(&tap_stats.mutex);
2573                tap_stats.sent.vbucket_set++;
2574                cb_mutex_exit(&tap_stats.mutex);
2575            }
2576
2577            msg.flush.message.header.request.bodylen = htonl(8 + nengine);
2578            memcpy(c->wcurr, msg.flush.bytes, sizeof(msg.flush.bytes));
2579            add_iov(c, c->wcurr, sizeof(msg.flush.bytes));
2580            c->wcurr += sizeof(msg.flush.bytes);
2581            c->wbytes += sizeof(msg.flush.bytes);
2582            if (nengine > 0) {
2583                memcpy(c->wcurr, engine, nengine);
2584                add_iov(c, c->wcurr, nengine);
2585                c->wcurr += nengine;
2586                c->wbytes += nengine;
2587            }
2588            break;
2589        default:
2590            abort();
2591        }
2592    } while (more_data);
2593
2594    c->ewouldblock = false;
2595    if (send_data) {
2596        conn_set_state(c, conn_mwrite);
2597        if (disconnect) {
2598            c->write_and_go = conn_closing;
2599        } else {
2600            c->write_and_go = conn_ship_log;
2601        }
2602    } else {
2603        if (disconnect) {
2604            conn_set_state(c, conn_closing);
2605        } else {
2606            /* No more items to ship to the slave at this time.. suspend.. */
2607            if (settings.verbose > 1) {
2608                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2609                                                "%d: No more items in tap log.. waiting\n",
2610                                                c->sfd);
2611            }
2612            c->ewouldblock = true;
2613        }
2614    }
2615}
2616
2617static ENGINE_ERROR_CODE default_unknown_command(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2618                                                 ENGINE_HANDLE* handle,
2619                                                 const void* cookie,
2620                                                 protocol_binary_request_header *request,
2621                                                 ADD_RESPONSE response)
2622{
2623    const conn *c = (void*)cookie;
2624
2625    if (!c->supports_datatype && request->request.datatype != PROTOCOL_BINARY_RAW_BYTES) {
2626        if (response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
2627                     PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie)) {
2628            return ENGINE_SUCCESS;
2629        } else {
2630            return ENGINE_DISCONNECT;
2631        }
2632    } else {
2633        return settings.engine.v1->unknown_command(handle, cookie,
2634                                                   request, response);
2635    }
2636}
2637
2638struct request_lookup {
2639    EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor;
2640    BINARY_COMMAND_CALLBACK callback;
2641};
2642
2643static struct request_lookup request_handlers[0x100];
2644
2645typedef void (*RESPONSE_HANDLER)(conn*);
2646/**
2647 * A map between the response packets op-code and the function to handle
2648 * the response message.
2649 */
2650static RESPONSE_HANDLER response_handlers[0x100];
2651
2652static void setup_binary_lookup_cmd(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2653                                    uint8_t cmd,
2654                                    BINARY_COMMAND_CALLBACK new_handler) {
2655    request_handlers[cmd].descriptor = descriptor;
2656    request_handlers[cmd].callback = new_handler;
2657}
2658
2659static void process_bin_unknown_packet(conn *c) {
2660    void *packet = c->rcurr - (c->binary_header.request.bodylen +
2661                               sizeof(c->binary_header));
2662    ENGINE_ERROR_CODE ret = c->aiostat;
2663    c->aiostat = ENGINE_SUCCESS;
2664    c->ewouldblock = false;
2665
2666    if (ret == ENGINE_SUCCESS) {
2667        struct request_lookup *rq = request_handlers + c->binary_header.request.opcode;
2668        ret = rq->callback(rq->descriptor, settings.engine.v0, c, packet,
2669                           binary_response_handler);
2670    }
2671
2672    switch (ret) {
2673    case ENGINE_SUCCESS:
2674        if (c->dynamic_buffer.buffer != NULL) {
2675            write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
2676            c->dynamic_buffer.buffer = NULL;
2677        } else {
2678            conn_set_state(c, conn_new_cmd);
2679        }
2680        break;
2681    case ENGINE_EWOULDBLOCK:
2682        c->ewouldblock = true;
2683        break;
2684    case ENGINE_DISCONNECT:
2685        conn_set_state(c, conn_closing);
2686        break;
2687    default:
2688        /* Release the dynamic buffer.. it may be partial.. */
2689        free(c->dynamic_buffer.buffer);
2690        c->dynamic_buffer.buffer = NULL;
2691        write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2692    }
2693}
2694
2695static void cbsasl_refresh_main(void *c)
2696{
2697    int rv = cbsasl_server_refresh();
2698    if (rv == SASL_OK) {
2699        notify_io_complete(c, ENGINE_SUCCESS);
2700    } else {
2701        notify_io_complete(c, ENGINE_EINVAL);
2702    }
2703}
2704
2705static ENGINE_ERROR_CODE refresh_cbsasl(conn *c)
2706{
2707    cb_thread_t tid;
2708    int err;
2709
2710    err = cb_create_thread(&tid, cbsasl_refresh_main, c, 1);
2711    if (err != 0) {
2712        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2713                                        "Failed to create cbsasl db "
2714                                        "update thread: %s",
2715                                        strerror(err));
2716        return ENGINE_DISCONNECT;
2717    }
2718
2719    return ENGINE_EWOULDBLOCK;
2720}
2721
2722#if 0
2723static void ssl_certs_refresh_main(void *c)
2724{
2725    /* Update the internal certificates */
2726
2727    notify_io_complete(c, ENGINE_SUCCESS);
2728}
2729#endif
2730static ENGINE_ERROR_CODE refresh_ssl_certs(conn *c)
2731{
2732#if 0
2733    cb_thread_t tid;
2734    int err;
2735
2736    err = cb_create_thread(&tid, ssl_certs_refresh_main, c, 1);
2737    if (err != 0) {
2738        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2739                                        "Failed to create ssl_certificate "
2740                                        "update thread: %s",
2741                                        strerror(err));
2742        return ENGINE_DISCONNECT;
2743    }
2744
2745    return ENGINE_EWOULDBLOCK;
2746#endif
2747    return ENGINE_SUCCESS;
2748}
2749
2750static void process_bin_tap_connect(conn *c) {
2751    TAP_ITERATOR iterator;
2752    char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2753                                sizeof(c->binary_header)));
2754    protocol_binary_request_tap_connect *req = (void*)packet;
2755    const char *key = packet + sizeof(req->bytes);
2756    const char *data = key + c->binary_header.request.keylen;
2757    uint32_t flags = 0;
2758    size_t ndata = c->binary_header.request.bodylen -
2759        c->binary_header.request.extlen -
2760        c->binary_header.request.keylen;
2761
2762    if (c->binary_header.request.extlen == 4) {
2763        flags = ntohl(req->message.body.flags);
2764
2765        if (flags & TAP_CONNECT_FLAG_BACKFILL) {
2766            /* the userdata has to be at least 8 bytes! */
2767            if (ndata < 8) {
2768                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2769                                                "%d: ERROR: Invalid tap connect message\n",
2770                                                c->sfd);
2771                conn_set_state(c, conn_closing);
2772                return ;
2773            }
2774        }
2775    } else {
2776        data -= 4;
2777        key -= 4;
2778    }
2779
2780    if (settings.verbose && c->binary_header.request.keylen > 0) {
2781        char buffer[1024];
2782        int len = c->binary_header.request.keylen;
2783        if (len >= sizeof(buffer)) {
2784            len = sizeof(buffer) - 1;
2785        }
2786        memcpy(buffer, key, len);
2787        buffer[len] = '\0';
2788        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2789                                        "%d: Trying to connect with named tap connection: <%s>\n",
2790                                        c->sfd, buffer);
2791    }
2792
2793    iterator = settings.engine.v1->get_tap_iterator(
2794        settings.engine.v0, c, key, c->binary_header.request.keylen,
2795        flags, data, ndata);
2796
2797    if (iterator == NULL) {
2798        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2799                                        "%d: FATAL: The engine does not support tap\n",
2800                                        c->sfd);
2801        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
2802        c->write_and_go = conn_closing;
2803    } else {
2804        c->tap_iterator = iterator;
2805        c->which = EV_WRITE;
2806        conn_set_state(c, conn_ship_log);
2807    }
2808}
2809
2810static void process_bin_tap_packet(tap_event_t event, conn *c) {
2811    char *packet;
2812    protocol_binary_request_tap_no_extras *tap;
2813    uint16_t nengine;
2814    uint16_t tap_flags;
2815    uint32_t seqno;
2816    uint8_t ttl;
2817    char *engine_specific;
2818    char *key;
2819    uint16_t nkey;
2820    char *data;
2821    uint32_t flags;
2822    uint32_t exptime;
2823    uint32_t ndata;
2824    ENGINE_ERROR_CODE ret;
2825
2826    cb_assert(c != NULL);
2827    packet = (c->rcurr - (c->binary_header.request.bodylen +
2828                                sizeof(c->binary_header)));
2829    tap = (void*)packet;
2830    nengine = ntohs(tap->message.body.tap.enginespecific_length);
2831    tap_flags = ntohs(tap->message.body.tap.flags);
2832    seqno = ntohl(tap->message.header.request.opaque);
2833    ttl = tap->message.body.tap.ttl;
2834    engine_specific = packet + sizeof(tap->bytes);
2835    key = engine_specific + nengine;
2836    nkey = c->binary_header.request.keylen;
2837    data = key + nkey;
2838    flags = 0;
2839    exptime = 0;
2840    ndata = c->binary_header.request.bodylen - nengine - nkey - 8;
2841    ret = c->aiostat;
2842
2843    if (ttl == 0) {
2844        ret = ENGINE_EINVAL;
2845    } else {
2846        if (event == TAP_MUTATION || event == TAP_CHECKPOINT_START ||
2847            event == TAP_CHECKPOINT_END) {
2848            protocol_binary_request_tap_mutation *mutation = (void*)tap;
2849
2850            /* engine_specific data in protocol_binary_request_tap_mutation is */
2851            /* at a different offset than protocol_binary_request_tap_no_extras */
2852            engine_specific = packet + sizeof(mutation->bytes);
2853
2854            flags = mutation->message.body.item.flags;
2855            if ((tap_flags & TAP_FLAG_NETWORK_BYTE_ORDER) == 0) {
2856                flags = ntohl(flags);
2857            }
2858
2859            exptime = ntohl(mutation->message.body.item.expiration);
2860            key += 8;
2861            data += 8;
2862            ndata -= 8;
2863        }
2864
2865        if (ret == ENGINE_SUCCESS) {
2866            uint8_t datatype = c->binary_header.request.datatype;
2867            if (event == TAP_MUTATION && !c->supports_datatype) {
2868                if (checkUTF8JSON((void*)data, ndata)) {
2869                    datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2870                }
2871            }
2872
2873            ret = settings.engine.v1->tap_notify(settings.engine.v0, c,
2874                                                 engine_specific, nengine,
2875                                                 ttl - 1, tap_flags,
2876                                                 event, seqno,
2877                                                 key, nkey,
2878                                                 flags, exptime,
2879                                                 ntohll(tap->message.header.request.cas),
2880                                                 datatype,
2881                                                 data, ndata,
2882                                                 c->binary_header.request.vbucket);
2883        }
2884    }
2885
2886    switch (ret) {
2887    case ENGINE_DISCONNECT:
2888        conn_set_state(c, conn_closing);
2889        break;
2890    case ENGINE_EWOULDBLOCK:
2891        c->ewouldblock = true;
2892        break;
2893    default:
2894        if ((tap_flags & TAP_FLAG_ACK) || (ret != ENGINE_SUCCESS)) {
2895            write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2896        } else {
2897            conn_set_state(c, conn_new_cmd);
2898        }
2899    }
2900}
2901
2902static void process_bin_tap_ack(conn *c) {
2903    char *packet;
2904    protocol_binary_response_no_extras *rsp;
2905    uint32_t seqno;
2906    uint16_t status;
2907    char *key;
2908    ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
2909
2910    cb_assert(c != NULL);
2911    packet = (c->rcurr - (c->binary_header.request.bodylen + sizeof(c->binary_header)));
2912    rsp = (void*)packet;
2913    seqno = ntohl(rsp->message.header.response.opaque);
2914    status = ntohs(rsp->message.header.response.status);
2915    key = packet + sizeof(rsp->bytes);
2916
2917    if (settings.engine.v1->tap_notify != NULL) {
2918        ret = settings.engine.v1->tap_notify(settings.engine.v0, c, NULL, 0, 0, status,
2919                                             TAP_ACK, seqno, key,
2920                                             c->binary_header.request.keylen, 0, 0,
2921                                             0, c->binary_header.request.datatype, NULL,
2922                                             0, 0);
2923    }
2924
2925    if (ret == ENGINE_DISCONNECT) {
2926        conn_set_state(c, conn_closing);
2927    } else {
2928        conn_set_state(c, conn_ship_log);
2929    }
2930}
2931
2932/**
2933 * We received a noop response.. just ignore it
2934 */
2935static void process_bin_noop_response(conn *c) {
2936    cb_assert(c != NULL);
2937    conn_set_state(c, conn_new_cmd);
2938}
2939
2940/*******************************************************************************
2941 **                             DCP MESSAGE PRODUCERS                         **
2942 ******************************************************************************/
2943static ENGINE_ERROR_CODE dcp_message_get_failover_log(const void *cookie,
2944                                                      uint32_t opaque,
2945                                                      uint16_t vbucket)
2946{
2947    protocol_binary_request_dcp_get_failover_log packet;
2948    conn *c = (void*)cookie;
2949
2950    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2951        /* We don't have room in the buffer */
2952        return ENGINE_E2BIG;
2953    }
2954
2955    memset(packet.bytes, 0, sizeof(packet.bytes));
2956    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
2957    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_GET_FAILOVER_LOG;
2958    packet.message.header.request.opaque = opaque;
2959    packet.message.header.request.vbucket = htons(vbucket);
2960
2961    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2962    add_iov(c, c->wcurr, sizeof(packet.bytes));
2963    c->wcurr += sizeof(packet.bytes);
2964    c->wbytes += sizeof(packet.bytes);
2965
2966    return ENGINE_SUCCESS;
2967}
2968
2969static ENGINE_ERROR_CODE dcp_message_stream_req(const void *cookie,
2970                                                uint32_t opaque,
2971                                                uint16_t vbucket,
2972                                                uint32_t flags,
2973                                                uint64_t start_seqno,
2974                                                uint64_t end_seqno,
2975                                                uint64_t vbucket_uuid,
2976                                                uint64_t snap_start_seqno,
2977                                                uint64_t snap_end_seqno)
2978{
2979    protocol_binary_request_dcp_stream_req packet;
2980    conn *c = (void*)cookie;
2981
2982    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2983        /* We don't have room in the buffer */
2984        return ENGINE_E2BIG;
2985    }
2986
2987    memset(packet.bytes, 0, sizeof(packet.bytes));
2988    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
2989    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_STREAM_REQ;
2990    packet.message.header.request.extlen = 48;
2991    packet.message.header.request.bodylen = htonl(48);
2992    packet.message.header.request.opaque = opaque;
2993    packet.message.header.request.vbucket = htons(vbucket);
2994
2995    packet.message.body.flags = ntohl(flags);
2996    packet.message.body.start_seqno = ntohll(start_seqno);
2997    packet.message.body.end_seqno = ntohll(end_seqno);
2998    packet.message.body.vbucket_uuid = ntohll(vbucket_uuid);
2999    packet.message.body.snap_start_seqno = ntohll(snap_start_seqno);
3000    packet.message.body.snap_end_seqno = ntohll(snap_end_seqno);
3001
3002    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3003    add_iov(c, c->wcurr, sizeof(packet.bytes));
3004    c->wcurr += sizeof(packet.bytes);
3005    c->wbytes += sizeof(packet.bytes);
3006
3007    return ENGINE_SUCCESS;
3008}
3009
3010static ENGINE_ERROR_CODE dcp_message_add_stream_response(const void *cookie,
3011                                                         uint32_t opaque,
3012                                                         uint32_t dialogopaque,
3013                                                         uint8_t status)
3014{
3015    protocol_binary_response_dcp_add_stream packet;
3016    conn *c = (void*)cookie;
3017
3018    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3019        /* We don't have room in the buffer */
3020        return ENGINE_E2BIG;
3021    }
3022
3023    memset(packet.bytes, 0, sizeof(packet.bytes));
3024    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
3025    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_ADD_STREAM;
3026    packet.message.header.response.extlen = 4;
3027    packet.message.header.response.status = htons(status);
3028    packet.message.header.response.bodylen = htonl(4);
3029    packet.message.header.response.opaque = opaque;
3030    packet.message.body.opaque = ntohl(dialogopaque);
3031
3032    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3033    add_iov(c, c->wcurr, sizeof(packet.bytes));
3034    c->wcurr += sizeof(packet.bytes);
3035    c->wbytes += sizeof(packet.bytes);
3036
3037    return ENGINE_SUCCESS;
3038}
3039
3040static ENGINE_ERROR_CODE dcp_message_marker_response(const void *cookie,
3041                                                     uint32_t opaque,
3042                                                     uint8_t status)
3043{
3044    protocol_binary_response_dcp_snapshot_marker packet;
3045    conn *c = (void*)cookie;
3046
3047    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3048        /* We don't have room in the buffer */
3049        return ENGINE_E2BIG;
3050    }
3051
3052    memset(packet.bytes, 0, sizeof(packet.bytes));
3053    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
3054    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
3055    packet.message.header.response.extlen = 0;
3056    packet.message.header.response.status = htons(status);
3057    packet.message.header.response.bodylen = 0;
3058    packet.message.header.response.opaque = opaque;
3059
3060    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3061    add_iov(c, c->wcurr, sizeof(packet.bytes));
3062    c->wcurr += sizeof(packet.bytes);
3063    c->wbytes += sizeof(packet.bytes);
3064
3065    return ENGINE_SUCCESS;
3066}
3067
3068static ENGINE_ERROR_CODE dcp_message_set_vbucket_state_response(const void *cookie,
3069                                                                uint32_t opaque,
3070                                                                uint8_t status)
3071{
3072    protocol_binary_response_dcp_set_vbucket_state packet;
3073    conn *c = (void*)cookie;
3074
3075    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3076        /* We don't have room in the buffer */
3077        return ENGINE_E2BIG;
3078    }
3079
3080    memset(packet.bytes, 0, sizeof(packet.bytes));
3081    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
3082    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE;
3083    packet.message.header.response.extlen = 0;
3084    packet.message.header.response.status = htons(status);
3085    packet.message.header.response.bodylen = 0;
3086    packet.message.header.response.opaque = opaque;
3087
3088    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3089    add_iov(c, c->wcurr, sizeof(packet.bytes));
3090    c->wcurr += sizeof(packet.bytes);
3091    c->wbytes += sizeof(packet.bytes);
3092
3093    return ENGINE_SUCCESS;
3094}
3095
3096static ENGINE_ERROR_CODE dcp_message_stream_end(const void *cookie,
3097                                                uint32_t opaque,
3098                                                uint16_t vbucket,
3099                                                uint32_t flags)
3100{
3101    protocol_binary_request_dcp_stream_end packet;
3102    conn *c = (void*)cookie;
3103
3104    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3105        /* We don't have room in the buffer */
3106        return ENGINE_E2BIG;
3107    }
3108
3109    memset(packet.bytes, 0, sizeof(packet.bytes));
3110    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3111    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_STREAM_END;
3112    packet.message.header.request.extlen = 4;
3113    packet.message.header.request.bodylen = htonl(4);
3114    packet.message.header.request.opaque = opaque;
3115    packet.message.header.request.vbucket = htons(vbucket);
3116    packet.message.body.flags = ntohl(flags);
3117
3118    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3119    add_iov(c, c->wcurr, sizeof(packet.bytes));
3120    c->wcurr += sizeof(packet.bytes);
3121    c->wbytes += sizeof(packet.bytes);
3122
3123    return ENGINE_SUCCESS;
3124}
3125
3126static ENGINE_ERROR_CODE dcp_message_marker(const void *cookie,
3127                                            uint32_t opaque,
3128                                            uint16_t vbucket,
3129                                            uint64_t start_seqno,
3130                                            uint64_t end_seqno,
3131                                            uint32_t flags)
3132{
3133    protocol_binary_request_dcp_snapshot_marker packet;
3134    conn *c = (void*)cookie;
3135
3136    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3137        /* We don't have room in the buffer */
3138        return ENGINE_E2BIG;
3139    }
3140
3141    memset(packet.bytes, 0, sizeof(packet.bytes));
3142    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3143    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
3144    packet.message.header.request.opaque = opaque;
3145    packet.message.header.request.vbucket = htons(vbucket);
3146    packet.message.header.request.extlen = 20;
3147    packet.message.header.request.bodylen = htonl(20);
3148    packet.message.body.start_seqno = htonll(start_seqno);
3149    packet.message.body.end_seqno = htonll(end_seqno);
3150    packet.message.body.flags = htonl(flags);
3151
3152    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3153    add_iov(c, c->wcurr, sizeof(packet.bytes));
3154    c->wcurr += sizeof(packet.bytes);
3155    c->wbytes += sizeof(packet.bytes);
3156
3157    return ENGINE_SUCCESS;
3158}
3159
3160static ENGINE_ERROR_CODE dcp_message_mutation(const void* cookie,
3161                                              uint32_t opaque,
3162                                              item *it,
3163                                              uint16_t vbucket,
3164                                              uint64_t by_seqno,
3165                                              uint64_t rev_seqno,
3166                                              uint32_t lock_time,
3167                                              const void *meta,
3168                                              uint16_t nmeta,
3169                                              uint8_t nru)
3170{
3171    conn *c = (void*)cookie;
3172    item_info_holder info;
3173    protocol_binary_request_dcp_mutation packet;
3174    int xx;
3175
3176    memset(&info, 0, sizeof(info));
3177    info.info.nvalue = IOV_MAX;
3178
3179    if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
3180                                           (void*)&info)) {
3181        settings.engine.v1->release(settings.engine.v0, c, it);
3182        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
3183                                        "%d: Failed to get item info\n", c->sfd);
3184        return ENGINE_FAILED;
3185    }
3186
3187    memset(packet.bytes, 0, sizeof(packet));
3188    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3189    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_MUTATION;
3190    packet.message.header.request.opaque = opaque;
3191    packet.message.header.request.vbucket = htons(vbucket);
3192    packet.message.header.request.cas = htonll(info.info.cas);
3193    packet.message.header.request.keylen = htons(info.info.nkey);
3194    packet.message.header.request.extlen = 31;
3195    packet.message.header.request.bodylen = ntohl(31 + info.info.nkey + info.info.nbytes + nmeta);
3196    packet.message.header.request.datatype = info.info.datatype;
3197    packet.message.body.by_seqno = htonll(by_seqno);
3198    packet.message.body.rev_seqno = htonll(rev_seqno);
3199    packet.message.body.lock_time = htonl(lock_time);
3200    packet.message.body.flags = info.info.flags;
3201    packet.message.body.expiration = htonl(info.info.exptime);
3202    packet.message.body.nmeta = htons(nmeta);
3203    packet.message.body.nru = nru;
3204
3205    c->ilist[c->ileft++] = it;
3206
3207    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3208    add_iov(c, c->wcurr, sizeof(packet.bytes));
3209    c->wcurr += sizeof(packet.bytes);
3210    c->wbytes += sizeof(packet.bytes);
3211    add_iov(c, info.info.key, info.info.nkey);
3212    for (xx = 0; xx < info.info.nvalue; ++xx) {
3213        add_iov(c, info.info.value[xx].iov_base, info.info.value[xx].iov_len);
3214    }
3215
3216    memcpy(c->wcurr, meta, nmeta);
3217    add_iov(c, c->wcurr, nmeta);
3218    c->wcurr += nmeta;
3219    c->wbytes += nmeta;
3220
3221    return ENGINE_SUCCESS;
3222}
3223
3224static ENGINE_ERROR_CODE dcp_message_deletion(const void* cookie,
3225                                              uint32_t opaque,
3226                                              const void *key,
3227                                              uint16_t nkey,
3228                                              uint64_t cas,
3229                                              uint16_t vbucket,
3230                                              uint64_t by_seqno,
3231                                              uint64_t rev_seqno,
3232                                              const void *meta,
3233