xref: /3.0.2-MP2/memcached/daemon/memcached.c (revision 4424e903)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15 */
16#include "config.h"
17#include "memcached.h"
18#include "memcached/extension_loggers.h"
19#include "alloc_hooks.h"
20#include "utilities/engine_loader.h"
21#include "timings.h"
22#include "cmdline.h"
23#include "mc_time.h"
24
25#include <signal.h>
26#include <fcntl.h>
27#include <errno.h>
28#include <stdlib.h>
29#include <stdio.h>
30#include <string.h>
31#include <time.h>
32#include <limits.h>
33#include <ctype.h>
34#include <stdarg.h>
35#include <stddef.h>
36#include <snappy-c.h>
37#include <JSON_checker.h>
38
39static bool grow_dynamic_buffer(conn *c, size_t needed);
40static void cookie_set_admin(const void *cookie);
41static bool cookie_is_admin(const void *cookie);
42
43typedef union {
44    item_info info;
45    char bytes[sizeof(item_info) + ((IOV_MAX - 1) * sizeof(struct iovec))];
46} item_info_holder;
47
48static void item_set_cas(const void *cookie, item *it, uint64_t cas) {
49    settings.engine.v1->item_set_cas(settings.engine.v0, cookie, it, cas);
50}
51
52#define MAX_SASL_MECH_LEN 32
53
54/* The item must always be called "it" */
55#define SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
56    thread_stats->slab_stats[info.info.clsid].slab_op++;
57
58#define THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
59    thread_stats->thread_op++;
60
61#define THREAD_GUTS2(conn, thread_stats, slab_op, thread_op) \
62    thread_stats->slab_op++; \
63    thread_stats->thread_op++;
64
65#define SLAB_THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
66    SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
67    THREAD_GUTS(conn, thread_stats, slab_op, thread_op)
68
69#define STATS_INCR1(GUTS, conn, slab_op, thread_op, key, nkey) { \
70    struct thread_stats *thread_stats = get_thread_stats(conn); \
71    cb_mutex_enter(&thread_stats->mutex); \
72    GUTS(conn, thread_stats, slab_op, thread_op); \
73    cb_mutex_exit(&thread_stats->mutex); \
74}
75
76#define STATS_INCR(conn, op, key, nkey) \
77    STATS_INCR1(THREAD_GUTS, conn, op, op, key, nkey)
78
79#define SLAB_INCR(conn, op, key, nkey) \
80    STATS_INCR1(SLAB_GUTS, conn, op, op, key, nkey)
81
82#define STATS_TWO(conn, slab_op, thread_op, key, nkey) \
83    STATS_INCR1(THREAD_GUTS2, conn, slab_op, thread_op, key, nkey)
84
85#define SLAB_TWO(conn, slab_op, thread_op, key, nkey) \
86    STATS_INCR1(SLAB_THREAD_GUTS, conn, slab_op, thread_op, key, nkey)
87
88#define STATS_HIT(conn, op, key, nkey) \
89    SLAB_TWO(conn, op##_hits, cmd_##op, key, nkey)
90
91#define STATS_MISS(conn, op, key, nkey) \
92    STATS_TWO(conn, op##_misses, cmd_##op, key, nkey)
93
94#define STATS_NOKEY(conn, op) { \
95    struct thread_stats *thread_stats = \
96        get_thread_stats(conn); \
97    cb_mutex_enter(&thread_stats->mutex); \
98    thread_stats->op++; \
99    cb_mutex_exit(&thread_stats->mutex); \
100}
101
102#define STATS_NOKEY2(conn, op1, op2) { \
103    struct thread_stats *thread_stats = \
104        get_thread_stats(conn); \
105    cb_mutex_enter(&thread_stats->mutex); \
106    thread_stats->op1++; \
107    thread_stats->op2++; \
108    cb_mutex_exit(&thread_stats->mutex); \
109}
110
111#define STATS_ADD(conn, op, amt) { \
112    struct thread_stats *thread_stats = \
113        get_thread_stats(conn); \
114    cb_mutex_enter(&thread_stats->mutex); \
115    thread_stats->op += amt; \
116    cb_mutex_exit(&thread_stats->mutex); \
117}
118
119volatile sig_atomic_t memcached_shutdown;
120
121/* Lock for global stats */
122static cb_mutex_t stats_lock;
123
124/**
125 * Structure to save ns_server's session cas token.
126 */
127static struct session_cas {
128    uint64_t value;
129    uint64_t ctr;
130    cb_mutex_t mutex;
131} session_cas;
132
133void STATS_LOCK() {
134    cb_mutex_enter(&stats_lock);
135}
136
137void STATS_UNLOCK() {
138    cb_mutex_exit(&stats_lock);
139}
140
141#ifdef WIN32
142static int is_blocking(DWORD dw) {
143    return (dw == WSAEWOULDBLOCK);
144}
145static int is_emfile(DWORD dw) {
146    return (dw == WSAEMFILE);
147}
148static int is_closed_conn(DWORD dw) {
149    return (dw == WSAENOTCONN || WSAECONNRESET);
150}
151static int is_addrinuse(DWORD dw) {
152    return (dw == WSAEADDRINUSE);
153}
154static void set_ewouldblock(void) {
155    WSASetLastError(WSAEWOULDBLOCK);
156}
157static void set_econnreset(void) {
158    WSASetLastError(WSAECONNRESET);
159}
160#else
161static int is_blocking(int dw) {
162    return (dw == EAGAIN || dw == EWOULDBLOCK);
163}
164static int is_emfile(int dw) {
165    return (dw == EMFILE);
166}
167static int is_closed_conn(int dw) {
168    return  (dw == ENOTCONN || dw != ECONNRESET);
169}
170static int is_addrinuse(int dw) {
171    return (dw == EADDRINUSE);
172}
173static void set_ewouldblock(void) {
174    errno = EWOULDBLOCK;
175}
176static void set_econnreset(void) {
177    errno = ECONNRESET;
178}
179#endif
180
181/*
182 * forward declarations
183 */
184static SOCKET new_socket(struct addrinfo *ai);
185static int try_read_command(conn *c);
186static struct thread_stats* get_independent_stats(conn *c);
187static struct thread_stats* get_thread_stats(conn *c);
188static void register_callback(ENGINE_HANDLE *eh,
189                              ENGINE_EVENT_TYPE type,
190                              EVENT_CALLBACK cb, const void *cb_data);
191static SERVER_HANDLE_V1 *get_server_api(void);
192
193
194enum try_read_result {
195    READ_DATA_RECEIVED,
196    READ_NO_DATA_RECEIVED,
197    READ_ERROR,            /** an error occured (on the socket) (or client closed connection) */
198    READ_MEMORY_ERROR      /** failed to allocate more memory */
199};
200
201static enum try_read_result try_read_network(conn *c);
202
203/* stats */
204static void stats_init(void);
205static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate);
206static void process_stat_settings(ADD_STAT add_stats, void *c);
207
208
209/* defaults */
210static void settings_init(void);
211
212/* event handling, network IO */
213static void event_handler(evutil_socket_t fd, short which, void *arg);
214static void complete_nread(conn *c);
215static void write_and_free(conn *c, char *buf, size_t bytes);
216static int ensure_iov_space(conn *c);
217static int add_iov(conn *c, const void *buf, size_t len);
218static int add_msghdr(conn *c);
219
220/** exported globals **/
221struct stats stats;
222struct settings settings;
223
224/** file scope variables **/
225static conn *listen_conn = NULL;
226static struct event_base *main_base;
227static struct thread_stats *default_independent_stats;
228
229static struct engine_event_handler *engine_event_handlers[MAX_ENGINE_EVENT_TYPE + 1];
230
231enum transmit_result {
232    TRANSMIT_COMPLETE,   /** All done writing. */
233    TRANSMIT_INCOMPLETE, /** More data remaining to write. */
234    TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
235    TRANSMIT_HARD_ERROR  /** Can't write (c->state is set to conn_closing) */
236};
237
238static enum transmit_result transmit(conn *c);
239
240
241
242/* Perform all callbacks of a given type for the given connection. */
243void perform_callbacks(ENGINE_EVENT_TYPE type,
244                       const void *data,
245                       const void *c) {
246    struct engine_event_handler *h;
247    for (h = engine_event_handlers[type]; h; h = h->next) {
248        h->cb(c, type, data, h->cb_data);
249    }
250}
251
252/**
253 * Return the TCP or domain socket listening_port structure that
254 * has a given port number
255 */
256static struct listening_port *get_listening_port_instance(const int port) {
257    struct listening_port *port_ins = NULL;
258    int i;
259    for (i = 0; i < settings.num_interfaces; ++i) {
260        if (stats.listening_ports[i].port == port) {
261            port_ins = &stats.listening_ports[i];
262        }
263    }
264    return port_ins;
265}
266
267static void stats_init(void) {
268    stats.daemon_conns = 0;
269    stats.rejected_conns = 0;
270    stats.curr_conns = stats.total_conns = 0;
271    stats.listening_ports = calloc(settings.num_interfaces, sizeof(struct listening_port));
272
273    stats_prefix_init();
274}
275
276static void stats_reset(const void *cookie) {
277    struct conn *conn = (struct conn*)cookie;
278    STATS_LOCK();
279    stats.rejected_conns = 0;
280    stats.total_conns = 0;
281    stats_prefix_clear();
282    STATS_UNLOCK();
283    threadlocal_stats_reset(get_independent_stats(conn));
284    settings.engine.v1->reset_stats(settings.engine.v0, cookie);
285}
286
287static int get_number_of_worker_threads(void) {
288    int ret;
289    char *override = getenv("MEMCACHED_NUM_CPUS");
290    if (override == NULL) {
291#ifdef WIN32
292        SYSTEM_INFO sysinfo;
293        GetSystemInfo(&sysinfo);
294        ret = (int)sysinfo.dwNumberOfProcessors;
295#else
296        ret = (int)sysconf(_SC_NPROCESSORS_ONLN);
297#endif
298        if (ret > 4) {
299            ret = (int)(ret * 0.75f);
300        }
301        if (ret < 4) {
302            ret = 4;
303        }
304    } else {
305        ret = atoi(override);
306        if (ret == 0) {
307            ret = 4;
308        }
309    }
310
311    return ret;
312}
313
314static void settings_init(void) {
315    static struct interface default_interface;
316    default_interface.port = 11211;
317    default_interface.maxconn = 1000;
318    default_interface.backlog = 1024;
319
320    settings.num_interfaces = 1;
321    settings.interfaces = &default_interface;
322    settings.daemonize = false;
323    settings.pid_file = NULL;
324    settings.bio_drain_buffer_sz = 8192;
325
326    settings.verbose = 0;
327    settings.num_threads = get_number_of_worker_threads();
328    settings.prefix_delimiter = ':';
329    settings.detail_enabled = 0;
330    settings.allow_detailed = true;
331    settings.reqs_per_event_high_priority = 50;
332    settings.reqs_per_event_med_priority = 5;
333    settings.reqs_per_event_low_priority = 1;
334    settings.default_reqs_per_event = 20;
335    settings.require_sasl = false;
336    settings.extensions.logger = get_stderr_logger();
337    settings.tcp_nodelay = getenv("MEMCACHED_DISABLE_TCP_NODELAY") == NULL;
338    settings.engine_module = "default_engine.so";
339    settings.engine_config = NULL;
340    settings.config = NULL;
341    settings.admin = NULL;
342    settings.disable_admin = false;
343    settings.datatype = false;
344}
345
346/*
347 * Adds a message header to a connection.
348 *
349 * Returns 0 on success, -1 on out-of-memory.
350 */
351static int add_msghdr(conn *c)
352{
353    struct msghdr *msg;
354
355    cb_assert(c != NULL);
356
357    if (c->msgsize == c->msgused) {
358        cb_assert(c->msgsize > 0);
359        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
360        if (! msg)
361            return -1;
362        c->msglist = msg;
363        c->msgsize *= 2;
364    }
365
366    msg = c->msglist + c->msgused;
367
368    /* this wipes msg_iovlen, msg_control, msg_controllen, and
369       msg_flags, the last 3 of which aren't defined on solaris: */
370    memset(msg, 0, sizeof(struct msghdr));
371
372    msg->msg_iov = &c->iov[c->iovused];
373
374    if (c->request_addr_size > 0) {
375        msg->msg_name = &c->request_addr;
376        msg->msg_namelen = c->request_addr_size;
377    }
378
379    c->msgbytes = 0;
380    c->msgused++;
381
382    return 0;
383}
384
385struct {
386    cb_mutex_t mutex;
387    bool disabled;
388    ssize_t count;
389    uint64_t num_disable;
390} listen_state;
391
392static bool is_listen_disabled(void) {
393    bool ret;
394    cb_mutex_enter(&listen_state.mutex);
395    ret = listen_state.disabled;
396    cb_mutex_exit(&listen_state.mutex);
397    return ret;
398}
399
400static uint64_t get_listen_disabled_num(void) {
401    uint64_t ret;
402    cb_mutex_enter(&listen_state.mutex);
403    ret = listen_state.num_disable;
404    cb_mutex_exit(&listen_state.mutex);
405    return ret;
406}
407
408static void disable_listen(void) {
409    conn *next;
410    cb_mutex_enter(&listen_state.mutex);
411    listen_state.disabled = true;
412    listen_state.count = 10;
413    ++listen_state.num_disable;
414    cb_mutex_exit(&listen_state.mutex);
415
416    for (next = listen_conn; next; next = next->next) {
417        update_event(next, 0);
418        if (listen(next->sfd, 1) != 0) {
419            log_socket_error(EXTENSION_LOG_WARNING, NULL,
420                             "listen() failed: %s");
421        }
422    }
423}
424
425void safe_close(SOCKET sfd) {
426    if (sfd != INVALID_SOCKET) {
427        int rval;
428        while ((rval = closesocket(sfd)) == SOCKET_ERROR &&
429               (errno == EINTR || errno == EAGAIN)) {
430            /* go ahead and retry */
431        }
432
433        if (rval == SOCKET_ERROR) {
434            char msg[80];
435            snprintf(msg, sizeof(msg), "Failed to close socket %d (%%s)!!", (int)sfd);
436            log_socket_error(EXTENSION_LOG_WARNING, NULL,
437                             msg);
438        } else {
439            STATS_LOCK();
440            stats.curr_conns--;
441            STATS_UNLOCK();
442
443            if (is_listen_disabled()) {
444                notify_dispatcher();
445            }
446        }
447    }
448}
449
450/**
451 * Reset all of the dynamic buffers used by a connection back to their
452 * default sizes. The strategy for resizing the buffers is to allocate a
453 * new one of the correct size and free the old one if the allocation succeeds
454 * instead of using realloc to change the buffer size (because realloc may
455 * not shrink the buffers, and will also copy the memory). If the allocation
456 * fails the buffer will be unchanged.
457 *
458 * @param c the connection to resize the buffers for
459 * @return true if all allocations succeeded, false if one or more of the
460 *         allocations failed.
461 */
462static bool conn_reset_buffersize(conn *c) {
463    bool ret = true;
464
465    if (c->rsize != DATA_BUFFER_SIZE) {
466        void *ptr = malloc(DATA_BUFFER_SIZE);
467        if (ptr != NULL) {
468            free(c->rbuf);
469            c->rbuf = ptr;
470            c->rsize = DATA_BUFFER_SIZE;
471        } else {
472            ret = false;
473        }
474    }
475
476    if (c->wsize != DATA_BUFFER_SIZE) {
477        void *ptr = malloc(DATA_BUFFER_SIZE);
478        if (ptr != NULL) {
479            free(c->wbuf);
480            c->wbuf = ptr;
481            c->wsize = DATA_BUFFER_SIZE;
482        } else {
483            ret = false;
484        }
485    }
486
487    if (c->isize != ITEM_LIST_INITIAL) {
488        void *ptr = malloc(sizeof(item *) * ITEM_LIST_INITIAL);
489        if (ptr != NULL) {
490            free(c->ilist);
491            c->ilist = ptr;
492            c->isize = ITEM_LIST_INITIAL;
493        } else {
494            ret = false;
495        }
496    }
497
498    if (c->temp_alloc_size != TEMP_ALLOC_LIST_INITIAL) {
499        void *ptr = malloc(sizeof(char *) * TEMP_ALLOC_LIST_INITIAL);
500        if (ptr != NULL) {
501            free(c->temp_alloc_list);
502            c->temp_alloc_list = ptr;
503            c->temp_alloc_size = TEMP_ALLOC_LIST_INITIAL;
504        } else {
505            ret = false;
506        }
507    }
508
509    if (c->iovsize != IOV_LIST_INITIAL) {
510        void *ptr = malloc(sizeof(struct iovec) * IOV_LIST_INITIAL);
511        if (ptr != NULL) {
512            free(c->iov);
513            c->iov = ptr;
514            c->iovsize = IOV_LIST_INITIAL;
515        } else {
516            ret = false;
517        }
518    }
519
520    if (c->msgsize != MSG_LIST_INITIAL) {
521        void *ptr = malloc(sizeof(struct msghdr) * MSG_LIST_INITIAL);
522        if (ptr != NULL) {
523            free(c->msglist);
524            c->msglist = ptr;
525            c->msgsize = MSG_LIST_INITIAL;
526        } else {
527            ret = false;
528        }
529    }
530
531    return ret;
532}
533
534/**
535 * Constructor for all memory allocations of connection objects. Initialize
536 * all members and allocate the transfer buffers.
537 *
538 * @param buffer The memory allocated by the object cache
539 * @return 0 on success, 1 if we failed to allocate memory
540 */
541static int conn_constructor(conn *c) {
542    memset(c, 0, sizeof(*c));
543    MEMCACHED_CONN_CREATE(c);
544
545    c->state = conn_immediate_close;
546    c->sfd = INVALID_SOCKET;
547    if (!conn_reset_buffersize(c)) {
548        free(c->rbuf);
549        free(c->wbuf);
550        free(c->ilist);
551        free(c->temp_alloc_list);
552        free(c->iov);
553        free(c->msglist);
554        settings.extensions.logger->log(EXTENSION_LOG_WARNING,
555                                        NULL,
556                                        "Failed to allocate buffers for connection\n");
557        return 1;
558    }
559
560    STATS_LOCK();
561    stats.conn_structs++;
562    STATS_UNLOCK();
563
564    return 0;
565}
566
567/**
568 * Destructor for all connection objects. Release all allocated resources.
569 *
570 * @param buffer The memory allocated by the objec cache
571 */
572static void conn_destructor(conn *c) {
573    free(c->rbuf);
574    free(c->wbuf);
575    free(c->ilist);
576    free(c->temp_alloc_list);
577    free(c->iov);
578    free(c->msglist);
579
580    STATS_LOCK();
581    stats.conn_structs--;
582    STATS_UNLOCK();
583}
584
585/*
586 * Free list management for connections.
587 */
588struct connections {
589    conn* free;
590    conn** all;
591    cb_mutex_t mutex;
592    int next;
593} connections;
594
595static void initialize_connections(void)
596{
597    int preallocate;
598
599    cb_mutex_initialize(&connections.mutex);
600    connections.all = calloc(settings.maxconns, sizeof(conn*));
601    if (connections.all == NULL) {
602        settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
603                                        "Failed to allocate memory for connections");
604        exit(EX_OSERR);
605    }
606
607    preallocate = settings.maxconns / 2;
608    if (preallocate < 1000) {
609        preallocate = settings.maxconns;
610    } else if (preallocate > 5000) {
611        preallocate = 5000;
612    }
613
614    for (connections.next = 0; connections.next < preallocate; ++connections.next) {
615        connections.all[connections.next] = malloc(sizeof(conn));
616        if (conn_constructor(connections.all[connections.next]) != 0) {
617            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
618                                            "Failed to allocate memory for connections");
619            exit(EX_OSERR);
620        }
621        connections.all[connections.next]->next = connections.free;
622        connections.free = connections.all[connections.next];
623    }
624}
625
626static void destroy_connections(void)
627{
628    int ii;
629    for (ii = 0; ii < settings.maxconns; ++ii) {
630        if (connections.all[ii]) {
631            conn *c = connections.all[ii];
632            conn_destructor(c);
633            free(c);
634        }
635    }
636
637    free(connections.all);
638}
639
640static conn *allocate_connection(void) {
641    conn *ret;
642
643    cb_mutex_enter(&connections.mutex);
644    ret = connections.free;
645    if (ret != NULL) {
646        connections.free = connections.free->next;
647        ret->next = NULL;
648    }
649    cb_mutex_exit(&connections.mutex);
650
651    if (ret == NULL) {
652        ret = malloc(sizeof(conn));
653        if (ret == NULL) {
654            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
655                                            "Failed to allocate memory for connection");
656            return NULL;
657        }
658
659        if (conn_constructor(ret) != 0) {
660            free(ret);
661            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
662                                            "Failed to allocate memory for connection");
663            return NULL;
664        }
665
666        cb_mutex_enter(&connections.mutex);
667        if (connections.next == settings.maxconns) {
668            free(ret);
669            ret = NULL;
670        } else {
671            connections.all[connections.next++] = ret;
672        }
673        cb_mutex_exit(&connections.mutex);
674    }
675
676    return ret;
677}
678
679static void release_connection(conn *c) {
680    c->sfd = INVALID_SOCKET;
681    cb_mutex_enter(&connections.mutex);
682    c->next = connections.free;
683    connections.free = c;
684    cb_mutex_exit(&connections.mutex);
685}
686
687static const char *substate_text(enum bin_substates state) {
688    switch (state) {
689    case bin_no_state: return "bin_no_state";
690    case bin_reading_set_header: return "bin_reading_set_header";
691    case bin_reading_cas_header: return "bin_reading_cas_header";
692    case bin_read_set_value: return "bin_read_set_value";
693    case bin_reading_sasl_auth: return "bin_reading_sasl_auth";
694    case bin_reading_sasl_auth_data: return "bin_reading_sasl_auth_data";
695    case bin_reading_packet: return "bin_reading_packet";
696    default:
697        return "illegal";
698    }
699}
700
701static void add_connection_stats(ADD_STAT add_stats, conn *d, conn *c) {
702    append_stat("conn", add_stats, d, "%p", c);
703    if (c->sfd == INVALID_SOCKET) {
704        append_stat("socket", add_stats, d, "disconnected");
705    } else {
706        append_stat("socket", add_stats, d, "%lu", (long)c->sfd);
707        append_stat("protocol", add_stats, d, "%s", "binary");
708        append_stat("transport", add_stats, d, "TCP");
709        append_stat("nevents", add_stats, d, "%u", c->nevents);
710        if (c->sasl_conn != NULL) {
711            append_stat("sasl_conn", add_stats, d, "%p", c->sasl_conn);
712        }
713        append_stat("state", add_stats, d, "%s", state_text(c->state));
714        append_stat("substate", add_stats, d, "%s", substate_text(c->substate));
715        append_stat("registered_in_libevent", add_stats, d, "%d",
716                    (int)c->registered_in_libevent);
717        append_stat("ev_flags", add_stats, d, "%x", c->ev_flags);
718        append_stat("which", add_stats, d, "%x", c->which);
719        append_stat("rbuf", add_stats, d, "%p", c->rbuf);
720        append_stat("rcurr", add_stats, d, "%p", c->rcurr);
721        append_stat("rsize", add_stats, d, "%u", c->rsize);
722        append_stat("rbytes", add_stats, d, "%u", c->rbytes);
723        append_stat("wbuf", add_stats, d, "%p", c->wbuf);
724        append_stat("wcurr", add_stats, d, "%p", c->wcurr);
725        append_stat("wsize", add_stats, d, "%u", c->wsize);
726        append_stat("wbytes", add_stats, d, "%u", c->wbytes);
727        append_stat("write_and_go", add_stats, d, "%p", c->write_and_go);
728        append_stat("write_and_free", add_stats, d, "%p", c->write_and_free);
729        append_stat("ritem", add_stats, d, "%p", c->ritem);
730        append_stat("rlbytes", add_stats, d, "%u", c->rlbytes);
731        append_stat("item", add_stats, d, "%p", c->item);
732        append_stat("store_op", add_stats, d, "%u", c->store_op);
733        append_stat("sbytes", add_stats, d, "%u", c->sbytes);
734        append_stat("iov", add_stats, d, "%p", c->iov);
735        append_stat("iovsize", add_stats, d, "%u", c->iovsize);
736        append_stat("iovused", add_stats, d, "%u", c->iovused);
737        append_stat("msglist", add_stats, d, "%p", c->msglist);
738        append_stat("msgsize", add_stats, d, "%u", c->msgsize);
739        append_stat("msgused", add_stats, d, "%u", c->msgused);
740        append_stat("msgcurr", add_stats, d, "%u", c->msgcurr);
741        append_stat("msgbytes", add_stats, d, "%u", c->msgbytes);
742        append_stat("ilist", add_stats, d, "%p", c->ilist);
743        append_stat("isize", add_stats, d, "%u", c->isize);
744        append_stat("icurr", add_stats, d, "%p", c->icurr);
745        append_stat("ileft", add_stats, d, "%u", c->ileft);
746        append_stat("temp_alloc_list", add_stats, d, "%p", c->temp_alloc_list);
747        append_stat("temp_alloc_size", add_stats, d, "%u", c->temp_alloc_size);
748        append_stat("temp_alloc_curr", add_stats, d, "%p", c->temp_alloc_curr);
749        append_stat("temp_alloc_left", add_stats, d, "%u", c->temp_alloc_left);
750
751        append_stat("noreply", add_stats, d, "%d", c->noreply);
752        append_stat("refcount", add_stats, d, "%u", (int)c->refcount);
753        append_stat("dynamic_buffer.buffer", add_stats, d, "%p",
754                    c->dynamic_buffer.buffer);
755        append_stat("dynamic_buffer.size", add_stats, d, "%zu",
756                    c->dynamic_buffer.size);
757        append_stat("dynamic_buffer.offset", add_stats, d, "%zu",
758                    c->dynamic_buffer.offset);
759        append_stat("engine_storage", add_stats, d, "%p", c->engine_storage);
760        /* @todo we should decode the binary header */
761        append_stat("cas", add_stats, d, "%"PRIu64, c->cas);
762        append_stat("cmd", add_stats, d, "%u", c->cmd);
763        append_stat("opaque", add_stats, d, "%u", c->opaque);
764        append_stat("keylen", add_stats, d, "%u", c->keylen);
765        append_stat("list_state", add_stats, d, "%u", c->list_state);
766        append_stat("next", add_stats, d, "%p", c->next);
767        append_stat("thread", add_stats, d, "%p", c->thread);
768        append_stat("aiostat", add_stats, d, "%u", c->aiostat);
769        append_stat("ewouldblock", add_stats, d, "%u", c->ewouldblock);
770        append_stat("tap_iterator", add_stats, d, "%p", c->tap_iterator);
771    }
772}
773
774/**
775 * Do a full stats of all of the connections.
776 * Do _NOT_ try to follow _ANY_ of the pointers in the conn structure
777 * because we read all of the values _DIRTY_. We preallocated the array
778 * of all of the connection pointers during startup, so we _KNOW_ that
779 * we can iterate through all of them. All of the conn structs will
780 * only appear in the connections.all array when we've allocated them,
781 * and we don't release them so it's safe to look at them.
782 */
783static void connection_stats(ADD_STAT add_stats, conn *c) {
784    int ii;
785    for (ii = 0; ii < settings.maxconns && connections.all[ii]; ++ii) {
786        add_connection_stats(add_stats, c, connections.all[ii]);
787    }
788}
789
790conn *conn_new(const SOCKET sfd, in_port_t parent_port,
791               STATE_FUNC init_state, int event_flags,
792               unsigned int read_buffer_size, struct event_base *base,
793               struct timeval *timeout) {
794    conn *c = allocate_connection();
795    if (c == NULL) {
796        return NULL;
797    }
798
799    c->admin = false;
800    cb_assert(c->thread == NULL);
801    c->max_reqs_per_event = settings.default_reqs_per_event;
802
803    if (c->rsize < read_buffer_size) {
804        void *mem = malloc(read_buffer_size);
805        if (mem) {
806            c->rsize = read_buffer_size;
807            free(c->rbuf);
808            c->rbuf = mem;
809        } else {
810            cb_assert(c->thread == NULL);
811            release_connection(c);
812            return NULL;
813        }
814    }
815
816    memset(&c->ssl, 0, sizeof(c->ssl));
817    if (init_state != conn_listening) {
818        int ii;
819        for (ii = 0; ii < settings.num_interfaces; ++ii) {
820            if (parent_port == settings.interfaces[ii].port) {
821                if (settings.interfaces[ii].ssl.cert != NULL) {
822                    const char *cert = settings.interfaces[ii].ssl.cert;
823                    const char *pkey = settings.interfaces[ii].ssl.key;
824
825                    c->ssl.ctx = SSL_CTX_new(SSLv23_server_method());
826
827                    /* @todo don't read files, but use in-memory-copies */
828                    if (!SSL_CTX_use_certificate_chain_file(c->ssl.ctx, cert) ||
829                        !SSL_CTX_use_PrivateKey_file(c->ssl.ctx, pkey, SSL_FILETYPE_PEM)) {
830                        release_connection(c);
831                        return NULL;
832                    }
833
834                    c->ssl.enabled = true;
835                    c->ssl.error = false;
836                    c->ssl.client = NULL;
837
838                    c->ssl.in.buffer = malloc(settings.bio_drain_buffer_sz);
839                    c->ssl.out.buffer = malloc(settings.bio_drain_buffer_sz);
840
841                    if (c->ssl.in.buffer == NULL || c->ssl.out.buffer == NULL) {
842                        release_connection(c);
843                        return NULL;
844                    }
845
846                    c->ssl.in.buffsz = settings.bio_drain_buffer_sz;
847                    c->ssl.out.buffsz = settings.bio_drain_buffer_sz;
848                    BIO_new_bio_pair(&c->ssl.application,
849                                     settings.bio_drain_buffer_sz,
850                                     &c->ssl.network,
851                                     settings.bio_drain_buffer_sz);
852
853                    c->ssl.client = SSL_new(c->ssl.ctx);
854                    SSL_set_bio(c->ssl.client,
855                                c->ssl.application,
856                                c->ssl.application);
857                }
858            }
859        }
860    }
861
862    c->request_addr_size = 0;
863
864    if (settings.verbose > 1) {
865        if (init_state == conn_listening) {
866            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
867                                            "<%d server listening", sfd);
868        } else {
869            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
870                                            "<%d new client connection", sfd);
871        }
872    }
873
874    c->sfd = sfd;
875    c->parent_port = parent_port;
876    c->state = init_state;
877    c->rlbytes = 0;
878    c->cmd = -1;
879    c->rbytes = c->wbytes = 0;
880    c->wcurr = c->wbuf;
881    c->rcurr = c->rbuf;
882    c->ritem = 0;
883    c->icurr = c->ilist;
884    c->temp_alloc_curr = c->temp_alloc_list;
885    c->ileft = 0;
886    c->temp_alloc_left = 0;
887    c->iovused = 0;
888    c->msgcurr = 0;
889    c->msgused = 0;
890    c->next = NULL;
891    c->list_state = 0;
892
893    c->write_and_go = init_state;
894    c->write_and_free = 0;
895    c->item = 0;
896    c->supports_datatype = false;
897    c->noreply = false;
898
899    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
900    event_base_set(base, &c->event);
901    c->ev_flags = event_flags;
902
903    if (!register_event(c, timeout)) {
904        cb_assert(c->thread == NULL);
905        release_connection(c);
906        return NULL;
907    }
908
909    STATS_LOCK();
910    stats.total_conns++;
911    STATS_UNLOCK();
912
913    c->aiostat = ENGINE_SUCCESS;
914    c->ewouldblock = false;
915    c->refcount = 1;
916
917    MEMCACHED_CONN_ALLOCATE(c->sfd);
918
919    perform_callbacks(ON_CONNECT, NULL, c);
920
921    return c;
922}
923
924static void conn_cleanup(conn *c) {
925    cb_assert(c != NULL);
926    c->admin = false;
927    if (c->item) {
928        settings.engine.v1->release(settings.engine.v0, c, c->item);
929        c->item = 0;
930    }
931
932    if (c->ileft != 0) {
933        for (; c->ileft > 0; c->ileft--,c->icurr++) {
934            settings.engine.v1->release(settings.engine.v0, c, *(c->icurr));
935        }
936    }
937
938    if (c->temp_alloc_left != 0) {
939        for (; c->temp_alloc_left > 0; c->temp_alloc_left--, c->temp_alloc_curr++) {
940            free(*(c->temp_alloc_curr));
941        }
942    }
943
944    if (c->write_and_free) {
945        free(c->write_and_free);
946        c->write_and_free = 0;
947    }
948
949    if (c->sasl_conn) {
950        cbsasl_dispose(&c->sasl_conn);
951        c->sasl_conn = NULL;
952    }
953
954    c->engine_storage = NULL;
955    c->tap_iterator = NULL;
956    c->thread = NULL;
957    cb_assert(c->next == NULL);
958    c->sfd = INVALID_SOCKET;
959    c->dcp = 0;
960    c->start = 0;
961    if (c->ssl.enabled) {
962        BIO_free_all(c->ssl.network);
963        SSL_free(c->ssl.client);
964        c->ssl.enabled = false;
965        c->ssl.error = false;
966        free(c->ssl.in.buffer);
967        free(c->ssl.out.buffer);
968        memset(&c->ssl, 0, sizeof(c->ssl));
969    }
970}
971
972void conn_close(conn *c) {
973    cb_assert(c != NULL);
974    cb_assert(c->sfd == INVALID_SOCKET);
975    cb_assert(c->state == conn_immediate_close);
976
977    cb_assert(c->thread);
978    /* remove from pending-io list */
979    if (settings.verbose > 1 && list_contains(c->thread->pending_io, c)) {
980        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
981                                        "Current connection was in the pending-io list.. Nuking it\n");
982    }
983    c->thread->pending_io = list_remove(c->thread->pending_io, c);
984
985    conn_cleanup(c);
986
987    /*
988     * The contract with the object cache is that we should return the
989     * object in a constructed state. Reset the buffers to the default
990     * size
991     */
992    conn_reset_buffersize(c);
993    cb_assert(c->thread == NULL);
994    release_connection(c);
995}
996
997/*
998 * Shrinks a connection's buffers if they're too big.  This prevents
999 * periodic large "get" requests from permanently chewing lots of server
1000 * memory.
1001 *
1002 * This should only be called in between requests since it can wipe output
1003 * buffers!
1004 */
1005static void conn_shrink(conn *c) {
1006    cb_assert(c != NULL);
1007
1008    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
1009        void *newbuf;
1010
1011        if (c->rcurr != c->rbuf) {
1012            /* Pack the buffer */
1013            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1014        }
1015
1016        newbuf = realloc(c->rbuf, DATA_BUFFER_SIZE);
1017
1018        if (newbuf) {
1019            c->rbuf = newbuf;
1020            c->rsize = DATA_BUFFER_SIZE;
1021        }
1022        c->rcurr = c->rbuf;
1023    }
1024
1025    /* isize is no longer dynamic */
1026    cb_assert(c->isize == ITEM_LIST_INITIAL);
1027
1028    if (c->msgsize > MSG_LIST_HIGHWAT) {
1029        void *newbuf = realloc(c->msglist,
1030                               MSG_LIST_INITIAL * sizeof(c->msglist[0]));
1031        if (newbuf) {
1032            c->msglist = newbuf;
1033            c->msgsize = MSG_LIST_INITIAL;
1034        }
1035    }
1036
1037    if (c->iovsize > IOV_LIST_HIGHWAT) {
1038        void *newbuf = realloc(c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
1039        if (newbuf) {
1040            c->iov = newbuf;
1041            c->iovsize = IOV_LIST_INITIAL;
1042        }
1043    }
1044}
1045
1046/**
1047 * Convert a state name to a human readable form.
1048 */
1049const char *state_text(STATE_FUNC state) {
1050    if (state == conn_listening) {
1051        return "conn_listening";
1052    } else if (state == conn_new_cmd) {
1053        return "conn_new_cmd";
1054    } else if (state == conn_waiting) {
1055        return "conn_waiting";
1056    } else if (state == conn_read) {
1057        return "conn_read";
1058    } else if (state == conn_parse_cmd) {
1059        return "conn_parse_cmd";
1060    } else if (state == conn_write) {
1061        return "conn_write";
1062    } else if (state == conn_nread) {
1063        return "conn_nread";
1064    } else if (state == conn_swallow) {
1065        return "conn_swallow";
1066    } else if (state == conn_closing) {
1067        return "conn_closing";
1068    } else if (state == conn_mwrite) {
1069        return "conn_mwrite";
1070    } else if (state == conn_ship_log) {
1071        return "conn_ship_log";
1072    } else if (state == conn_setup_tap_stream) {
1073        return "conn_setup_tap_stream";
1074    } else if (state == conn_pending_close) {
1075        return "conn_pending_close";
1076    } else if (state == conn_immediate_close) {
1077        return "conn_immediate_close";
1078    } else if (state == conn_refresh_cbsasl) {
1079        return "conn_refresh_cbsasl";
1080    } else if (state == conn_refresh_ssl_certs) {
1081        return "conn_refresh_ssl_cert";
1082    } else {
1083        return "Unknown";
1084    }
1085}
1086
1087/*
1088 * Sets a connection's current state in the state machine. Any special
1089 * processing that needs to happen on certain state transitions can
1090 * happen here.
1091 */
1092void conn_set_state(conn *c, STATE_FUNC state) {
1093    cb_assert(c != NULL);
1094
1095    if (state != c->state) {
1096        /*
1097         * The connections in the "tap thread" behaves differently than
1098         * normal connections because they operate in a full duplex mode.
1099         * New messages may appear from both sides, so we can't block on
1100         * read from the nework / engine
1101         */
1102        if (c->tap_iterator != NULL || c->dcp) {
1103            if (state == conn_waiting) {
1104                c->which = EV_WRITE;
1105                state = conn_ship_log;
1106            }
1107        }
1108
1109        if (settings.verbose > 2 || c->state == conn_closing
1110            || c->state == conn_setup_tap_stream) {
1111            settings.extensions.logger->log(EXTENSION_LOG_DETAIL, c,
1112                                            "%d: going from %s to %s\n",
1113                                            c->sfd, state_text(c->state),
1114                                            state_text(state));
1115        }
1116
1117        if (state == conn_write || state == conn_mwrite) {
1118            if (c->start != 0) {
1119                collect_timing(c->cmd, gethrtime() - c->start);
1120                c->start = 0;
1121            }
1122            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
1123        }
1124
1125        c->state = state;
1126    }
1127}
1128
1129/*
1130 * Ensures that there is room for another struct iovec in a connection's
1131 * iov list.
1132 *
1133 * Returns 0 on success, -1 on out-of-memory.
1134 */
1135static int ensure_iov_space(conn *c) {
1136    cb_assert(c != NULL);
1137
1138    if (c->iovused >= c->iovsize) {
1139        int i, iovnum;
1140        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
1141                                (c->iovsize * 2) * sizeof(struct iovec));
1142        if (! new_iov)
1143            return -1;
1144        c->iov = new_iov;
1145        c->iovsize *= 2;
1146
1147        /* Point all the msghdr structures at the new list. */
1148        for (i = 0, iovnum = 0; i < c->msgused; i++) {
1149            c->msglist[i].msg_iov = &c->iov[iovnum];
1150            iovnum += c->msglist[i].msg_iovlen;
1151        }
1152    }
1153
1154    return 0;
1155}
1156
1157
1158/*
1159 * Adds data to the list of pending data that will be written out to a
1160 * connection.
1161 *
1162 * Returns 0 on success, -1 on out-of-memory.
1163 */
1164
1165static int add_iov(conn *c, const void *buf, size_t len) {
1166    struct msghdr *m;
1167    size_t leftover;
1168    bool limit_to_mtu;
1169
1170    cb_assert(c != NULL);
1171
1172    if (len == 0) {
1173        return 0;
1174    }
1175
1176    do {
1177        m = &c->msglist[c->msgused - 1];
1178
1179        /*
1180         * Limit the first payloads of TCP replies, to
1181         * UDP_MAX_PAYLOAD_SIZE bytes.
1182         */
1183        limit_to_mtu = (1 == c->msgused);
1184
1185        /* We may need to start a new msghdr if this one is full. */
1186        if (m->msg_iovlen == IOV_MAX ||
1187            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
1188            add_msghdr(c);
1189        }
1190
1191        if (ensure_iov_space(c) != 0)
1192            return -1;
1193
1194        /* If the fragment is too big to fit in the datagram, split it up */
1195        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
1196            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
1197            len -= leftover;
1198        } else {
1199            leftover = 0;
1200        }
1201
1202        m = &c->msglist[c->msgused - 1];
1203        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
1204        m->msg_iov[m->msg_iovlen].iov_len = len;
1205
1206        c->msgbytes += (int)len;
1207        c->iovused++;
1208        m->msg_iovlen++;
1209
1210        buf = ((char *)buf) + len;
1211        len = leftover;
1212    } while (leftover > 0);
1213
1214    return 0;
1215}
1216
1217/**
1218 * get a pointer to the start of the request struct for the current command
1219 */
1220static void* binary_get_request(conn *c) {
1221    char *ret = c->rcurr;
1222    ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
1223            c->binary_header.request.extlen);
1224
1225    cb_assert(ret >= c->rbuf);
1226    return ret;
1227}
1228
1229/**
1230 * get a pointer to the key in this request
1231 */
1232static char* binary_get_key(conn *c) {
1233    return c->rcurr - (c->binary_header.request.keylen);
1234}
1235
1236/**
1237 * Insert a key into a buffer, but replace all non-printable characters
1238 * with a '.'.
1239 *
1240 * @param dest where to store the output
1241 * @param destsz size of destination buffer
1242 * @param prefix string to insert before the data
1243 * @param client the client we are serving
1244 * @param from_client set to true if this data is from the client
1245 * @param key the key to add to the buffer
1246 * @param nkey the number of bytes in the key
1247 * @return number of bytes in dest if success, -1 otherwise
1248 */
1249static ssize_t key_to_printable_buffer(char *dest, size_t destsz,
1250                                       SOCKET client, bool from_client,
1251                                       const char *prefix,
1252                                       const char *key,
1253                                       size_t nkey)
1254{
1255    char *ptr;
1256    ssize_t ii;
1257    ssize_t nw = snprintf(dest, destsz, "%c%d %s ", from_client ? '>' : '<',
1258                          (int)client, prefix);
1259    if (nw == -1) {
1260        return -1;
1261    }
1262
1263    ptr = dest + nw;
1264    destsz -= nw;
1265    if (nkey > destsz) {
1266        nkey = destsz;
1267    }
1268
1269    for (ii = 0; ii < nkey; ++ii, ++key, ++ptr) {
1270        if (isgraph(*key)) {
1271            *ptr = *key;
1272        } else {
1273            *ptr = '.';
1274        }
1275    }
1276
1277    *ptr = '\0';
1278    return (ssize_t)(ptr - dest);
1279}
1280
1281/**
1282 * Convert a byte array to a text string
1283 *
1284 * @param dest where to store the output
1285 * @param destsz size of destination buffer
1286 * @param prefix string to insert before the data
1287 * @param client the client we are serving
1288 * @param from_client set to true if this data is from the client
1289 * @param data the data to add to the buffer
1290 * @param size the number of bytes in data to print
1291 * @return number of bytes in dest if success, -1 otherwise
1292 */
1293static ssize_t bytes_to_output_string(char *dest, size_t destsz,
1294                                      SOCKET client, bool from_client,
1295                                      const char *prefix,
1296                                      const char *data,
1297                                      size_t size)
1298{
1299    ssize_t nw = snprintf(dest, destsz, "%c%d %s", from_client ? '>' : '<',
1300                          (int)client, prefix);
1301    ssize_t offset = nw;
1302    ssize_t ii;
1303
1304    if (nw == -1) {
1305        return -1;
1306    }
1307
1308    for (ii = 0; ii < size; ++ii) {
1309        if (ii % 4 == 0) {
1310            if ((nw = snprintf(dest + offset, destsz - offset, "\n%c%d  ",
1311                               from_client ? '>' : '<', client)) == -1) {
1312                return  -1;
1313            }
1314            offset += nw;
1315        }
1316        if ((nw = snprintf(dest + offset, destsz - offset,
1317                           " 0x%02x", (unsigned char)data[ii])) == -1) {
1318            return -1;
1319        }
1320        offset += nw;
1321    }
1322
1323    if ((nw = snprintf(dest + offset, destsz - offset, "\n")) == -1) {
1324        return -1;
1325    }
1326
1327    return offset + nw;
1328}
1329
1330static int add_bin_header(conn *c,
1331                          uint16_t err,
1332                          uint8_t hdr_len,
1333                          uint16_t key_len,
1334                          uint32_t body_len,
1335                          uint8_t datatype) {
1336    protocol_binary_response_header* header;
1337
1338    cb_assert(c);
1339
1340    c->msgcurr = 0;
1341    c->msgused = 0;
1342    c->iovused = 0;
1343    if (add_msghdr(c) != 0) {
1344        return -1;
1345    }
1346
1347    header = (protocol_binary_response_header *)c->wbuf;
1348
1349    header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1350    header->response.opcode = c->binary_header.request.opcode;
1351    header->response.keylen = (uint16_t)htons(key_len);
1352
1353    header->response.extlen = (uint8_t)hdr_len;
1354    header->response.datatype = datatype;
1355    header->response.status = (uint16_t)htons(err);
1356
1357    header->response.bodylen = htonl(body_len);
1358    header->response.opaque = c->opaque;
1359    header->response.cas = htonll(c->cas);
1360
1361    if (settings.verbose > 1) {
1362        char buffer[1024];
1363        if (bytes_to_output_string(buffer, sizeof(buffer), c->sfd, false,
1364                                   "Writing bin response:",
1365                                   (const char*)header->bytes,
1366                                   sizeof(header->bytes)) != -1) {
1367            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1368                                            "%s", buffer);
1369        }
1370    }
1371
1372    return add_iov(c, c->wbuf, sizeof(header->response));
1373}
1374
1375/**
1376 * Convert an error code generated from the storage engine to the corresponding
1377 * error code used by the protocol layer.
1378 * @param e the error code as used in the engine
1379 * @return the error code as used by the protocol layer
1380 */
1381static protocol_binary_response_status engine_error_2_protocol_error(ENGINE_ERROR_CODE e) {
1382    protocol_binary_response_status ret;
1383
1384    switch (e) {
1385    case ENGINE_SUCCESS:
1386        return PROTOCOL_BINARY_RESPONSE_SUCCESS;
1387    case ENGINE_KEY_ENOENT:
1388        return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1389    case ENGINE_KEY_EEXISTS:
1390        return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1391    case ENGINE_ENOMEM:
1392        return PROTOCOL_BINARY_RESPONSE_ENOMEM;
1393    case ENGINE_TMPFAIL:
1394        return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1395    case ENGINE_NOT_STORED:
1396        return PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1397    case ENGINE_EINVAL:
1398        return PROTOCOL_BINARY_RESPONSE_EINVAL;
1399    case ENGINE_ENOTSUP:
1400        return PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED;
1401    case ENGINE_E2BIG:
1402        return PROTOCOL_BINARY_RESPONSE_E2BIG;
1403    case ENGINE_NOT_MY_VBUCKET:
1404        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1405    case ENGINE_ERANGE:
1406        return PROTOCOL_BINARY_RESPONSE_ERANGE;
1407    case ENGINE_ROLLBACK:
1408        return PROTOCOL_BINARY_RESPONSE_ROLLBACK;
1409    default:
1410        ret = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1411    }
1412
1413    return ret;
1414}
1415
1416static ENGINE_ERROR_CODE get_vb_map_cb(const void *cookie,
1417                                       const void *map,
1418                                       size_t mapsize)
1419{
1420    char *buf;
1421    conn *c = (conn*)cookie;
1422    protocol_binary_response_header header;
1423    size_t needed = mapsize+ sizeof(protocol_binary_response_header);
1424    if (!grow_dynamic_buffer(c, needed)) {
1425        if (settings.verbose > 0) {
1426            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1427                    "<%d ERROR: Failed to allocate memory for response\n",
1428                    c->sfd);
1429        }
1430        return ENGINE_ENOMEM;
1431    }
1432
1433    buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1434    memset(&header, 0, sizeof(header));
1435
1436    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1437    header.response.opcode = c->binary_header.request.opcode;
1438    header.response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET);
1439    header.response.bodylen = htonl((uint32_t)mapsize);
1440    header.response.opaque = c->opaque;
1441
1442    memcpy(buf, header.bytes, sizeof(header.response));
1443    buf += sizeof(header.response);
1444    memcpy(buf, map, mapsize);
1445    c->dynamic_buffer.offset += needed;
1446
1447    return ENGINE_SUCCESS;
1448}
1449
1450static void write_bin_packet(conn *c, protocol_binary_response_status err, int swallow) {
1451    if (err == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
1452        ENGINE_ERROR_CODE ret;
1453        cb_assert(swallow == 0);
1454
1455        ret = settings.engine.v1->get_engine_vb_map(settings.engine.v0, c,
1456                                                    get_vb_map_cb);
1457        if (ret == ENGINE_SUCCESS) {
1458            write_and_free(c, c->dynamic_buffer.buffer,
1459                           c->dynamic_buffer.offset);
1460            c->dynamic_buffer.buffer = NULL;
1461        } else {
1462            conn_set_state(c, conn_closing);
1463        }
1464    } else {
1465        ssize_t len = 0;
1466        const char *errtext = NULL;
1467
1468        if (err != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1469            errtext = memcached_protocol_errcode_2_text(err);
1470            if (errtext != NULL) {
1471                len = (ssize_t)strlen(errtext);
1472            }
1473        }
1474
1475        if (errtext && settings.verbose > 1) {
1476            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1477                                            ">%d Writing an error: %s\n", c->sfd,
1478                                            errtext);
1479        }
1480
1481        add_bin_header(c, err, 0, 0, len, PROTOCOL_BINARY_RAW_BYTES);
1482        if (errtext) {
1483            add_iov(c, errtext, len);
1484        }
1485        conn_set_state(c, conn_mwrite);
1486        if (swallow > 0) {
1487            c->sbytes = swallow;
1488            c->write_and_go = conn_swallow;
1489        } else {
1490            c->write_and_go = conn_new_cmd;
1491        }
1492    }
1493}
1494
1495/* Form and send a response to a command over the binary protocol */
1496static void write_bin_response(conn *c, const void *d, int hlen, int keylen, int dlen) {
1497    if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1498        c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1499        if (add_bin_header(c, 0, hlen, keylen, dlen, PROTOCOL_BINARY_RAW_BYTES) == -1) {
1500            conn_set_state(c, conn_closing);
1501            return;
1502        }
1503        add_iov(c, d, dlen);
1504        conn_set_state(c, conn_mwrite);
1505        c->write_and_go = conn_new_cmd;
1506    } else {
1507        if (c->start != 0) {
1508            collect_timing(c->cmd, gethrtime() - c->start);
1509            c->start = 0;
1510        }
1511        conn_set_state(c, conn_new_cmd);
1512    }
1513}
1514
1515static void complete_update_bin(conn *c) {
1516    protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1517    ENGINE_ERROR_CODE ret;
1518    item *it;
1519    item_info_holder info;
1520
1521    cb_assert(c != NULL);
1522    it = c->item;
1523    memset(&info, 0, sizeof(info));
1524    info.info.nvalue = 1;
1525    if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
1526                                           (void*)&info)) {
1527        settings.engine.v1->release(settings.engine.v0, c, it);
1528        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1529                                        "%d: Failed to get item info",
1530                                        c->sfd);
1531        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1532        return;
1533    }
1534
1535    ret = c->aiostat;
1536    c->aiostat = ENGINE_SUCCESS;
1537    if (ret == ENGINE_SUCCESS) {
1538        if (!c->supports_datatype) {
1539            if (checkUTF8JSON((void*)info.info.value[0].iov_base,
1540                              (int)info.info.value[0].iov_len)) {
1541                info.info.datatype = PROTOCOL_BINARY_DATATYPE_JSON;
1542                if (!settings.engine.v1->set_item_info(settings.engine.v0, c,
1543                                                       it, &info.info)) {
1544                    settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1545                            "%d: Failed to set item info",
1546                            c->sfd);
1547                }
1548            }
1549        }
1550        ret = settings.engine.v1->store(settings.engine.v0, c,
1551                                        it, &c->cas, c->store_op,
1552                                        c->binary_header.request.vbucket);
1553    }
1554
1555#ifdef ENABLE_DTRACE
1556    switch (c->cmd) {
1557    case OPERATION_ADD:
1558        MEMCACHED_COMMAND_ADD(c->sfd, info.info.key, info.info.nkey,
1559                              (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1560        break;
1561    case OPERATION_REPLACE:
1562        MEMCACHED_COMMAND_REPLACE(c->sfd, info.info.key, info.info.nkey,
1563                                  (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1564        break;
1565    case OPERATION_APPEND:
1566        MEMCACHED_COMMAND_APPEND(c->sfd, info.info.key, info.info.nkey,
1567                                 (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1568        break;
1569    case OPERATION_PREPEND:
1570        MEMCACHED_COMMAND_PREPEND(c->sfd, info.info.key, info.info.nkey,
1571                                  (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1572        break;
1573    case OPERATION_SET:
1574        MEMCACHED_COMMAND_SET(c->sfd, info.info.key, info.info.nkey,
1575                              (ret == ENGINE_SUCCESS) ? info.info.nbytes : -1, c->cas);
1576        break;
1577    }
1578#endif
1579
1580    switch (ret) {
1581    case ENGINE_SUCCESS:
1582        /* Stored */
1583        write_bin_response(c, NULL, 0, 0, 0);
1584        break;
1585    case ENGINE_KEY_EEXISTS:
1586        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1587        break;
1588    case ENGINE_KEY_ENOENT:
1589        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1590        break;
1591    case ENGINE_ENOMEM:
1592        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1593        break;
1594    case ENGINE_TMPFAIL:
1595        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1596        break;
1597    case ENGINE_EWOULDBLOCK:
1598        c->ewouldblock = true;
1599        break;
1600    case ENGINE_DISCONNECT:
1601        c->state = conn_closing;
1602        break;
1603    case ENGINE_ENOTSUP:
1604        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1605        break;
1606    case ENGINE_NOT_MY_VBUCKET:
1607        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1608        break;
1609    case ENGINE_E2BIG:
1610        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, 0);
1611        break;
1612    default:
1613        if (c->store_op == OPERATION_ADD) {
1614            eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1615        } else if(c->store_op == OPERATION_REPLACE) {
1616            eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1617        } else {
1618            eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1619        }
1620        write_bin_packet(c, eno, 0);
1621    }
1622
1623    if (c->store_op == OPERATION_CAS) {
1624        switch (ret) {
1625        case ENGINE_SUCCESS:
1626            SLAB_INCR(c, cas_hits, info.info.key, info.info.nkey);
1627            break;
1628        case ENGINE_KEY_EEXISTS:
1629            SLAB_INCR(c, cas_badval, info.info.key, info.info.nkey);
1630            break;
1631        case ENGINE_KEY_ENOENT:
1632            STATS_NOKEY(c, cas_misses);
1633            break;
1634        default:
1635            ;
1636        }
1637    } else {
1638        SLAB_INCR(c, cmd_set, info.info.key, info.info.nkey);
1639    }
1640
1641    if (!c->ewouldblock) {
1642        /* release the c->item reference */
1643        settings.engine.v1->release(settings.engine.v0, c, c->item);
1644        c->item = 0;
1645    }
1646}
1647
1648static void process_bin_get(conn *c) {
1649    item *it;
1650    protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1651    char* key = binary_get_key(c);
1652    size_t nkey = c->binary_header.request.keylen;
1653    uint16_t keylen;
1654    uint32_t bodylen;
1655    item_info_holder info;
1656    int ii;
1657    ENGINE_ERROR_CODE ret;
1658    uint8_t datatype;
1659    bool need_inflate = false;
1660
1661    memset(&info, 0, sizeof(info));
1662    if (settings.verbose > 1) {
1663        char buffer[1024];
1664        if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1665                                    "GET", key, nkey) != -1) {
1666            settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
1667                                            buffer);
1668        }
1669    }
1670
1671    ret = c->aiostat;
1672    c->aiostat = ENGINE_SUCCESS;
1673    if (ret == ENGINE_SUCCESS) {
1674        ret = settings.engine.v1->get(settings.engine.v0, c, &it, key, (int)nkey,
1675                                      c->binary_header.request.vbucket);
1676    }
1677
1678    info.info.nvalue = IOV_MAX;
1679    switch (ret) {
1680    case ENGINE_SUCCESS:
1681        STATS_HIT(c, get, key, nkey);
1682
1683        if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
1684                                               (void*)&info)) {
1685            settings.engine.v1->release(settings.engine.v0, c, it);
1686            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1687                                            "%d: Failed to get item info",
1688                                            c->sfd);
1689            write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1690            break;
1691        }
1692
1693        datatype = info.info.datatype;
1694        if (!c->supports_datatype) {
1695            if ((datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) == PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
1696                need_inflate = true;
1697            } else {
1698                datatype = PROTOCOL_BINARY_RAW_BYTES;
1699            }
1700        }
1701
1702        keylen = 0;
1703        bodylen = sizeof(rsp->message.body) + info.info.nbytes;
1704
1705        if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1706            bodylen += (uint32_t)nkey;
1707            keylen = (uint16_t)nkey;
1708        }
1709
1710        if (need_inflate) {
1711            if (info.info.nvalue != 1) {
1712                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1713            } else if (binary_response_handler(key, keylen,
1714                                               &info.info.flags, 4,
1715                                               info.info.value[0].iov_base,
1716                                               (uint32_t)info.info.value[0].iov_len,
1717                                               datatype,
1718                                               PROTOCOL_BINARY_RESPONSE_SUCCESS,
1719                                               info.info.cas, c)) {
1720                write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
1721                c->dynamic_buffer.buffer = NULL;
1722                settings.engine.v1->release(settings.engine.v0, c, it);
1723            } else {
1724                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1725            }
1726        } else {
1727            if (add_bin_header(c, 0, sizeof(rsp->message.body),
1728                               keylen, bodylen, datatype) == -1) {
1729                conn_set_state(c, conn_closing);
1730                return;
1731            }
1732            rsp->message.header.response.cas = htonll(info.info.cas);
1733
1734            /* add the flags */
1735            rsp->message.body.flags = info.info.flags;
1736            add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1737
1738            if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1739                add_iov(c, info.info.key, nkey);
1740            }
1741
1742            for (ii = 0; ii < info.info.nvalue; ++ii) {
1743                add_iov(c, info.info.value[ii].iov_base,
1744                        info.info.value[ii].iov_len);
1745            }
1746            conn_set_state(c, conn_mwrite);
1747            /* Remember this item so we can garbage collect it later */
1748            c->item = it;
1749        }
1750        break;
1751    case ENGINE_KEY_ENOENT:
1752        STATS_MISS(c, get, key, nkey);
1753
1754        MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1755
1756        if (c->noreply) {
1757            conn_set_state(c, conn_new_cmd);
1758        } else {
1759            if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1760                char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1761                if (add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1762                                   0, (uint16_t)nkey,
1763                                   (uint32_t)nkey, PROTOCOL_BINARY_RAW_BYTES) == -1) {
1764                    conn_set_state(c, conn_closing);
1765                    return;
1766                }
1767                memcpy(ofs, key, nkey);
1768                add_iov(c, ofs, nkey);
1769                conn_set_state(c, conn_mwrite);
1770            } else {
1771                write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1772            }
1773        }
1774        break;
1775    case ENGINE_EWOULDBLOCK:
1776        c->ewouldblock = true;
1777        break;
1778    case ENGINE_DISCONNECT:
1779        c->state = conn_closing;
1780        break;
1781    default:
1782        write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
1783    }
1784
1785    if (settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) {
1786        stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS);
1787    }
1788}
1789
1790static void append_bin_stats(const char *key, const uint16_t klen,
1791                             const char *val, const uint32_t vlen,
1792                             conn *c) {
1793    char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1794    uint32_t bodylen = klen + vlen;
1795    protocol_binary_response_header header;
1796
1797    memset(&header, 0, sizeof(header));
1798    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1799    header.response.opcode = PROTOCOL_BINARY_CMD_STAT;
1800    header.response.keylen = (uint16_t)htons(klen);
1801    header.response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
1802    header.response.bodylen = htonl(bodylen);
1803    header.response.opaque = c->opaque;
1804
1805    memcpy(buf, header.bytes, sizeof(header.response));
1806    buf += sizeof(header.response);
1807
1808    if (klen > 0) {
1809        cb_assert(key != NULL);
1810        memcpy(buf, key, klen);
1811        buf += klen;
1812
1813        if (vlen > 0) {
1814            memcpy(buf, val, vlen);
1815        }
1816    }
1817
1818    c->dynamic_buffer.offset += sizeof(header.response) + bodylen;
1819}
1820
1821static bool grow_dynamic_buffer(conn *c, size_t needed) {
1822    size_t nsize = c->dynamic_buffer.size;
1823    size_t available = nsize - c->dynamic_buffer.offset;
1824    bool rv = true;
1825
1826    /* Special case: No buffer -- need to allocate fresh */
1827    if (c->dynamic_buffer.buffer == NULL) {
1828        nsize = 1024;
1829        available = c->dynamic_buffer.size = c->dynamic_buffer.offset = 0;
1830    }
1831
1832    while (needed > available) {
1833        cb_assert(nsize > 0);
1834        nsize = nsize << 1;
1835        available = nsize - c->dynamic_buffer.offset;
1836    }
1837
1838    if (nsize != c->dynamic_buffer.size) {
1839        char *ptr = realloc(c->dynamic_buffer.buffer, nsize);
1840        if (ptr) {
1841            c->dynamic_buffer.buffer = ptr;
1842            c->dynamic_buffer.size = nsize;
1843        } else {
1844            rv = false;
1845        }
1846    }
1847
1848    return rv;
1849}
1850
1851static void append_stats(const char *key, const uint16_t klen,
1852                         const char *val, const uint32_t vlen,
1853                         const void *cookie)
1854{
1855    size_t needed;
1856    conn *c = (conn*)cookie;
1857    /* value without a key is invalid */
1858    if (klen == 0 && vlen > 0) {
1859        return ;
1860    }
1861
1862    needed = vlen + klen + sizeof(protocol_binary_response_header);
1863    if (!grow_dynamic_buffer(c, needed)) {
1864        return ;
1865    }
1866    append_bin_stats(key, klen, val, vlen, c);
1867    cb_assert(c->dynamic_buffer.offset <= c->dynamic_buffer.size);
1868}
1869
1870static void bin_read_chunk(conn *c,
1871                           enum bin_substates next_substate,
1872                           uint32_t chunk) {
1873    ptrdiff_t offset;
1874    cb_assert(c);
1875    c->substate = next_substate;
1876    c->rlbytes = chunk;
1877
1878    /* Ok... do we have room for everything in our buffer? */
1879    offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1880    if (c->rlbytes > c->rsize - offset) {
1881        size_t nsize = c->rsize;
1882        size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
1883
1884        while (size > nsize) {
1885            nsize *= 2;
1886        }
1887
1888        if (nsize != c->rsize) {
1889            char *newm;
1890            if (settings.verbose > 1) {
1891                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1892                        "%d: Need to grow buffer from %lu to %lu\n",
1893                        c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1894            }
1895            newm = realloc(c->rbuf, nsize);
1896            if (newm == NULL) {
1897                if (settings.verbose) {
1898                    settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1899                            "%d: Failed to grow buffer.. closing connection\n",
1900                            c->sfd);
1901                }
1902                conn_set_state(c, conn_closing);
1903                return;
1904            }
1905
1906            c->rbuf= newm;
1907            /* rcurr should point to the same offset in the packet */
1908            c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1909            c->rsize = (int)nsize;
1910        }
1911        if (c->rbuf != c->rcurr) {
1912            memmove(c->rbuf, c->rcurr, c->rbytes);
1913            c->rcurr = c->rbuf;
1914            if (settings.verbose > 1) {
1915                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1916                                                "%d: Repack input buffer\n",
1917                                                c->sfd);
1918            }
1919        }
1920    }
1921
1922    /* preserve the header in the buffer.. */
1923    c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1924    conn_set_state(c, conn_nread);
1925}
1926
1927static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1928    bin_read_chunk(c, next_substate, c->keylen + extra);
1929}
1930
1931
1932/* Just write an error message and disconnect the client */
1933static void handle_binary_protocol_error(conn *c) {
1934    write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1935    if (settings.verbose) {
1936        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
1937                "%d: Protocol error (opcode %02x), close connection\n",
1938                c->sfd, c->binary_header.request.opcode);
1939    }
1940    c->write_and_go = conn_closing;
1941}
1942
1943static void get_auth_data(const void *cookie, auth_data_t *data) {
1944    conn *c = (conn*)cookie;
1945    if (c->sasl_conn) {
1946        cbsasl_getprop(c->sasl_conn, CBSASL_USERNAME, (void*)&data->username);
1947        cbsasl_getprop(c->sasl_conn, CBSASL_CONFIG, (void*)&data->config);
1948    } else {
1949        data->username = NULL;
1950        data->config = NULL;
1951    }
1952}
1953
1954struct sasl_tmp {
1955    int ksize;
1956    int vsize;
1957    char data[1]; /* data + ksize == value */
1958};
1959
1960static void process_bin_sasl_auth(conn *c) {
1961    int nkey;
1962    int vlen;
1963    char *key;
1964    size_t buffer_size;
1965    struct sasl_tmp *data;
1966
1967    cb_assert(c->binary_header.request.extlen == 0);
1968    nkey = c->binary_header.request.keylen;
1969    vlen = c->binary_header.request.bodylen - nkey;
1970
1971    if (nkey > MAX_SASL_MECH_LEN) {
1972        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
1973        c->write_and_go = conn_swallow;
1974        return;
1975    }
1976
1977    key = binary_get_key(c);
1978    cb_assert(key);
1979
1980    buffer_size = sizeof(struct sasl_tmp) + nkey + vlen + 2;
1981    data = calloc(sizeof(struct sasl_tmp) + buffer_size, 1);
1982    if (!data) {
1983        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1984        c->write_and_go = conn_swallow;
1985        return;
1986    }
1987
1988    data->ksize = nkey;
1989    data->vsize = vlen;
1990    memcpy(data->data, key, nkey);
1991
1992    c->item = data;
1993    c->ritem = data->data + nkey;
1994    c->rlbytes = vlen;
1995    conn_set_state(c, conn_nread);
1996    c->substate = bin_reading_sasl_auth_data;
1997}
1998
1999static void process_bin_complete_sasl_auth(conn *c) {
2000    auth_data_t data;
2001    const char *out = NULL;
2002    unsigned int outlen = 0;
2003    int nkey;
2004    int vlen;
2005    struct sasl_tmp *stmp;
2006    char mech[1024];
2007    const char *challenge;
2008    int result=-1;
2009
2010    cb_assert(c->item);
2011
2012    nkey = c->binary_header.request.keylen;
2013    if (nkey > 1023) {
2014        /* too big.. */
2015        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2016                "%d: sasl error. key: %d > 1023", c->sfd, nkey);
2017        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2018        return;
2019    }
2020    vlen = c->binary_header.request.bodylen - nkey;
2021
2022    stmp = c->item;
2023    memcpy(mech, stmp->data, nkey);
2024    mech[nkey] = 0x00;
2025
2026    if (settings.verbose) {
2027        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2028                "%d: mech: ``%s'' with %d bytes of data\n", c->sfd, mech, vlen);
2029    }
2030
2031    challenge = vlen == 0 ? NULL : (stmp->data + nkey);
2032    switch (c->cmd) {
2033    case PROTOCOL_BINARY_CMD_SASL_AUTH:
2034        result = cbsasl_server_start(&c->sasl_conn, mech,
2035                                     challenge, vlen,
2036                                     (unsigned char **)&out, &outlen);
2037        break;
2038    case PROTOCOL_BINARY_CMD_SASL_STEP:
2039        result = cbsasl_server_step(c->sasl_conn, challenge,
2040                                    vlen, &out, &outlen);
2041        break;
2042    default:
2043        cb_assert(false); /* CMD should be one of the above */
2044        /* This code is pretty much impossible, but makes the compiler
2045           happier */
2046        if (settings.verbose) {
2047            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2048                    "%d: Unhandled command %d with challenge %s\n",
2049                    c->sfd, c->cmd, challenge);
2050        }
2051        break;
2052    }
2053
2054    free(c->item);
2055    c->item = NULL;
2056    c->ritem = NULL;
2057
2058    if (settings.verbose) {
2059        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2060                                        "%d: sasl result code:  %d\n",
2061                                        c->sfd, result);
2062    }
2063
2064    switch(result) {
2065    case SASL_OK:
2066        write_bin_response(c, "Authenticated", 0, 0, (uint32_t)strlen("Authenticated"));
2067        get_auth_data(c, &data);
2068        if (settings.disable_admin) {
2069            /* "everyone is admins" */
2070            cookie_set_admin(c);
2071        } else if (settings.admin != NULL && data.username != NULL) {
2072            if (strcmp(settings.admin, data.username) == 0) {
2073                cookie_set_admin(c);
2074            }
2075        }
2076        perform_callbacks(ON_AUTH, (const void*)&data, c);
2077        STATS_NOKEY(c, auth_cmds);
2078        break;
2079    case SASL_CONTINUE:
2080        if (add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0,
2081                           outlen, PROTOCOL_BINARY_RAW_BYTES) == -1) {
2082            conn_set_state(c, conn_closing);
2083            return;
2084        }
2085        add_iov(c, out, outlen);
2086        conn_set_state(c, conn_mwrite);
2087        c->write_and_go = conn_new_cmd;
2088        break;
2089    case SASL_BADPARAM:
2090        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2091                                        "%d: Bad sasl params: %d\n",
2092                                        c->sfd, result);
2093        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2094        STATS_NOKEY2(c, auth_cmds, auth_errors);
2095        break;
2096    default:
2097        if (result == SASL_NOUSER || result == SASL_PWERR) {
2098            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2099                                            "%d: Invalid username/password combination",
2100                                            c->sfd);
2101        } else {
2102            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2103                                            "%d: Unknown sasl response: %d",
2104                                            c->sfd, result);
2105        }
2106        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2107        STATS_NOKEY2(c, auth_cmds, auth_errors);
2108    }
2109}
2110
2111static bool authenticated(conn *c) {
2112    bool rv = false;
2113
2114    switch (c->cmd) {
2115    case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
2116    case PROTOCOL_BINARY_CMD_SASL_AUTH:       /* FALLTHROUGH */
2117    case PROTOCOL_BINARY_CMD_SASL_STEP:       /* FALLTHROUGH */
2118    case PROTOCOL_BINARY_CMD_VERSION:         /* FALLTHROUGH */
2119    case PROTOCOL_BINARY_CMD_HELLO:
2120        rv = true;
2121        break;
2122    default:
2123        if (c->sasl_conn) {
2124            const void *uname = NULL;
2125            cbsasl_getprop(c->sasl_conn, CBSASL_USERNAME, &uname);
2126            rv = uname != NULL;
2127        }
2128    }
2129
2130    if (settings.verbose > 1) {
2131        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2132                "%d: authenticated() in cmd 0x%02x is %s\n",
2133                c->sfd, c->cmd, rv ? "true" : "false");
2134    }
2135
2136    return rv;
2137}
2138
2139bool binary_response_handler(const void *key, uint16_t keylen,
2140                             const void *ext, uint8_t extlen,
2141                             const void *body, uint32_t bodylen,
2142                             uint8_t datatype, uint16_t status,
2143                             uint64_t cas, const void *cookie)
2144{
2145    protocol_binary_response_header header;
2146    char *buf;
2147    conn *c = (conn*)cookie;
2148    /* Look at append_bin_stats */
2149    size_t needed;
2150    bool need_inflate = false;
2151    size_t inflated_length;
2152
2153    if (!c->supports_datatype) {
2154        if ((datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) == PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
2155            need_inflate = true;
2156        }
2157        /* We may silently drop the knowledge about a JSON item */
2158        datatype = PROTOCOL_BINARY_RAW_BYTES;
2159    }
2160
2161    needed = keylen + extlen + sizeof(protocol_binary_response_header);
2162    if (need_inflate) {
2163        if (snappy_uncompressed_length(body, bodylen,
2164                                       &inflated_length) != SNAPPY_OK) {
2165            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2166                    "<%d ERROR: Failed to determine inflated size",
2167                    c->sfd);
2168            return false;
2169        }
2170        needed += inflated_length;
2171    } else {
2172        needed += bodylen;
2173    }
2174
2175    if (!grow_dynamic_buffer(c, needed)) {
2176        if (settings.verbose > 0) {
2177            settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2178                    "<%d ERROR: Failed to allocate memory for response",
2179                    c->sfd);
2180        }
2181        return false;
2182    }
2183
2184    buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
2185    memset(&header, 0, sizeof(header));
2186    header.response.magic = (uint8_t)PROTOCOL_BINARY_RES;
2187    header.response.opcode = c->binary_header.request.opcode;
2188    header.response.keylen = (uint16_t)htons(keylen);
2189    header.response.extlen = extlen;
2190    header.response.datatype = datatype;
2191    header.response.status = (uint16_t)htons(status);
2192    if (need_inflate) {
2193        header.response.bodylen = htonl((uint32_t)(inflated_length + keylen + extlen));
2194    } else {
2195        header.response.bodylen = htonl(bodylen + keylen + extlen);
2196    }
2197    header.response.opaque = c->opaque;
2198    header.response.cas = htonll(cas);
2199
2200    memcpy(buf, header.bytes, sizeof(header.response));
2201    buf += sizeof(header.response);
2202
2203    if (extlen > 0) {
2204        memcpy(buf, ext, extlen);
2205        buf += extlen;
2206    }
2207
2208    if (keylen > 0) {
2209        cb_assert(key != NULL);
2210        memcpy(buf, key, keylen);
2211        buf += keylen;
2212    }
2213
2214    if (bodylen > 0) {
2215        if (need_inflate) {
2216            if (snappy_uncompress(body, bodylen, buf, &inflated_length) != SNAPPY_OK) {
2217                settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2218                        "<%d ERROR: Failed to inflate item", c->sfd);
2219                return false;
2220            }
2221        } else {
2222            memcpy(buf, body, bodylen);
2223        }
2224    }
2225
2226    c->dynamic_buffer.offset += needed;
2227    return true;
2228}
2229
2230/**
2231 * Tap stats (these are only used by the tap thread, so they don't need
2232 * to be in the threadlocal struct right now...
2233 */
2234struct tap_cmd_stats {
2235    uint64_t connect;
2236    uint64_t mutation;
2237    uint64_t checkpoint_start;
2238    uint64_t checkpoint_end;
2239    uint64_t delete;
2240    uint64_t flush;
2241    uint64_t opaque;
2242    uint64_t vbucket_set;
2243};
2244
2245struct tap_stats {
2246    cb_mutex_t mutex;
2247    struct tap_cmd_stats sent;
2248    struct tap_cmd_stats received;
2249} tap_stats;
2250
2251static void ship_tap_log(conn *c) {
2252    bool more_data = true;
2253    bool send_data = false;
2254    bool disconnect = false;
2255    item *it;
2256    uint32_t bodylen;
2257    int ii = 0;
2258
2259    c->msgcurr = 0;
2260    c->msgused = 0;
2261    c->iovused = 0;
2262    if (add_msghdr(c) != 0) {
2263        if (settings.verbose) {
2264            settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2265                                            "%d: Failed to create output headers. Shutting down tap connection\n", c->sfd);
2266        }
2267        conn_set_state(c, conn_closing);
2268        return ;
2269    }
2270    /* @todo add check for buffer overflow of c->wbuf) */
2271    c->wbytes = 0;
2272    c->wcurr = c->wbuf;
2273    c->icurr = c->ilist;
2274    do {
2275        /* @todo fixme! */
2276        void *engine;
2277        uint16_t nengine;
2278        uint8_t ttl;
2279        uint16_t tap_flags;
2280        uint32_t seqno;
2281        uint16_t vbucket;
2282        tap_event_t event;
2283        bool inflate = false;
2284        size_t inflated_length = 0;
2285
2286        union {
2287            protocol_binary_request_tap_mutation mutation;
2288            protocol_binary_request_tap_delete delete;
2289            protocol_binary_request_tap_flush flush;
2290            protocol_binary_request_tap_opaque opaque;
2291            protocol_binary_request_noop noop;
2292        } msg;
2293        item_info_holder info;
2294        memset(&info, 0, sizeof(info));
2295
2296        if (ii++ == 10) {
2297            break;
2298        }
2299
2300        event = c->tap_iterator(settings.engine.v0, c, &it,
2301                                            &engine, &nengine, &ttl,
2302                                            &tap_flags, &seqno, &vbucket);
2303        memset(&msg, 0, sizeof(msg));
2304        msg.opaque.message.header.request.magic = (uint8_t)PROTOCOL_BINARY_REQ;
2305        msg.opaque.message.header.request.opaque = htonl(seqno);
2306        msg.opaque.message.body.tap.enginespecific_length = htons(nengine);
2307        msg.opaque.message.body.tap.ttl = ttl;
2308        msg.opaque.message.body.tap.flags = htons(tap_flags);
2309        msg.opaque.message.header.request.extlen = 8;
2310        msg.opaque.message.header.request.vbucket = htons(vbucket);
2311        info.info.nvalue = IOV_MAX;
2312
2313        switch (event) {
2314        case TAP_NOOP :
2315            send_data = true;
2316            msg.noop.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
2317            msg.noop.message.header.request.extlen = 0;
2318            msg.noop.message.header.request.bodylen = htonl(0);
2319            memcpy(c->wcurr, msg.noop.bytes, sizeof(msg.noop.bytes));
2320            add_iov(c, c->wcurr, sizeof(msg.noop.bytes));
2321            c->wcurr += sizeof(msg.noop.bytes);
2322            c->wbytes += sizeof(msg.noop.bytes);
2323            break;
2324        case TAP_PAUSE :
2325            more_data = false;
2326            break;
2327        case TAP_CHECKPOINT_START:
2328        case TAP_CHECKPOINT_END:
2329        case TAP_MUTATION:
2330            if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
2331                                                   (void*)&info)) {
2332                settings.engine.v1->release(settings.engine.v0, c, it);
2333                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2334                                                "%d: Failed to get item info\n", c->sfd);
2335                break;
2336            }
2337            send_data = true;
2338            c->ilist[c->ileft++] = it;
2339
2340            if (event == TAP_CHECKPOINT_START) {
2341                msg.mutation.message.header.request.opcode =
2342                    PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START;
2343                cb_mutex_enter(&tap_stats.mutex);
2344                tap_stats.sent.checkpoint_start++;
2345                cb_mutex_exit(&tap_stats.mutex);
2346            } else if (event == TAP_CHECKPOINT_END) {
2347                msg.mutation.message.header.request.opcode =
2348                    PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END;
2349                cb_mutex_enter(&tap_stats.mutex);
2350                tap_stats.sent.checkpoint_end++;
2351                cb_mutex_exit(&tap_stats.mutex);
2352            } else if (event == TAP_MUTATION) {
2353                msg.mutation.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_MUTATION;
2354                cb_mutex_enter(&tap_stats.mutex);
2355                tap_stats.sent.mutation++;
2356                cb_mutex_exit(&tap_stats.mutex);
2357            }
2358
2359            msg.mutation.message.header.request.cas = htonll(info.info.cas);
2360            msg.mutation.message.header.request.keylen = htons(info.info.nkey);
2361            msg.mutation.message.header.request.extlen = 16;
2362            if (c->supports_datatype) {
2363                msg.mutation.message.header.request.datatype = info.info.datatype;
2364            } else {
2365                switch (info.info.datatype) {
2366                case 0:
2367                    break;
2368                case PROTOCOL_BINARY_DATATYPE_JSON:
2369                    break;
2370                case PROTOCOL_BINARY_DATATYPE_COMPRESSED:
2371                case PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON:
2372                    inflate = true;
2373                    break;
2374                default:
2375                    settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2376                                                    "%d: shipping data with"
2377                                                    " an invalid datatype "
2378                                                    "(stripping info)",
2379                                                    c->sfd);
2380                }
2381                msg.mutation.message.header.request.datatype = 0;
2382            }
2383
2384            bodylen = 16 + info.info.nkey + nengine;
2385            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2386                if (inflate) {
2387                    if (snappy_uncompressed_length(info.info.value[0].iov_base,
2388                                                   info.info.nbytes,
2389                                                   &inflated_length) == SNAPPY_OK) {
2390                        bodylen += inflated_length;
2391                    } else {
2392                        settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2393                                                        "<%d ERROR: Failed to determine inflated size. Sending as compressed",
2394                                                        c->sfd);
2395                        inflate = false;
2396                        bodylen += info.info.nbytes;
2397                    }
2398                } else {
2399                    bodylen += info.info.nbytes;
2400                }
2401            }
2402            msg.mutation.message.header.request.bodylen = htonl(bodylen);
2403
2404            if ((tap_flags & TAP_FLAG_NETWORK_BYTE_ORDER) == 0) {
2405                msg.mutation.message.body.item.flags = htonl(info.info.flags);
2406            } else {
2407                msg.mutation.message.body.item.flags = info.info.flags;
2408            }
2409            msg.mutation.message.body.item.expiration = htonl(info.info.exptime);
2410            msg.mutation.message.body.tap.enginespecific_length = htons(nengine);
2411            msg.mutation.message.body.tap.ttl = ttl;
2412            msg.mutation.message.body.tap.flags = htons(tap_flags);
2413            memcpy(c->wcurr, msg.mutation.bytes, sizeof(msg.mutation.bytes));
2414
2415            add_iov(c, c->wcurr, sizeof(msg.mutation.bytes));
2416            c->wcurr += sizeof(msg.mutation.bytes);
2417            c->wbytes += sizeof(msg.mutation.bytes);
2418
2419            if (nengine > 0) {
2420                memcpy(c->wcurr, engine, nengine);
2421                add_iov(c, c->wcurr, nengine);
2422                c->wcurr += nengine;
2423                c->wbytes += nengine;
2424            }
2425
2426            add_iov(c, info.info.key, info.info.nkey);
2427            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2428                if (inflate) {
2429                    void *buf = malloc(inflated_length);
2430                    void *body = info.info.value[0].iov_base;
2431                    size_t bodylen = info.info.value[0].iov_len;
2432                    if (snappy_uncompress(body, bodylen,
2433                                          buf, &inflated_length) == SNAPPY_OK) {
2434                        c->temp_alloc_list[c->temp_alloc_left++] = buf;
2435
2436                        add_iov(c, buf, inflated_length);
2437                    } else {
2438                        free(buf);
2439                        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2440                                                        "%d: FATAL: failed to inflate object. shutitng down connection", c->sfd);
2441                        conn_set_state(c, conn_closing);
2442                        return;
2443                    }
2444                } else {
2445                    int xx;
2446                    for (xx = 0; xx < info.info.nvalue; ++xx) {
2447                        add_iov(c, info.info.value[xx].iov_base,
2448                                info.info.value[xx].iov_len);
2449                    }
2450                }
2451            }
2452
2453            break;
2454        case TAP_DELETION:
2455            /* This is a delete */
2456            if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
2457                                                   (void*)&info)) {
2458                settings.engine.v1->release(settings.engine.v0, c, it);
2459                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2460                                                "%d: Failed to get item info\n", c->sfd);
2461                break;
2462            }
2463            send_data = true;
2464            c->ilist[c->ileft++] = it;
2465            msg.delete.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_DELETE;
2466            msg.delete.message.header.request.cas = htonll(info.info.cas);
2467            msg.delete.message.header.request.keylen = htons(info.info.nkey);
2468
2469            bodylen = 8 + info.info.nkey + nengine;
2470            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2471                bodylen += info.info.nbytes;
2472            }
2473            msg.delete.message.header.request.bodylen = htonl(bodylen);
2474
2475            memcpy(c->wcurr, msg.delete.bytes, sizeof(msg.delete.bytes));
2476            add_iov(c, c->wcurr, sizeof(msg.delete.bytes));
2477            c->wcurr += sizeof(msg.delete.bytes);
2478            c->wbytes += sizeof(msg.delete.bytes);
2479
2480            if (nengine > 0) {
2481                memcpy(c->wcurr, engine, nengine);
2482                add_iov(c, c->wcurr, nengine);
2483                c->wcurr += nengine;
2484                c->wbytes += nengine;
2485            }
2486
2487            add_iov(c, info.info.key, info.info.nkey);
2488            if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2489                int xx;
2490                for (xx = 0; xx < info.info.nvalue; ++xx) {
2491                    add_iov(c, info.info.value[xx].iov_base,
2492                            info.info.value[xx].iov_len);
2493                }
2494            }
2495
2496            cb_mutex_enter(&tap_stats.mutex);
2497            tap_stats.sent.delete++;
2498            cb_mutex_exit(&tap_stats.mutex);
2499            break;
2500
2501        case TAP_DISCONNECT:
2502            disconnect = true;
2503            more_data = false;
2504            break;
2505        case TAP_VBUCKET_SET:
2506        case TAP_FLUSH:
2507        case TAP_OPAQUE:
2508            send_data = true;
2509
2510            if (event == TAP_OPAQUE) {
2511                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_OPAQUE;
2512                cb_mutex_enter(&tap_stats.mutex);
2513                tap_stats.sent.opaque++;
2514                cb_mutex_exit(&tap_stats.mutex);
2515
2516            } else if (event == TAP_FLUSH) {
2517                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_FLUSH;
2518                cb_mutex_enter(&tap_stats.mutex);
2519                tap_stats.sent.flush++;
2520                cb_mutex_exit(&tap_stats.mutex);
2521            } else if (event == TAP_VBUCKET_SET) {
2522                msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET;
2523                msg.flush.message.body.tap.flags = htons(tap_flags);
2524                cb_mutex_enter(&tap_stats.mutex);
2525                tap_stats.sent.vbucket_set++;
2526                cb_mutex_exit(&tap_stats.mutex);
2527            }
2528
2529            msg.flush.message.header.request.bodylen = htonl(8 + nengine);
2530            memcpy(c->wcurr, msg.flush.bytes, sizeof(msg.flush.bytes));
2531            add_iov(c, c->wcurr, sizeof(msg.flush.bytes));
2532            c->wcurr += sizeof(msg.flush.bytes);
2533            c->wbytes += sizeof(msg.flush.bytes);
2534            if (nengine > 0) {
2535                memcpy(c->wcurr, engine, nengine);
2536                add_iov(c, c->wcurr, nengine);
2537                c->wcurr += nengine;
2538                c->wbytes += nengine;
2539            }
2540            break;
2541        default:
2542            abort();
2543        }
2544    } while (more_data);
2545
2546    c->ewouldblock = false;
2547    if (send_data) {
2548        conn_set_state(c, conn_mwrite);
2549        if (disconnect) {
2550            c->write_and_go = conn_closing;
2551        } else {
2552            c->write_and_go = conn_ship_log;
2553        }
2554    } else {
2555        if (disconnect) {
2556            conn_set_state(c, conn_closing);
2557        } else {
2558            /* No more items to ship to the slave at this time.. suspend.. */
2559            if (settings.verbose > 1) {
2560                settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2561                                                "%d: No more items in tap log.. waiting\n",
2562                                                c->sfd);
2563            }
2564            c->ewouldblock = true;
2565        }
2566    }
2567}
2568
2569static ENGINE_ERROR_CODE default_unknown_command(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2570                                                 ENGINE_HANDLE* handle,
2571                                                 const void* cookie,
2572                                                 protocol_binary_request_header *request,
2573                                                 ADD_RESPONSE response)
2574{
2575    const conn *c = (void*)cookie;
2576
2577    if (!c->supports_datatype && request->request.datatype != PROTOCOL_BINARY_RAW_BYTES) {
2578        if (response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
2579                     PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie)) {
2580            return ENGINE_SUCCESS;
2581        } else {
2582            return ENGINE_DISCONNECT;
2583        }
2584    } else {
2585        return settings.engine.v1->unknown_command(handle, cookie,
2586                                                   request, response);
2587    }
2588}
2589
2590struct request_lookup {
2591    EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor;
2592    BINARY_COMMAND_CALLBACK callback;
2593};
2594
2595static struct request_lookup request_handlers[0x100];
2596
2597typedef void (*RESPONSE_HANDLER)(conn*);
2598/**
2599 * A map between the response packets op-code and the function to handle
2600 * the response message.
2601 */
2602static RESPONSE_HANDLER response_handlers[0x100];
2603
2604static void setup_binary_lookup_cmd(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2605                                    uint8_t cmd,
2606                                    BINARY_COMMAND_CALLBACK new_handler) {
2607    request_handlers[cmd].descriptor = descriptor;
2608    request_handlers[cmd].callback = new_handler;
2609}
2610
2611static void process_bin_unknown_packet(conn *c) {
2612    void *packet = c->rcurr - (c->binary_header.request.bodylen +
2613                               sizeof(c->binary_header));
2614    ENGINE_ERROR_CODE ret = c->aiostat;
2615    c->aiostat = ENGINE_SUCCESS;
2616    c->ewouldblock = false;
2617
2618    if (ret == ENGINE_SUCCESS) {
2619        struct request_lookup *rq = request_handlers + c->binary_header.request.opcode;
2620        ret = rq->callback(rq->descriptor, settings.engine.v0, c, packet,
2621                           binary_response_handler);
2622    }
2623
2624    switch (ret) {
2625    case ENGINE_SUCCESS:
2626        if (c->dynamic_buffer.buffer != NULL) {
2627            write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
2628            c->dynamic_buffer.buffer = NULL;
2629        } else {
2630            conn_set_state(c, conn_new_cmd);
2631        }
2632        break;
2633    case ENGINE_EWOULDBLOCK:
2634        c->ewouldblock = true;
2635        break;
2636    case ENGINE_DISCONNECT:
2637        conn_set_state(c, conn_closing);
2638        break;
2639    default:
2640        /* Release the dynamic buffer.. it may be partial.. */
2641        free(c->dynamic_buffer.buffer);
2642        c->dynamic_buffer.buffer = NULL;
2643        write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2644    }
2645}
2646
2647static void cbsasl_refresh_main(void *c)
2648{
2649    int rv = cbsasl_server_refresh();
2650    if (rv == SASL_OK) {
2651        notify_io_complete(c, ENGINE_SUCCESS);
2652    } else {
2653        notify_io_complete(c, ENGINE_EINVAL);
2654    }
2655}
2656
2657static ENGINE_ERROR_CODE refresh_cbsasl(conn *c)
2658{
2659    cb_thread_t tid;
2660    int err;
2661
2662    err = cb_create_thread(&tid, cbsasl_refresh_main, c, 1);
2663    if (err != 0) {
2664        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2665                                        "Failed to create cbsasl db "
2666                                        "update thread: %s",
2667                                        strerror(err));
2668        return ENGINE_DISCONNECT;
2669    }
2670
2671    return ENGINE_EWOULDBLOCK;
2672}
2673
2674#if 0
2675static void ssl_certs_refresh_main(void *c)
2676{
2677    /* Update the internal certificates */
2678
2679    notify_io_complete(c, ENGINE_SUCCESS);
2680}
2681#endif
2682static ENGINE_ERROR_CODE refresh_ssl_certs(conn *c)
2683{
2684#if 0
2685    cb_thread_t tid;
2686    int err;
2687
2688    err = cb_create_thread(&tid, ssl_certs_refresh_main, c, 1);
2689    if (err != 0) {
2690        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2691                                        "Failed to create ssl_certificate "
2692                                        "update thread: %s",
2693                                        strerror(err));
2694        return ENGINE_DISCONNECT;
2695    }
2696
2697    return ENGINE_EWOULDBLOCK;
2698#endif
2699    return ENGINE_SUCCESS;
2700}
2701
2702static void process_bin_tap_connect(conn *c) {
2703    TAP_ITERATOR iterator;
2704    char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2705                                sizeof(c->binary_header)));
2706    protocol_binary_request_tap_connect *req = (void*)packet;
2707    const char *key = packet + sizeof(req->bytes);
2708    const char *data = key + c->binary_header.request.keylen;
2709    uint32_t flags = 0;
2710    size_t ndata = c->binary_header.request.bodylen -
2711        c->binary_header.request.extlen -
2712        c->binary_header.request.keylen;
2713
2714    if (c->binary_header.request.extlen == 4) {
2715        flags = ntohl(req->message.body.flags);
2716
2717        if (flags & TAP_CONNECT_FLAG_BACKFILL) {
2718            /* the userdata has to be at least 8 bytes! */
2719            if (ndata < 8) {
2720                settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2721                                                "%d: ERROR: Invalid tap connect message\n",
2722                                                c->sfd);
2723                conn_set_state(c, conn_closing);
2724                return ;
2725            }
2726        }
2727    } else {
2728        data -= 4;
2729        key -= 4;
2730    }
2731
2732    if (settings.verbose && c->binary_header.request.keylen > 0) {
2733        char buffer[1024];
2734        int len = c->binary_header.request.keylen;
2735        if (len >= sizeof(buffer)) {
2736            len = sizeof(buffer) - 1;
2737        }
2738        memcpy(buffer, key, len);
2739        buffer[len] = '\0';
2740        settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2741                                        "%d: Trying to connect with named tap connection: <%s>\n",
2742                                        c->sfd, buffer);
2743    }
2744
2745    iterator = settings.engine.v1->get_tap_iterator(
2746        settings.engine.v0, c, key, c->binary_header.request.keylen,
2747        flags, data, ndata);
2748
2749    if (iterator == NULL) {
2750        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2751                                        "%d: FATAL: The engine does not support tap\n",
2752                                        c->sfd);
2753        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
2754        c->write_and_go = conn_closing;
2755    } else {
2756        c->max_reqs_per_event = settings.reqs_per_event_high_priority;
2757        c->tap_iterator = iterator;
2758        c->which = EV_WRITE;
2759        conn_set_state(c, conn_ship_log);
2760    }
2761}
2762
2763static void process_bin_tap_packet(tap_event_t event, conn *c) {
2764    char *packet;
2765    protocol_binary_request_tap_no_extras *tap;
2766    uint16_t nengine;
2767    uint16_t tap_flags;
2768    uint32_t seqno;
2769    uint8_t ttl;
2770    char *engine_specific;
2771    char *key;
2772    uint16_t nkey;
2773    char *data;
2774    uint32_t flags;
2775    uint32_t exptime;
2776    uint32_t ndata;
2777    ENGINE_ERROR_CODE ret;
2778
2779    cb_assert(c != NULL);
2780    packet = (c->rcurr - (c->binary_header.request.bodylen +
2781                                sizeof(c->binary_header)));
2782    tap = (void*)packet;
2783    nengine = ntohs(tap->message.body.tap.enginespecific_length);
2784    tap_flags = ntohs(tap->message.body.tap.flags);
2785    seqno = ntohl(tap->message.header.request.opaque);
2786    ttl = tap->message.body.tap.ttl;
2787    engine_specific = packet + sizeof(tap->bytes);
2788    key = engine_specific + nengine;
2789    nkey = c->binary_header.request.keylen;
2790    data = key + nkey;
2791    flags = 0;
2792    exptime = 0;
2793    ndata = c->binary_header.request.bodylen - nengine - nkey - 8;
2794    ret = c->aiostat;
2795
2796    if (ttl == 0) {
2797        ret = ENGINE_EINVAL;
2798    } else {
2799        if (event == TAP_MUTATION || event == TAP_CHECKPOINT_START ||
2800            event == TAP_CHECKPOINT_END) {
2801            protocol_binary_request_tap_mutation *mutation = (void*)tap;
2802
2803            /* engine_specific data in protocol_binary_request_tap_mutation is */
2804            /* at a different offset than protocol_binary_request_tap_no_extras */
2805            engine_specific = packet + sizeof(mutation->bytes);
2806
2807            flags = mutation->message.body.item.flags;
2808            if ((tap_flags & TAP_FLAG_NETWORK_BYTE_ORDER) == 0) {
2809                flags = ntohl(flags);
2810            }
2811
2812            exptime = ntohl(mutation->message.body.item.expiration);
2813            key += 8;
2814            data += 8;
2815            ndata -= 8;
2816        }
2817
2818        if (ret == ENGINE_SUCCESS) {
2819            uint8_t datatype = c->binary_header.request.datatype;
2820            if (event == TAP_MUTATION && !c->supports_datatype) {
2821                if (checkUTF8JSON((void*)data, ndata)) {
2822                    datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2823                }
2824            }
2825
2826            ret = settings.engine.v1->tap_notify(settings.engine.v0, c,
2827                                                 engine_specific, nengine,
2828                                                 ttl - 1, tap_flags,
2829                                                 event, seqno,
2830                                                 key, nkey,
2831                                                 flags, exptime,
2832                                                 ntohll(tap->message.header.request.cas),
2833                                                 datatype,
2834                                                 data, ndata,
2835                                                 c->binary_header.request.vbucket);
2836        }
2837    }
2838
2839    switch (ret) {
2840    case ENGINE_DISCONNECT:
2841        conn_set_state(c, conn_closing);
2842        break;
2843    case ENGINE_EWOULDBLOCK:
2844        c->ewouldblock = true;
2845        break;
2846    default:
2847        if ((tap_flags & TAP_FLAG_ACK) || (ret != ENGINE_SUCCESS)) {
2848            write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2849        } else {
2850            conn_set_state(c, conn_new_cmd);
2851        }
2852    }
2853}
2854
2855static void process_bin_tap_ack(conn *c) {
2856    char *packet;
2857    protocol_binary_response_no_extras *rsp;
2858    uint32_t seqno;
2859    uint16_t status;
2860    char *key;
2861    ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
2862
2863    cb_assert(c != NULL);
2864    packet = (c->rcurr - (c->binary_header.request.bodylen + sizeof(c->binary_header)));
2865    rsp = (void*)packet;
2866    seqno = ntohl(rsp->message.header.response.opaque);
2867    status = ntohs(rsp->message.header.response.status);
2868    key = packet + sizeof(rsp->bytes);
2869
2870    if (settings.engine.v1->tap_notify != NULL) {
2871        ret = settings.engine.v1->tap_notify(settings.engine.v0, c, NULL, 0, 0, status,
2872                                             TAP_ACK, seqno, key,
2873                                             c->binary_header.request.keylen, 0, 0,
2874                                             0, c->binary_header.request.datatype, NULL,
2875                                             0, 0);
2876    }
2877
2878    if (ret == ENGINE_DISCONNECT) {
2879        conn_set_state(c, conn_closing);
2880    } else {
2881        conn_set_state(c, conn_ship_log);
2882    }
2883}
2884
2885/**
2886 * We received a noop response.. just ignore it
2887 */
2888static void process_bin_noop_response(conn *c) {
2889    cb_assert(c != NULL);
2890    conn_set_state(c, conn_new_cmd);
2891}
2892
2893/*******************************************************************************
2894 **                             DCP MESSAGE PRODUCERS                         **
2895 ******************************************************************************/
2896static ENGINE_ERROR_CODE dcp_message_get_failover_log(const void *cookie,
2897                                                      uint32_t opaque,
2898                                                      uint16_t vbucket)
2899{
2900    protocol_binary_request_dcp_get_failover_log packet;
2901    conn *c = (void*)cookie;
2902
2903    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2904        /* We don't have room in the buffer */
2905        return ENGINE_E2BIG;
2906    }
2907
2908    memset(packet.bytes, 0, sizeof(packet.bytes));
2909    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
2910    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_GET_FAILOVER_LOG;
2911    packet.message.header.request.opaque = opaque;
2912    packet.message.header.request.vbucket = htons(vbucket);
2913
2914    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2915    add_iov(c, c->wcurr, sizeof(packet.bytes));
2916    c->wcurr += sizeof(packet.bytes);
2917    c->wbytes += sizeof(packet.bytes);
2918
2919    return ENGINE_SUCCESS;
2920}
2921
2922static ENGINE_ERROR_CODE dcp_message_stream_req(const void *cookie,
2923                                                uint32_t opaque,
2924                                                uint16_t vbucket,
2925                                                uint32_t flags,
2926                                                uint64_t start_seqno,
2927                                                uint64_t end_seqno,
2928                                                uint64_t vbucket_uuid,
2929                                                uint64_t snap_start_seqno,
2930                                                uint64_t snap_end_seqno)
2931{
2932    protocol_binary_request_dcp_stream_req packet;
2933    conn *c = (void*)cookie;
2934
2935    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2936        /* We don't have room in the buffer */
2937        return ENGINE_E2BIG;
2938    }
2939
2940    memset(packet.bytes, 0, sizeof(packet.bytes));
2941    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
2942    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_STREAM_REQ;
2943    packet.message.header.request.extlen = 48;
2944    packet.message.header.request.bodylen = htonl(48);
2945    packet.message.header.request.opaque = opaque;
2946    packet.message.header.request.vbucket = htons(vbucket);
2947
2948    packet.message.body.flags = ntohl(flags);
2949    packet.message.body.start_seqno = ntohll(start_seqno);
2950    packet.message.body.end_seqno = ntohll(end_seqno);
2951    packet.message.body.vbucket_uuid = ntohll(vbucket_uuid);
2952    packet.message.body.snap_start_seqno = ntohll(snap_start_seqno);
2953    packet.message.body.snap_end_seqno = ntohll(snap_end_seqno);
2954
2955    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2956    add_iov(c, c->wcurr, sizeof(packet.bytes));
2957    c->wcurr += sizeof(packet.bytes);
2958    c->wbytes += sizeof(packet.bytes);
2959
2960    return ENGINE_SUCCESS;
2961}
2962
2963static ENGINE_ERROR_CODE dcp_message_add_stream_response(const void *cookie,
2964                                                         uint32_t opaque,
2965                                                         uint32_t dialogopaque,
2966                                                         uint8_t status)
2967{
2968    protocol_binary_response_dcp_add_stream packet;
2969    conn *c = (void*)cookie;
2970
2971    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
2972        /* We don't have room in the buffer */
2973        return ENGINE_E2BIG;
2974    }
2975
2976    memset(packet.bytes, 0, sizeof(packet.bytes));
2977    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
2978    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_ADD_STREAM;
2979    packet.message.header.response.extlen = 4;
2980    packet.message.header.response.status = htons(status);
2981    packet.message.header.response.bodylen = htonl(4);
2982    packet.message.header.response.opaque = opaque;
2983    packet.message.body.opaque = ntohl(dialogopaque);
2984
2985    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
2986    add_iov(c, c->wcurr, sizeof(packet.bytes));
2987    c->wcurr += sizeof(packet.bytes);
2988    c->wbytes += sizeof(packet.bytes);
2989
2990    return ENGINE_SUCCESS;
2991}
2992
2993static ENGINE_ERROR_CODE dcp_message_marker_response(const void *cookie,
2994                                                     uint32_t opaque,
2995                                                     uint8_t status)
2996{
2997    protocol_binary_response_dcp_snapshot_marker packet;
2998    conn *c = (void*)cookie;
2999
3000    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3001        /* We don't have room in the buffer */
3002        return ENGINE_E2BIG;
3003    }
3004
3005    memset(packet.bytes, 0, sizeof(packet.bytes));
3006    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
3007    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
3008    packet.message.header.response.extlen = 0;
3009    packet.message.header.response.status = htons(status);
3010    packet.message.header.response.bodylen = 0;
3011    packet.message.header.response.opaque = opaque;
3012
3013    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3014    add_iov(c, c->wcurr, sizeof(packet.bytes));
3015    c->wcurr += sizeof(packet.bytes);
3016    c->wbytes += sizeof(packet.bytes);
3017
3018    return ENGINE_SUCCESS;
3019}
3020
3021static ENGINE_ERROR_CODE dcp_message_set_vbucket_state_response(const void *cookie,
3022                                                                uint32_t opaque,
3023                                                                uint8_t status)
3024{
3025    protocol_binary_response_dcp_set_vbucket_state packet;
3026    conn *c = (void*)cookie;
3027
3028    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3029        /* We don't have room in the buffer */
3030        return ENGINE_E2BIG;
3031    }
3032
3033    memset(packet.bytes, 0, sizeof(packet.bytes));
3034    packet.message.header.response.magic =  (uint8_t)PROTOCOL_BINARY_RES;
3035    packet.message.header.response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE;
3036    packet.message.header.response.extlen = 0;
3037    packet.message.header.response.status = htons(status);
3038    packet.message.header.response.bodylen = 0;
3039    packet.message.header.response.opaque = opaque;
3040
3041    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3042    add_iov(c, c->wcurr, sizeof(packet.bytes));
3043    c->wcurr += sizeof(packet.bytes);
3044    c->wbytes += sizeof(packet.bytes);
3045
3046    return ENGINE_SUCCESS;
3047}
3048
3049static ENGINE_ERROR_CODE dcp_message_stream_end(const void *cookie,
3050                                                uint32_t opaque,
3051                                                uint16_t vbucket,
3052                                                uint32_t flags)
3053{
3054    protocol_binary_request_dcp_stream_end packet;
3055    conn *c = (void*)cookie;
3056
3057    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3058        /* We don't have room in the buffer */
3059        return ENGINE_E2BIG;
3060    }
3061
3062    memset(packet.bytes, 0, sizeof(packet.bytes));
3063    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3064    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_STREAM_END;
3065    packet.message.header.request.extlen = 4;
3066    packet.message.header.request.bodylen = htonl(4);
3067    packet.message.header.request.opaque = opaque;
3068    packet.message.header.request.vbucket = htons(vbucket);
3069    packet.message.body.flags = ntohl(flags);
3070
3071    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3072    add_iov(c, c->wcurr, sizeof(packet.bytes));
3073    c->wcurr += sizeof(packet.bytes);
3074    c->wbytes += sizeof(packet.bytes);
3075
3076    return ENGINE_SUCCESS;
3077}
3078
3079static ENGINE_ERROR_CODE dcp_message_marker(const void *cookie,
3080                                            uint32_t opaque,
3081                                            uint16_t vbucket,
3082                                            uint64_t start_seqno,
3083                                            uint64_t end_seqno,
3084                                            uint32_t flags)
3085{
3086    protocol_binary_request_dcp_snapshot_marker packet;
3087    conn *c = (void*)cookie;
3088
3089    if (c->wbytes + sizeof(packet.bytes) >= c->wsize) {
3090        /* We don't have room in the buffer */
3091        return ENGINE_E2BIG;
3092    }
3093
3094    memset(packet.bytes, 0, sizeof(packet.bytes));
3095    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3096    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER;
3097    packet.message.header.request.opaque = opaque;
3098    packet.message.header.request.vbucket = htons(vbucket);
3099    packet.message.header.request.extlen = 20;
3100    packet.message.header.request.bodylen = htonl(20);
3101    packet.message.body.start_seqno = htonll(start_seqno);
3102    packet.message.body.end_seqno = htonll(end_seqno);
3103    packet.message.body.flags = htonl(flags);
3104
3105    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3106    add_iov(c, c->wcurr, sizeof(packet.bytes));
3107    c->wcurr += sizeof(packet.bytes);
3108    c->wbytes += sizeof(packet.bytes);
3109
3110    return ENGINE_SUCCESS;
3111}
3112
3113static ENGINE_ERROR_CODE dcp_message_mutation(const void* cookie,
3114                                              uint32_t opaque,
3115                                              item *it,
3116                                              uint16_t vbucket,
3117                                              uint64_t by_seqno,
3118                                              uint64_t rev_seqno,
3119                                              uint32_t lock_time,
3120                                              const void *meta,
3121                                              uint16_t nmeta,
3122                                              uint8_t nru)
3123{
3124    conn *c = (void*)cookie;
3125    item_info_holder info;
3126    protocol_binary_request_dcp_mutation packet;
3127    int xx;
3128
3129    if (c->wbytes + sizeof(packet.bytes) + nmeta >= c->wsize) {
3130        /* We don't have room in the buffer */
3131        return ENGINE_E2BIG;
3132    }
3133
3134    memset(&info, 0, sizeof(info));
3135    info.info.nvalue = IOV_MAX;
3136
3137    if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
3138                                           (void*)&info)) {
3139        settings.engine.v1->release(settings.engine.v0, c, it);
3140        settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
3141                                        "%d: Failed to get item info\n", c->sfd);
3142        return ENGINE_FAILED;
3143    }
3144
3145    memset(packet.bytes, 0, sizeof(packet));
3146    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3147    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_MUTATION;
3148    packet.message.header.request.opaque = opaque;
3149    packet.message.header.request.vbucket = htons(vbucket);
3150    packet.message.header.request.cas = htonll(info.info.cas);
3151    packet.message.header.request.keylen = htons(info.info.nkey);
3152    packet.message.header.request.extlen = 31;
3153    packet.message.header.request.bodylen = ntohl(31 + info.info.nkey + info.info.nbytes + nmeta);
3154    packet.message.header.request.datatype = info.info.datatype;
3155    packet.message.body.by_seqno = htonll(by_seqno);
3156    packet.message.body.rev_seqno = htonll(rev_seqno);
3157    packet.message.body.lock_time = htonl(lock_time);
3158    packet.message.body.flags = info.info.flags;
3159    packet.message.body.expiration = htonl(info.info.exptime);
3160    packet.message.body.nmeta = htons(nmeta);
3161    packet.message.body.nru = nru;
3162
3163    c->ilist[c->ileft++] = it;
3164
3165    memcpy(c->wcurr, packet.bytes, sizeof(packet.bytes));
3166    add_iov(c, c->wcurr, sizeof(packet.bytes));
3167    c->wcurr += sizeof(packet.bytes);
3168    c->wbytes += sizeof(packet.bytes);
3169    add_iov(c, info.info.key, info.info.nkey);
3170    for (xx = 0; xx < info.info.nvalue; ++xx) {
3171        add_iov(c, info.info.value[xx].iov_base, info.info.value[xx].iov_len);
3172    }
3173
3174    memcpy(c->wcurr, meta, nmeta);
3175    add_iov(c, c->wcurr, nmeta);
3176    c->wcurr += nmeta;
3177    c->wbytes += nmeta;
3178
3179    return ENGINE_SUCCESS;
3180}
3181
3182static ENGINE_ERROR_CODE dcp_message_deletion(const void* cookie,
3183                                              uint32_t opaque,
3184                                              const void *key,
3185                                              uint16_t nkey,
3186                                              uint64_t cas,
3187                                              uint16_t vbucket,
3188                                              uint64_t by_seqno,
3189                                              uint64_t rev_seqno,
3190                                              const void *meta,
3191                                              uint16_t nmeta)
3192{
3193    conn *c = (void*)cookie;
3194    protocol_binary_request_dcp_deletion packet;
3195    if (c->wbytes + sizeof(packet.bytes) + nkey + nmeta >= c->wsize) {
3196        return ENGINE_E2BIG;
3197    }
3198
3199    memset(packet.bytes, 0, sizeof(packet));
3200    packet.message.header.request.magic =  (uint8_t)PROTOCOL_BINARY_REQ;
3201    packet.message.header.request.opcode = (uint8_t)PROTOCOL_BINARY_CMD_DCP_DELETION;
3202    packet.message.header.