xref: /3.0.2-MP2/memcached/daemon/memcached.c (revision 9658c497)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15 */
16#include "config.h"
17#include "memcached.h"
18#include "memcached/extension_loggers.h"
19#include "alloc_hooks.h"
20#include "utilities/engine_loader.h"
21#include "timings.h"
22#include "cmdline.h"
23#include "mc_time.h"
24
25#include <signal.h>
26#include <fcntl.h>
27#include <errno.h>
28#include <stdlib.h>
29#include <stdio.h>
30#include <string.h>
31#include <time.h>
32#include <limits.h>
33#include <ctype.h>
34#include <stdarg.h>
35#include <stddef.h>
36#include <snappy-c.h>
37#include <JSON_checker.h>
38
39static bool grow_dynamic_buffer(conn *c, size_t needed);
40static void cookie_set_admin(const void *cookie);
41static bool cookie_is_admin(const void *cookie);
42
43typedef union {
44    item_info info;
45    char bytes[sizeof(item_info) + ((IOV_MAX - 1) * sizeof(struct iovec))];
46} item_info_holder;
47
48static void item_set_cas(const void *cookie, item *it, uint64_t cas) {
49    settings.engine.v1->item_set_cas(settings.engine.v0, cookie, it, cas);
50}
51
52#define MAX_SASL_MECH_LEN 32
53
54/* The item must always be called "it" */
55#define SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
56    thread_stats->slab_stats[info.info.clsid].slab_op++;
57
58#define THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
59    thread_stats->thread_op++;
60
61#define THREAD_GUTS2(conn, thread_stats, slab_op, thread_op) \
62    thread_stats->slab_op++; \
63    thread_stats->thread_op++;
64
65#define SLAB_THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
66    SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
67    THREAD_GUTS(conn, thread_stats, slab_op, thread_op)
68
69#define STATS_INCR1(GUTS, conn, slab_op, thread_op, key, nkey) { \
70    struct thread_stats *thread_stats = get_thread_stats(conn); \
71    cb_mutex_enter(&thread_stats->mutex); \
72    GUTS(conn, thread_stats, slab_op, thread_op); \
73    cb_mutex_exit(&thread_stats->mutex); \
74}
75
76#define STATS_INCR(conn, op, key, nkey) \
77    STATS_INCR1(THREAD_GUTS, conn, op, op, key, nkey)
78
79#define SLAB_INCR(conn, op, key, nkey) \
80    STATS_INCR1(SLAB_GUTS, conn, op, op, key, nkey)
81
82#define STATS_TWO(conn, slab_op, thread_op, key, nkey) \
83    STATS_INCR1(THREAD_GUTS2, conn, slab_op, thread_op, key, nkey)
84
85#define SLAB_TWO(conn, slab_op, thread_op, key, nkey) \
86    STATS_INCR1(SLAB_THREAD_GUTS, conn, slab_op, thread_op, key, nkey)
87
88#define STATS_HIT(conn, op, key, nkey) \
89    SLAB_TWO(conn, op##_hits, cmd_##op, key, nkey)
90
91#define STATS_MISS(conn, op, key, nkey) \
92    STATS_TWO(conn, op##_misses, cmd_##op, key, nkey)
93
94#define STATS_NOKEY(conn, op) { \
95    struct thread_stats *thread_stats = \
96        get_thread_stats(conn); \
97    cb_mutex_enter(&thread_stats->mutex); \
98    thread_stats->op++; \
99    cb_mutex_exit(&thread_stats->mutex); \
100}
101
102#define STATS_NOKEY2(conn, op1, op2) { \
103    struct thread_stats *thread_stats = \
104        get_thread_stats(conn); \
105    cb_mutex_enter(&thread_stats->mutex); \
106    thread_stats->op1++; \
107    thread_stats->op2++; \
108    cb_mutex_exit(&thread_stats->mutex); \
109}
110
111#define STATS_ADD(conn, op, amt) { \
112    struct thread_stats *thread_stats = \
113        get_thread_stats(conn); \
114    cb_mutex_enter(&thread_stats->mutex); \
115    thread_stats->op += amt; \
116    cb_mutex_exit(&thread_stats->mutex); \
117}
118
119volatile sig_atomic_t memcached_shutdown;
120
121/* Lock for global stats */
122static cb_mutex_t stats_lock;
123
124/**
125 * Structure to save ns_server's session cas token.
126 */
127static struct session_cas {
128    uint64_t value;
129    uint64_t ctr;
130    cb_mutex_t mutex;
131} session_cas;
132
133void STATS_LOCK() {
134    cb_mutex_enter(&stats_lock);
135}
136
137void STATS_UNLOCK() {
138    cb_mutex_exit(&stats_lock);
139}
140
141#ifdef WIN32
142static int is_blocking(DWORD dw) {
143    return (dw == WSAEWOULDBLOCK);
144}
145static int is_emfile(DWORD dw) {
146    return (dw == WSAEMFILE);
147}
148static int is_closed_conn(DWORD dw) {
149    return (dw == WSAENOTCONN || WSAECONNRESET);
150}
151static int is_addrinuse(DWORD dw) {
152    return (dw == WSAEADDRINUSE);
153}
154static void set_ewouldblock(void) {
155    WSASetLastError(WSAEWOULDBLOCK);
156}
157static void set_econnreset(void) {
158    WSASetLastError(WSAECONNRESET);
159}
160#else
161static int is_blocking(int dw) {
162    return (dw == EAGAIN || dw == EWOULDBLOCK);
163}
164static int is_emfile(int dw) {
165    return (dw == EMFILE);
166}
167static int is_closed_conn(int dw) {
168    return  (dw == ENOTCONN || dw != ECONNRESET);
169}
170static int is_addrinuse(int dw) {
171    return (dw == EADDRINUSE);
172}
173static void set_ewouldblock(void) {
174    errno = EWOULDBLOCK;
175}
176static void set_econnreset(void) {
177    errno = ECONNRESET;
178}
179#endif
180
181/*
182 * forward declarations
183 */
184static SOCKET new_socket(struct addrinfo *ai);
185static int try_read_command(conn *c);
186static struct thread_stats* get_independent_stats(conn *c);
187static struct thread_stats* get_thread_stats(conn *c);
188static void register_callback(ENGINE_HANDLE *eh,
189                              ENGINE_EVENT_TYPE type,
190                              EVENT_CALLBACK cb, const void *cb_data);
191static SERVER_HANDLE_V1 *get_server_api(void);
192
193
194enum try_read_result {
195    READ_DATA_RECEIVED,
196    READ_NO_DATA_RECEIVED,
197    READ_ERROR,            /** an error occured (on the socket) (or client closed connection) */
198    READ_MEMORY_ERROR      /** failed to allocate more memory */
199};
200
201static enum try_read_result try_read_network(conn *c);
202
203/* stats */
204static void stats_init(void);
205static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate);
206static void process_stat_settings(ADD_STAT add_stats, void *c);
207
208
209/* defaults */
210static void settings_init(void);
211
212/* event handling, network IO */
213static void event_handler(evutil_socket_t fd, short which, void *arg);
214static void complete_nread(conn *c);
215static void write_and_free(conn *c, char *buf, size_t bytes);
216static int ensure_iov_space(conn *c);
217static int add_iov(conn *c, const void *buf, size_t len);
218static int add_msghdr(conn *c);
219
220/** exported globals **/
221struct stats stats;
222struct settings settings;
223
224/** file scope variables **/
225static conn *listen_conn = NULL;
226static struct event_base *main_base;
227static struct thread_stats *default_independent_stats;
228
229static struct engine_event_handler *engine_event_handlers[MAX_ENGINE_EVENT_TYPE + 1];
230
231enum transmit_result {
232    TRANSMIT_COMPLETE,   /** All done writing. */
233    TRANSMIT_INCOMPLETE, /** More data remaining to write. */
234    TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
235    TRANSMIT_HARD_ERROR  /** Can't write (c->state is set to conn_closing) */
236};
237
238static enum transmit_result transmit(conn *c);
239
240
241
242/* Perform all callbacks of a given type for the given connection. */
243void perform_callbacks(ENGINE_EVENT_TYPE type,
244                       const void *data,
245                       const void *c) {
246    struct engine_event_handler *h;
247    for (h = engine_event_handlers[type]; h; h = h->next) {
248        h->cb(c, type, data, h->cb_data);
249    }
250}
251
252/**
253 * Return the TCP or domain socket listening_port structure that
254 * has a given port number
255 */
256static struct listening_port *get_listening_port_instance(const int port) {
257    struct listening_port *port_ins = NULL;
258    int i;
259    for (i = 0; i < settings.num_interfaces; ++i) {
260        if (stats.listening_ports[i].port == port) {
261            port_ins = &stats.listening_ports[i];
262        }
263    }
264    return port_ins;
265}
266
267static void stats_init(void) {
268    stats.daemon_conns = 0;
269    stats.rejected_conns = 0;
270    stats.curr_conns = stats.total_conns = 0;
271    stats.listening_ports = calloc(settings.num_interfaces, sizeof(struct listening_port));
272
273    stats_prefix_init();
274}
275
276static void stats_reset(const void *cookie) {
277    struct conn *conn = (struct conn*)cookie;
278    STATS_LOCK();
279    stats.rejected_conns = 0;
280    stats.total_conns = 0;
281    stats_prefix_clear();
282    STATS_UNLOCK();
283    threadlocal_stats_reset(get_independent_stats(conn));
284    settings.engine.v1->reset_stats(settings.engine.v0, cookie);
285}
286
287static int get_number_of_worker_threads(void) {
288    int ret;
289    char *override = getenv("MEMCACHED_NUM_CPUS");
290    if (override == NULL) {
291#ifdef WIN32
292        SYSTEM_INFO sysinfo;
293        GetSystemInfo(&sysinfo);
294        ret = (int)sysinfo.dwNumberOfProcessors;
295#else
296        ret = (int)sysconf(_SC_NPROCESSORS_ONLN);
297#endif
298        if (ret > 4) {
299            ret = (int)(ret * 0.75f);
300        }
301        if (ret < 4) {
302            ret = 4;
303        }
304    } else {
305        ret = atoi(override);
306        if (ret == 0) {
307            ret = 4;
308        }
309    }
310
311    return ret;
312}
313
314static void settings_init(void) {
315    static struct interface default_interface;
316    default_interface.port = 11211;
317    default_interface.maxconn = 1000;
318    default_interface.backlog = 1024;
319
320    settings.num_interfaces = 1;
321    settings.interfaces = &default_interface;
322    settings.daemonize = false;
323    settings.pid_file = NULL;
324    settings.bio_drain_buffer_sz = 8192;
325
326    settings.verbose = 0;
327    settings.num_threads = get_number_of_worker_threads();
328    settings.prefix_delimiter = ':';
329    settings.detail_enabled = 0;
330    settings.allow_detailed = true;
331    settings.reqs_per_event = DEFAULT_REQS_PER_EVENT;
332    settings.require_sasl = false;
333    settings.extensions.logger = get_stderr_logger();
334    settings.tcp_nodelay = getenv("MEMCACHED_DISABLE_TCP_NODELAY") == NULL;
335    settings.engine_module = "default_engine.so";
336    settings.engine_config = NULL;
337    settings.config = NULL;
338    settings.admin = NULL;
339    settings.disable_admin = false;
340    settings.datatype = false;
341}
342
343/*
344 * Adds a message header to a connection.
345 *
346 * Returns 0 on success, -1 on out-of-memory.
347 */
348static int add_msghdr(conn *c)
349{
350    struct msghdr *msg;
351
352    cb_assert(c != NULL);
353
354    if (c->msgsize == c->msgused) {
355        cb_assert(c->msgsize > 0);
356        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
357        if (! msg)
358            return -1;
359        c->msglist = msg;
360        c->msgsize *= 2;
361    }
362
363    msg = c->msglist + c->msgused;
364
365    /* this wipes msg_iovlen, msg_control, msg_controllen, and
366       msg_flags, the last 3 of which aren't defined on solaris: */
367    memset(msg, 0, sizeof(struct msghdr));
368
369    msg->msg_iov = &c->iov[c->iovused];
370
371    if (c->request_addr_size > 0) {
372        msg->msg_name = &c->request_addr;
373        msg->msg_namelen = c->request_addr_size;
374    }
375
376    c->msgbytes = 0;
377    c->msgused++;
378
379    return 0;
380}
381
382struct {
383    cb_mutex_t mutex;
384    bool disabled;
385    ssize_t count;
386    uint64_t num_disable;
387} listen_state;
388
389static bool is_listen_disabled(void) {
390    bool ret;
391    cb_mutex_enter(&listen_state.mutex);
392    ret = listen_state.disabled;
393    cb_mutex_exit(&listen_state.mutex);
394    return ret;
395}
396
397static uint64_t get_listen_disabled_num(void) {
398    uint64_t ret;
399    cb_mutex_enter(&listen_state.mutex);
400    ret = listen_state.num_disable;
401    cb_mutex_exit(&listen_state.mutex);
402    return ret;
403}
404
405static void disable_listen(void) {
406    conn *next;
407    cb_mutex_enter(&listen_state.mutex);
408    listen_state.disabled = true;
409    listen_state.count = 10;
410    ++listen_state.num_disable;
411    cb_mutex_exit(&listen_state.mutex);
412
413    for (next = listen_conn; next; next = next->next) {
414        update_event(next, 0);
415        if (listen(next->sfd, 1) != 0) {
416            log_socket_error(EXTENSION_LOG_WARNING, NULL,
417                             "listen() failed: %s");
418        }
419    }
420}
421
422void safe_close(SOCKET sfd) {
423    if (sfd != INVALID_SOCKET) {
424        int rval;
425        while ((rval = closesocket(sfd)) == SOCKET_ERROR &&
426               (errno == EINTR || errno == EAGAIN)) {
427            /* go ahead and retry */
428        }
429
430        if (rval == SOCKET_ERROR) {
431            char msg[80];
432            snprintf(msg, sizeof(msg), "Failed to close socket %d (%%s)!!", (int)sfd);
433            log_socket_error(EXTENSION_LOG_WARNING, NULL,
434                             msg);
435        } else {
436            STATS_LOCK();
437            stats.curr_conns--;
438            STATS_UNLOCK();
439
440            if (is_listen_disabled()) {
441                notify_dispatcher();
442            }
443        }
444    }
445}
446
447/**
448 * Reset all of the dynamic buffers used by a connection back to their
449 * default sizes. The strategy for resizing the buffers is to allocate a
450 * new one of the correct size and free the old one if the allocation succeeds
451 * instead of using realloc to change the buffer size (because realloc may
452 * not shrink the buffers, and will also copy the memory). If the allocation
453 * fails the buffer will be unchanged.
454 *
455 * @param c the connection to resize the buffers for
456 * @return true if all allocations succeeded, false if one or more of the
457 *         allocations failed.
458 */
459static bool conn_reset_buffersize(conn *c) {
460    bool ret = true;
461
462    if (c->rsize != DATA_BUFFER_SIZE) {
463        void *ptr = malloc(DATA_BUFFER_SIZE);
464        if (ptr != NULL) {
465            free(c->rbuf);
466            c->rbuf = ptr;
467            c->rsize = DATA_BUFFER_SIZE;
468        } else {
469            ret = false;
470        }
471    }
472
473    if (c->wsize != DATA_BUFFER_SIZE) {
474        void *ptr = malloc(DATA_BUFFER_SIZE);
475        if (ptr != NULL) {
476            free(c->wbuf);
477            c->wbuf = ptr;
478            c->wsize = DATA_BUFFER_SIZE;
479        } else {
480            ret = false;
481        }
482    }
483
484    if (c->isize != ITEM_LIST_INITIAL) {
485        void *ptr = malloc(sizeof(item *) * ITEM_LIST_INITIAL);
486        if (ptr != NULL) {
487            free(c->ilist);
488            c->ilist = ptr;
489            c->isize = ITEM_LIST_INITIAL;
490        } else {
491            ret = false;
492        }
493    }
494
495    if (c->temp_alloc_size != TEMP_ALLOC_LIST_INITIAL) {
496        void *ptr = malloc(sizeof(char *) * TEMP_ALLOC_LIST_INITIAL);
497        if (ptr != NULL) {
498            free(c->temp_alloc_list);
499            c->temp_alloc_list = ptr;
500            c->temp_alloc_size = TEMP_ALLOC_LIST_INITIAL;
501        } else {
502            ret = false;
503        }
504    }
505
506    if (c->iovsize != IOV_LIST_INITIAL) {
507        void *ptr = malloc(sizeof(struct iovec) * IOV_LIST_INITIAL);
508        if (ptr != NULL) {
509            free(c->iov);
510            c->iov = ptr;
511            c->iovsize = IOV_LIST_INITIAL;
512        } else {
513            ret = false;
514        }
515    }
516
517    if (c->msgsize != MSG_LIST_INITIAL) {
518        void *ptr = malloc(sizeof(struct msghdr) * MSG_LIST_INITIAL);
519        if (ptr != NULL) {
520            free(c->msglist);
521            c->msglist = ptr;
522            c->msgsize = MSG_LIST_INITIAL;
523        } else {
524            ret = false;
525        }
526    }
527
528    return ret;
529}
530
531/**
532 * Constructor for all memory allocations of connection objects. Initialize
533 * all members and allocate the transfer buffers.
534 *
535 * @param buffer The memory allocated by the object cache
536 * @return 0 on success, 1 if we failed to allocate memory
537 */
538static int conn_constructor(conn *c) {
539    memset(c, 0, sizeof(*c));
540    MEMCACHED_CONN_CREATE(c);
541
542    c->state = conn_immediate_close;
543    c->sfd = INVALID_SOCKET;
544    if (!conn_reset_buffersize(c)) {
545        free(c->rbuf);
546        free(c->wbuf);
547        free(c->ilist);
548        free(c->temp_alloc_list);
549        free(c->iov);
550        free(c->msglist);
551        settings.extensions.logger->log(EXTENSION_LOG_WARNING,
552                                        NULL,
553                                        "Failed to allocate buffers for connection\n");
554        return 1;
555    }
556
557    STATS_LOCK();
558    stats.conn_structs++;
559    STATS_UNLOCK();
560
561    return 0;
562}
563
564/**
565 * Destructor for all connection objects. Release all allocated resources.
566 *
567 * @param buffer The memory allocated by the objec cache
568 */
569static void conn_destructor(conn *c) {
570    free(c->rbuf);
571    free(c->wbuf);
572    free(c->ilist);
573    free(c->temp_alloc_list);
574    free(c->iov);
575    free(c->msglist);
576
577    STATS_LOCK();
578    stats.conn_structs--;
579    STATS_UNLOCK();
580}
581
582/*
583 * Free list management for connections.
584 */
585struct connections {
586    conn* free;
587    conn** all;
588    cb_mutex_t mutex;
589    int next;
590} connections;
591
592static void initialize_connections(void)
593{
594    int preallocate;
595
596    cb_mutex_initialize(&connections.mutex);
597    connections.all = calloc(settings.maxconns, sizeof(conn*));
598    if (connections.all == NULL) {
599        settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
600                                        "Failed to allocate memory for connections");
601        exit(EX_OSERR);
602    }
603
604    preallocate = settings.maxconns / 2;
605    if (preallocate < 1000) {
606        preallocate = settings.maxconns;
607    } else if (preallocate > 5000) {
608        preallocate = 5000;
609    }
610
611    for (connections.next = 0; connections.next < preallocate; ++connections.next) {
612        connections.all[connections.next] = malloc(sizeof(conn));
613        if (conn_constructor(connections.all[connections.next]) != 0) {
614            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
615                                            "Failed to allocate memory for connections");
616            exit(EX_OSERR);
617        }
618        connections.all[connections.next]->next = connections.free;
619        connections.free = connections.all[connections.next];
620    }
621}
622
623static void destroy_connections(void)
624{
625    int ii;
626    for (ii = 0; ii < settings.maxconns; ++ii) {
627        if (connections.all[ii]) {
628            conn *c = connections.all[ii];
629            conn_destructor(c);
630            free(c);
631        }
632    }
633
634    free(connections.all);
635}
636
637static conn *allocate_connection(void) {
638    conn *ret;
639
640    cb_mutex_enter(&connections.mutex);
641    ret = connections.free;
642    if (ret != NULL) {
643        connections.free = connections.free->next;
644        ret->next = NULL;
645    }
646    cb_mutex_exit(&connections.mutex);
647
648    if (ret == NULL) {
649        ret = malloc(sizeof(conn));
650        if (ret == NULL) {
651            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
652                                            "Failed to allocate memory for connection");
653            return NULL;
654        }
655
656        if (conn_constructor(ret) != 0) {
657            free(ret);
658            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
659                                            "Failed to allocate memory for connection");
660            return NULL;
661        }
662
663        cb_mutex_enter(&connections.mutex);
664        if (connections.next == settings.maxconns) {
665            free(ret);
666            ret = NULL;
667        } else {
668            connections.all[connections.next++] = ret;
669        }
670        cb_mutex_exit(&connections.mutex);
671    }
672
673    return ret;
674}
675
676static void release_connection(conn *c) {
677    c->sfd = INVALID_SOCKET;
678    cb_mutex_enter(&connections.mutex);
679    c->next = connections.free;
680    connections.free = c;
681    cb_mutex_exit(&connections.mutex);
682}
683
684static const char *substate_text(enum bin_substates state) {
685    switch (state) {
686    case bin_no_state: return "bin_no_state";
687    case bin_reading_set_header: return "bin_reading_set_header";
688    case bin_reading_cas_header: return "bin_reading_cas_header";
689    case bin_read_set_value: return "bin_read_set_value";
690    case bin_reading_sasl_auth: return "bin_reading_sasl_auth";
691    case bin_reading_sasl_auth_data: return "bin_reading_sasl_auth_data";
692    case bin_reading_packet: return "bin_reading_packet";
693    default:
694        return "illegal";
695    }
696}
697
698static void add_connection_stats(ADD_STAT add_stats, conn *d, conn *c) {
699    append_stat("conn", add_stats, d, "%p", c);
700    if (c->sfd == INVALID_SOCKET) {
701        append_stat("socket", add_stats, d, "disconnected");
702    } else {
703        append_stat("socket", add_stats, d, "%lu", (long)c->sfd);
704        append_stat("protocol", add_stats, d, "%s", "binary");
705        append_stat("transport", add_stats, d, "TCP");
706        append_stat("nevents", add_stats, d, "%u", c->nevents);
707        if (c->sasl_conn != NULL) {
708            append_stat("sasl_conn", add_stats, d, "%p", c->sasl_conn);
709        }
710        append_stat("state", add_stats, d, "%s", state_text(c->state));
711        append_stat("substate", add_stats, d, "%s", substate_text(c->substate));
712        append_stat("registered_in_libevent", add_stats, d, "%d",
713                    (int)c->registered_in_libevent);
714        append_stat("ev_flags", add_stats, d, "%x", c->ev_flags);
715        append_stat("which", add_stats, d, "%x", c->which);
716        append_stat("rbuf", add_stats, d, "%p", c->rbuf);
717        append_stat("rcurr", add_stats, d, "%p", c->rcurr);
718        append_stat("rsize", add_stats, d, "%u", c->rsize);
719        append_stat("rbytes", add_stats, d, "%u", c->rbytes);
720        append_stat("wbuf", add_stats, d, "%p", c->wbuf);
721        append_stat("wcurr", add_stats, d, "%p", c->wcurr);
722        append_stat("wsize", add_stats, d, "%u", c->wsize);
723        append_stat("wbytes", add_stats, d, "%u", c->wbytes);
724        append_stat("write_and_go", add_stats, d, "%p", c->write_and_go);
725        append_stat("write_and_free", add_stats, d, "%p", c->write_and_free);
726        append_stat("ritem", add_stats, d, "%p", c->ritem);
727        append_stat("rlbytes", add_stats, d, "%u", c->rlbytes);
728        append_stat("item", add_stats, d, "%p", c->item);
729        append_stat("store_op", add_stats, d, "%u", c->store_op);
730        append_stat("sbytes", add_stats, d, "%u", c->sbytes);
731        append_stat("iov", add_stats, d, "%p", c->iov);
732        append_stat("iovsize", add_stats, d, "%u", c->iovsize);
733        append_stat("iovused", add_stats, d, "%u", c->iovused);
734        append_stat("msglist", add_stats, d, "%p", c->msglist);
735        append_stat("msgsize", add_stats, d, "%u", c->msgsize);
736        append_stat("msgused", add_stats, d, "%u", c->msgused);
737        append_stat("msgcurr", add_stats, d, "%u", c->msgcurr);
738        append_stat("msgbytes", add_stats, d, "%u", c->msgbytes);
739        append_stat("ilist", add_stats, d, "%p", c->ilist);
740        append_stat("isize", add_stats, d, "%u", c->isize);
741        append_stat("icurr", add_stats, d, "%p", c->icurr);
742        append_stat("ileft", add_stats, d, "%u", c->ileft);
743        append_stat("temp_alloc_list", add_stats, d, "%p", c->temp_alloc_list);
744        append_stat("temp_alloc_size", add_stats, d, "%u", c->temp_alloc_size);
745        append_stat("temp_alloc_curr", add_stats, d, "%p", c->temp_alloc_curr);
746        append_stat("temp_alloc_left", add_stats, d, "%u", c->temp_alloc_left);
747
748        append_stat("noreply", add_stats, d, "%d", c->noreply);
749        append_stat("refcount", add_stats, d, "%u", (int)c->refcount);
750        append_stat("dynamic_buffer.buffer", add_stats, d, "%p",
751                    c->dynamic_buffer.buffer);
752        append_stat("dynamic_buffer.size", add_stats, d, "%zu",
753                    c->dynamic_buffer.size);
754        append_stat("dynamic_buffer.offset", add_stats, d, "%zu",
755                    c->dynamic_buffer.offset);
756        append_stat("engine_storage", add_stats, d, "%p", c->engine_storage);
757        /* @todo we should decode the binary header */
758        append_stat("cas", add_stats, d, "%"PRIu64, c->cas);
759        append_stat("cmd", add_stats, d, "%u", c->cmd);
760        append_stat("opaque", add_stats, d, "%u", c->opaque);
761        append_stat("keylen", add_stats, d, "%u", c->keylen);
762        append_stat("list_state", add_stats, d, "%u", c->list_state);
763        append_stat("next", add_stats, d, "%p", c->next);
764        append_stat("thread", add_stats, d, "%p", c->thread);
765        append_stat("aiostat", add_stats, d, "%u", c->aiostat);
766        append_stat("ewouldblock", add_stats, d, "%u", c->ewouldblock);
767        append_stat("tap_iterator", add_stats, d, "%p", c->tap_iterator);
768    }
769}
770
771/**
772 * Do a full stats of all of the connections.
773 * Do _NOT_ try to follow _ANY_ of the pointers in the conn structure
774 * because we read all of the values _DIRTY_. We preallocated the array
775 * of all of the connection pointers during startup, so we _KNOW_ that
776 * we can iterate through all of them. All of the conn structs will
777 * only appear in the connections.all array when we've allocated them,
778 * and we don't release them so it's safe to look at them.
779 */
780static void connection_stats(ADD_STAT add_stats, conn *c) {
781    int ii;
782    for (ii = 0; ii < settings.maxconns && connections.all[ii]; ++ii) {
783        add_connection_stats(add_stats, c, connections.all[ii]);
784    }
785}
786
787conn *conn_new(const SOCKET sfd, in_port_t parent_port,
788               STATE_FUNC init_state, int event_flags,
789               unsigned int read_buffer_size, struct event_base *base,
790               struct timeval *timeout) {
791    conn *c = allocate_connection();
792    if (c == NULL) {
793        return NULL;
794    }
795
796    c->admin = false;
797    cb_assert(c->thread == NULL);
798
799    if (c->rsize < read_buffer_size) {
800        void *mem = malloc(read_buffer_size);
801        if (mem) {
802            c->rsize = read_buffer_size;
803            free(c->rbuf);
804            c->rbuf = mem;
805        } else {
806            cb_assert(c->thread == NULL);
807            release_connection(c);
808            return NULL;
809        }
810    }
811
812    memset(&c->ssl, 0, sizeof(c->ssl));
813    if (init_state != conn_listening) {
814        int ii;
815        for (ii = 0; ii < settings.num_interfaces; ++ii) {
816            if (parent_port == settings.interfaces[ii].port) {
817                if (settings.interfaces[ii].ssl.cert != NULL) {
818                    const char *cert = settings.interfaces[ii].ssl.cert;
819                    const char *pkey = settings.interfaces[ii].ssl.key;
820
821                    c->ssl.ctx = SSL_CTX_new(SSLv23_server_method());
822
823                    /* @todo don't read files, but use in-memory-copies */
824                    if (!SSL_CTX_use_certificate_chain_file(c->ssl.ctx, cert) ||
825                        !SSL_CTX_use_PrivateKey_file(c->ssl.ctx, pkey, SSL_FILETYPE_PEM)) {
826                        release_connection(c);
827                        return NULL;
828                    }
829
830                    c->ssl.enabled = true;
831                    c->ssl.error = false;
832                    c->ssl.client = NULL;
833
834                    c->ssl.in.buffer = malloc(settings.bio_drain_buffer_sz);
835                    c->ssl.out.buffer = malloc(settings.bio_drain_buffer_sz);
836
837                    if (c->ssl.in.buffer == NULL || c->ssl.out.buffer == NULL) {
838                        release_connection(c);
839                        return NULL;
840                    }
841
842                    c->ssl.in.buffsz = settings.bio_drain_buffer_sz;
843                    c->ssl.out.buffsz = settings.bio_drain_buffer_sz;
844                    BIO_new_bio_pair(&c->ssl.application,
845                                     settings.bio_drain_buffer_sz,
846                                     &c->ssl.network,
847                                     settings.bio_drain_buffer_sz);
848
849                    c->ssl.client = SSL_new(c->ssl.ctx);
850                    SSL_set_bio(c->ssl.client,
851                                c->ssl.application,
852                                c->ssl.application);
853                }
854            }
855        }
856    }
857
858    c->request_addr_size = 0;
859
860    if (settings.verbose > 1) {
861        if (init_state == conn_listening) {
862            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
863                                            "<%d server listening", sfd);
864        } else {
865            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
866                                            "<%d new client connection", sfd);
867        }
868    }
869
870    c->sfd = sfd;
871    c->parent_port = parent_port;
872    c->state = init_state;
873    c->rlbytes = 0;
874    c->cmd = -1;
875    c->rbytes = c->wbytes = 0;
876    c->wcurr = c->wbuf;
877    c->rcurr = c->rbuf;
878    c->ritem = 0;
879    c->icurr = c->ilist;
880    c->temp_alloc_curr = c->temp_alloc_list;
881    c->ileft = 0;
882    c->temp_alloc_left = 0;
883    c->iovused = 0;
884    c->msgcurr = 0;
885    c->msgused = 0;
886    c->next = NULL;
887    c->list_state = 0;
888
889    c->write_and_go = init_state;
890    c->write_and_free = 0;
891    c->item = 0;
892    c->supports_datatype = false;
893    c->noreply = false;
894
895    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
896    event_base_set(base, &c->event);
897    c->ev_flags = event_flags;
898
899    if (!register_event(c, timeout)) {
900        cb_assert(c->thread == NULL);
901        release_connection(c);
902        return NULL;
903    }
904
905    STATS_LOCK();
906    stats.total_conns++;
907    STATS_UNLOCK();
908
909    c->aiostat = ENGINE_SUCCESS;
910    c->ewouldblock = false;
911    c->refcount = 1;
912
913    MEMCACHED_CONN_ALLOCATE(c->sfd);
914
915    perform_callbacks(ON_CONNECT, NULL, c);
916
917    return c;
918}
919
920static void conn_cleanup(conn *c) {
921    cb_assert(c != NULL);
922    c->admin = false;
923    if (c->item) {
924        settings.engine.v1->release(settings.engine.v0, c, c->item);
925        c->item = 0;
926    }
927
928    if (c->ileft != 0) {
929        for (; c->ileft > 0; c->ileft--,c->icurr++) {
930            settings.engine.v1->release(settings.engine.v0, c, *(c->icurr));
931        }
932    }
933
934    if (c->temp_alloc_left != 0) {
935        for (; c->temp_alloc_left > 0; c->temp_alloc_left--, c->temp_alloc_curr++) {
936            free(*(c->temp_alloc_curr));
937        }
938    }
939
940    if (c->write_and_free) {
941        free(c->write_and_free);
942        c->write_and_free = 0;
943    }
944
945    if (c->sasl_conn) {
946        cbsasl_dispose(&c->sasl_conn);
947        c->sasl_conn = NULL;
948    }
949
950    c->engine_storage = NULL;
951    c->tap_iterator = NULL;
952    c->thread = NULL;
953    cb_assert(c->next == NULL);
954    c->sfd = INVALID_SOCKET;
955    c->dcp = 0;
956    c->start = 0;
957    if (c->ssl.enabled) {
958        BIO_free_all(c->ssl.network);
959        SSL_free(c->ssl.client);
960        c->ssl.enabled = false;
961        c->ssl.error = false;
962        free(c->ssl.in.buffer);
963        free(c->ssl.out.buffer);
964        memset(&c->ssl, 0, sizeof(c->ssl));
965    }
966}
967
968void conn_close(conn *c) {
969    cb_assert(c != NULL);
970    cb_assert(c->sfd == INVALID_SOCKET);
971    cb_assert(c->state == conn_immediate_close);
972
973    cb_assert(c->thread);
974    /* remove from pending-io list */
975    if (settings.verbose > 1 && list_contains(c->thread->pending_io, c)) {
976        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
977                                        "Current connection was in the pending-io list.. Nuking it\n");
978    }
979    c->thread->pending_io = list_remove(c->thread->pending_io, c);
980
981    conn_cleanup(c);
982
983    /*
984     * The contract with the object cache is that we should return the
985     * object in a constructed state. Reset the buffers to the default
986     * size
987     */
988    conn_reset_buffersize(c);
989    cb_assert(c->thread == NULL);
990    release_connection(c);
991}
992
993/*
994 * Shrinks a connection's buffers if they're too big.  This prevents
995 * periodic large "get" requests from permanently chewing lots of server
996 * memory.
997 *
998 * This should only be called in between requests since it can wipe output
999 * buffers!
1000 */
1001static void conn_shrink(conn *c) {
1002    cb_assert(c != NULL);
1003
1004    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
1005        void *newbuf;
1006
1007        if (c->rcurr != c->rbuf) {
1008            /* Pack the buffer */
1009            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1010        }
1011
1012        newbuf = realloc(c->rbuf, DATA_BUFFER_SIZE);
1013
1014        if (newbuf) {
1015            c->rbuf = newbuf;
1016            c->rsize = DATA_BUFFER_SIZE;
1017        }
1018        c->rcurr = c->rbuf;
1019    }
1020
1021    /* isize is no longer dynamic */
1022    cb_assert(c->isize == ITEM_LIST_INITIAL);
1023
1024    if (c->msgsize > MSG_LIST_HIGHWAT) {
1025        void *newbuf = realloc(c->msglist,
1026                               MSG_LIST_INITIAL * sizeof(c->msglist[0]));
1027        if (newbuf) {
1028            c->msglist = newbuf;
1029            c->msgsize = MSG_LIST_INITIAL;
1030        }
1031    }
1032
1033    if (c->iovsize > IOV_LIST_HIGHWAT) {
1034        void *newbuf = realloc(c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
1035        if (newbuf) {
1036            c->iov = newbuf;
1037            c->iovsize = IOV_LIST_INITIAL;
1038        }
1039    }
1040}
1041
1042/**
1043 * Convert a state name to a human readable form.
1044 */
1045const char *state_text(STATE_FUNC state) {
1046    if (state == conn_listening) {
1047        return "conn_listening";
1048    } else if (state == conn_new_cmd) {
1049        return "conn_new_cmd";
1050    } else if (state == conn_waiting) {
1051        return "conn_waiting";
1052    } else if (state == conn_read) {
1053        return "conn_read";
1054    } else if (state == conn_parse_cmd) {
1055        return "conn_parse_cmd";
1056    } else if (state == conn_write) {
1057        return "conn_write";
1058    } else if (state == conn_nread) {
1059        return "conn_nread";
1060    } else if (state == conn_swallow) {
1061        return "conn_swallow";
1062    } else if (state == conn_closing) {
1063        return "conn_closing";
1064    } else if (state == conn_mwrite) {
1065        return "conn_mwrite";
1066    } else if (state == conn_ship_log) {
1067        return "conn_ship_log";
1068    } else if (state == conn_setup_tap_stream) {
1069        return "conn_setup_tap_stream";
1070    } else if (state == conn_pending_close) {
1071        return "conn_pending_close";
1072    } else if (state == conn_immediate_close) {
1073        return "conn_immediate_close";
1074    } else if (state == conn_refresh_cbsasl) {
1075        return "conn_refresh_cbsasl";
1076    } else if (state == conn_refresh_ssl_certs) {
1077        return "conn_refresh_ssl_cert";
1078    } else {
1079        return "Unknown";
1080    }
1081}
1082
1083/*
1084 * Sets a connection's current state in the state machine. Any special
1085 * processing that needs to happen on certain state transitions can
1086 * happen here.
1087 */
1088void conn_set_state(conn *c, STATE_FUNC state) {
1089    cb_assert(c != NULL);
1090
1091    if (state != c->state) {
1092        /*
1093         * The connections in the "tap thread" behaves differently than
1094         * normal connections because they operate in a full duplex mode.
1095         * New messages may appear from both sides, so we can't block on
1096         * read from the nework / engine
1097         */
1098        if (c->tap_iterator != NULL || c->dcp) {
1099            if (state == conn_waiting) {
1100                c->which = EV_WRITE;
1101                state = conn_ship_log;
1102            }
1103        }
1104
1105        if (settings.verbose > 2 || c->state == conn_closing
1106            || c->state == conn_setup_tap_stream) {
1107            settings.extensions.logger->log(EXTENSION_LOG_DETAIL, c,
1108                                            "%d: going from %s to %s\n",
1109                                            c->sfd, state_text(c->state),
1110                                            state_text(state));
1111        }
1112
1113        if (state == conn_write || state == conn_mwrite) {
1114            if (c->start != 0) {
1115                collect_timing(c->cmd, gethrtime() - c->start);
1116                c->start = 0;
1117            }
1118            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
1119        }
1120
1121        c->state = state;
1122    }
1123}
1124
1125/*
1126 * Ensures that there is room for another struct iovec in a connection's
1127 * iov list.
1128 *
1129 * Returns 0 on success, -1 on out-of-memory.
1130 */
1131static int ensure_iov_space(conn *c) {
1132    cb_assert(c != NULL);
1133
1134    if (c->iovused >= c->iovsize) {
1135        int i, iovnum;
1136        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
1137                                (c->iovsize * 2) * sizeof(struct iovec));
1138        if (! new_iov)
1139            return -1;
1140        c->iov = new_iov;
1141        c->iovsize *= 2;
1142
1143        /* Point all the msghdr structures at the new list. */
1144        for (i = 0, iovnum = 0; i < c->msgused; i++) {
1145            c->msglist[i].msg_iov = &c->iov[iovnum];
1146            iovnum += c->msglist[i].msg_iovlen;
1147        }
1148    }
1149
1150    return 0;
1151}
1152
1153
1154/*
1155 * Adds data to the list of pending data that will be written out to a
1156 * connection.
1157 *
1158 * Returns 0 on success, -1 on out-of-memory.
1159 */
1160
1161static int add_iov(conn *c, const void *buf, size_t len) {
1162    struct msghdr *m;
1163    size_t leftover;
1164    bool limit_to_mtu;
1165
1166    cb_assert(c != NULL);
1167
1168    if (len == 0) {
1169        return 0;
1170    }
1171
1172    do {
1173        m = &c->msglist[c->msgused - 1];
1174
1175        /*
1176         * Limit the first payloads of TCP replies, to
1177         * UDP_MAX_PAYLOAD_SIZE bytes.
1178         */
1179        limit_to_mtu = (1 == c->msgused);
1180
1181        /* We may need to start a new msghdr if this one is full. */
1182        if (m->msg_iovlen == IOV_MAX ||
1183            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
1184            add_msghdr(c);
1185        }
1186
1187        if (ensure_iov_space(c) != 0)
1188            return -1;
1189
1190        /* If the fragment is too big to fit in the datagram, split it up */
1191        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
1192            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
1193            len -= leftover;
1194        } else {
1195            leftover = 0;
1196        }
1197
1198        m = &c->msglist[c->msgused - 1];
1199        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
1200        m->msg_iov[m->msg_iovlen].iov_len = len;
1201
1202        c->msgbytes += (int)len;
1203        c->iovused++;
1204        m->msg_iovlen++;
1205
1206        buf = ((char *)buf) + len;
1207        len = leftover;
1208    } while (leftover > 0);
1209
1210    return 0;
1211}
1212
1213/**
1214 * get a pointer to the start of the request struct for the current command
1215 */
1216static void* binary_get_request(conn *c) {
1217    char *ret = c->rcurr;
1218    ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
1219            c->binary_header.request.extlen);
1220
1221    cb_assert(ret >= c->rbuf);
1222    return ret;
1223}
1224
1225/**
1226 * get a pointer to the key in this request
1227 */
1228static char* binary_get_key(conn *c) {
1229    return c->rcurr - (c->binary_header.request.keylen);
1230}
1231
1232/**
1233 * Insert a key into a buffer, but replace all non-printable characters
1234 * with a '.'.
1235 *
1236 * @param dest where to store the output
1237 * @param destsz size of destination buffer
1238 * @param prefix string to insert before the data
1239 * @param client the client we are serving
1240 * @param from_client set to true if this data is from the client
1241 * @param key the key to add to the buffer
1242 * @param nkey the number of bytes in the key
1243 * @return number of bytes in dest if success, -1 otherwise
1244 */
1245static ssize_t key_to_printable_buffer(char *dest, size_t destsz,
1246                                       SOCKET client, bool from_client,
1247                                       const char *prefix,
1248                                       const char *key,
1249                                       size_t nkey)
1250{
1251    char *ptr;
1252    ssize_t ii;
1253    ssize_t nw = snprintf(dest, destsz, "%c%d %s ", from_client ? '>' : '<',
1254                          (int)client, prefix);
1255    if (nw == -1) {
1256        return -1;
1257    }
1258
1259    ptr = dest + nw;
1260    destsz -= nw;
1261    if (nkey > destsz) {
1262        nkey = destsz;
1263    }
1264
1265    for (ii = 0; ii < nkey; ++ii, ++key, ++ptr) {
1266        if (isgraph(*key)) {
1267            *ptr = *key;
1268        } else {
1269            *ptr = '.';
1270        }
1271    }
1272
1273    *ptr = '\0';
1274    return (ssize_t)(ptr - dest);
1275}
1276
1277/**
1278 * Convert a byte array to a text string
1279 *
1280 * @param dest where to store the output
1281 * @param destsz size of destination buffer
1282 * @param prefix string to insert before the data
1283 * @param client the client we are serving
1284 * @param from_client set to true if this data is from the client
1285 * @param data the data to add to the buffer
1286 * @param size the number of bytes in data to print
1287 * @return number of bytes in dest if success, -1 otherwise
1288 */
1289static ssize_t bytes_to_output_string(char *dest, size_t destsz,
1290                                      SOCKET client, bool from_client,
1291                                      const char *prefix,
1292                                      const char *data,
1293                                      size_t size)
1294{
1295    ssize_t nw = snprintf(dest, destsz, "%c%d %s", from_client ? '>' : '<',
1296                          (int)client, prefix);
1297    ssize_t offset = nw;
1298    ssize_t ii;
1299
1300    if (nw == -1) {
1301        return -1;
1302    }
1303
1304    for (ii = 0; ii < size; ++ii) {
1305        if (ii % 4 == 0) {
1306            if ((nw = snprintf(dest + offset, destsz - offset, "\n%c%d  ",
1307                               from_client ? '>' : '<', client)) == -1) {
1308                return  -1;
1309            }
1310            offset += nw;
1311        }
1312        if ((nw = snprintf(dest + offset, destsz - offset,
1313                           " 0x%02x", (unsigned char)data[ii])) == -1) {
1314            return -1;
1315        }
1316        offset += nw;
1317    }
1318
1319    if ((nw = snprintf(dest + offset, destsz - offset, "\n")) == -1) {
1320        return -1;
1321    }
1322
1323    return offset + nw;
1324}
1325
1326static int add_bin_header(conn *c,
1327                          uint16_t err,
1328                          uint8_t hdr_len,
1329                          uint16_t key_len,
1330                          uint32_t body_len,
1331                          uint8_t datatype) {
1332    protocol_binary_response_header* header;
1333
1334    cb_assert(c);
1335
1336    c->msgcurr = 0;
1337    c->msgused = 0;
1338    c->iovused = 0;
1339    if (add_msghdr(c) != 0) {
1340        return -1;
1341    }
1342
1343    header = (protocol_binary_response_header *)c->wbuf;
1344
1345    header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1346    header->response.opcode = c->binary_header.request.opcode;
1347    header->response.keylen = (uint16_t)htons(key_len);
1348
1349    header->response.extlen = (uint8_t)hdr_len;
1350    header->response.datatype = datatype;
1351    header->response.status = (uint16_t)htons(err);
1352
1353    header->response.bodylen = htonl(body_len);
1354    header->response.opaque = c->opaque;
1355    header->response.cas = htonll(c->cas);
1356
1357    if (settings.verbose > 1) {
1358        char buffer[1024];
1359        if (bytes_to_output_string(buffer, sizeof(buffer), c->sfd, false,
1360                                   "Writing bin response:",
1361                                   (const char*)header->bytes,
1362                                   sizeof(header->bytes)) != -1) {
1363            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1364                                            "%s", buffer);
1365        }
1366    }
1367
1368    return add_iov(c, c->wbuf, sizeof(header->response));
1369}
1370
1371/**
1372 * Convert an error code generated from the storage engine to the corresponding
1373 * error code used by the protocol layer.
1374 * @param e the error code as used in the engine
1375 * @return the error code as used by the protocol layer
1376 */
1377static protocol_binary_response_status engine_error_2_protocol_error(ENGINE_ERROR_CODE e) {
1378    protocol_binary_response_status ret;
1379
1380    switch (e) {
1381    case ENGINE_SUCCESS:
1382        return PROTOCOL_BINARY_RESPONSE_SUCCESS;
1383    case ENGINE_KEY_ENOENT:
1384        return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1385    case ENGINE_KEY_EEXISTS:
1386        return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1387    case ENGINE_ENOMEM:
1388        return PROTOCOL_BINARY_RESPONSE_ENOMEM;
1389    case ENGINE_TMPFAIL:
1390        return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1391    case ENGINE_NOT_STORED:
1392        return PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1393    case ENGINE_EINVAL:
1394        return PROTOCOL_BINARY_RESPONSE_EINVAL;
1395    case ENGINE_ENOTSUP:
1396        return PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED;
1397    case ENGINE_E2BIG:
1398        return PROTOCOL_BINARY_RESPONSE_E2BIG;
1399    case ENGINE_NOT_MY_VBUCKET:
1400        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1401    case ENGINE_ERANGE:
1402        return PROTOCOL_BINARY_RESPONSE_ERANGE;
1403    case ENGINE_ROLLBACK:
1404        return PROTOCOL_BINARY_RESPONSE_ROLLBACK;
1405    default:
1406        ret = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1407    }
1408
1409    return ret;
1410}
1411
1412static ENGINE_ERROR_CODE get_vb_map_cb(const void *cookie,
1413                                       const void *map,
1414                                       size_t mapsize)
1415{
1416    char *buf;
1417    conn *c = (conn*)cookie;
1418    protocol_binary_response_header header;
1419    size_t needed = mapsize+ sizeof(protocol_binary_response_header);
1420    if (!grow_dynamic_buffer(c, needed)) {
1421        if (settings.verbose > 0) {
1422            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1423                    "<%d ERROR: Failed to allocate memory for response\n",
1424                    c->sfd);
1425        }
1426        return ENGINE_ENOMEM;
1427    }
1428
1429    buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1430    memset(&header, 0, sizeof(header));
1431
1432    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1433    header.response.opcode = c->binary_header.request.opcode;
1434    header.response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET);
1435    header.response.bodylen = htonl((uint32_t)mapsize);
1436    header.response.opaque = c->opaque;
1437
1438    memcpy(buf, header.bytes, sizeof(header.response));
1439    buf += sizeof(header.response);
1440    memcpy(buf, map, mapsize);
1441    c->dynamic_buffer.offset += needed;
1442
1443    return ENGINE_SUCCESS;
1444}
1445
1446static void write_bin_packet(conn *c, protocol_binary_response_status err, int swallow) {
1447    if (err == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
1448        ENGINE_ERROR_CODE ret;
1449        cb_assert(swallow == 0);
1450
1451        ret = settings.engine.v1->get_engine_vb_map(settings.engine.v0, c,
1452                                                    get_vb_map_cb);
1453        if (ret == ENGINE_SUCCESS) {
1454            write_and_free(c, c->dynamic_buffer.buffer,
1455                           c->dynamic_buffer.offset);
1456            c->dynamic_buffer.buffer = NULL;
1457        } else {
1458            conn_set_state(c, conn_closing);
1459        }
1460    } else {
1461        ssize_t len = 0;
1462        const char *errtext = NULL;
1463
1464        if (err != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1465            errtext = memcached_protocol_errcode_2_text(err);
1466            if (errtext != NULL) {
1467                len = (ssize_t)strlen(errtext);
1468            }
1469        }
1470
1471        if (errtext && settings.verbose > 1) {
1472            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1473                                            ">%d Writing an error: %s\n", c->sfd,
1474                                            errtext);
1475        }
1476
1477        add_bin_header(c, err, 0, 0, len, PROTOCOL_BINARY_RAW_BYTES);
1478        if (errtext) {
1479            add_iov(c, errtext, len);
1480        }
1481        conn_set_state(c, conn_mwrite);
1482        if (swallow > 0) {
1483            c->sbytes = swallow;
1484            c->write_and_go = conn_swallow;
1485        } else {
1486            c->write_and_go = conn_new_cmd;
1487        }
1488    }
1489}
1490
1491/* Form and send a response to a command over the binary protocol */
1492static void write_bin_response(conn *c, const void *d, int hlen, int keylen, int dlen) {
1493    if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1494        c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1495        if (add_bin_header(c, 0, hlen, keylen, dlen, PROTOCOL_BINARY_RAW_BYTES) == -1) {
1496            conn_set_state(c, conn_closing);
1497            return;
1498        }
1499        add_iov(c, d, dlen);
1500        conn_set_state(c, conn_mwrite);
1501        c->write_and_go = conn_new_cmd;
1502    } else {
1503        if (c->start != 0) {
1504            collect_timing(c->cmd, gethrtime() - c->start);
1505            c->start = 0;
1506        }
1507        conn_set_state(c, conn_new_cmd);
1508    }
1509}
1510
1511static void complete_update_bin(conn *c) {
1512    protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1513    ENGINE_ERROR_CODE ret;
1514    item *it;
1515    item_info_holder info;
1516
1517    cb_assert(c != NULL);
1518    it = c->item;
1519    memset(&info, 0, sizeof(info));
1520    info.info.nvalue = 1;
1521    if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
1522                                           (void*)&info)) {
1523        settings.engine.v1->release(settings.engine.v0, c, it);
1524        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1525                                        "%d: Failed to get item info",
1526                                        c->sfd);
1527        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1528        return;
1529    }
1530
1531    ret = c->aiostat;
1532    c->aiostat = ENGINE_SUCCESS;
1533    if (ret == ENGINE_SUCCESS) {
1534        if (!c->supports_datatype) {
1535            if (checkUTF8JSON((void*)info.info.value[0].iov_base,
1536                              (int)info.info.value[0].iov_len)) {
1537                info.info.datatype = PROTOCOL_BINARY_DATATYPE_JSON;
1538                if (!settings.engine.v1->set_item_info(settings.engine.v0, c,
1539                                                       it, &info.info)) {
1540                    settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1541                            "%d: Failed to set item info",
1542                            c->sfd);
1543                }
1544            }
1545        }
1546        ret = settings.engine.v1->store(settings.engine.v0, c,
1547                                        it, &c->cas, c->store_op,
1548                                        c->binary_header.request.vbucket);
1549    }
1550
1551#ifdef ENABLE_DTRACE
1552    switch (c->cmd) {
1553    case OPERATION_ADD:
1554        MEMCACHED_COMMAND_ADD(c->sfd, info.info.key, info.info.nkey,
1555                              (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1556        break;
1557    case OPERATION_REPLACE:
1558        MEMCACHED_COMMAND_REPLACE(c->sfd, info.info.key, info.info.nkey,
1559                                  (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1560        break;
1561    case OPERATION_APPEND:
1562        MEMCACHED_COMMAND_APPEND(c->sfd, info.info.key, info.info.nkey,
1563                                 (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1564        break;
1565    case OPERATION_PREPEND:
1566        MEMCACHED_COMMAND_PREPEND(c->sfd, info.info.key, info.info.nkey,
1567                                  (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1568        break;
1569    case OPERATION_SET:
1570        MEMCACHED_COMMAND_SET(c->sfd, info.info.key, info.info.nkey,
1571                              (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1572        break;
1573    }
1574#endif
1575
1576    switch (ret) {
1577    case ENGINE_SUCCESS:
1578        /* Stored */
1579        write_bin_response(c, NULL, 0, 0, 0);
1580        break;
1581    case ENGINE_KEY_EEXISTS:
1582        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1583        break;
1584    case ENGINE_KEY_ENOENT:
1585        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1586        break;
1587    case ENGINE_ENOMEM:
1588        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1589        break;
1590    case ENGINE_TMPFAIL:
1591        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1592        break;
1593    case ENGINE_EWOULDBLOCK:
1594        c->ewouldblock = true;
1595        break;
1596    case ENGINE_DISCONNECT:
1597        c->state = conn_closing;
1598        break;
1599    case ENGINE_ENOTSUP:
1600        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1601        break;
1602    case ENGINE_NOT_MY_VBUCKET:
1603        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1604        break;
1605    case ENGINE_E2BIG:
1606        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, 0);
1607        break;
1608    default:
1609        if (c->store_op == OPERATION_ADD) {
1610            eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1611        } else if(c->store_op == OPERATION_REPLACE) {
1612            eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1613        } else {
1614            eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1615        }
1616        write_bin_packet(c, eno, 0);
1617    }
1618
1619    if (c->store_op == OPERATION_CAS) {
1620        switch (ret) {
1621        case ENGINE_SUCCESS:
1622            SLAB_INCR(c, cas_hits, info.info.key, info.info.nkey);
1623            break;
1624        case ENGINE_KEY_EEXISTS:
1625            SLAB_INCR(c, cas_badval, info.info.key, info.info.nkey);
1626            break;
1627        case ENGINE_KEY_ENOENT:
1628            STATS_NOKEY(c, cas_misses);
1629            break;
1630        default:
1631            ;
1632        }
1633    } else {
1634        SLAB_INCR(c, cmd_set, info.info.key, info.info.nkey);
1635    }
1636
1637    if (!c->ewouldblock) {
1638        /* release the c->item reference */
1639        settings.engine.v1->release(settings.engine.v0, c, c->item);
1640        c->item = 0;
1641    }
1642}
1643
1644static void process_bin_get(conn *c) {
1645    item *it;
1646    protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1647    char* key = binary_get_key(c);
1648    size_t nkey = c->binary_header.request.keylen;
1649    uint16_t keylen;
1650    uint32_t bodylen;
1651    item_info_holder info;
1652    int ii;
1653    ENGINE_ERROR_CODE ret;
1654    uint8_t datatype;
1655    bool need_inflate = false;
1656
1657    memset(&info, 0, sizeof(info));
1658    if (settings.verbose > 1) {
1659        char buffer[1024];
1660        if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1661                                    "GET", key, nkey) != -1) {
1662            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
1663                                            buffer);
1664        }
1665    }
1666
1667    ret = c->aiostat;
1668    c->aiostat = ENGINE_SUCCESS;
1669    if (ret == ENGINE_SUCCESS) {
1670        ret = settings.engine.v1->get(settings.engine.v0, c, &it, key, (int)nkey,
1671                                      c->binary_header.request.vbucket);
1672    }
1673
1674    info.info.nvalue = IOV_MAX;
1675    switch (ret) {
1676    case ENGINE_SUCCESS:
1677        STATS_HIT(c, get, key, nkey);
1678
1679        if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
1680                                               (void*)&info)) {
1681            settings.engine.v1->release(settings.engine.v0, c, it);
1682            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1683                                            "%d: Failed to get item info",
1684                                            c->sfd);
1685            write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1686            break;
1687        }
1688
1689        datatype = info.info.datatype;
1690        if (!c->supports_datatype) {
1691            if ((datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) == PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
1692                need_inflate = true;
1693            } else {
1694                datatype = PROTOCOL_BINARY_RAW_BYTES;
1695            }
1696        }
1697
1698        keylen = 0;
1699        bodylen = sizeof(rsp->message.body) + info.info.nbytes;
1700
1701        if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1702            bodylen += (uint32_t)nkey;
1703            keylen = (uint16_t)nkey;
1704        }
1705
1706        if (need_inflate) {
1707            if (info.info.nvalue != 1) {
1708                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1709            } else if (binary_response_handler(key, keylen,
1710                                               &info.info.flags, 4,
1711                                               info.info.value[0].iov_base,
1712                                               (uint32_t)info.info.value[0].iov_len,
1713                                               datatype,
1714                                               PROTOCOL_BINARY_RESPONSE_SUCCESS,
1715                                               info.info.cas, c)) {
1716                write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
1717                c->dynamic_buffer.buffer = NULL;
1718                settings.engine.v1->release(settings.engine.v0, c, it);
1719            } else {
1720                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1721            }
1722        } else {
1723            if (add_bin_header(c, 0, sizeof(rsp->message.body),
1724                               keylen, bodylen, datatype) == -1) {
1725                conn_set_state(c, conn_closing);
1726                return;
1727            }
1728            rsp->message.header.response.cas = htonll(info.info.cas);
1729
1730            /* add the flags */
1731            rsp->message.body.flags = info.info.flags;
1732            add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1733
1734            if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1735                add_iov(c, info.info.key, nkey);
1736            }
1737
1738            for (ii = 0; ii < info.info.nvalue; ++ii) {
1739                add_iov(c, info.info.value[ii].iov_base,
1740                        info.info.value[ii].iov_len);
1741            }
1742            conn_set_state(c, conn_mwrite);
1743            /* Remember this item so we can garbage collect it later */
1744            c->item = it;
1745        }
1746        break;
1747    case ENGINE_KEY_ENOENT:
1748        STATS_MISS(c, get, key, nkey);
1749
1750        MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1751
1752        if (c->noreply) {
1753            conn_set_state(c, conn_new_cmd);
1754        } else {
1755            if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1756                char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1757                if (add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1758                                   0, (uint16_t)nkey,
1759                                   (uint32_t)nkey, PROTOCOL_BINARY_RAW_BYTES) == -1) {
1760                    conn_set_state(c, conn_closing);
1761                    return;
1762                }
1763                memcpy(ofs, key, nkey);
1764                add_iov(c, ofs, nkey);
1765                conn_set_state(c, conn_mwrite);
1766            } else {
1767                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1768            }
1769        }
1770        break;
1771    case ENGINE_EWOULDBLOCK:
1772        c->ewouldblock = true;
1773        break;
1774    case ENGINE_DISCONNECT:
1775        c->state = conn_closing;
1776        break;
1777    default:
1778        write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
1779    }
1780
1781    if (settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) {
1782        stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS);
1783    }
1784}
1785
1786static void append_bin_stats(const char *key, const uint16_t klen,
1787                             const char *val, const uint32_t vlen,
1788                             conn *c) {
1789    char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1790    uint32_t bodylen = klen + vlen;
1791    protocol_binary_response_header header;
1792
1793    memset(&header, 0, sizeof(header));
1794    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1795    header.response.opcode = PROTOCOL_BINARY_CMD_STAT;
1796    header.response.keylen = (uint16_t)htons(klen);
1797    header.response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
1798    header.response.bodylen = htonl(bodylen);
1799    header.response.opaque = c->opaque;
1800
1801    memcpy(buf, header.bytes, sizeof(header.response));
1802    buf += sizeof(header.response);
1803
1804    if (klen > 0) {
1805        cb_assert(key != NULL);
1806        memcpy(buf, key, klen);
1807        buf += klen;
1808
1809        if (vlen > 0) {
1810            memcpy(buf, val, vlen);
1811        }
1812    }
1813
1814    c->dynamic_buffer.offset += sizeof(header.response) + bodylen;
1815}
1816
1817static bool grow_dynamic_buffer(conn *c, size_t needed) {
1818    size_t nsize = c->dynamic_buffer.size;
1819    size_t available = nsize - c->dynamic_buffer.offset;
1820    bool rv = true;
1821
1822    /* Special case: No buffer -- need to allocate fresh */
1823    if (c->dynamic_buffer.buffer == NULL) {
1824        nsize = 1024;
1825        available = c->dynamic_buffer.size = c->dynamic_buffer.offset = 0;
1826    }
1827
1828    while (needed > available) {
1829        cb_assert(nsize > 0);
1830        nsize = nsize << 1;
1831        available = nsize - c->dynamic_buffer.offset;
1832    }
1833
1834    if (nsize != c->dynamic_buffer.size) {
1835        char *ptr = realloc(c->dynamic_buffer.buffer, nsize);
1836        if (ptr) {
1837            c->dynamic_buffer.buffer = ptr;
1838            c->dynamic_buffer.size = nsize;
1839        } else {
1840            rv = false;
1841        }
1842    }
1843
1844    return rv;
1845}
1846
1847static void append_stats(const char *key, const uint16_t klen,
1848                         const char *val, const uint32_t vlen,
1849                         const void *cookie)
1850{
1851    size_t needed;
1852    conn *c = (conn*)cookie;
1853    /* value without a key is invalid */
1854    if (klen == 0 && vlen > 0) {
1855        return ;
1856    }
1857
1858    needed = vlen + klen + sizeof(protocol_binary_response_header);
1859    if (!grow_dynamic_buffer(c, needed)) {
1860        return ;
1861    }
1862    append_bin_stats(key, klen, val, vlen, c);
1863    cb_assert(c->dynamic_buffer.offset <= c->dynamic_buffer.size);
1864}
1865
1866static void bin_read_chunk(conn *c,
1867                           enum bin_substates next_substate,
1868                           uint32_t chunk) {
1869    ptrdiff_t offset;
1870    cb_assert(c);
1871    c->substate = next_substate;
1872    c->rlbytes = chunk;
1873
1874    /* Ok... do we have room for everything in our buffer? */
1875    offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1876    if (c->rlbytes > c->rsize - offset) {
1877        size_t nsize = c->rsize;
1878        size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
1879
1880        while (size > nsize) {
1881            nsize *= 2;
1882        }
1883
1884        if (nsize != c->rsize) {
1885            char *newm;
1886            if (settings.verbose > 1) {
1887                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1888                        "%d: Need to grow buffer from %lu to %lu\n",
1889                        c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1890            }
1891            newm = realloc(c->rbuf, nsize);
1892            if (newm == NULL) {
1893                if (settings.verbose) {
1894                    settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1895                            "%d: Failed to grow buffer.. closing connection\n",
1896                            c->sfd);
1897                }
1898                conn_set_state(c, conn_closing);
1899                return;
1900            }
1901
1902            c->rbuf= newm;
1903            /* rcurr should point to the same offset in the packet */
1904            c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1905            c->rsize = (int)nsize;
1906        }
1907        if (c->rbuf != c->rcurr) {
1908            memmove(c->rbuf, c->rcurr, c->rbytes);
1909            c->rcurr = c->rbuf;
1910            if (settings.verbose > 1) {
1911                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1912                                                "%d: Repack input buffer\n",
1913                                                c->sfd);
1914            }
1915        }
1916    }
1917
1918    /* preserve the header in the buffer.. */
1919    c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1920    conn_set_state(c, conn_nread);
1921}
1922
1923static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1924    bin_read_chunk(c, next_substate, c->keylen + extra);
1925}
1926
1927
1928/* Just write an error message and disconnect the client */
1929static void handle_binary_protocol_error(conn *c) {
1930    write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1931    if (settings.verbose) {
1932        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1933                "%d: Protocol error (opcode %02x), close connection\n",
1934                c->sfd, c->binary_header.request.opcode);
1935    }
1936    c->write_and_go = conn_closing;
1937}
1938
1939static void get_auth_data(const void *cookie, auth_data_t *data) {
1940    conn *c = (conn*)cookie;
1941    if (c->sasl_conn) {
1942        cbsasl_getprop(c->sasl_conn, CBSASL_USERNAME, (void*)&data->username);
1943        cbsasl_getprop(c->sasl_conn, CBSASL_CONFIG, (void*)&data->config);
1944    } else {
1945        data->username = NULL;
1946        data->config = NULL;
1947    }
1948}
1949
1950struct sasl_tmp {
1951    int ksize;
1952    int vsize;
1953    char data[1]; /* data + ksize == value */
1954};
1955
1956static void process_bin_sasl_auth(conn *c) {
1957    int nkey;
1958    int vlen;
1959    char *key;
1960    size_t buffer_size;
1961    struct sasl_tmp *data;
1962
1963    cb_assert(c->binary_header.request.extlen == 0);
1964    nkey = c->binary_header.request.keylen;
1965    vlen = c->binary_header.request.bodylen - nkey;
1966
1967    if (nkey > MAX_SASL_MECH_LEN) {
1968        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
1969        c->write_and_go = conn_swallow;
1970        return;
1971    }
1972
1973    key = binary_get_key(c);
1974    cb_assert(key);
1975
1976    buffer_size = sizeof(struct sasl_tmp) + nkey + vlen + 2;
1977    data = calloc(sizeof(struct sasl_tmp) + buffer_size, 1);
1978    if (!data) {
1979        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1980        c->write_and_go = conn_swallow;
1981        return;
1982    }
1983
1984    data->ksize = nkey;
1985    data->vsize = vlen;
1986    memcpy(data->data, key, nkey);
1987
1988    c->item = data;
1989    c->ritem = data->data + nkey;
1990    c->rlbytes = vlen;
1991    conn_set_state(c, conn_nread);
1992    c->substate = bin_reading_sasl_auth_data;
1993}
1994
1995static void process_bin_complete_sasl_auth(conn *c) {
1996    auth_data_t data;
1997    const char *out = NULL;
1998    unsigned int outlen = 0;
1999    int nkey;
2000    int vlen;
2001    struct sasl_tmp *stmp;
2002    char mech[1024];
2003    const char *challenge;
2004    int result=-1;
2005
2006    cb_assert(c->item);
2007
2008    nkey = c->binary_header.request.keylen;
2009    if (nkey > 1023) {
2010        /* too big.. */
2011        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2012                "%d: sasl error. key: %d > 1023", c->sfd, nkey);
2013        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2014        return;
2015    }
2016    vlen = c->binary_header.request.bodylen - nkey;
2017
2018    stmp = c->item;
2019    memcpy(mech, stmp->data, nkey);
2020    mech[nkey] = 0x00;
2021
2022    if (settings.verbose) {
2023        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2024                "%d: mech: ``%s'' with %d bytes of data\n", c->sfd, mech, vlen);
2025    }
2026
2027    challenge = vlen == 0 ? NULL : (stmp->data + nkey);
2028    switch (c->cmd) {
2029    case PROTOCOL_BINARY_CMD_SASL_AUTH:
2030        result = cbsasl_server_start(&c->sasl_conn, mech,
2031                                     challenge, vlen,
2032                                     (unsigned char **)&out, &outlen);
2033        break;
2034    case PROTOCOL_BINARY_CMD_SASL_STEP:
2035        result = cbsasl_server_step(c->sasl_conn, challenge,
2036                                    vlen, &out, &outlen);
2037        break;
2038    default:
2039        cb_assert(false); /* CMD should be one of the above */
2040        /* This code is pretty much impossible, but makes the compiler
2041           happier */
2042        if (settings.verbose) {
2043            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2044                    "%d: Unhandled command %d with challenge %s\n",
2045                    c->sfd, c->cmd, challenge);
2046        }
2047        break;
2048    }
2049
2050    free(c->item);
2051    c->item = NULL;
2052    c->ritem = NULL;
2053
2054    if (settings.verbose) {
2055        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2056                                        "%d: sasl result code:  %d\n",
2057                                        c->sfd, result);
2058    }
2059
2060    switch(result) {
2061    case SASL_OK:
2062        write_bin_response(c, "Authenticated", 0, 0, (uint32_t)strlen("Authenticated"));
2063        get_auth_data(c, &data);
2064        if (settings.disable_admin) {
2065            /* "everyone is admins" */
2066            cookie_set_admin(c);
2067        } else if (settings.admin != NULL && data.username != NULL) {
2068            if (strcmp(settings.admin, data.username) == 0) {
2069                cookie_set_admin(c);
2070            }
2071        }
2072        perform_callbacks(ON_AUTH, (const void*)&data, c);
2073        STATS_NOKEY(c, auth_cmds);
2074        break;
2075    case SASL_CONTINUE:
2076        if (add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0,
2077                           outlen, PROTOCOL_BINARY_RAW_BYTES) == -1) {
2078            conn_set_state(c, conn_closing);
2079            return;
2080        }
2081        add_iov(c, out, outlen);
2082        conn_set_state(c, conn_mwrite);
2083        c->write_and_go = conn_new_cmd;
2084        break;
2085    case SASL_BADPARAM:
2086        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2087                                        "%d: Bad sasl params: %d\n",
2088                                        c->sfd, result);
2089        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2090        STATS_NOKEY2(c, auth_cmds, auth_errors);
2091        break;
2092    default:
2093        if (result == SASL_NOUSER || result == SASL_PWERR) {
2094            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2095                                            "%d: Invalid username/password combination",
2096                                            c->sfd);
2097        } else {
2098            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2099                                            "%d: Unknown sasl response: %d",
2100                                            c->sfd, result);
2101        }
2102        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2103        STATS_NOKEY2(c, auth_cmds, auth_errors);
2104    }
2105}
2106
2107static bool authenticated(conn *c) {
2108    bool rv = false;
2109
2110    switch (c->cmd) {
2111    case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
2112    case PROTOCOL_BINARY_CMD_SASL_AUTH:       /* FALLTHROUGH */
2113    case PROTOCOL_BINARY_CMD_SASL_STEP:       /* FALLTHROUGH */
2114    case PROTOCOL_BINARY_CMD_VERSION:         /* FALLTHROUGH */
2115    case PROTOCOL_BINARY_CMD_HELLO:
2116        rv = true;
2117        break;
2118    default:
2119        if (c->sasl_conn) {
2120            const void *uname = NULL;
2121            cbsasl_getprop(c->sasl_conn, CBSASL_USERNAME, &uname);
2122            rv = uname != NULL;
2123        }
2124    }
2125
2126    if (settings.verbose > 1) {
2127        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2128                "%d: authenticated() in cmd 0x%02x is %s\n",
2129                c->sfd, c->cmd, rv ? "true" : "false");
2130    }
2131
2132    return rv;
2133}
2134
2135bool binary_response_handler(const void *key, uint16_t keylen,
2136                             const void *ext, uint8_t extlen,
2137                             const void *body, uint32_t bodylen,
2138                             uint8_t datatype, uint16_t status,
2139                             uint64_t cas, const void *cookie)
2140{
2141    protocol_binary_response_header header;
2142    char *buf;
2143    conn *c = (conn*)cookie;
2144    /* Look at append_bin_stats */
2145    size_t needed;
2146    bool need_inflate = false;
2147    size_t inflated_length;
2148
2149    if (!c->supports_datatype) {
2150        if ((datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) == PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
2151            need_inflate = true;
2152        }
2153        /* We may silently drop the knowledge about a JSON item */
2154        datatype = PROTOCOL_BINARY_RAW_BYTES;
2155    }
2156
2157    needed = keylen + extlen + sizeof(protocol_binary_response_header);
2158    if (need_inflate) {
2159        if (snappy_uncompressed_length(body, bodylen,
2160                                       &inflated_length) != SNAPPY_OK) {
2161            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2162                    "<%d ERROR: Failed to determine inflated size",
2163                    c->sfd);
2164            return false;
2165        }
2166        needed += inflated_length;
2167    } else {
2168        needed += bodylen;
2169    }
2170
2171    if (!grow_dynamic_buffer(c, needed)) {
2172        if (settings.verbose > 0) {
2173            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2174                    "<%d ERROR: Failed to allocate memory for response",
2175                    c->sfd);
2176        }
2177        return false;
2178    }
2179
2180    buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
2181    memset(&header, 0, sizeof(header));
2182    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
2183    header.response.opcode = c->binary_header.request.opcode;
2184    header.response.keylen = (uint16_t)htons(keylen);
2185    header.response.extlen = extlen;
2186    header.response.datatype = datatype;
2187    header.response.status = (uint16_t)htons(status);
2188    if (need_inflate) {
2189        header.response.bodylen = htonl((uint32_t)(inflated_length + keylen + extlen));
2190    } else {
2191        header.response.bodylen = htonl(bodylen + keylen + extlen);
2192    }
2193    header.response.opaque = c->opaque;
2194    header.response.cas = htonll(cas);
2195
2196    memcpy(buf, header.bytes, sizeof(header.response));
2197    buf += sizeof(header.response);
2198
2199    if (extlen > 0) {
2200        memcpy(buf, ext, extlen);
2201        buf += extlen;
2202    }
2203
2204    if (keylen > 0) {
2205        cb_assert(key != NULL);
2206        memcpy(buf, key, keylen);
2207        buf += keylen;
2208    }
2209
2210    if (bodylen > 0) {
2211        if (need_inflate) {
2212            if (snappy_uncompress(body, bodylen, buf, &inflated_length) != SNAPPY_OK) {
2213                settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2214                        "<%d ERROR: Failed to inflate item", c->sfd);
2215                return false;
2216            }
2217        } else {
2218            memcpy(buf, body, bodylen);
2219        }
2220    }
2221
2222    c->dynamic_buffer.offset += needed;
2223    return true;
2224}
2225
2226/**
2227 * Tap stats (these are only used by the tap thread, so they don't need
2228 * to be in the threadlocal struct right now...
2229 */
2230struct tap_cmd_stats {
2231    uint64_t connect;
2232    uint64_t mutation;
2233    uint64_t checkpoint_start;
2234    uint64_t checkpoint_end;
2235    uint64_t delete;
2236    uint64_t flush;
2237    uint64_t opaque;
2238    uint64_t vbucket_set;
2239};
2240
2241struct tap_stats {
2242    cb_mutex_t mutex;
2243    struct tap_cmd_stats sent;
2244    struct tap_cmd_stats received;
2245} tap_stats;
2246
2247static void ship_tap_log(conn *c) {
2248    bool more_data = true;
2249    bool send_data = false;
2250    bool disconnect = false;
2251    item *it;
2252    uint32_t bodylen;
2253    int ii = 0;
2254
2255    c->msgcurr = 0;
2256    c->msgused = 0;
2257    c->iovused = 0;
2258    if (add_msghdr(c) != 0) {
2259        if (settings.verbose) {
2260            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2261                                            "%d: Failed to create output headers. Shutting down tap connection\n", c->sfd);
2262        }
2263        conn_set_state(c, conn_closing);
2264        return ;
2265    }
2266    /* @todo add check for buffer overflow of c->wbuf) */
2267    c->wbytes = 0;
2268    c->wcurr = c->wbuf;
2269    c->icurr = c->ilist;
2270    do {
2271        /* @todo fixme! */
2272        void *engine;
2273        uint16_t nengine;
2274        uint8_t ttl;
2275        uint16_t tap_flags;
2276        uint32_t seqno;
2277        uint16_t vbucket;
2278        tap_event_t event;
2279        bool inflate = false;
2280        size_t inflated_length = 0;
2281
2282        union {
2283            protocol_binary_request_tap_mutation mutation;
2284            protocol_binary_request_tap_delete delete;
2285            protocol_binary_request_tap_flush flush;
2286            protocol_binary_request_tap_opaque opaque;
2287            protocol_binary_request_noop noop;
2288        } msg;
2289        item_info_holder info;
2290        memset(&info, 0, sizeof(info));
2291
2292        if (ii++ == 10) {
2293            break;
2294        }
2295
2296        event = c->tap_iterator(settings.engine.v0, c, &it,
2297                                            &engine, &nengine, &ttl,
2298                                            &tap_flags, &seqno, &vbucket);
2299        memset(&msg, 0, sizeof(msg));
2300        msg.opaque.message.header.request.magic = (uint8_t)PROTOCOL_BINARY_REQ;
2301        msg.opaque.message.header.request.opaque = htonl(seqno);
2302        msg.opaque.message.body.tap.enginespecific_length = htons(nengine);
2303        msg.opaque.message.body.tap.ttl = ttl;
2304        msg.opaque.message.body.tap.flags = htons(tap_flags);
2305        msg.opaque.message.header.request.extlen = 8;
2306        msg.opaque.message.header.request.vbucket = htons(vbucket);
2307        info.info.nvalue = IOV_MAX;
2308
2309        switch (event) {
2310        case TAP_NOOP :
2311            send_data = true;
2312            msg.noop.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
2313            msg.noop.message.header.request.extlen = 0;
2314            msg.noop.message.header.request.bodylen = htonl(0);
2315            memcpy(c->wcurr, msg.noop.bytes, sizeof(msg.noop.bytes));
2316            add_iov(c, c->wcurr, sizeof(msg.noop.bytes));
2317            c->wcurr += sizeof(msg.noop.bytes);
2318            c->wbytes += sizeof(msg.noop.bytes);
2319            break;
2320        case TAP_PAUSE :
2321            more_data = false;
2322            break;
2323        case TAP_CHECKPOINT_START:
2324        case TAP_CHECKPOINT_END:
2325        case TAP_MUTATION:
2326            if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
2327                                                   (void*)&info)) {
2328                settings.engine.v1->release(settings.engine.v0, c, it);
2329                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2330                                                "%d: Failed to get item info\n", c->sfd);
2331                break;
2332            }
2333            send_data = true;
2334            c->ilist[c->ileft++] = it;
2335
2336            if (event == TAP_CHECKPOINT_START) {
2337                msg.mutation.message.header.request.opcode =
2338                    PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START;
2339                cb_mutex_enter(&tap_stats.mutex);
2340                tap_stats.sent.checkpoint_start++;
2341                cb_mutex_exit(&tap_stats.mutex);
2342            } else if (event == TAP_CHECKPOINT_END) {
2343                msg.mutation.message.header.request.opcode =
2344                    PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END;
2345                cb_mutex_enter(&tap_stats.mutex);
2346                tap_stats.sent.checkpoint_end++;
2347                cb_mutex_exit(&tap_stats.mutex);
2348            } else if (event == TAP_MUTATION) {
2349                msg.mutation.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_MUTATION;
2350                cb_mutex_enter(&tap_stats.mutex);
2351                tap_stats.sent.mutation++;
2352                cb_mutex_exit(&tap_stats.mutex);
2353            }
2354
2355            msg.mutation.message.header.request.cas = htonll(info.info.cas);
2356            msg.mutation.message.header.request.keylen = htons(info.info.nkey);
2357            msg.mutation.message.header.request.extlen = 16;
2358            if (c->supports_datatype) {
2359                msg.mutation.message.header.request.datatype = info.info.datatype;
2360            } else {
2361                switch (info.info.datatype) {
2362                case 0:
2363                    break;
2364                case PROTOCOL_BINARY_DATATYPE_JSON:
2365                    break;
2366                case PROTOCOL_BINARY_DATATYPE_COMPRESSED:
2367                case PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON:
2368                    inflate = true;
2369                    break;
2370                default:
2371                    settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2372                                                    "%d: shipping data with"
2373                                                    " an invalid datatype "
2374                                                    "(stripping info)",
2375                                                    c->sfd);
2376                }
2377                msg.mutation.message.header.request.datatype = 0;
2378            }
2379
2380            bodylen = 16 + info.info.nkey + nengine;
2381            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2382                if (inflate) {
2383                    if (snappy_uncompressed_length(info.info.value[0].iov_base,
2384                                                   info.info.nbytes,
2385                                                   &inflated_length) == SNAPPY_OK) {
2386                        bodylen += inflated_length;
2387                    } else {
2388                        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2389                                                        "<%d ERROR: Failed to determine inflated size. Sending as compressed",
2390                                                        c->sfd);
2391                        inflate = false;
2392                        bodylen += info.info.nbytes;
2393                    }
2394                } else {
2395                    bodylen += info.info.nbytes;
2396                }
2397            }
2398            msg.mutation.message.header.request.bodylen = htonl(bodylen);
2399
2400            if ((tap_flags & TAP_FLAG_NETWORK_BYTE_ORDER) == 0) {
2401                msg.mutation.message.body.item.flags = htonl(info.info.flags);
2402            } else {
2403                msg.mutation.message.body.item.flags = info.info.flags;
2404            }
2405            msg.mutation.message.body.item.expiration = htonl(info.info.exptime);
2406            msg.mutation.message.body.tap.enginespecific_length = htons(nengine);
2407            msg.mutation.message.body.tap.ttl = ttl;
2408            msg.mutation.message.body.tap.flags = htons(tap_flags);
2409            memcpy(c->wcurr, msg.mutation.bytes, sizeof(msg.mutation.bytes));
2410
2411            add_iov(c, c->wcurr, sizeof(msg.mutation.bytes));
2412            c->wcurr += sizeof(msg.mutation.bytes);
2413            c->wbytes += sizeof(msg.mutation.bytes);
2414
2415            if (nengine > 0) {
2416                memcpy(c->wcurr, engine, nengine);
2417                add_iov(c, c->wcurr, nengine);
2418                c->wcurr += nengine;
2419                c->wbytes += nengine;
2420            }
2421
2422            add_iov(c, info.info.key, info.info.nkey);
2423            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2424                if (inflate) {
2425                    void *buf = malloc(inflated_length);
2426                    void *body = info.info.value[0].iov_base;
2427                    size_t bodylen = info.info.value[0].iov_len;
2428                    if (snappy_uncompress(body, bodylen,
2429                                          buf, &inflated_length) == SNAPPY_OK) {
2430                        c->temp_alloc_list[c->temp_alloc_left++] = buf;
2431
2432                        add_iov(c, buf, inflated_length);
2433                    } else {
2434                        free(buf);
2435                        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2436                                                        "%d: FATAL: failed to inflate object. shutitng down connection", c->sfd);
2437                        conn_set_state(c, conn_closing);
2438                        return;
2439                    }
2440                } else {
2441                    int xx;
2442                    for (xx = 0; xx < info.info.nvalue; ++xx) {
2443                        add_iov(c, info.info.value[xx].iov_base,
2444                                info.info.value[xx].iov_len);
2445                    }
2446                }
2447            }
2448
2449            break;
2450        case TAP_DELETION:
2451            /* This is a delete */
2452            if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
2453                                                   (void*)&info)) {
2454                settings.engine.v1->release(settings.engine.v0, c, it);
2455                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2456                                                "%d: Failed to get item info\n", c->sfd);
2457                break;
2458            }
2459            send_data = true;
2460            c->ilist[c->ileft++] = it;
2461            msg.delete.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_DELETE;
2462            msg.delete.message.header.request.cas = htonll(info.info.cas);
2463            msg.delete.message.header.request.keylen = htons(info.info.nkey);
2464
2465            bodylen = 8 + info.info.nkey + nengine;
2466            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2467                bodylen += info.info.nbytes;
2468            }
2469            msg.delete.message.header.request.bodylen = htonl(bodylen);
2470
2471            memcpy(c->wcurr, msg.delete.bytes, sizeof(msg.delete.bytes));
2472            add_iov(c, c->wcurr, sizeof(msg.delete.bytes));
2473            c->wcurr += sizeof(msg.delete.bytes);
2474            c->wbytes += sizeof(msg.delete.bytes);
2475
2476            if (nengine > 0) {
2477                memcpy(c->wcurr, engine, nengine);
2478                add_iov(c, c->wcurr, nengine);
2479                c->wcurr += nengine;
2480                c->wbytes += nengine;
2481            }
2482
2483            add_iov(c, info.info.key, info.info.nkey);
2484            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2485                int xx;
2486                for (xx = 0; xx < info.info.nvalue; ++xx) {
2487                    add_iov(c, info.info.value[xx].iov_base,
2488                            info.info.value[xx].iov_len);
2489                }
2490            }
2491
2492            cb_mutex_enter(&tap_stats.mutex);
2493            tap_stats.sent.delete++;
2494            cb_mutex_exit(&tap_stats.mutex);
2495            break;
2496
2497        case TAP_DISCONNECT:
2498            disconnect = true;
2499            more_data = false;
2500            break;
2501        case TAP_VBUCKET_SET:
2502        case TAP_FLUSH:
2503        case TAP_OPAQUE:
2504            send_data = true;
2505
2506            if (event == TAP_OPAQUE) {
2507                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_OPAQUE;
2508                cb_mutex_enter(&tap_stats.mutex);
2509                tap_stats.sent.opaque++;
2510                cb_mutex_exit(&tap_stats.mutex);
2511
2512            } else if (event == TAP_FLUSH) {
2513                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_FLUSH;
2514                cb_mutex_enter(&tap_stats.mutex);
2515                tap_stats.sent.flush++;
2516                cb_mutex_exit(&tap_stats.mutex);
2517            } else if (event == TAP_VBUCKET_SET) {
2518                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET;
2519                msg.flush.message.body.tap.flags = htons(tap_flags);
2520                cb_mutex_enter(&tap_stats.mutex);
2521                tap_stats.sent.vbucket_set++;
2522                cb_mutex_exit(&tap_stats.mutex);
2523            }
2524
2525            msg.flush.message.header.request.bodylen = htonl(8 + nengine);
2526            memcpy(c->wcurr, msg.flush.bytes, sizeof(msg.flush.bytes));
2527            add_iov(c, c->wcurr, sizeof(msg.flush.bytes));
2528            c->wcurr += sizeof(msg.flush.bytes);
2529            c->wbytes += sizeof(msg.flush.bytes);
2530            if (nengine > 0) {
2531                memcpy(c->wcurr, engine, nengine);
2532                add_iov(c, c->wcurr, nengine);
2533                c->wcurr += nengine;
2534                c->wbytes += nengine;
2535            }
2536            break;
2537        default:
2538            abort();
2539        }
2540    } while (more_data);
2541
2542    c->ewouldblock = false;
2543    if (send_data) {
2544        conn_set_state(c, conn_mwrite);
2545        if (disconnect) {
2546            c->write_and_go = conn_closing;
2547        } else {
2548            c->write_and_go = conn_ship_log;
2549        }
2550    } else {
2551        if (disconnect) {
2552            conn_set_state(c, conn_closing);
2553        } else {
2554            /* No more items to ship to the slave at this time.. suspend.. */
2555            if (settings.verbose > 1) {
2556                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2557                                                "%d: No more items in tap log.. waiting\n",
2558                                                c->sfd);
2559            }
2560            c->ewouldblock = true;
2561        }
2562    }
2563}
2564
2565static ENGINE_ERROR_CODE default_unknown_command(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2566                                                 ENGINE_HANDLE* handle,
2567                                                 const void* cookie,
2568                                                 protocol_binary_request_header *request,
2569                                                 ADD_RESPONSE response)
2570{
2571    const conn *c = (void*)cookie;
2572
2573    if (!c->supports_datatype && request->request.datatype != PROTOCOL_BINARY_RAW_BYTES) {
2574        if (response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
2575                     PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie)) {
2576            return ENGINE_SUCCESS;
2577        } else {
2578            return ENGINE_DISCONNECT;
2579        }
2580    } else {
2581        return settings.engine.v1->unknown_command(handle, cookie,
2582                                                   request, response);
2583    }
2584}
2585
2586struct request_lookup {
2587    EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor;
2588    BINARY_COMMAND_CALLBACK callback;
2589};
2590
2591static struct request_lookup request_handlers[0x100];
2592
2593typedef void (*RESPONSE_HANDLER)(conn*);
2594/**
2595 * A map between the response packets op-code and the function to handle
2596 * the response message.
2597 */
2598static RESPONSE_HANDLER response_handlers[0x100];
2599
2600static void setup_binary_lookup_cmd(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2601                                    uint8_t cmd,
2602                                    BINARY_COMMAND_CALLBACK new_handler) {
2603    request_handlers[cmd].descriptor = descriptor;
2604    request_handlers[cmd].callback = new_handler;
2605}
2606
2607static void process_bin_unknown_packet(conn *c) {
2608    void *packet = c->rcurr - (c->binary_header.request.bodylen +
2609                               sizeof(c->binary_header));
2610    ENGINE_ERROR_CODE ret = c->aiostat;
2611    c->aiostat = ENGINE_SUCCESS;
2612    c->ewouldblock = false;
2613
2614    if (ret == ENGINE_SUCCESS) {
2615        struct request_lookup *rq = request_handlers + c->binary_header.request.opcode;
2616        ret = rq->callback(rq->descriptor, settings.engine.v0, c, packet,
2617                           binary_response_handler);
2618    }
2619
2620    switch (ret) {
2621    case ENGINE_SUCCESS:
2622        if (c->dynamic_buffer.buffer != NULL) {
2623            write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
2624            c->dynamic_buffer.buffer = NULL;
2625        } else {
2626            conn_set_state(c, conn_new_cmd);
2627        }
2628        break;
2629    case ENGINE_EWOULDBLOCK:
2630        c->ewouldblock = true;
2631        break;
2632    case ENGINE_DISCONNECT:
2633        conn_set_state(c, conn_closing);
2634        break;
2635    default:
2636        /* Release the dynamic buffer.. it may be partial.. */
2637        free(c->dynamic_buffer.buffer);
2638        c->dynamic_buffer.buffer = NULL;
2639        write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2640    }
2641}
2642
2643static void cbsasl_refresh_main(void *c)
2644{
2645    int rv = cbsasl_server_refresh();
2646    if (rv == SASL_OK) {
2647        notify_io_complete(c, ENGINE_SUCCESS);
2648    } else {
2649        notify_io_complete(c, ENGINE_EINVAL);
2650    }
2651}
2652
2653static ENGINE_ERROR_CODE refresh_cbsasl(conn *c)
2654{
2655    cb_thread_t tid;
2656    int err;
2657
2658    err = cb_create_thread(&tid, cbsasl_refresh_main, c, 1);
2659    if (err != 0) {
2660        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2661                                        "Failed to create cbsasl db "
2662                                        "update thread: %s",
2663                                        strerror(err));
2664        return ENGINE_DISCONNECT;
2665    }
2666
2667    return ENGINE_EWOULDBLOCK;
2668}
2669
2670#if 0
2671static void ssl_certs_refresh_main(void *c)
2672{
2673    /* Update the internal certificates */
2674
2675    notify_io_complete(c, ENGINE_SUCCESS);
2676}
2677#endif
2678static ENGINE_ERROR_CODE refresh_ssl_certs(conn *c)
2679{
2680#if 0
2681    cb_thread_t tid;
2682    int err;
2683
2684    err = cb_create_thread(&tid, ssl_certs_refresh_main, c, 1);
2685    if (err != 0) {
2686        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2687                                        "Failed to create ssl_certificate "
2688                                        "update thread: %s",
2689                                        strerror(err));
2690        return ENGINE_DISCONNECT;
2691    }
2692
2693    return ENGINE_EWOULDBLOCK;
2694#endif
2695    return ENGINE_SUCCESS;
2696}
2697
2698static void process_bin_tap_connect(conn *c) {
2699    TAP_ITERATOR iterator;
2700    char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2701                                sizeof(c->binary_header)));
2702    protocol_binary_request_tap_connect *req = (void*)packet;
2703    const char *key = packet + sizeof(req->bytes);
2704    const char *data = key + c->binary_header.request.keylen;
2705    uint32_t flags = 0;
2706    size_t ndata = c->binary_header.request.bodylen -
2707        c->binary_header.request.extlen -
2708        c->binary_header.request.keylen;
2709
2710    if (c->binary_header.request.extlen == 4) {
2711        flags = ntohl(req->message.body.flags);
2712
2713        if (flags & TAP_CONNECT_FLAG_BACKFILL) {
2714            /* the userdata has to be at least 8 bytes! */
2715            if (ndata < 8) {
2716                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2717                                                "%d: ERROR: Invalid tap connect message\n",
2718                                                c->sfd);
2719                conn_set_state(c, conn_closing);
2720                return ;
2721            }
2722        }
2723    } else {
2724        data -= 4;
2725        key -= 4;
2726    }
2727
2728    if (settings.verbose && c->binary_header.request.keylen > 0) {
2729        char buffer[1024];
2730        int len = c->binary_header.request.keylen;
2731        if (len >= sizeof(buffer)) {
2732            len = sizeof(buffer) - 1;
2733        }
2734        memcpy(buffer, key, len);
2735        buffer[len] = '\0';
2736        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2737                                        "%d: Trying to connect with named tap connection: <%s>\n",
2738                                        c->sfd, buffer);
2739    }
2740
2741    iterator = settings.engine.v1->get_tap_iterator(
2742        settings.engine.v0, c, key, c->binary_header.request.keylen,
2743        flags, data, ndata);
2744
2745    if (iterator == NULL) {
2746        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2747                                        "%d: FATAL: The engine does not support tap\n",
2748                                        c->sfd);
2749        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
2750        c->write_and_go = conn_closing;
2751    } else {
2752        c->tap_iterator = iterator;
2753        c->which = EV_WRITE;
2754        conn_set_state(c, conn_ship_log);
2755    }
2756}
2757
2758static void process_bin_tap_packet(tap_event_t event, conn *c) {
2759    char *packet;
2760    protocol_binary_request_tap_no_extras *tap;
2761    uint16_t nengine;
2762    uint16_t tap_flags;
2763    uint32_t seqno;
2764    uint8_t ttl;
2765    char *engine_specific;
2766    char *key;
2767    uint16_t nkey;
2768    char *data;
2769    uint32_t flags;
2770    uint32_t exptime;
2771    uint32_t ndata;
2772    ENGINE_ERROR_CODE ret;
2773
2774    cb_assert(c != NULL);
2775    packet = (c->rcurr - (c->binary_header.request.bodylen +
2776                                sizeof(c->binary_header)));
2777    tap = (void*)packet;
2778    nengine = ntohs(tap->message.body.tap.enginespecific_length);
2779    tap_flags = ntohs(tap->message.body.tap.flags);
2780    seqno = ntohl(tap->message.header.request.opaque);
2781    ttl = tap->message.body.tap.ttl;
2782    engine_specific = packet + sizeof(tap->bytes);
2783    key = engine_specific + nengine;
2784    nkey = c->binary_header.request.keylen;
2785    data = key + nkey;
2786    flags = 0;
2787    exptime = 0;
2788    ndata = c->binary_header.request.bodylen - nengine - nkey - 8;
2789    ret = c->aiostat;
2790
2791    if (ttl == 0) {
2792        ret = ENGINE_EINVAL;
2793    } else {
2794        if (event == TAP_MUTATION || event == TAP_CHECKPOINT_START ||
2795            event == TAP_CHECKPOINT_END) {
2796            protocol_binary_request_tap_mutation *mutation = (void*)tap;
2797
2798            /* engine_specific data in protocol_binary_request_tap_mutation is */
2799            /* at a different offset than protocol_binary_request_tap_no_extras */
2800            engine_specific = packet + sizeof(mutation->bytes);
2801
2802            flags = mutation->message.body.item.flags;
2803            if ((tap_flags & TAP_FLAG_NETWORK_BYTE_ORDER) == 0) {
2804                flags = ntohl(flags);
2805            }
2806
2807            exptime = ntohl(mutation->message.body.item.expiration);
2808            key += 8;
2809            data += 8;
2810            ndata -= 8;
2811        }
2812
2813        if (ret == ENGINE_SUCCESS) {
2814            uint8_t datatype = c->binary_header.request.datatype;
2815            if (event == TAP_MUTATION && !c->supports_datatype) {
2816                if (checkUTF8JSON((void*)data, ndata)) {
2817                    datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2818                }
2819            }
2820
2821            ret = settings.engine.v1->tap_notify(settings.engine.v0, c,
2822                                                 engine_specific, nengine,
2823                                                 ttl - 1, tap_flags,
2824                                                 event, seqno,
2825                                                 key, nkey,
2826                                                 flags, exptime,
2827                                                 ntohll(tap->message.header.request.cas),
2828                                                 datatype,
2829                                                 data, ndata,
2830                                                 c->binary_header.request.vbucket);
2831        }
2832    }
2833
2834    switch (ret) {
2835    case ENGINE_DISCONNECT:
2836        conn_set_state(c, conn_closing);
2837        break;
2838    case ENGINE_EWOULDBLOCK:
2839        c->ewouldblock = true;
2840        break;
2841    default:
2842        if ((tap_flags & TAP_FLAG_ACK) || (ret != ENGINE_SUCCESS)) {
2843            write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2844        } else {
2845            conn_set_state(c, conn_new_cmd);
2846        }
2847    }
2848}
2849
2850static void process_bin_tap_ack(conn *c) {
2851    char *packet;
2852    protocol_binary_response_no_extras *rsp;
2853    uint32_t seqno;
2854    uint16_t status;
2855    char *key;
2856    ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
2857
2858    cb_assert(c != NULL);
2859    packet = (c->rcurr - (c->binary_header.request.bodylen + sizeof(c->binary_header)));
2860    rsp = (void*)packet;
2861    seqno = ntohl(rsp->message.header.response.opaque);
2862    status = ntohs(rsp->message.header.response.status);
2863    key = packet + sizeof(rsp->bytes);
2864
2865    if (settings.engine.v1->tap_notify != NULL) {
2866        ret = settings.engine.v1->tap_notify(settings.engine.v0, c, NULL, 0, 0, status,
2867                                             TAP_ACK, seqno, key,
2868                                             c->binary_header.request.keylen, 0, 0,
2869                                             0, c->binary_header.request.datatype, NULL,
2870                                             0, 0);
2871    }
2872
2873    if (ret == ENGINE_DISCONNECT) {
2874        conn_set_state(c, conn_closing);
2875    } else {
2876        conn_set_state(c, conn_ship_log);
2877    }
2878}
2879
2880/**
2881 * We received a noop response.. just ignore it
2882 */
2883static void process_bin_noop_response(conn *c) {
2884    cb_assert(c != NULL);
2885    conn_set_state(c, conn_new_cmd);
2886}
2887
2888/*******************************************************************************
2889 **                             DCP MESSAGE PRODUCERS                         **
2890 ******************************************************************************/
2891static ENGINE_ERROR_CODE dcp_message_get_failover_log(const void *cookie,
2892                                                      uint32_t opaque,
2893                                                      uint16_t vbucket)
2894{
2895    protocol_binary_request_dcp_get_failover_log packet;
2896    conn *c = (void*)cookie;
2897
2898    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2899        /* We don't have room in the buffer */
2900        return ENGINE_E2BIG;
2901    }
2902
2903    memset(packet.bytes, 0, sizeof(packet.bytes));
2904    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
2905    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_GET_FAILOVER_LOG;
2906    packet.message.header.request.opaque = opaque;
2907    packet.message.header.request.vbucket = htons(vbucket);
2908
2909    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2910    add_iov(c, c->wcurr, sizeof(packet.bytes));
2911    c->wcurr += sizeof(packet.bytes);
2912    c->wbytes += sizeof(packet.bytes);
2913
2914    return ENGINE_SUCCESS;
2915}
2916
2917static ENGINE_ERROR_CODE dcp_message_stream_req(const void *cookie,
2918                                                uint32_t opaque,
2919                                                uint16_t vbucket,
2920                                                uint32_t flags,
2921                                                uint64_t start_seqno,
2922                                                uint64_t end_seqno,
2923                                                uint64_t vbucket_uuid,
2924                                                uint64_t snap_start_seqno,
2925                                                uint64_t snap_end_seqno)
2926{
2927    protocol_binary_request_dcp_stream_req packet;
2928    conn *c = (void*)cookie;
2929
2930    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2931        /* We don't have room in the buffer */
2932        return ENGINE_E2BIG;
2933    }
2934
2935    memset(packet.bytes, 0, sizeof(packet.bytes));
2936    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
2937    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_STREAM_REQ;
2938    packet.message.header.request.extlen = 48;
2939    packet.message.header.request.bodylen = htonl(48);
2940    packet.message.header.request.opaque = opaque;
2941    packet.message.header.request.vbucket = htons(vbucket);
2942
2943    packet.message.body.flags = ntohl(flags);
2944    packet.message.body.start_seqno = ntohll(start_seqno);
2945    packet.message.body.end_seqno = ntohll(end_seqno);
2946    packet.message.body.vbucket_uuid = ntohll(vbucket_uuid);
2947    packet.message.body.snap_start_seqno = ntohll(snap_start_seqno);
2948    packet.message.body.snap_end_seqno = ntohll(snap_end_seqno);
2949
2950    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2951    add_iov(c, c->wcurr, sizeof(packet.bytes));
2952    c->wcurr += sizeof(packet.bytes);
2953    c->wbytes += sizeof(packet.bytes);
2954
2955    return ENGINE_SUCCESS;
2956}
2957
2958static ENGINE_ERROR_CODE dcp_message_add_stream_response(const void *cookie,
2959                                                         uint32_t opaque,
2960                                                         uint32_t dialogopaque,
2961                                                         uint8_t status)
2962{
2963    protocol_binary_response_dcp_add_stream packet;
2964    conn *c = (void*)cookie;
2965
2966    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2967        /* We don't have room in the buffer */
2968        return ENGINE_E2BIG;
2969    }
2970
2971    memset(packet.bytes, 0, sizeof(packet.bytes));
2972    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
2973    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_ADD_STREAM;
2974    packet.message.header.response.extlen = 4;
2975    packet.message.header.response.status = htons(status);
2976    packet.message.header.response.bodylen = htonl(4);
2977    packet.message.header.response.opaque = opaque;
2978    packet.message.body.opaque = ntohl(dialogopaque);
2979
2980    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2981    add_iov(c, c->wcurr, sizeof(packet.bytes));
2982    c->wcurr += sizeof(packet.bytes);
2983    c->wbytes += sizeof(packet.bytes);
2984
2985    return ENGINE_SUCCESS;
2986}
2987
2988static ENGINE_ERROR_CODE dcp_message_marker_response(const void *cookie,
2989                                                     uint32_t opaque,
2990                                                     uint8_t status)
2991{
2992    protocol_binary_response_dcp_snapshot_marker packet;
2993    conn *c = (void*)cookie;
2994
2995    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2996        /* We don't have room in the buffer */
2997        return ENGINE_E2BIG;
2998    }
2999
3000    memset(packet.bytes, 0, sizeof(packet.bytes));
3001    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
3002    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
3003    packet.message.header.response.extlen = 0;
3004    packet.message.header.response.status = htons(status);
3005    packet.message.header.response.bodylen = 0;
3006    packet.message.header.response.opaque = opaque;
3007
3008    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3009    add_iov(c, c->wcurr, sizeof(packet.bytes));
3010    c->wcurr += sizeof(packet.bytes);
3011    c->wbytes += sizeof(packet.bytes);
3012
3013    return ENGINE_SUCCESS;
3014}
3015
3016static ENGINE_ERROR_CODE dcp_message_set_vbucket_state_response(const void *cookie,
3017                                                                uint32_t opaque,
3018                                                                uint8_t status)
3019{
3020    protocol_binary_response_dcp_set_vbucket_state packet;
3021    conn *c = (void*)cookie;
3022
3023    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3024        /* We don't have room in the buffer */
3025        return ENGINE_E2BIG;
3026    }
3027
3028    memset(packet.bytes, 0, sizeof(packet.bytes));
3029    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
3030    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE;
3031    packet.message.header.response.extlen = 0;
3032    packet.message.header.response.status = htons(status);
3033    packet.message.header.response.bodylen = 0;
3034    packet.message.header.response.opaque = opaque;
3035
3036    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3037    add_iov(c, c->wcurr, sizeof(packet.bytes));
3038    c->wcurr += sizeof(packet.bytes);
3039    c->wbytes += sizeof(packet.bytes);
3040
3041    return ENGINE_SUCCESS;
3042}
3043
3044static ENGINE_ERROR_CODE dcp_message_stream_end(const void *cookie,
3045                                                uint32_t opaque,
3046                                                uint16_t vbucket,
3047                                                uint32_t flags)
3048{
3049    protocol_binary_request_dcp_stream_end packet;
3050    conn *c = (void*)cookie;
3051
3052    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3053        /* We don't have room in the buffer */
3054        return ENGINE_E2BIG;
3055    }
3056
3057    memset(packet.bytes, 0, sizeof(packet.bytes));
3058    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3059    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_STREAM_END;
3060    packet.message.header.request.extlen = 4;
3061    packet.message.header.request.bodylen = htonl(4);
3062    packet.message.header.request.opaque = opaque;
3063    packet.message.header.request.vbucket = htons(vbucket);
3064    packet.message.body.flags = ntohl(flags);
3065
3066    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3067    add_iov(c, c->wcurr, sizeof(packet.bytes));
3068    c->wcurr += sizeof(packet.bytes);
3069    c->wbytes += sizeof(packet.bytes);
3070
3071    return ENGINE_SUCCESS;
3072}
3073
3074static ENGINE_ERROR_CODE dcp_message_marker(const void *cookie,
3075                                            uint32_t opaque,
3076                                            uint16_t vbucket,
3077                                            uint64_t start_seqno,
3078                                            uint64_t end_seqno,
3079                                            uint32_t flags)
3080{
3081    protocol_binary_request_dcp_snapshot_marker packet;
3082    conn *c = (void*)cookie;
3083
3084    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3085        /* We don't have room in the buffer */
3086        return ENGINE_E2BIG;
3087    }
3088
3089    memset(packet.bytes, 0, sizeof(packet.bytes));
3090    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3091    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
3092    packet.message.header.request.opaque = opaque;
3093    packet.message.header.request.vbucket = htons(vbucket);
3094    packet.message.header.request.extlen = 20;
3095    packet.message.header.request.bodylen = htonl(20);
3096    packet.message.body.start_seqno = htonll(start_seqno);
3097    packet.message.body.end_seqno = htonll(end_seqno);
3098    packet.message.body.flags = htonl(flags);
3099
3100    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3101    add_iov(c, c->wcurr, sizeof(packet.bytes));
3102    c->wcurr += sizeof(packet.bytes);
3103    c->wbytes += sizeof(packet.bytes);
3104
3105    return ENGINE_SUCCESS;
3106}
3107
3108static ENGINE_ERROR_CODE dcp_message_mutation(const void* cookie,
3109                                              uint32_t opaque,
3110                                              item *it,
3111                                              uint16_t vbucket,
3112                                              uint64_t by_seqno,
3113                                              uint64_t rev_seqno,
3114                                              uint32_t lock_time,
3115                                              const void *meta,
3116                                              uint16_t nmeta,
3117                                              uint8_t nru)
3118{
3119    conn *c = (void*)cookie;
3120    item_info_holder info;
3121    protocol_binary_request_dcp_mutation packet;
3122    int xx;
3123
3124    if (c->wbytes + sizeof(packet.bytes) + nmeta >= c->wsize) {
3125        /* We don't have room in the buffer */
3126        return ENGINE_E2BIG;
3127    }
3128
3129    memset(&info, 0, sizeof(info));
3130    info.info.nvalue = IOV_MAX;
3131
3132    if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
3133                                           (void*)&info)) {
3134        settings.engine.v1->release(settings.engine.v0, c, it);
3135        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
3136                                        "%d: Failed to get item info\n", c->sfd);
3137        return ENGINE_FAILED;
3138    }
3139
3140    memset(packet.bytes, 0, sizeof(packet));
3141    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3142    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_MUTATION;
3143    packet.message.header.request.opaque = opaque;
3144    packet.message.header.request.vbucket = htons(vbucket);
3145    packet.message.header.request.cas = htonll(info.info.cas);
3146    packet.message.header.request.keylen = htons(info.info.nkey);
3147    packet.message.header.request.extlen = 31;
3148    packet.message.header.request.bodylen = ntohl(31 + info.info.nkey + info.info.nbytes + nmeta);
3149    packet.message.header.request.datatype = info.info.datatype;
3150    packet.message.body.by_seqno = htonll(by_seqno);
3151    packet.message.body.rev_seqno = htonll(rev_seqno);
3152    packet.message.body.lock_time = htonl(lock_time);
3153    packet.message.body.flags = info.info.flags;
3154    packet.message.body.expiration = htonl(info.info.exptime);
3155    packet.message.body.nmeta = htons(nmeta);
3156    packet.message.body.nru = nru;
3157
3158    c->ilist[c->ileft++] = it;
3159
3160    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3161    add_iov(c, c->wcurr, sizeof(packet.bytes));
3162    c->wcurr += sizeof(packet.bytes);
3163    c->wbytes += sizeof(packet.bytes);
3164    add_iov(c, info.info.key, info.info.nkey);
3165    for (xx = 0; xx < info.info.nvalue; ++xx) {
3166        add_iov(c, info.info.value[xx].iov_base, info.info.value[xx].iov_len);
3167    }
3168
3169    memcpy(c->wcurr, meta, nmeta);
3170    add_iov(c, c->wcurr, nmeta);
3171    c->wcurr += nmeta;
3172    c->wbytes += nmeta;
3173
3174    return ENGINE_SUCCESS;
3175}
3176
3177static ENGINE_ERROR_CODE dcp_message_deletion(const void* cookie,
3178                                              uint32_t opaque,
3179                                              const void *key,
3180                                              uint16_t nkey,
3181                                              uint64_t cas,
3182                                              uint16_t vbucket,
3183                                              uint64_t by_seqno,
3184                                              uint64_t rev_seqno,
3185                                              const void *meta,
3186                                              uint16_t nmeta)
3187{
3188    conn *c = (void*)cookie;
3189    protocol_binary_request_dcp_deletion packet;
3190    if (c->wbytes + sizeof(packet.bytes) + nkey + nmeta >= c->wsize) {
3191        return ENGINE_E2BIG;
3192    }
3193
3194    memset(packet.bytes, 0, sizeof(packet));
3195    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3196    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_DELETION;
3197    packet.message.header.request.opaque = opaque;
3198    packet.message.header.request.vbucket = htons(vbucket);
3199    packet.message.header.request.cas =