xref: /5.5.2/moxi/src/memcached.c (revision 78326ab1)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15 */
16#include "memcached.h"
17#include <sys/stat.h>
18#include <signal.h>
19#include <ctype.h>
20#include <stdarg.h>
21
22#include <fcntl.h>
23#include <errno.h>
24#include <stdlib.h>
25#include <stdio.h>
26#include <string.h>
27#include <time.h>
28#include <platform/cbassert.h>
29#include <limits.h>
30#include <stddef.h>
31#include <getopt.h>
32// MB-14649 log() crash on windows on some CPU's
33#include <math.h>
34
35#include "cproxy.h"
36#include "agent.h"
37#include "stdin_check.h"
38#include "log.h"
39
40int IS_UDP(enum network_transport protocol) {
41    return protocol == udp_transport;
42}
43
44#ifdef WIN32
45static int is_blocking(DWORD dw) {
46    return (dw == WSAEWOULDBLOCK);
47}
48static int is_emfile(DWORD dw) {
49    return (dw == WSAEMFILE);
50}
51static int is_closed_conn(DWORD dw) {
52    return (dw == WSAENOTCONN || WSAECONNRESET);
53}
54static int is_addrinuse(DWORD dw) {
55    return (dw == WSAEADDRINUSE);
56}
57#else
58static int is_blocking(int dw) {
59    return (dw == EAGAIN || dw == EWOULDBLOCK);
60}
61
62static int is_emfile(int dw) {
63    return (dw == EMFILE);
64}
65
66static int is_closed_conn(int dw) {
67    return  (dw == ENOTCONN || dw != ECONNRESET);
68}
69
70static int is_addrinuse(int dw) {
71    return (dw == EADDRINUSE);
72}
73#endif
74
75/*
76 * forward declarations
77 */
78static SOCKET new_socket(struct addrinfo *ai);
79
80enum try_read_result {
81    READ_DATA_RECEIVED,
82    READ_NO_DATA_RECEIVED,
83    READ_ERROR,            /** an error occured (on the socket) (or client closed connection) */
84    READ_MEMORY_ERROR      /** failed to allocate more memory */
85};
86
87static enum try_read_result try_read_network(conn *c);
88static enum try_read_result try_read_udp(conn *c);
89
90/* stats */
91static void stats_init(void);
92
93/* defaults */
94static void settings_init(void);
95
96/* event handling, network IO */
97static void event_handler(evutil_socket_t fd, short which, void *arg);
98static void conn_close(conn *c);
99static void conn_init(void);
100static void write_and_free(conn *c, char *buf, size_t bytes);
101
102/* time handling */
103static rel_time_t realtime(const time_t exptime);
104static void set_current_time(void);  /* update the global variable holding
105                              global 32-bit seconds-since-start time
106                              (to avoid 64 bit time_t) */
107
108static void conn_free(conn *c);
109
110/** exported globals **/
111struct stats stats;
112struct settings settings;
113time_t process_started;     /* when the process was started */
114conn *listen_conn = NULL;
115
116/** file scope variables **/
117static struct event_base *main_base;
118
119enum transmit_result {
120    TRANSMIT_COMPLETE,   /** All done writing. */
121    TRANSMIT_INCOMPLETE, /** More data remaining to write. */
122    TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
123    TRANSMIT_HARD_ERROR  /** Can't write (c->state is set to conn_closing) */
124};
125
126static enum transmit_result transmit(conn *c);
127
128conn_funcs conn_funcs_default = {
129    .conn_init                   = NULL,
130    .conn_close                  = NULL,
131    .conn_connect                = NULL,
132    .conn_process_ascii_command  = process_command,
133    .conn_process_binary_command = dispatch_bin_command,
134    .conn_complete_nread_ascii   = complete_nread_ascii,
135    .conn_complete_nread_binary  = complete_nread_binary,
136    .conn_pause                  = NULL,
137    .conn_realtime               = realtime,
138    .conn_binary_command_magic   = PROTOCOL_BINARY_REQ,
139    .conn_state_change           = NULL
140};
141
142#ifdef MAIN_CHECK
143int main_check(int argc, char **argv);
144#endif
145
146/* global logger handle */
147moxi_log *ml;
148
149/*
150 * given time value that's either unix time or delta from current unix time, return
151 * unix time. Use the fact that delta can't exceed one month (and real time value can't
152 * be that low).
153 */
154static rel_time_t realtime(const time_t exptime) {
155    /* no. of seconds in 30 days - largest possible delta exptime */
156
157    if (exptime == 0) return 0; /* 0 means never expire */
158
159    if (exptime > REALTIME_MAXDELTA) {
160        /* if item expiration is at/before the server started, give it an
161           expiration time of 1 second after the server started.
162           (because 0 means don't expire).  without this, we'd
163           underflow and wrap around to some large value way in the
164           future, effectively making items expiring in the past
165           really expiring never */
166        if (exptime <= process_started)
167            return (rel_time_t)1;
168        return (rel_time_t)(exptime - process_started);
169    } else {
170        return (rel_time_t)(exptime + current_time);
171    }
172}
173
174static void stats_init(void) {
175    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
176    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
177    stats.curr_bytes = stats.listen_disabled_num = 0;
178    stats.accepting_conns = true; /* assuming we start in this state. */
179
180    /* make the time we started always be 2 seconds before we really
181       did, so time(0) - time.started is never zero.  if so, things
182       like 'settings.oldest_live' which act as booleans as well as
183       values are now false in boolean context... */
184    process_started = time(0) - 2;
185    stats_prefix_init();
186}
187
188static void stats_reset(void) {
189    STATS_LOCK();
190    stats.total_items = stats.total_conns = 0;
191    stats.evictions = 0;
192    stats.listen_disabled_num = 0;
193    stats_prefix_clear();
194    STATS_UNLOCK();
195    threadlocal_stats_reset();
196    item_stats_reset();
197}
198
199static void settings_init(void) {
200    settings.use_cas = true;
201    settings.access = 0700;
202    settings.port = UNSPECIFIED;
203    settings.udpport = UNSPECIFIED;
204    /* By default this string should be NULL for getaddrinfo() */
205    settings.inter = NULL;
206    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
207    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
208    settings.verbose = 0;
209    settings.oldest_live = 0;
210    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
211    settings.socketpath = NULL;       /* by default, not using a unix socket */
212    settings.factor = 1.25;
213    settings.chunk_size = 48;         /* space for a modest key and value */
214    settings.num_threads = 4 + 1;     /* N workers + 1 dispatcher */
215    settings.prefix_delimiter = ':';
216    settings.detail_enabled = 0;
217    settings.reqs_per_event = 20;
218    settings.backlog = 1024;
219    settings.binding_protocol = negotiating_prot;
220}
221
222/*
223 * Adds a message header to a connection.
224 *
225 * Returns 0 on success, -1 on out-of-memory.
226 */
227int add_msghdr(conn *c) {
228    struct msghdr *msg;
229
230    cb_assert(c != NULL);
231
232    if (c->msgsize == c->msgused) {
233        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
234        if (! msg)
235            return -1;
236        c->msglist = msg;
237        c->msgsize *= 2;
238    }
239
240    msg = c->msglist + c->msgused;
241
242    /* this wipes msg_iovlen, msg_control, msg_controllen, and
243       msg_flags, the last 3 of which aren't defined on solaris: */
244    memset(msg, 0, sizeof(struct msghdr));
245
246    msg->msg_iov = &c->iov[c->iovused];
247
248    if (c->request_addr_size > 0) {
249        msg->msg_name = &c->request_addr;
250        msg->msg_namelen = c->request_addr_size;
251    }
252
253    c->msgbytes = 0;
254    c->msgused++;
255
256    if (IS_UDP(c->transport)) {
257        /* Leave room for the UDP header, which we'll fill in later. */
258        return add_iov(c, NULL, UDP_HEADER_SIZE);
259    }
260
261    return 0;
262}
263
264
265/*
266 * Free list management for connections.
267 */
268
269static conn **freeconns;
270static size_t freetotal;
271static size_t freecurr;
272/* Lock for connection freelist */
273static cb_mutex_t conn_lock;
274
275static void conn_init(void) {
276    freetotal = 200;
277    freecurr = 0;
278    cb_mutex_initialize(&conn_lock);
279    if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
280        moxi_log_write("Failed to allocate connection structures\n");
281    }
282    return;
283}
284
285/*
286 * Returns a connection from the freelist, if any.
287 */
288conn *conn_from_freelist() {
289    conn *c;
290
291    cb_mutex_enter(&conn_lock);
292    if (freecurr > 0) {
293        c = freeconns[--freecurr];
294    } else {
295        c = NULL;
296    }
297    cb_mutex_exit(&conn_lock);
298
299    return c;
300}
301
302/*
303 * Adds a connection to the freelist. 0 = success.
304 */
305bool conn_add_to_freelist(conn *c) {
306    bool ret = true;
307    cb_mutex_enter(&conn_lock);
308    if (freecurr < freetotal) {
309        freeconns[freecurr++] = c;
310        ret = false;
311    } else {
312        /* try to enlarge free connections array */
313        size_t newsize = freetotal * 2;
314        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
315        if (new_freeconns) {
316            freetotal = newsize;
317            freeconns = new_freeconns;
318            freeconns[freecurr++] = c;
319            ret = false;
320        }
321    }
322    cb_mutex_exit(&conn_lock);
323    return ret;
324}
325
326static const char *prot_text(enum protocol prot) {
327    char *rv = "unknown";
328    switch(prot) {
329        case ascii_prot:
330            rv = "ascii";
331            break;
332        case binary_prot:
333            rv = "binary";
334            break;
335        case proxy_upstream_ascii_prot:
336            rv = "proxy-upstream-ascii";
337            break;
338        case proxy_upstream_binary_prot:
339            rv = "proxy-upstream-binary";
340            break;
341        case proxy_downstream_ascii_prot:
342            rv = "proxy-downstream-ascii";
343            break;
344        case proxy_downstream_binary_prot:
345            rv = "proxy-downstream-binary";
346            break;
347        case negotiating_proxy_prot:
348            rv = "auto-negotiate-proxy";
349        case negotiating_prot:
350            rv = "auto-negotiate";
351            break;
352    }
353    return rv;
354}
355
356conn *conn_new(const SOCKET sfd, enum conn_states init_state,
357               const int event_flags,
358               const int read_buffer_size,
359               enum network_transport transport,
360               struct event_base *base,
361               conn_funcs *funcs, void *extra) {
362    conn *c = conn_from_freelist();
363
364    if (NULL == c) {
365        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
366            moxi_log_write("calloc()\n");
367            return NULL;
368        }
369        MEMCACHED_CONN_CREATE(c);
370
371        c->rbuf = c->wbuf = 0;
372        c->ilist = 0;
373        c->suffixlist = 0;
374        c->iov = 0;
375        c->msglist = 0;
376        c->hdrbuf = 0;
377
378        c->rsize = read_buffer_size;
379        c->wsize = DATA_BUFFER_SIZE;
380        c->isize = ITEM_LIST_INITIAL;
381        c->suffixsize = SUFFIX_LIST_INITIAL;
382        c->iovsize = IOV_LIST_INITIAL;
383        c->msgsize = MSG_LIST_INITIAL;
384        c->hdrsize = 0;
385
386        c->rbuf = (char *)malloc((size_t)c->rsize);
387        c->wbuf = (char *)malloc((size_t)c->wsize);
388        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
389        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
390        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
391        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
392
393        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
394                c->msglist == 0 || c->suffixlist == 0) {
395            conn_free(c);
396            moxi_log_write("malloc()\n");
397            return NULL;
398        }
399
400        STATS_LOCK();
401        stats.conn_structs++;
402        STATS_UNLOCK();
403    }
404
405    c->transport = transport;
406    c->protocol = settings.binding_protocol;
407
408    /* unix socket mode doesn't need this, so zeroed out.  but why
409     * is this done for every command?  presumably for UDP
410     * mode.  */
411    if (!settings.socketpath) {
412        c->request_addr_size = sizeof(c->request_addr);
413    } else {
414        c->request_addr_size = 0;
415    }
416
417    if (settings.verbose > 1) {
418        if (init_state == conn_listening) {
419            moxi_log_write("<%d server listening (%s)\n", sfd,
420                prot_text(c->protocol));
421        } else if (IS_UDP(transport)) {
422            moxi_log_write("<%d server listening (udp)\n", sfd);
423        } else if (IS_NEGOTIATING(c->protocol)) {
424            moxi_log_write("<%d new auto-negotiating client connection\n",
425                    sfd);
426        } else if (c->protocol == ascii_prot) {
427            moxi_log_write("<%d new ascii client connection.\n", sfd);
428        } else if (c->protocol == binary_prot) {
429            moxi_log_write("<%d new binary client connection.\n", sfd);
430        } else {
431            moxi_log_write("<%d new %s client connection\n",
432                    sfd, prot_text(c->protocol));
433        }
434    }
435
436    c->sfd = sfd;
437    c->state = init_state;
438    c->rlbytes = 0;
439    c->cmd = -1;
440    c->rbytes = c->wbytes = 0;
441    c->wcurr = c->wbuf;
442    c->rcurr = c->rbuf;
443    c->ritem = 0;
444    c->icurr = c->ilist;
445    c->suffixcurr = c->suffixlist;
446    c->ileft = 0;
447    c->suffixleft = 0;
448    c->iovused = 0;
449    c->msgcurr = 0;
450    c->msgused = 0;
451
452    c->write_and_go = init_state;
453    c->write_and_free = 0;
454    c->item = 0;
455
456    c->noreply = false;
457
458    c->funcs = funcs;
459    if (c->funcs == NULL) {
460        c->funcs = &conn_funcs_default;
461        if (settings.verbose > 1)
462            moxi_log_write( "<%d initialized conn_funcs to default\n", sfd);
463    }
464
465    c->cmd_curr = -1;
466    c->cmd_start = NULL;
467    c->cmd_start_time = 0;
468    c->cmd_retries = 0;
469    c->corked = NULL;
470    c->host_ident = NULL;
471    c->peer_host = NULL;
472    c->peer_protocol = 0;
473    c->peer_port = 0;
474    c->update_diag = NULL;
475
476    c->extra = extra;
477
478    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
479    event_base_set(base, &c->event);
480    c->ev_flags = event_flags;
481
482    if (event_add(&c->event, 0) == -1) {
483        event_del(&c->event);
484        conn_free(c);
485        perror("event_add\n");
486        return NULL;
487    }
488
489    if (c->funcs->conn_init != NULL &&
490        c->funcs->conn_init(c) == false) {
491        event_del(&c->event);
492        conn_free(c);
493        return NULL;
494    }
495
496    STATS_LOCK();
497    stats.curr_conns++;
498    stats.total_conns++;
499    STATS_UNLOCK();
500
501    MEMCACHED_CONN_ALLOCATE(c->sfd);
502
503    return c;
504}
505
506static void conn_cleanup(conn *c) {
507    cb_assert(c != NULL);
508
509    if (c->item) {
510        item_remove(c->item);
511        c->item = 0;
512    }
513
514    if (c->ileft != 0) {
515        for (; c->ileft > 0; c->ileft--, c->icurr++) {
516            item_remove(*(c->icurr));
517        }
518    }
519
520    if (c->suffixleft != 0) {
521        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
522            cache_free(c->thread->suffix_cache, *(c->suffixcurr));
523        }
524    }
525
526    if (c->write_and_free) {
527        free(c->write_and_free);
528        c->write_and_free = 0;
529    }
530}
531
532/*
533 * Frees a connection.
534 */
535void conn_free(conn *c) {
536    if (c) {
537        MEMCACHED_CONN_DESTROY(c);
538        if (c->hdrbuf)
539            free(c->hdrbuf);
540        if (c->msglist)
541            free(c->msglist);
542        if (c->rbuf)
543            free(c->rbuf);
544        if (c->wbuf)
545            free(c->wbuf);
546        if (c->ilist)
547            free(c->ilist);
548        if (c->suffixlist)
549            free(c->suffixlist);
550        if (c->iov)
551            free(c->iov);
552        if (c->host_ident)
553            free(c->host_ident);
554
555        while (c->corked != NULL) {
556            bin_cmd *bc = c->corked;
557            c->corked = c->corked->next;
558            if (bc->request_item != NULL) {
559                item_remove(bc->request_item);
560            }
561            if (bc->response_item != NULL) {
562                item_remove(bc->response_item);
563            }
564            free(bc);
565        }
566
567        free(c);
568    }
569}
570
571static void conn_close(conn *c) {
572    cb_assert(c != NULL);
573
574    /* delete the event, the socket and the conn */
575    event_del(&c->event);
576
577    if (settings.verbose > 1)
578        moxi_log_write("<%d connection closed.\n", c->sfd);
579
580    MEMCACHED_CONN_RELEASE(c->sfd);
581    closesocket(c->sfd);
582    accept_new_conns(true);
583    conn_cleanup(c);
584
585    /* if the connection has big buffers, just free it */
586    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
587        conn_free(c);
588    }
589
590    STATS_LOCK();
591    stats.curr_conns--;
592    STATS_UNLOCK();
593
594    return;
595}
596
597/*
598 * Shrinks a connection's buffers if they're too big.  This prevents
599 * periodic large "get" requests from permanently chewing lots of server
600 * memory.
601 *
602 * This should only be called in between requests since it can wipe output
603 * buffers!
604 */
605static void conn_shrink(conn *c) {
606    cb_assert(c != NULL);
607
608    if (IS_UDP(c->transport))
609        return;
610
611    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
612        char *newbuf;
613
614        if (c->rcurr != c->rbuf)
615            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
616
617        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
618
619        if (newbuf) {
620            c->rbuf = newbuf;
621            c->rsize = DATA_BUFFER_SIZE;
622        }
623        /* TODO check other branch... */
624        c->rcurr = c->rbuf;
625    }
626
627    if (c->isize > ITEM_LIST_HIGHWAT) {
628        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
629        if (newbuf) {
630            c->ilist = newbuf;
631            c->isize = ITEM_LIST_INITIAL;
632        }
633    /* TODO check error condition? */
634    }
635
636    if (c->msgsize > MSG_LIST_HIGHWAT) {
637        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
638        if (newbuf) {
639            c->msglist = newbuf;
640            c->msgsize = MSG_LIST_INITIAL;
641        }
642    /* TODO check error condition? */
643    }
644
645    if (c->iovsize > IOV_LIST_HIGHWAT) {
646        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
647        if (newbuf) {
648            c->iov = newbuf;
649            c->iovsize = IOV_LIST_INITIAL;
650        }
651    /* TODO check return value */
652    }
653}
654
655/**
656 * Convert a state name to a human readable form.
657 */
658const char *state_text(enum conn_states state) {
659    const char* const statenames[] = { "conn_listening",
660                                       "conn_new_cmd",
661                                       "conn_waiting",
662                                       "conn_read",
663                                       "conn_parse_cmd",
664                                       "conn_write",
665                                       "conn_nread",
666                                       "conn_swallow",
667                                       "conn_closing",
668                                       "conn_mwrite",
669                                       "conn_pause",
670                                       "conn_connecting" };
671    return statenames[state];
672}
673
674/*
675 * Sets a connection's current state in the state machine. Any special
676 * processing that needs to happen on certain state transitions can
677 * happen here.
678 */
679void conn_set_state(conn *c, enum conn_states state) {
680    cb_assert(c != NULL);
681    cb_assert(state < conn_max_state);
682
683    if (state != c->state) {
684        if (c->funcs != NULL &&
685            c->funcs->conn_state_change != NULL) {
686            c->funcs->conn_state_change(c, state);
687        }
688
689        if (settings.verbose > 2) {
690            moxi_log_write("%d: going from %s to %s\n",
691                    c->sfd, state_text(c->state),
692                    state_text(state));
693        }
694
695        c->state = state;
696
697        if (state == conn_write || state == conn_mwrite) {
698            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
699        }
700    }
701}
702
703/*
704 * Ensures that there is room for another struct iovec in a connection's
705 * iov list.
706 *
707 * Returns 0 on success, -1 on out-of-memory.
708 */
709int ensure_iov_space(conn *c) {
710    cb_assert(c != NULL);
711
712    if (c->iovused >= c->iovsize) {
713        int i, iovnum;
714        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
715                                (c->iovsize * 2) * sizeof(struct iovec));
716        if (! new_iov)
717            return -1;
718        c->iov = new_iov;
719        c->iovsize *= 2;
720
721        /* Point all the msghdr structures at the new list. */
722        for (i = 0, iovnum = 0; i < c->msgused; i++) {
723            c->msglist[i].msg_iov = &c->iov[iovnum];
724            iovnum += c->msglist[i].msg_iovlen;
725        }
726    }
727
728    return 0;
729}
730
731
732/*
733 * Adds data to the list of pending data that will be written out to a
734 * connection.
735 *
736 * Returns 0 on success, -1 on out-of-memory.
737 */
738int add_iov(conn *c, const void *buf, int len) {
739    struct msghdr *m;
740    int leftover;
741    bool limit_to_mtu;
742
743    cb_assert(c != NULL);
744    cb_assert(c->msgused > 0);
745
746    do {
747        m = &c->msglist[c->msgused - 1];
748
749        /*
750         * Limit UDP packets, and the first payloads of TCP replies, to
751         * UDP_MAX_PAYLOAD_SIZE bytes.
752         */
753        limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
754
755        /* We may need to start a new msghdr if this one is full. */
756        if (m->msg_iovlen == IOV_MAX ||
757            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
758            add_msghdr(c);
759            m = &c->msglist[c->msgused - 1];
760        }
761
762        if (ensure_iov_space(c) != 0)
763            return -1;
764
765        /* If the fragment is too big to fit in the datagram, split it up */
766        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
767            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
768            len -= leftover;
769        } else {
770            leftover = 0;
771        }
772
773        m = &c->msglist[c->msgused - 1];
774        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
775        m->msg_iov[m->msg_iovlen].iov_len = len;
776
777        c->msgbytes += len;
778        c->iovused++;
779        m->msg_iovlen++;
780
781        buf = ((char *)buf) + len;
782        len = leftover;
783    } while (leftover > 0);
784
785    return 0;
786}
787
788
789/*
790 * Constructs a set of UDP headers and attaches them to the outgoing messages.
791 */
792static int build_udp_headers(conn *c) {
793    int i;
794    unsigned char *hdr;
795
796    cb_assert(c != NULL);
797
798    if (c->msgused > c->hdrsize) {
799        void *new_hdrbuf;
800        if (c->hdrbuf)
801            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
802        else
803            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
804        if (! new_hdrbuf)
805            return -1;
806        c->hdrbuf = (unsigned char *)new_hdrbuf;
807        c->hdrsize = c->msgused * 2;
808    }
809
810    hdr = c->hdrbuf;
811    for (i = 0; i < c->msgused; i++) {
812        c->msglist[i].msg_iov[0].iov_base = (void*)hdr;
813        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
814        *hdr++ = c->request_id / 256;
815        *hdr++ = c->request_id % 256;
816        *hdr++ = i / 256;
817        *hdr++ = i % 256;
818        *hdr++ = c->msgused / 256;
819        *hdr++ = c->msgused % 256;
820        *hdr++ = 0;
821        *hdr++ = 0;
822        cb_assert((void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
823    }
824
825    return 0;
826}
827
828
829void out_string(conn *c, const char *str) {
830    int len;
831
832    cb_assert(c != NULL);
833
834    if (c->noreply) {
835        if (settings.verbose > 1)
836            moxi_log_write(">%d NOREPLY %s\n", c->sfd, str);
837        c->noreply = false;
838        conn_set_state(c, conn_new_cmd);
839        return;
840    }
841
842    if (settings.verbose > 1)
843        moxi_log_write(">%d %s\n", c->sfd, str);
844
845    len = (int)strlen(str);
846    if ((len + 2) > c->wsize) {
847        /* ought to be always enough. just fail for simplicity */
848        str = "SERVER_ERROR output line too long";
849        len = (int)strlen(str);
850    }
851
852    memcpy(c->wbuf, str, len);
853    memcpy(c->wbuf + len, "\r\n", 2);
854    c->wbytes = len + 2;
855    c->wcurr = c->wbuf;
856
857    conn_set_state(c, conn_write);
858    c->write_and_go = conn_new_cmd;
859    return;
860}
861
862/*
863 * we get here after reading the value in set/add/replace commands. The command
864 * has been stored in c->cmd, and the item is ready in c->item.
865 */
866void complete_nread_ascii(conn *c) {
867    cb_assert(c != NULL);
868
869    item *it = c->item;
870    int comm = c->cmd;
871    enum store_item_type ret;
872
873    cb_mutex_enter(&c->thread->stats.mutex);
874    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
875    cb_mutex_exit(&c->thread->stats.mutex);
876
877    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
878        out_string(c, "CLIENT_ERROR bad data chunk");
879    } else {
880      ret = store_item(it, comm, c);
881
882#ifdef ENABLE_DTRACE
883      uint64_t cas = ITEM_get_cas(it);
884      switch (c->cmd) {
885      case NREAD_ADD:
886          MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
887                                (ret == 1) ? it->nbytes : -1, cas);
888          break;
889      case NREAD_REPLACE:
890          MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
891                                    (ret == 1) ? it->nbytes : -1, cas);
892          break;
893      case NREAD_APPEND:
894          MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
895                                   (ret == 1) ? it->nbytes : -1, cas);
896          break;
897      case NREAD_PREPEND:
898          MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
899                                    (ret == 1) ? it->nbytes : -1, cas);
900          break;
901      case NREAD_SET:
902          MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
903                                (ret == 1) ? it->nbytes : -1, cas);
904          break;
905      case NREAD_CAS:
906          MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes,
907                                cas);
908          break;
909      }
910#endif
911
912      switch (ret) {
913      case STORED:
914          out_string(c, "STORED");
915          break;
916      case EXISTS:
917          out_string(c, "EXISTS");
918          break;
919      case NOT_FOUND:
920          out_string(c, "NOT_FOUND");
921          break;
922      case NOT_STORED:
923          out_string(c, "NOT_STORED");
924          break;
925      default:
926          out_string(c, "SERVER_ERROR Unhandled storage type.");
927      }
928
929    }
930
931    item_remove(c->item);       /* release the c->item reference */
932    c->item = 0;
933}
934
935/**
936 * get a pointer to the start of the request struct for the current command
937 */
938void* binary_get_request(conn *c) {
939    char *ret = c->rcurr;
940    ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
941            c->binary_header.request.extlen);
942
943    cb_assert(ret >= c->rbuf);
944    return ret;
945}
946
947/**
948 * get a pointer to the key in this request
949 */
950char* binary_get_key(conn *c) {
951    return c->rcurr - (c->binary_header.request.keylen);
952}
953
954static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
955    protocol_binary_response_header* header;
956
957    cb_assert(c);
958
959    c->msgcurr = 0;
960    c->msgused = 0;
961    c->iovused = 0;
962    if (add_msghdr(c) != 0) {
963        /* XXX:  out_string is inappropriate here */
964        out_string(c, "SERVER_ERROR out of memory");
965        return;
966    }
967
968    header = (protocol_binary_response_header *)c->wbuf;
969
970    header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
971    header->response.opcode = c->binary_header.request.opcode;
972    header->response.keylen = (uint16_t)htons(key_len);
973
974    header->response.extlen = (uint8_t)hdr_len;
975    header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
976    header->response.status = (uint16_t)htons(err);
977
978    header->response.bodylen = htonl(body_len);
979    header->response.opaque = c->opaque;
980    header->response.cas = mc_swap64(c->cas);
981
982    if (settings.verbose > 1) {
983        moxi_log_write(">%d Writing bin response:\n", c->sfd);
984        cproxy_dump_header(c->sfd, (char *) header->bytes);
985    }
986
987    add_iov(c, c->wbuf, sizeof(header->response));
988}
989
990void write_bin_error(conn *c, protocol_binary_response_status err, int swallow) {
991    const char *errstr = "Unknown error";
992    size_t len;
993
994    switch (err) {
995    case PROTOCOL_BINARY_RESPONSE_ENOMEM:
996        errstr = "Out of memory";
997        break;
998    case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
999        errstr = "Unknown command";
1000        break;
1001    case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
1002        errstr = "Not found";
1003        break;
1004    case PROTOCOL_BINARY_RESPONSE_EINVAL:
1005        errstr = "Invalid arguments";
1006        break;
1007    case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
1008        errstr = "Data exists for key.";
1009        break;
1010    case PROTOCOL_BINARY_RESPONSE_E2BIG:
1011        errstr = "Too large.";
1012        break;
1013    case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
1014        errstr = "Non-numeric server-side value for incr or decr";
1015        break;
1016    case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
1017        errstr = "Not stored.";
1018        break;
1019    case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
1020        errstr = "Auth failure";
1021        break;
1022    case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
1023        errstr = "Auth continue";
1024        break;
1025    case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
1026        errstr = "Not my vbucket";
1027        break;
1028    case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
1029        errstr = "Not supported";
1030        break;
1031    case PROTOCOL_BINARY_RESPONSE_EINTERNAL:
1032        errstr = "Internal error";
1033        break;
1034    case PROTOCOL_BINARY_RESPONSE_EBUSY:
1035        errstr = "System is busy";
1036        break;
1037    case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
1038        errstr = "Temporary failure";
1039        break;
1040    default:
1041        cb_assert(false);
1042        errstr = "UNHANDLED ERROR";
1043        moxi_log_write(">%d UNHANDLED ERROR: %d\n", c->sfd, err);
1044    }
1045
1046    if (settings.verbose > 1) {
1047        moxi_log_write(">%d Writing an error: %d %s\n", c->sfd, (int) err, errstr);
1048    }
1049
1050    len = strlen(errstr);
1051    add_bin_header(c, err, 0, 0, (uint32_t)len);
1052    if (len > 0) {
1053        add_iov(c, errstr, (uint32_t)len);
1054    }
1055    conn_set_state(c, conn_mwrite);
1056    if(swallow > 0) {
1057        c->sbytes = swallow;
1058        c->write_and_go = conn_swallow;
1059    } else {
1060        c->write_and_go = conn_new_cmd;
1061    }
1062}
1063
1064/* Form and send a response to a command over the binary protocol */
1065void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) {
1066    if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1067        c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1068        c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1069        add_bin_header(c, 0, hlen, keylen, dlen);
1070        if(dlen > 0) {
1071            add_iov(c, d, dlen);
1072        }
1073        conn_set_state(c, conn_mwrite);
1074        c->write_and_go = conn_new_cmd;
1075    } else {
1076        conn_set_state(c, conn_new_cmd);
1077    }
1078}
1079
1080/* Byte swap a 64-bit number */
1081uint64_t mc_swap64(uint64_t in) {
1082#ifdef ENDIAN_LITTLE
1083    /* Little endian, flip the bytes around until someone makes a faster/better
1084    * way to do this. */
1085    int64_t rv = 0;
1086    int i = 0;
1087     for(i = 0; i<8; i++) {
1088        rv = (rv << 8) | (in & 0xff);
1089        in >>= 8;
1090     }
1091    return rv;
1092#else
1093    /* big-endian machines don't need byte swapping */
1094    return in;
1095#endif
1096}
1097
1098static void complete_incr_bin(conn *c) {
1099    item *it;
1100    char *key;
1101    size_t nkey;
1102
1103    protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
1104    protocol_binary_request_incr* req = binary_get_request(c);
1105
1106    cb_assert(c != NULL);
1107    cb_assert(c->wsize >= (int) sizeof(*rsp));
1108
1109    /* fix byteorder in the request */
1110    req->message.body.delta = mc_swap64(req->message.body.delta);
1111    req->message.body.initial = mc_swap64(req->message.body.initial);
1112    req->message.body.expiration = ntohl(req->message.body.expiration);
1113    key = binary_get_key(c);
1114    nkey = c->binary_header.request.keylen;
1115
1116    if (settings.verbose) {
1117        size_t i;
1118        moxi_log_write("incr ");
1119
1120        for (i = 0; i < nkey; i++) {
1121            moxi_log_write("%c", key[i]);
1122        }
1123        moxi_log_write(" %lld, %llu, %d\n",
1124                (long long)req->message.body.delta,
1125                (long long)req->message.body.initial,
1126                req->message.body.expiration);
1127    }
1128
1129    it = item_get(key, nkey);
1130    if (it && (c->binary_header.request.cas == 0 ||
1131               c->binary_header.request.cas == ITEM_get_cas(it))) {
1132        /* Weird magic in add_delta forces me to pad here */
1133        char tmpbuf[INCR_MAX_STORAGE_LEN];
1134        protocol_binary_response_status st = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1135
1136        switch(add_delta(c, it, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
1137                         req->message.body.delta, tmpbuf)) {
1138        case OK:
1139            break;
1140        case NON_NUMERIC:
1141            st = PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL;
1142            break;
1143        case EOM:
1144            st = PROTOCOL_BINARY_RESPONSE_ENOMEM;
1145            break;
1146        }
1147
1148        if (st != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1149            write_bin_error(c, st, 0);
1150        } else {
1151            rsp->message.body.value = mc_swap64(strtoull(tmpbuf, NULL, 10));
1152            c->cas = ITEM_get_cas(it);
1153            write_bin_response(c, &rsp->message.body, 0, 0,
1154                               sizeof(rsp->message.body.value));
1155        }
1156
1157        item_remove(it);         /* release our reference */
1158    } else if (!it && req->message.body.expiration != 0xffffffff) {
1159        /* Save some room for the response */
1160        rsp->message.body.value = mc_swap64(req->message.body.initial);
1161        it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
1162                        INCR_MAX_STORAGE_LEN);
1163
1164        if (it != NULL) {
1165            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
1166                     (unsigned long long)req->message.body.initial);
1167
1168            if (store_item(it, NREAD_SET, c)) {
1169                c->cas = ITEM_get_cas(it);
1170                write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
1171            } else {
1172                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1173            }
1174            item_remove(it);         /* release our reference */
1175        } else {
1176            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1177        }
1178    } else if (it) {
1179        /* incorrect CAS */
1180        item_remove(it);         /* release our reference */
1181        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1182    } else {
1183
1184        cb_mutex_enter(&c->thread->stats.mutex);
1185        if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1186            c->thread->stats.incr_misses++;
1187        } else {
1188            c->thread->stats.decr_misses++;
1189        }
1190        cb_mutex_exit(&c->thread->stats.mutex);
1191
1192        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1193    }
1194}
1195
1196static void complete_update_bin(conn *c) {
1197    protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1198    enum store_item_type ret = NOT_STORED;
1199    cb_assert(c != NULL);
1200
1201    item *it = c->item;
1202
1203    cb_mutex_enter(&c->thread->stats.mutex);
1204    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
1205    cb_mutex_exit(&c->thread->stats.mutex);
1206
1207    /* We don't actually receive the trailing two characters in the bin
1208     * protocol, so we're going to just set them here */
1209    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1210    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1211
1212    ret = store_item(it, c->cmd, c);
1213
1214#ifdef ENABLE_DTRACE
1215    uint64_t cas = ITEM_get_cas(it);
1216    switch (c->cmd) {
1217    case NREAD_ADD:
1218        MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
1219                              (ret == STORED) ? it->nbytes : -1, cas);
1220        break;
1221    case NREAD_REPLACE:
1222        MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
1223                                  (ret == STORED) ? it->nbytes : -1, cas);
1224        break;
1225    case NREAD_APPEND:
1226        MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
1227                                 (ret == STORED) ? it->nbytes : -1, cas);
1228        break;
1229    case NREAD_PREPEND:
1230        MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
1231                                 (ret == STORED) ? it->nbytes : -1, cas);
1232        break;
1233    case NREAD_SET:
1234        MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
1235                              (ret == STORED) ? it->nbytes : -1, cas);
1236        break;
1237    }
1238#endif
1239
1240    switch (ret) {
1241    case STORED:
1242        /* Stored */
1243        write_bin_response(c, NULL, 0, 0, 0);
1244        break;
1245    case EXISTS:
1246        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1247        break;
1248    case NOT_FOUND:
1249        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1250        break;
1251    case NOT_STORED:
1252        if (c->cmd == NREAD_ADD) {
1253            eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1254        } else if(c->cmd == NREAD_REPLACE) {
1255            eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1256        } else {
1257            eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1258        }
1259        write_bin_error(c, eno, 0);
1260    }
1261
1262    item_remove(c->item);       /* release the c->item reference */
1263    c->item = 0;
1264}
1265
1266static void process_bin_get(conn *c) {
1267    item *it;
1268
1269    protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1270    char* key = binary_get_key(c);
1271    size_t nkey = c->binary_header.request.keylen;
1272
1273    if (settings.verbose) {
1274        size_t ii;
1275        moxi_log_write("<%d GET ", c->sfd);
1276        for (ii = 0; ii < nkey; ++ii) {
1277            moxi_log_write("%c", key[ii]);
1278        }
1279        moxi_log_write("\n");
1280    }
1281
1282    it = item_get(key, nkey);
1283    if (it) {
1284        /* the length has two unnecessary bytes ("\r\n") */
1285        uint16_t keylen = 0;
1286        uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1287
1288        cb_mutex_enter(&c->thread->stats.mutex);
1289        c->thread->stats.get_cmds++;
1290        c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
1291        cb_mutex_exit(&c->thread->stats.mutex);
1292
1293        MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
1294                              it->nbytes, ITEM_get_cas(it));
1295
1296        if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1297            c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1298            bodylen += (uint32_t)nkey;
1299            keylen = (uint16_t)nkey;
1300        }
1301        add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1302        rsp->message.header.response.cas = mc_swap64(ITEM_get_cas(it));
1303
1304        /* add the flags */
1305        rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1306        add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1307
1308        if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1309            c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1310            add_iov(c, ITEM_key(it), (uint32_t)nkey);
1311        }
1312
1313        /* Add the data minus the CRLF */
1314        add_iov(c, ITEM_data(it), it->nbytes - 2);
1315        conn_set_state(c, conn_mwrite);
1316        /* Remember this command so we can garbage collect it later */
1317        c->item = it;
1318    } else {
1319        cb_mutex_enter(&c->thread->stats.mutex);
1320        c->thread->stats.get_cmds++;
1321        c->thread->stats.get_misses++;
1322        cb_mutex_exit(&c->thread->stats.mutex);
1323
1324        MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1325
1326        if (c->noreply) {
1327            conn_set_state(c, conn_new_cmd);
1328        } else {
1329            if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1330                c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1331                char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1332                add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1333                        0, (uint16_t)nkey, (uint32_t)nkey);
1334                memcpy(ofs, key, nkey);
1335                add_iov(c, ofs, (int)nkey);
1336                conn_set_state(c, conn_mwrite);
1337            } else {
1338                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1339            }
1340        }
1341    }
1342
1343    if (settings.detail_enabled) {
1344        stats_prefix_record_get(key, nkey, NULL != it);
1345    }
1346}
1347
1348static void append_bin_stats(const char *key, const uint16_t klen,
1349                             const char *val, const uint32_t vlen,
1350                             conn *c) {
1351    char *buf = c->stats.buffer + c->stats.offset;
1352    uint32_t bodylen = klen + vlen;
1353    protocol_binary_response_header header = {
1354        .response = {
1355            .magic = (uint8_t)PROTOCOL_BINARY_RES,
1356            .opcode = PROTOCOL_BINARY_CMD_STAT,
1357            .keylen = (uint16_t)htons(klen),
1358            .datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1359            .bodylen = htonl(bodylen),
1360            .opaque = c->opaque
1361        }
1362    };
1363
1364    memcpy(buf, header.bytes, sizeof(header.response));
1365    buf += sizeof(header.response);
1366
1367    if (klen > 0) {
1368        memcpy(buf, key, klen);
1369        buf += klen;
1370
1371        if (vlen > 0) {
1372            memcpy(buf, val, vlen);
1373        }
1374    }
1375
1376    c->stats.offset += sizeof(header.response) + bodylen;
1377}
1378
1379static void append_ascii_stats(const char *key, const uint16_t klen,
1380                               const char *val, const uint32_t vlen,
1381                               conn *c) {
1382    char *pos = c->stats.buffer + c->stats.offset;
1383    uint32_t nbytes = 0;
1384    size_t remaining = c->stats.size - c->stats.offset;
1385    size_t room = remaining - 1;
1386
1387    if (klen == 0 && vlen == 0) {
1388        nbytes = snprintf(pos, room, "END\r\n");
1389    } else if (vlen == 0) {
1390        nbytes = snprintf(pos, room, "STAT %s\r\n", key);
1391    } else {
1392        nbytes = snprintf(pos, room, "STAT %s %s\r\n", key, val);
1393    }
1394
1395    c->stats.offset += nbytes;
1396}
1397
1398static bool grow_stats_buf(conn *c, size_t needed) {
1399    size_t nsize = c->stats.size;
1400    size_t available = nsize - c->stats.offset;
1401    bool rv = true;
1402
1403    /* Special case: No buffer -- need to allocate fresh */
1404    if (c->stats.buffer == NULL) {
1405        nsize = 1024;
1406        available = c->stats.size = c->stats.offset = 0;
1407    }
1408
1409    while (needed > available) {
1410        cb_assert(nsize > 0);
1411        nsize = nsize << 1;
1412        available = nsize - c->stats.offset;
1413    }
1414
1415    if (nsize != c->stats.size) {
1416        char *ptr = realloc(c->stats.buffer, nsize);
1417        if (ptr) {
1418            c->stats.buffer = ptr;
1419            c->stats.size = nsize;
1420        } else {
1421            rv = false;
1422        }
1423    }
1424
1425    return rv;
1426}
1427
1428static void append_stats(const char *key, const uint16_t klen,
1429                  const char *val, const uint32_t vlen,
1430                  const void *cookie)
1431{
1432    /* value without a key is invalid */
1433    if (klen == 0 && vlen > 0) {
1434        return ;
1435    }
1436
1437    conn *c = (conn*)cookie;
1438
1439    if (IS_BINARY(c->protocol)) {
1440        size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1441        if (!grow_stats_buf(c, needed)) {
1442            return ;
1443        }
1444        append_bin_stats(key, klen, val, vlen, c);
1445    } else {
1446        size_t needed = vlen + klen + 10; /* 10 == "STAT = \r\n" */
1447        if (!grow_stats_buf(c, needed)) {
1448            return ;
1449        }
1450        append_ascii_stats(key, klen, val, vlen, c);
1451    }
1452
1453    cb_assert(c->stats.offset <= c->stats.size);
1454}
1455
1456void process_bin_proxy_stats(conn *c) {
1457    struct proxy_stats_cmd_info psci = {
1458        .do_info = false,
1459        .do_settings = false,
1460        .do_behaviors = false,
1461        .do_frontcache = false,
1462        .do_keystats = false,
1463        .do_stats = true,
1464        .do_zeros = true
1465    };
1466
1467    /* proxy_stats_dump_proxy_main(&append_stats, c, &psci); */
1468    proxy_stats_dump_proxies(&append_stats, c, &psci);
1469
1470    /* Append termination package and start the transfer */
1471    append_stats(NULL, 0, NULL, 0, c);
1472    if (c->stats.buffer == NULL) {
1473        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1474    } else {
1475        write_and_free(c, c->stats.buffer, c->stats.offset);
1476        c->stats.buffer = NULL;
1477    }
1478}
1479
1480static void process_bin_stat(conn *c) {
1481    char *subcommand = binary_get_key(c);
1482    size_t nkey = c->binary_header.request.keylen;
1483
1484    if (settings.verbose) {
1485        size_t ii;
1486        moxi_log_write("<%d STATS ", c->sfd);
1487        for (ii = 0; ii < nkey; ++ii) {
1488            moxi_log_write("%c", subcommand[ii]);
1489        }
1490        moxi_log_write("\n");
1491    }
1492
1493    if (nkey == 0) {
1494        /* request all statistics */
1495        server_stats(&append_stats, c, NULL);
1496        (void)get_stats(NULL, 0, &append_stats, c);
1497    } else if (strncmp(subcommand, "reset", 5) == 0) {
1498        stats_reset();
1499    } else if (strncmp(subcommand, "settings", 8) == 0) {
1500        process_stat_settings(&append_stats, c, NULL);
1501    } else if (strncmp(subcommand, "detail", 6) == 0) {
1502        char *subcmd_pos = subcommand + 6;
1503        if (strncmp(subcmd_pos, " dump", 5) == 0) {
1504            int len;
1505            char *dump_buf = stats_prefix_dump(&len);
1506            if (dump_buf == NULL || len <= 0) {
1507                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1508                return ;
1509            } else {
1510                append_stats("detailed", (uint16_t)strlen("detailed"), dump_buf, len, c);
1511                free(dump_buf);
1512            }
1513        } else if (strncmp(subcmd_pos, " on", 3) == 0) {
1514            settings.detail_enabled = 1;
1515        } else if (strncmp(subcmd_pos, " off", 4) == 0) {
1516            settings.detail_enabled = 0;
1517        } else {
1518            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1519            return;
1520        }
1521    } else {
1522        if (get_stats(subcommand, (int)nkey, &append_stats, c)) {
1523            if (c->stats.buffer == NULL) {
1524                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1525            } else {
1526                write_and_free(c, c->stats.buffer, c->stats.offset);
1527                c->stats.buffer = NULL;
1528            }
1529        } else {
1530            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1531        }
1532
1533        return;
1534    }
1535
1536    /* Append termination package and start the transfer */
1537    append_stats(NULL, 0, NULL, 0, c);
1538    if (c->stats.buffer == NULL) {
1539        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1540    } else {
1541        write_and_free(c, c->stats.buffer, c->stats.offset);
1542        c->stats.buffer = NULL;
1543    }
1544}
1545
1546void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1547    cb_assert(c);
1548    c->substate = next_substate;
1549    c->rlbytes = c->keylen + extra;
1550
1551    /* Ok... do we have room for the extras and the key in the input buffer? */
1552    ptrdiff_t offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1553    if (c->rlbytes > c->rsize - offset) {
1554        int nsize = c->rsize;
1555        int size = c->rlbytes + sizeof(protocol_binary_request_header);
1556
1557        while (size > nsize) {
1558            nsize *= 2;
1559        }
1560
1561        if (nsize != c->rsize) {
1562            if (settings.verbose) {
1563                moxi_log_write("%d: Need to grow buffer from %lu to %lu\n",
1564                        c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1565            }
1566            char *newm = realloc(c->rbuf, nsize);
1567            if (newm == NULL) {
1568                if (settings.verbose) {
1569                    moxi_log_write("%d: Failed to grow buffer.. closing connection\n",
1570                            c->sfd);
1571                }
1572                conn_set_state(c, conn_closing);
1573                return;
1574            }
1575
1576            c->rbuf= newm;
1577            /* rcurr should point to the same offset in the packet */
1578            c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1579            c->rsize = nsize;
1580        }
1581        if (c->rbuf != c->rcurr) {
1582            memmove(c->rbuf, c->rcurr, c->rbytes);
1583            c->rcurr = c->rbuf;
1584            if (settings.verbose) {
1585                moxi_log_write("%d: Repack input buffer\n", c->sfd);
1586            }
1587        }
1588    }
1589
1590    /* preserve the header in the buffer.. */
1591    c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1592    conn_set_state(c, conn_nread);
1593}
1594
1595void process_bin_noreply(conn *c) {
1596    cb_assert(c);
1597    c->noreply = true;
1598    switch (c->binary_header.request.opcode) {
1599    case PROTOCOL_BINARY_CMD_SETQ:
1600        c->cmd = PROTOCOL_BINARY_CMD_SET;
1601        break;
1602    case PROTOCOL_BINARY_CMD_ADDQ:
1603        c->cmd = PROTOCOL_BINARY_CMD_ADD;
1604        break;
1605    case PROTOCOL_BINARY_CMD_REPLACEQ:
1606        c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
1607        break;
1608    case PROTOCOL_BINARY_CMD_DELETEQ:
1609        c->cmd = PROTOCOL_BINARY_CMD_DELETE;
1610        break;
1611    case PROTOCOL_BINARY_CMD_INCREMENTQ:
1612        c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
1613        break;
1614    case PROTOCOL_BINARY_CMD_DECREMENTQ:
1615        c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
1616        break;
1617    case PROTOCOL_BINARY_CMD_QUITQ:
1618        c->cmd = PROTOCOL_BINARY_CMD_QUIT;
1619        break;
1620    case PROTOCOL_BINARY_CMD_FLUSHQ:
1621        c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
1622        break;
1623    case PROTOCOL_BINARY_CMD_APPENDQ:
1624        c->cmd = PROTOCOL_BINARY_CMD_APPEND;
1625        break;
1626    case PROTOCOL_BINARY_CMD_PREPENDQ:
1627        c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
1628        break;
1629    case PROTOCOL_BINARY_CMD_GETQ:
1630        c->cmd = PROTOCOL_BINARY_CMD_GET;
1631        break;
1632    case PROTOCOL_BINARY_CMD_GETKQ:
1633        c->cmd = PROTOCOL_BINARY_CMD_GETK;
1634        break;
1635    default:
1636        c->noreply = false;
1637    }
1638}
1639
1640void dispatch_bin_command(conn *c) {
1641    int protocol_error = 0;
1642
1643    uint32_t extlen = c->binary_header.request.extlen;
1644    uint32_t keylen = c->binary_header.request.keylen;
1645    uint32_t bodylen = c->binary_header.request.bodylen;
1646
1647    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
1648
1649    process_bin_noreply(c);
1650
1651    switch (c->cmd) {
1652        case PROTOCOL_BINARY_CMD_VERSION:
1653            if (extlen == 0 && keylen == 0 && bodylen == 0) {
1654                write_bin_response(c, VERSION, 0, 0, (int)strlen(VERSION));
1655            } else {
1656                protocol_error = 1;
1657            }
1658            break;
1659        case PROTOCOL_BINARY_CMD_FLUSH:
1660            if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
1661                bin_read_key(c, bin_read_flush_exptime, extlen);
1662            } else {
1663                protocol_error = 1;
1664            }
1665            break;
1666        case PROTOCOL_BINARY_CMD_NOOP:
1667            if (extlen == 0 && keylen == 0 && bodylen == 0) {
1668                write_bin_response(c, NULL, 0, 0, 0);
1669            } else {
1670                protocol_error = 1;
1671            }
1672            break;
1673        case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
1674        case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
1675        case PROTOCOL_BINARY_CMD_REPLACE:
1676            if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
1677                bin_read_key(c, bin_reading_set_header, 8);
1678            } else {
1679                protocol_error = 1;
1680            }
1681            break;
1682        case PROTOCOL_BINARY_CMD_GETQ:  /* FALLTHROUGH */
1683        case PROTOCOL_BINARY_CMD_GET:   /* FALLTHROUGH */
1684        case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
1685        case PROTOCOL_BINARY_CMD_GETK:
1686        case PROTOCOL_BINARY_CMD_GETL:
1687            if (extlen == 0 && bodylen == keylen && keylen > 0) {
1688                bin_read_key(c, bin_reading_get_key, 0);
1689            } else {
1690                protocol_error = 1;
1691            }
1692            break;
1693        case PROTOCOL_BINARY_CMD_DELETE:
1694            if (keylen > 0 && extlen == 0 && bodylen == keylen) {
1695                bin_read_key(c, bin_reading_del_header, extlen);
1696            } else {
1697                protocol_error = 1;
1698            }
1699            break;
1700        case PROTOCOL_BINARY_CMD_INCREMENT:
1701        case PROTOCOL_BINARY_CMD_DECREMENT:
1702            if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
1703                bin_read_key(c, bin_reading_incr_header, 20);
1704            } else {
1705                protocol_error = 1;
1706            }
1707            break;
1708        case PROTOCOL_BINARY_CMD_APPEND:
1709        case PROTOCOL_BINARY_CMD_PREPEND:
1710            if (keylen > 0 && extlen == 0) {
1711                bin_read_key(c, bin_reading_set_header, 0);
1712            } else {
1713                protocol_error = 1;
1714            }
1715            break;
1716        case PROTOCOL_BINARY_CMD_STAT:
1717            if (extlen == 0) {
1718                bin_read_key(c, bin_reading_stat, 0);
1719            } else {
1720                protocol_error = 1;
1721            }
1722            break;
1723        case PROTOCOL_BINARY_CMD_QUIT:
1724            if (keylen == 0 && extlen == 0 && bodylen == 0) {
1725                write_bin_response(c, NULL, 0, 0, 0);
1726                c->write_and_go = conn_closing;
1727                if (c->noreply) {
1728                    conn_set_state(c, conn_closing);
1729                }
1730            } else {
1731                protocol_error = 1;
1732            }
1733            break;
1734        default:
1735            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
1736    }
1737
1738    if (protocol_error) {
1739        /* Just write an error message and disconnect the client */
1740        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1741        if (settings.verbose) {
1742            moxi_log_write("Protocol error (opcode %02x), close connection %d\n",
1743                    c->binary_header.request.opcode, c->sfd);
1744        }
1745        c->write_and_go = conn_closing;
1746    }
1747}
1748
1749static void process_bin_update(conn *c) {
1750    char *key;
1751    int nkey;
1752    int vlen;
1753    item *it;
1754    protocol_binary_request_set* req = binary_get_request(c);
1755
1756    cb_assert(c != NULL);
1757
1758    key = binary_get_key(c);
1759    nkey = c->binary_header.request.keylen;
1760
1761    /* fix byteorder in the request */
1762    req->message.body.flags = ntohl(req->message.body.flags);
1763    req->message.body.expiration = ntohl(req->message.body.expiration);
1764
1765    vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
1766
1767    if (settings.verbose) {
1768        int ii;
1769        if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
1770            moxi_log_write("<%d ADD ", c->sfd);
1771        } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
1772            moxi_log_write("<%d SET ", c->sfd);
1773        } else {
1774            moxi_log_write("<%d REPLACE ", c->sfd);
1775        }
1776        for (ii = 0; ii < nkey; ++ii) {
1777            moxi_log_write("%c", key[ii]);
1778        }
1779
1780        if (settings.verbose > 1) {
1781            moxi_log_write(" Value len is %d", vlen);
1782        }
1783        moxi_log_write("\n");
1784    }
1785
1786    if (settings.detail_enabled) {
1787        stats_prefix_record_set(key, nkey);
1788    }
1789
1790    it = item_alloc(key, nkey, req->message.body.flags,
1791            c->funcs->conn_realtime(req->message.body.expiration), vlen+2);
1792
1793    if (it == 0) {
1794        if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) {
1795            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
1796        } else {
1797            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1798        }
1799
1800        /* Avoid stale data persisting in cache because we failed alloc.
1801         * Unacceptable for SET. Anywhere else too? */
1802        if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
1803            it = item_get(key, nkey);
1804            if (it) {
1805                item_unlink(it);
1806                item_remove(it);
1807            }
1808        }
1809
1810        /* swallow the data line */
1811        c->write_and_go = conn_swallow;
1812        return;
1813    }
1814
1815    ITEM_set_cas(it, c->binary_header.request.cas);
1816
1817    switch (c->cmd) {
1818        case PROTOCOL_BINARY_CMD_ADD:
1819            c->cmd = NREAD_ADD;
1820            break;
1821        case PROTOCOL_BINARY_CMD_SET:
1822            c->cmd = NREAD_SET;
1823            break;
1824        case PROTOCOL_BINARY_CMD_REPLACE:
1825            c->cmd = NREAD_REPLACE;
1826            break;
1827        default:
1828            cb_assert(0);
1829    }
1830
1831    if (ITEM_get_cas(it) != 0) {
1832        c->cmd = NREAD_CAS;
1833    }
1834
1835    c->item = it;
1836    c->ritem = ITEM_data(it);
1837    c->rlbytes = vlen;
1838    conn_set_state(c, conn_nread);
1839    c->substate = bin_read_set_value;
1840}
1841
1842static void process_bin_append_prepend(conn *c) {
1843    char *key;
1844    int nkey;
1845    int vlen;
1846    item *it;
1847
1848    cb_assert(c != NULL);
1849
1850    key = binary_get_key(c);
1851    nkey = c->binary_header.request.keylen;
1852    vlen = c->binary_header.request.bodylen - nkey;
1853
1854    if (settings.verbose > 1) {
1855        moxi_log_write("Value len is %d\n", vlen);
1856    }
1857
1858    if (settings.detail_enabled) {
1859        stats_prefix_record_set(key, nkey);
1860    }
1861
1862    it = item_alloc(key, nkey, 0, 0, vlen+2);
1863
1864    if (it == 0) {
1865        if (! item_size_ok(nkey, 0, vlen + 2)) {
1866            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
1867        } else {
1868            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1869        }
1870        /* swallow the data line */
1871        c->write_and_go = conn_swallow;
1872        return;
1873    }
1874
1875    ITEM_set_cas(it, c->binary_header.request.cas);
1876
1877    switch (c->cmd) {
1878        case PROTOCOL_BINARY_CMD_APPEND:
1879            c->cmd = NREAD_APPEND;
1880            break;
1881        case PROTOCOL_BINARY_CMD_PREPEND:
1882            c->cmd = NREAD_PREPEND;
1883            break;
1884        default:
1885            cb_assert(0);
1886    }
1887
1888    c->item = it;
1889    c->ritem = ITEM_data(it);
1890    c->rlbytes = vlen;
1891    conn_set_state(c, conn_nread);
1892    c->substate = bin_read_set_value;
1893}
1894
1895static void process_bin_flush(conn *c) {
1896    time_t exptime = 0;
1897    protocol_binary_request_flush* req = binary_get_request(c);
1898
1899    if (c->binary_header.request.extlen == sizeof(req->message.body)) {
1900        exptime = ntohl(req->message.body.expiration);
1901    }
1902
1903    set_current_time();
1904
1905    if (exptime > 0) {
1906        settings.oldest_live = c->funcs->conn_realtime(exptime) - 1;
1907    } else {
1908        settings.oldest_live = current_time - 1;
1909    }
1910    item_flush_expired();
1911
1912    cb_mutex_enter(&c->thread->stats.mutex);
1913    c->thread->stats.flush_cmds++;
1914    cb_mutex_exit(&c->thread->stats.mutex);
1915
1916    write_bin_response(c, NULL, 0, 0, 0);
1917}
1918
1919static void process_bin_delete(conn *c) {
1920    item *it;
1921
1922    protocol_binary_request_delete* req = binary_get_request(c);
1923
1924    char* key = binary_get_key(c);
1925    size_t nkey = c->binary_header.request.keylen;
1926
1927    cb_assert(c != NULL);
1928
1929    if (settings.verbose) {
1930        moxi_log_write("Deleting %s\n", key);
1931    }
1932
1933    if (settings.detail_enabled) {
1934        stats_prefix_record_delete(key, nkey);
1935    }
1936
1937    it = item_get(key, nkey);
1938    if (it) {
1939        uint64_t cas=mc_swap64(req->message.header.request.cas);
1940        if (cas == 0 || cas == ITEM_get_cas(it)) {
1941            MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
1942            item_unlink(it);
1943            write_bin_response(c, NULL, 0, 0, 0);
1944        } else {
1945            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1946        }
1947        item_remove(it);      /* release our reference */
1948    } else {
1949        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1950    }
1951}
1952
1953void complete_nread_binary(conn *c) {
1954    cb_assert(c != NULL);
1955    cb_assert(c->cmd >= 0);
1956
1957    switch(c->substate) {
1958    case bin_reading_set_header:
1959        if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
1960                c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
1961            process_bin_append_prepend(c);
1962        } else {
1963            process_bin_update(c);
1964        }
1965        break;
1966    case bin_read_set_value:
1967        complete_update_bin(c);
1968        break;
1969    case bin_reading_get_key:
1970        process_bin_get(c);
1971        break;
1972    case bin_reading_stat:
1973        process_bin_stat(c);
1974        break;
1975    case bin_reading_del_header:
1976        process_bin_delete(c);
1977        break;
1978    case bin_reading_incr_header:
1979        complete_incr_bin(c);
1980        break;
1981    case bin_read_flush_exptime:
1982        process_bin_flush(c);
1983        break;
1984    default:
1985        moxi_log_write("Not handling substate %d\n", c->substate);
1986        cb_assert(0);
1987    }
1988}
1989
1990void reset_cmd_handler(conn *c) {
1991    c->cmd = -1;
1992    c->cmd_curr = -1;
1993    c->substate = bin_no_state;
1994    conn_cleanup(c);
1995    conn_shrink(c);
1996    if (c->rbytes > 0) {
1997        conn_set_state(c, conn_parse_cmd);
1998    } else {
1999        conn_set_state(c, conn_waiting);
2000    }
2001}
2002
2003void complete_nread(conn *c) {
2004    cb_assert(c != NULL);
2005    cb_assert(c->funcs != NULL);
2006
2007    if (IS_ASCII(c->protocol)) {
2008        c->funcs->conn_complete_nread_ascii(c);
2009    } else if (IS_BINARY(c->protocol)) {
2010        c->funcs->conn_complete_nread_binary(c);
2011    }
2012}
2013
2014/*
2015 * Stores an item in the cache according to the semantics of one of the set
2016 * commands. In threaded mode, this is protected by the cache lock.
2017 *
2018 * Returns the state of storage.
2019 */
2020enum store_item_type do_store_item(item *it, int comm, conn *c) {
2021    char *key = ITEM_key(it);
2022    item *old_it = do_item_get(key, it->nkey);
2023    enum store_item_type stored = NOT_STORED;
2024
2025    item *new_it = NULL;
2026    int flags;
2027
2028    if (old_it != NULL && comm == NREAD_ADD) {
2029        /* add only adds a nonexistent item, but promote to head of LRU */
2030        do_item_update(old_it);
2031    } else if (!old_it && (comm == NREAD_REPLACE
2032        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
2033    {
2034        /* replace only replaces an existing value; don't store */
2035    } else if (comm == NREAD_CAS) {
2036        /* validate cas operation */
2037        if(old_it == NULL) {
2038            /* LRU expired */
2039            stored = NOT_FOUND;
2040            cb_mutex_enter(&c->thread->stats.mutex);
2041            c->thread->stats.cas_misses++;
2042            cb_mutex_exit(&c->thread->stats.mutex);
2043        }
2044        else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
2045            /* cas validates */
2046            /* it and old_it may belong to different classes. */
2047            /* I'm updating the stats for the one that's getting pushed out */
2048            cb_mutex_enter(&c->thread->stats.mutex);
2049            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
2050            cb_mutex_exit(&c->thread->stats.mutex);
2051
2052            item_replace(old_it, it);
2053            stored = STORED;
2054        } else {
2055            cb_mutex_enter(&c->thread->stats.mutex);
2056            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
2057            cb_mutex_exit(&c->thread->stats.mutex);
2058
2059            if(settings.verbose > 1) {
2060                moxi_log_write("CAS:  failure: expected %llu, got %llu\n",
2061                        (unsigned long long)ITEM_get_cas(old_it),
2062                        (unsigned long long)ITEM_get_cas(it));
2063            }
2064            stored = EXISTS;
2065        }
2066    } else {
2067        /*
2068         * Append - combine new and old record into single one. Here it's
2069         * atomic and thread-safe.
2070         */
2071        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
2072            /*
2073             * Validate CAS
2074             */
2075            if (ITEM_get_cas(it) != 0) {
2076                /* CAS much be equal */
2077                if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
2078                    stored = EXISTS;
2079                }
2080            }
2081
2082            if (stored == NOT_STORED) {
2083                /* we have it and old_it here - alloc memory to hold both */
2084                /* flags was already lost - so recover them from ITEM_suffix(it) */
2085
2086                flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
2087
2088                new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
2089
2090                if (new_it == NULL) {
2091                    /* SERVER_ERROR out of memory */
2092                    if (old_it != NULL)
2093                        do_item_remove(old_it);
2094
2095                    return NOT_STORED;
2096                }
2097
2098                /* copy data from it and old_it to new_it */
2099
2100                if (comm == NREAD_APPEND) {
2101                    memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
2102                    memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
2103                } else {
2104                    /* NREAD_PREPEND */
2105                    memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
2106                    memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
2107                }
2108
2109                it = new_it;
2110            }
2111        }
2112
2113        if (stored == NOT_STORED) {
2114            if (old_it != NULL)
2115                item_replace(old_it, it);
2116            else
2117                do_item_link(it);
2118
2119            c->cas = ITEM_get_cas(it);
2120
2121            stored = STORED;
2122        }
2123    }
2124
2125    if (old_it != NULL)
2126        do_item_remove(old_it);         /* release our reference */
2127    if (new_it != NULL)
2128        do_item_remove(new_it);
2129
2130    if (stored == STORED) {
2131        c->cas = ITEM_get_cas(it);
2132    }
2133
2134    return stored;
2135}
2136
2137#define COMMAND_TOKEN 0
2138#define SUBCOMMAND_TOKEN 1
2139#define KEY_TOKEN 1
2140
2141#define MAX_TOKENS 8
2142
2143/*
2144 * Tokenize the command string by replacing whitespace with '\0' and update
2145 * the token array tokens with pointer to start of each token and length.
2146 * Returns total number of tokens.  The last valid token is the terminal
2147 * token (value points to the first unprocessed character of the string and
2148 * length zero).
2149 *
2150 * Usage example:
2151 *
2152 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2153 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
2154 *          ...
2155 *      }
2156 *      ncommand = tokens[ix].value - command;
2157 *      command  = tokens[ix].value;
2158 *   }
2159 */
2160size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
2161    char *s, *e;
2162    size_t ntokens = 0;
2163
2164    cb_assert(command != NULL && tokens != NULL && max_tokens > 1);
2165
2166    for (s = e = command; ntokens < max_tokens - 1; ++e) {
2167        if (*e == ' ') {
2168            if (s != e) {
2169                tokens[ntokens].value = s;
2170                tokens[ntokens].length = e - s;
2171                ntokens++;
2172                *e = '\0';
2173            }
2174            s = e + 1;
2175        }
2176        else if (*e == '\0') {
2177            if (s != e) {
2178                tokens[ntokens].value = s;
2179                tokens[ntokens].length = e - s;
2180                ntokens++;
2181            }
2182
2183            break; /* string end */
2184        }
2185    }
2186
2187    /*
2188     * If we scanned the whole string, the terminal value pointer is null,
2189     * otherwise it is the first unprocessed character.
2190     */
2191    tokens[ntokens].value =  *e == '\0' ? NULL : e;
2192    tokens[ntokens].length = 0;
2193    ntokens++;
2194
2195    return ntokens;
2196}
2197
2198/* set up a connection to write a buffer then free it, used for stats */
2199static void write_and_free(conn *c, char *buf, size_t bytes) {
2200    if (buf) {
2201        c->write_and_free = buf;
2202        c->wcurr = buf;
2203        c->wbytes = (int)bytes;
2204        conn_set_state(c, conn_write);
2205        c->write_and_go = conn_new_cmd;
2206    } else {
2207        out_string(c, "SERVER_ERROR out of memory writing stats");
2208    }
2209}
2210
2211void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens) {
2212    int noreply_index = (int)ntokens - 2;
2213
2214    cb_assert(noreply_index >= 0);
2215
2216    /*
2217      NOTE: this function is not the first place where we are going to
2218      send the reply.  We could send it instead from process_command()
2219      if the request line has wrong number of tokens.  However parsing
2220      malformed line for "noreply" option is not reliable anyway, so
2221      it can't be helped.
2222    */
2223    if (tokens[noreply_index].value
2224        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
2225        c->noreply = true;
2226    }
2227}
2228
2229void append_stat(const char *name, ADD_STAT add_stats, void *c,
2230                 const char *fmt, ...) {
2231    char val_str[STAT_VAL_LEN];
2232    int vlen;
2233    va_list ap;
2234
2235    cb_assert(name);
2236    cb_assert(add_stats);
2237    cb_assert(c);
2238    cb_assert(fmt);
2239
2240    va_start(ap, fmt);
2241    vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
2242    va_end(ap);
2243
2244    add_stats(name, (uint16_t)strlen(name), val_str, vlen, c);
2245}
2246
2247void append_prefix_stat(const char *prefix, const char *name, ADD_STAT add_stats, void *c,
2248                        const char *fmt, ...) {
2249    char val_str[STAT_VAL_LEN];
2250    int vlen;
2251    char *val_free = 0;
2252    char *val;
2253    va_list ap;
2254
2255    cb_assert(name);
2256    cb_assert(add_stats);
2257    cb_assert(c);
2258    cb_assert(fmt);
2259
2260    va_start(ap, fmt);
2261    vlen = vsnprintf(val_str, sizeof(val_str), fmt, ap);
2262    va_end(ap);
2263
2264    if (vlen > (int)sizeof(val_str)-1) {
2265        val_free = malloc(vlen+1);
2266        if (val_free != 0) {
2267            val = val_free;
2268            va_start(ap, fmt);
2269            vsnprintf(val_free, vlen+1, fmt, ap);
2270            va_end(ap);
2271        } else {
2272            val = val_str;
2273            vlen = sizeof(val_str)-1;
2274        }
2275    } else {
2276        val = val_str;
2277    }
2278
2279    if (prefix == NULL) {
2280        add_stats(name, (uint16_t)strlen(name), val, vlen, c);
2281    } else {
2282        char key_str[STAT_KEY_LEN];
2283        strcpy(key_str, prefix); strcat(key_str, name);
2284        add_stats(key_str, (uint16_t)strlen(key_str), val, vlen, c);
2285    }
2286
2287    free(val_free);
2288}
2289
2290static void process_stats_detail(conn *c, const char *command) {
2291    cb_assert(c != NULL);
2292
2293    if (strcmp(command, "on") == 0) {
2294        settings.detail_enabled = 1;
2295        out_string(c, "OK");
2296    }
2297    else if (strcmp(command, "off") == 0) {
2298        settings.detail_enabled = 0;
2299        out_string(c, "OK");
2300    }
2301    else if (strcmp(command, "dump") == 0) {
2302        int len;
2303        char *stats_dump = stats_prefix_dump(&len);
2304        write_and_free(c, stats_dump, len);
2305    }
2306    else {
2307        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
2308    }
2309}
2310
2311/* return server specific stats only */
2312void server_stats(ADD_STAT add_stats, void *c, const char *prefix) {
2313#ifndef _MSC_VER
2314    pid_t pid = getpid();
2315#endif
2316    rel_time_t now = current_time;
2317
2318    struct thread_stats thread_stats;
2319    threadlocal_stats_aggregate(&thread_stats);
2320    struct slab_stats slab_stats;
2321    slab_stats_aggregate(&thread_stats, &slab_stats);
2322
2323#ifndef WIN32
2324    struct rusage usage;
2325    getrusage(RUSAGE_SELF, &usage);
2326#endif /* !WIN32 */
2327
2328    STATS_LOCK();
2329#ifndef _MSC_VER
2330    APPEND_PREFIX_STAT("pid", "%lu", (long)pid);
2331#endif
2332    APPEND_PREFIX_STAT("uptime", "%u", now);
2333    APPEND_PREFIX_STAT("time", "%ld", now + (long)process_started);
2334    APPEND_PREFIX_STAT("version", "%s", VERSION);
2335    APPEND_PREFIX_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2336
2337#ifndef WIN32
2338    if (1) {
2339        char rusage_buf[128];
2340        snprintf(rusage_buf, sizeof(rusage_buf),  "%ld.%06ld",
2341                 (long)usage.ru_utime.tv_sec, (long)usage.ru_utime.tv_usec);
2342        APPEND_PREFIX_STAT("rusage_user", "%s", rusage_buf);
2343        snprintf(rusage_buf, sizeof(rusage_buf),  "%ld.%06ld",
2344                 (long)usage.ru_stime.tv_sec, (long)usage.ru_stime.tv_usec);
2345        APPEND_PREFIX_STAT("rusage_system", "%s", rusage_buf);
2346    }
2347#endif
2348
2349    APPEND_PREFIX_STAT("curr_connections", "%u", stats.curr_conns - 1);
2350    APPEND_PREFIX_STAT("total_connections", "%u", stats.total_conns);
2351    APPEND_PREFIX_STAT("connection_structures", "%u", stats.conn_structs);
2352    APPEND_PREFIX_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
2353    APPEND_PREFIX_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
2354    APPEND_PREFIX_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
2355    APPEND_PREFIX_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
2356    APPEND_PREFIX_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
2357    APPEND_PREFIX_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
2358    APPEND_PREFIX_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
2359    APPEND_PREFIX_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
2360    APPEND_PREFIX_STAT("incr_hits", "%llu", (unsigned long long)slab_stats.incr_hits);
2361    APPEND_PREFIX_STAT("decr_misses", "%llu", (unsigned long long)thread_stats.decr_misses);
2362    APPEND_PREFIX_STAT("decr_hits", "%llu", (unsigned long long)slab_stats.decr_hits);
2363    APPEND_PREFIX_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
2364    APPEND_PREFIX_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
2365    APPEND_PREFIX_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
2366    APPEND_PREFIX_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
2367    APPEND_PREFIX_STAT("bytes_written", "%llu", (unsigned long long)thread_stats.bytes_written);
2368    APPEND_PREFIX_STAT("limit_maxbytes", "%llu", (unsigned long long)settings.maxbytes);
2369    APPEND_PREFIX_STAT("accepting_conns", "%u", stats.accepting_conns);
2370    APPEND_PREFIX_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
2371    APPEND_PREFIX_STAT("threads", "%d", settings.num_threads);
2372    APPEND_PREFIX_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
2373
2374    STATS_UNLOCK();
2375}
2376
2377void process_stat_settings(ADD_STAT add_stats, void *c, const char *prefix) {
2378    cb_assert(add_stats);
2379    APPEND_PREFIX_STAT("maxbytes", "%u", (unsigned int)settings.maxbytes);
2380    APPEND_PREFIX_STAT("maxconns", "%d", settings.maxconns);
2381    APPEND_PREFIX_STAT("tcpport", "%d", settings.port);
2382    APPEND_PREFIX_STAT("udpport", "%d", settings.udpport);
2383    APPEND_PREFIX_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
2384    APPEND_PREFIX_STAT("verbosity", "%d", settings.verbose);
2385    APPEND_PREFIX_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
2386    APPEND_PREFIX_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
2387    APPEND_PREFIX_STAT("domain_socket", "%s",
2388                settings.socketpath ? settings.socketpath : "NULL");
2389    APPEND_PREFIX_STAT("umask", "%o", settings.access);
2390    APPEND_PREFIX_STAT("growth_factor", "%.2f", settings.factor);
2391    APPEND_PREFIX_STAT("chunk_size", "%d", settings.chunk_size);
2392    APPEND_PREFIX_STAT("num_threads", "%d", settings.num_threads);
2393    APPEND_PREFIX_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
2394    APPEND_PREFIX_STAT("detail_enabled", "%s",
2395                settings.detail_enabled ? "yes" : "no");
2396    APPEND_PREFIX_STAT("reqs_per_event", "%d", settings.reqs_per_event);
2397    APPEND_PREFIX_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
2398    APPEND_PREFIX_STAT("tcp_backlog", "%d", settings.backlog);
2399    APPEND_PREFIX_STAT("binding_protocol", "%s",
2400                prot_text(settings.binding_protocol));
2401}
2402
2403static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
2404    const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
2405    cb_assert(c != NULL);
2406
2407    if (ntokens < 2) {
2408        out_string(c, "CLIENT_ERROR bad command line");
2409        return;
2410    }
2411
2412    if (ntokens == 2) {
2413        server_stats(&append_stats, c, NULL);
2414        (void)get_stats(NULL, 0, &append_stats, c);
2415    } else if (strcmp(subcommand, "reset") == 0) {
2416        stats_reset();
2417        out_string(c, "RESET");
2418        return ;
2419    } else if (strcmp(subcommand, "detail") == 0) {
2420        /* NOTE: how to tackle detail with binary? */
2421        if (ntokens < 4)
2422            process_stats_detail(c, "");  /* outputs the error message */
2423        else
2424            process_stats_detail(c, tokens[2].value);
2425        /* Output already generated */
2426        return ;
2427    } else if (strcmp(subcommand, "settings") == 0) {
2428        process_stat_settings(&append_stats, c, NULL);
2429    } else if (strcmp(subcommand, "cachedump") == 0) {
2430        char *buf;
2431        unsigned int bytes, id, limit = 0;
2432
2433        if (ntokens < 5) {
2434            out_string(c, "CLIENT_ERROR bad command line");
2435            return;
2436        }
2437
2438        if (!safe_strtoul(tokens[2].value, &id) ||
2439            !safe_strtoul(tokens[3].value, &limit)) {
2440            out_string(c, "CLIENT_ERROR bad command line format");
2441            return;
2442        }
2443
2444        buf = item_cachedump(id, limit, &bytes);
2445        write_and_free(c, buf, bytes);
2446        return ;
2447    } else {
2448        /* getting here means that the subcommand is either engine specific or
2449           is invalid. query the engine and see. */
2450        if (get_stats(subcommand, (int)strlen(subcommand), &append_stats, c)) {
2451            if (c->stats.buffer == NULL) {
2452                out_string(c, "SERVER_ERROR out of memory writing stats");
2453            } else {
2454                write_and_free(c, c->stats.buffer, c->stats.offset);
2455                c->stats.buffer = NULL;
2456            }
2457        } else {
2458            out_string(c, "ERROR");
2459        }
2460        return ;
2461    }
2462
2463    /* append terminator and start the transfer */
2464    append_stats(NULL, 0, NULL, 0, c);
2465
2466    if (c->stats.buffer == NULL) {
2467        out_string(c, "SERVER_ERROR out of memory writing stats");
2468    } else {
2469        write_and_free(c, c->stats.buffer, c->stats.offset);
2470        c->stats.buffer = NULL;
2471    }
2472}
2473
2474/* ntokens is overwritten here... shrug.. */
2475static void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
2476    char *key;
2477    size_t nkey;
2478    int i = 0, sid = 0;
2479    item *it;
2480    token_t *key_token = &tokens[KEY_TOKEN];
2481    char *suffix;
2482    int stats_get_cmds   = 0;
2483    int stats_get_misses = 0;
2484    int stats_get_hits[MAX_NUMBER_OF_SLAB_CLASSES];
2485    cb_assert(c != NULL);
2486
2487    memset(&stats_get_hits, 0, sizeof(stats_get_hits));
2488
2489    do {
2490        while(key_token->length != 0) {
2491
2492            key = key_token->value;
2493            nkey = key_token->length;
2494
2495            if(nkey > KEY_MAX_LENGTH) {
2496                cb_mutex_enter(&c->thread->stats.mutex);
2497                c->thread->stats.get_cmds   += stats_get_cmds;
2498                c->thread->stats.get_misses += stats_get_misses;
2499                for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2500                    c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2501                }
2502                cb_mutex_exit(&c->thread->stats.mutex);
2503                out_string(c, "CLIENT_ERROR bad command line format");
2504                return;
2505            }
2506
2507            stats_get_cmds++;
2508            it = item_get(key, nkey);
2509            if (settings.detail_enabled) {
2510                stats_prefix_record_get(key, nkey, NULL != it);
2511            }
2512            if (it) {
2513                if (i >= c->isize) {
2514                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
2515                    if (new_list) {
2516                        c->isize *= 2;
2517                        c->ilist = new_list;
2518                    } else {
2519                        item_remove(it);
2520                        break;
2521                    }
2522                }
2523
2524                /*
2525                 * Construct the response. Each hit adds three elements to the
2526                 * outgoing data list:
2527                 *   "VALUE "
2528                 *   key
2529                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
2530                 */
2531
2532                if (return_cas)
2533                {
2534                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2535                                        it->nbytes, ITEM_get_cas(it));
2536                  /* Goofy mid-flight realloc. */
2537                  if (i >= c->suffixsize) {
2538                    char **new_suffix_list = realloc(c->suffixlist,
2539                                           sizeof(char *) * c->suffixsize * 2);
2540                    if (new_suffix_list) {
2541                        c->suffixsize *= 2;
2542                        c->suffixlist  = new_suffix_list;
2543                    } else {
2544                        item_remove(it);
2545                        break;
2546                    }
2547                  }
2548
2549                  suffix = cache_alloc(c->thread->suffix_cache);
2550                  if (suffix == NULL) {
2551                    cb_mutex_enter(&c->thread->stats.mutex);
2552                    c->thread->stats.get_cmds   += stats_get_cmds;
2553                    c->thread->stats.get_misses += stats_get_misses;
2554                    for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2555                        c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2556                    }
2557                    cb_mutex_exit(&c->thread->stats.mutex);
2558                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
2559                    item_remove(it);
2560                    return;
2561                  }
2562                  *(c->suffixlist + i) = suffix;
2563                  int suffix_len = snprintf(suffix, SUFFIX_SIZE,
2564                                            " %llu\r\n",
2565                                            (unsigned long long)ITEM_get_cas(it));
2566                  if (add_iov(c, "VALUE ", 6) != 0 ||
2567                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2568                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
2569                      add_iov(c, suffix, suffix_len) != 0 ||
2570                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
2571                      {
2572                          item_remove(it);
2573                          break;
2574                      }
2575                }
2576                else
2577                {
2578                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2579                                        it->nbytes, ITEM_get_cas(it));
2580                  if (add_iov(c, "VALUE ", 6) != 0 ||
2581                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2582                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
2583                      {
2584                          item_remove(it);
2585                          break;
2586                      }
2587                }
2588
2589
2590                if (settings.verbose > 1)
2591                    moxi_log_write(">%d sending key %s\n", c->sfd, ITEM_key(it));
2592
2593                /* item_get() has incremented it->refcount for us */
2594                stats_get_hits[it->slabs_clsid]++;
2595                item_update(it);
2596                *(c->ilist + i) = it;
2597                i++;
2598
2599            } else {
2600                stats_get_misses++;
2601                MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
2602            }
2603
2604            key_token++;
2605        }
2606
2607        /*
2608         * If the command string hasn't been fully processed, get the next set
2609         * of tokens.
2610         */
2611        if(key_token->value != NULL) {
2612            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
2613            key_token = tokens;
2614        }
2615
2616    } while(key_token->value != NULL);
2617
2618    c->icurr = c->ilist;
2619    c->ileft = i;
2620    if (return_cas) {
2621        c->suffixcurr = c->suffixlist;
2622        c->suffixleft = i;
2623    }
2624
2625    if (settings.verbose > 1)
2626        moxi_log_write(">%d END\n", c->sfd);
2627
2628    /*
2629        If the loop was terminated because of out-of-memory, it is not
2630        reliable to add END\r\n to the buffer, because it might not end
2631        in \r\n. So we send SERVER_ERROR instead.
2632    */
2633    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
2634        || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
2635        out_string(c, "SERVER_ERROR out of memory writing get response");
2636    }
2637    else {
2638        conn_set_state(c, conn_mwrite);
2639        c->write_and_go = conn_new_cmd;
2640        c->msgcurr = 0;
2641    }
2642
2643    cb_mutex_enter(&c->thread->stats.mutex);
2644    c->thread->stats.get_cmds   += stats_get_cmds;
2645    c->thread->stats.get_misses += stats_get_misses;
2646    for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2647        c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2648    }
2649    cb_mutex_exit(&c->thread->stats.mutex);
2650
2651    return;
2652}
2653
2654void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
2655    char *key;
2656    size_t nkey;
2657    unsigned int flags;
2658    int32_t exptime_int = 0;
2659    time_t exptime;
2660    int vlen;
2661    uint64_t req_cas_id=0;
2662    item *it;
2663
2664    cb_assert(c != NULL);
2665
2666    set_noreply_maybe(c, tokens, ntokens);
2667
2668    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2669        out_string(c, "CLIENT_ERROR bad command line format");
2670        return;
2671    }
2672
2673    key = tokens[KEY_TOKEN].value;
2674    nkey = tokens[KEY_TOKEN].length;
2675
2676    if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
2677           && safe_strtol(tokens[3].value, &exptime_int)
2678           && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
2679        out_string(c, "CLIENT_ERROR bad command line format");
2680        return;
2681    }
2682
2683    /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2684    exptime = exptime_int;
2685
2686    /* does cas value exist? */
2687    if (handle_cas) {
2688        if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
2689            out_string(c, "CLIENT_ERROR bad command line format");
2690            return;
2691        }
2692    }
2693
2694    vlen += 2;
2695    if (vlen < 0 || vlen - 2 < 0) {
2696        out_string(c, "CLIENT_ERROR bad command line format");
2697        return;
2698    }
2699
2700    if (settings.detail_enabled) {
2701        stats_prefix_record_set(key, nkey);
2702    }
2703
2704    it = item_alloc(key, nkey, flags, c->funcs->conn_realtime(exptime), vlen);
2705
2706    if (it == 0) {
2707        if (! item_size_ok(nkey, flags, vlen))
2708            out_string(c, "SERVER_ERROR object too large for cache");
2709        else
2710            out_string(c, "SERVER_ERROR out of memory storing object");
2711        /* swallow the data line */
2712        c->write_and_go = conn_swallow;
2713        c->sbytes = vlen;
2714
2715        /* Avoid stale data persisting in cache because we failed alloc.
2716         * Unacceptable for SET. Anywhere else too? */
2717        if (comm == NREAD_SET) {
2718            it = item_get(key, nkey);
2719            if (it) {
2720                item_unlink(it);
2721                item_remove(it);
2722            }
2723        }
2724
2725        return;
2726    }
2727    ITEM_set_cas(it, req_cas_id);
2728
2729    c->item = it;
2730    c->ritem = ITEM_data(it);
2731    c->rlbytes = it->nbytes;
2732    c->cmd = comm;
2733    conn_set_state(c, conn_nread);
2734}
2735
2736static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2737    char temp[INCR_MAX_STORAGE_LEN];
2738    item *it;
2739    uint64_t delta;
2740    char *key;
2741    size_t nkey;
2742
2743    cb_assert(c != NULL);
2744
2745    set_noreply_maybe(c, tokens, ntokens);
2746
2747    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2748        out_string(c, "CLIENT_ERROR bad command line format");
2749        return;
2750    }
2751
2752    key = tokens[KEY_TOKEN].value;
2753    nkey = tokens[KEY_TOKEN].length;
2754
2755    if (!safe_strtoull(tokens[2].value, &delta)) {
2756        out_string(c, "CLIENT_ERROR invalid numeric delta argument");
2757        return;
2758    }
2759
2760    it = item_get(key, nkey);
2761    if (!it) {
2762        cb_mutex_enter(&c->thread->stats.mutex);
2763        if (incr) {
2764            c->thread->stats.incr_misses++;
2765        } else {
2766            c->thread->stats.decr_misses++;
2767        }
2768        cb_mutex_exit(&c->thread->stats.mutex);
2769
2770        out_string(c, "NOT_FOUND");
2771        return;
2772    }
2773
2774    switch(add_delta(c, it, incr, delta, temp)) {
2775    case OK:
2776        out_string(c, temp);
2777        break;
2778    case NON_NUMERIC:
2779        out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
2780        break;
2781    case EOM:
2782        out_string(c, "SERVER_ERROR out of memory");
2783        break;
2784    }
2785    item_remove(it);         /* release our reference */
2786}
2787
2788/*
2789 * adds a delta value to a numeric item.
2790 *
2791 * c     connection requesting the operation
2792 * it    item to adjust
2793 * incr  true to increment value, false to decrement
2794 * delta amount to adjust value by
2795 * buf   buffer for response string
2796 *
2797 * returns a response string to send back to the client.
2798 */
2799enum delta_result_type do_add_delta(conn *c, item *it, const bool incr,
2800                                    const int64_t delta, char *buf) {
2801    char *ptr;
2802    int64_t value;
2803    int res;
2804
2805    ptr = ITEM_data(it);
2806
2807    if (!safe_strtoull(ptr, (uint64_t *)&value)) {
2808        return NON_NUMERIC;
2809    }
2810
2811    if (incr) {
2812        value += delta;
2813        MEMCACHED_COMMAND_INCR(c->sfd, ITEM_key(it), it->nkey, value);
2814    } else {
2815        if(delta > value) {
2816            value = 0;
2817        } else {
2818            value -= delta;
2819        }
2820        MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
2821    }
2822
2823    cb_mutex_enter(&c->thread->stats.mutex);
2824    if (incr) {
2825        c->thread->stats.slab_stats[it->slabs_clsid].incr_hits++;
2826    } else {
2827        c->thread->stats.slab_stats[it->slabs_clsid].decr_hits++;
2828    }
2829    cb_mutex_exit(&c->thread->stats.mutex);
2830
2831    snprintf(buf, INCR_MAX_STORAGE_LEN, "%llu", (unsigned long long)value);
2832    res = (int)strlen(buf);
2833    if (res + 2 > it->nbytes) { /* need to realloc */
2834        item *new_it;
2835        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2836        if (new_it == 0) {
2837            return EOM;
2838        }
2839        memcpy(ITEM_data(new_it), buf, res);
2840        memcpy(ITEM_data(new_it) + res, "\r\n", 2);
2841        item_replace(it, new_it);
2842        do_item_remove(new_it);       /* release our reference */
2843    } else { /* replace in-place */
2844        /* When changing the value without replacing the item, we
2845           need to update the CAS on the existing item. */
2846        ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
2847
2848        memcpy(ITEM_data(it), buf, res);
2849        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2850    }
2851
2852    return OK;
2853}
2854
2855static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2856    char *key;
2857    size_t nkey;
2858    item *it;
2859
2860    cb_assert(c != NULL);
2861
2862    set_noreply_maybe(c, tokens, ntokens);
2863
2864    key = tokens[KEY_TOKEN].value;
2865    nkey = tokens[KEY_TOKEN].length;
2866
2867    if(nkey > KEY_MAX_LENGTH) {
2868        out_string(c, "CLIENT_ERROR bad command line format");
2869        return;
2870    }
2871
2872    if (settings.detail_enabled) {
2873        stats_prefix_record_delete(key, nkey);
2874    }
2875
2876    it = item_get(key, nkey);
2877    if (it) {
2878        MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
2879
2880        cb_mutex_enter(&c->thread->stats.mutex);
2881        c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
2882        cb_mutex_exit(&c->thread->stats.mutex);
2883
2884        item_unlink(it);
2885        item_remove(it);      /* release our reference */
2886        out_string(c, "DELETED");
2887    } else {
2888        cb_mutex_enter(&c->thread->stats.mutex);
2889        c->thread->stats.delete_misses++;
2890        cb_mutex_exit(&c->thread->stats.mutex);
2891
2892        out_string(c, "NOT_FOUND");
2893    }
2894}
2895
2896void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2897    unsigned int level;
2898
2899    cb_assert(c != NULL);
2900
2901    set_noreply_maybe(c, tokens, ntokens);
2902    if (c->noreply && ntokens == 3) {
2903        /* "verbosity noreply" is not according to the correct syntax */
2904        c->noreply = false;
2905        out_string(c, "ERROR");
2906        return;
2907    }
2908
2909    if (safe_strtoul(tokens[1].value, &level)) {
2910        settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2911        out_string(c, "OK");
2912    } else {
2913        out_string(c, "ERROR");
2914    }
2915}
2916
2917void process_command(conn *c, char *command) {
2918
2919    token_t tokens[MAX_TOKENS];
2920    size_t ntokens;
2921    int comm;
2922
2923    cb_assert(c != NULL);
2924
2925    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
2926
2927    if (settings.verbose > 1)
2928        moxi_log_write("<%d %s\n", c->sfd, command);
2929
2930    /*
2931     * for commands set/add/replace, we build an item and read the data
2932     * directly into it, then continue in nread_complete().
2933     */
2934
2935    c->msgcurr = 0;
2936    c->msgused = 0;
2937    c->iovused = 0;
2938    if (add_msghdr(c) != 0) {
2939        out_string(c, "SERVER_ERROR out of memory preparing response");
2940        return;
2941    }
2942
2943    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2944    if (ntokens >= 3 &&
2945        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2946         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2947
2948        process_get_command(c, tokens, ntokens, false);
2949
2950    } else if ((ntokens == 6 || ntokens == 7) &&
2951               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2952                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2953                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2954                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2955                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2956
2957        process_update_command(c, tokens, ntokens, comm, false);
2958
2959    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2960
2961        process_update_command(c, tokens, ntokens, comm, true);
2962
2963    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2964
2965        process_arithmetic_command(c, tokens, ntokens, 1);
2966
2967    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2968
2969        process_get_command(c, tokens, ntokens, true);
2970
2971    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2972
2973        process_arithmetic_command(c, tokens, ntokens, 0);
2974
2975    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2976
2977        process_delete_command(c, tokens, ntokens);
2978
2979    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2980
2981        process_stat(c, tokens, ntokens);
2982
2983    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2984        time_t exptime = 0;
2985        set_current_time();
2986
2987        set_noreply_maybe(c, tokens, ntokens);
2988
2989        cb_mutex_enter(&c->thread->stats.mutex);
2990        c->thread->stats.flush_cmds++;
2991        cb_mutex_exit(&c->thread->stats.mutex);
2992
2993        if(ntokens == (c->noreply ? 3 : 2)) {
2994            settings.oldest_live = current_time - 1;
2995            item_flush_expired();
2996            out_string(c, "OK");
2997            return;
2998        }
2999
3000        exptime = strtol(tokens[1].value, NULL, 10);
3001        if(errno == ERANGE) {
3002            out_string(c, "CLIENT_ERROR bad command line format");
3003            return;
3004        }
3005
3006        /*
3007          If exptime is zero realtime() would return zero too, and
3008          realtime(exptime) - 1 would overflow to the max unsigned
3009          value.  So we process exptime == 0 the same way we do when
3010          no delay is given at all.
3011        */
3012        if (exptime > 0)
3013            settings.oldest_live = c->funcs->conn_realtime(exptime) - 1;
3014        else /* exptime == 0 */
3015            settings.oldest_live = current_time - 1;
3016        item_flush_expired();
3017        out_string(c, "OK");
3018        return;
3019
3020    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
3021
3022        out_string(c, "VERSION " VERSION);
3023
3024    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
3025
3026        conn_set_state(c, conn_closing);
3027
3028    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
3029                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
3030#ifdef ALLOW_SLABS_REASSIGN
3031
3032        int src, dst, rv;
3033
3034        src = strtol(tokens[2].value, NULL, 10);
3035        dst  = strtol(tokens[3].value, NULL, 10);
3036
3037        if(errno == ERANGE) {
3038            out_string(c, "CLIENT_ERROR bad command line format");
3039            return;
3040        }
3041
3042        rv = slabs_reassign(src, dst);
3043        if (rv == 1) {
3044            out_string(c, "DONE");
3045            return;
3046        }
3047        if (rv == 0) {
3048            out_string(c, "CANT");
3049            return;
3050        }
3051        if (rv == -1) {
3052            out_string(c, "BUSY");
3053            return;
3054        }
3055#else
3056        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
3057#endif
3058    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
3059        process_verbosity_command(c, tokens, ntokens);
3060    } else {
3061        out_string(c, "ERROR");
3062    }
3063    return;
3064}
3065
3066void process_stats_proxy_command(conn *c, token_t *tokens, const size_t ntokens) {
3067    if (ntokens == 4 && strcmp(tokens[2].value, "reset") == 0) {
3068        proxy_td *ptd = c->extra;
3069        if (ptd != NULL) {
3070            proxy_stats_reset(ptd->proxy->main);
3071        }
3072
3073        out_string(c, "OK");
3074        return;
3075    }
3076
3077    if (ntokens == 4 && strcmp(tokens[2].value, "timings") == 0) {
3078        proxy_stats_dump_timings(&append_stats, c);
3079    } else if (ntokens == 4 && strcmp(tokens[2].value, "config") == 0) {
3080        proxy_stats_dump_config(&append_stats, c);
3081    } else {
3082        bool do_all = (ntokens == 3 || strcmp(tokens[2].value, "all") == 0);
3083        struct proxy_stats_cmd_info psci = {
3084            .do_info       = (do_all || strcmp(tokens[2].value, "info") == 0),
3085            .do_settings   = (do_all || strcmp(tokens[2].value, "settings") == 0),
3086            .do_behaviors  = (do_all || strcmp(tokens[2].value, "behaviors") == 0),
3087            .do_frontcache = (do_all || strcmp(tokens[2].value, "frontcache") == 0),
3088            .do_keystats   = (do_all || strcmp(tokens[2].value, "keystats") == 0),
3089            .do_stats      = (do_all || strcmp(tokens[2].value, "stats") == 0),
3090            .do_zeros      = (do_all || ntokens == 4)
3091        };
3092
3093        if (psci.do_info) {
3094            proxy_stats_dump_basic(&append_stats, c, "basic:");
3095        }
3096
3097        if (psci.do_settings) {
3098            process_stat_settings(&append_stats, c, "memcached:settings:");
3099        }
3100
3101        if (psci.do_stats) {
3102            server_stats(&append_stats, c, "memcached:stats:" );
3103        }
3104
3105        proxy_stats_dump_proxy_main(&append_stats, c, &psci);
3106
3107        proxy_stats_dump_proxies(&append_stats, c, &psci);
3108    }
3109
3110    /* append terminator and start the transfer */
3111    append_stats(NULL, 0, NULL, 0, c);
3112
3113    if (c->stats.buffer == NULL) {
3114        out_string(c, "SERVER_ERROR out of memory writing stats");
3115    } else {
3116        write_and_free(c, c->stats.buffer, c->stats.offset);
3117        c->stats.buffer = NULL;
3118    }
3119}
3120
3121/*
3122 * if we have a complete line in the buffer, process it.
3123 */
3124int try_read_command(conn *c) {
3125    cb_assert(c != NULL);
3126    cb_assert(c->rcurr <= (c->rbuf + c->rsize));
3127    cb_assert(c->rbytes > 0);
3128
3129    if (IS_NEGOTIATING(c->protocol) || c->transport == udp_transport)  {
3130        if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
3131            c->protocol = IS_PROXY(c->protocol) ? proxy_upstream_binary_prot : binary_prot;
3132        } else {
3133            c->protocol = IS_PROXY(c->protocol) ? proxy_upstream_ascii_prot : ascii_prot;
3134        }
3135
3136        if (settings.verbose) {
3137            moxi_log_write("%d: Client using the %s protocol\n", c->sfd,
3138                    prot_text(c->protocol));
3139        }
3140    }
3141
3142    if (IS_BINARY(c->protocol)) {
3143        /* Do we have the complete packet header? */
3144        if (c->rbytes < (int)sizeof(c->binary_header)) {
3145            /* need more data! */
3146            return 0;
3147        } else {
3148#ifdef NEED_ALIGN
3149            if (((long)(c->rcurr)) % 8 != 0) {
3150                /* must realign input buffer */
3151                memmove(c->rbuf, c->rcurr, c->rbytes);
3152                c->rcurr = c->rbuf;
3153                if (settings.verbose) {
3154                    moxi_log_write("%d: Realign input buffer\n", c->sfd);
3155                }
3156            }
3157#endif
3158            protocol_binary_request_header* req;
3159            req = (protocol_binary_request_header*)c->rcurr;
3160
3161            if (settings.verbose > 1) {
3162                /* Dump the packet before we convert it to host order */
3163                moxi_log_write("<%d Read binary protocol data:\n", c->sfd);
3164                cproxy_dump_header(c->sfd, (char *) req->bytes);
3165            }
3166
3167            c->binary_header = *req;
3168            c->binary_header.request.keylen = ntohs(req->request.keylen);
3169            c->binary_header.request.bodylen = ntohl(req->request.bodylen);
3170            c->binary_header.request.cas = mc_swap64(req->request.cas);
3171
3172            if (c->binary_header.request.magic != c->funcs->conn_binary_command_magic) {
3173                if (settings.verbose) {
3174                    moxi_log_write("Invalid magic:  %x\n",
3175                            c->binary_header.request.magic);
3176                }
3177                conn_set_state(c, conn_closing);
3178                return -1;
3179            }
3180
3181            c->msgcurr = 0;
3182            c->msgused = 0;
3183            c->iovused = 0;
3184            if (add_msghdr(c) != 0) {
3185                out_string(c, "SERVER_ERROR out of memory");
3186                return 0;
3187            }
3188
3189            c->cmd = c->binary_header.request.opcode;
3190            c->keylen = c->binary_header.request.keylen;
3191            c->opaque = c->binary_header.request.opaque;
3192            /* clear the returned cas value */
3193            c->cas = 0;
3194
3195            c->funcs->conn_process_binary_command(c);
3196
3197            c->rbytes -= sizeof(c->binary_header);
3198            c->rcurr += sizeof(c->binary_header);
3199        }
3200    } else {
3201        char *el, *cont;
3202
3203        if (c->rbytes == 0)
3204            return 0;
3205        el = memchr(c->rcurr, '\n', c->rbytes);
3206        if (!el)
3207            return 0;
3208        cont = el + 1;
3209        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
3210            el--;
3211        }
3212        *el = '\0';
3213
3214        cb_assert(cont <= (c->rcurr + c->rbytes));
3215
3216        c->funcs->conn_process_ascii_command(c, c->rcurr);
3217
3218        c->rbytes -= (int)(cont - c->rcurr);
3219        c->rcurr = cont;
3220
3221        cb_assert(c->rcurr <= (c->rbuf + c->rsize));
3222    }
3223
3224    return 1;
3225}
3226
3227/*
3228 * read a UDP request.
3229 */
3230static enum try_read_result try_read_udp(conn *c) {
3231    int res;
3232
3233    cb_assert(c != NULL);
3234
3235    c->request_addr_size = sizeof(c->request_addr);
3236    res = recvfrom(c->sfd, c->rbuf, c->rsize,
3237                   0, &c->request_addr, &c->request_addr_size);
3238    if (res > 8) {
3239        unsigned char *buf = (unsigned char *)c->rbuf;
3240
3241        cb_mutex_enter(&c->thread->stats.mutex);
3242        c->thread->stats.bytes_read += res;
3243        cb_mutex_exit(&c->thread->stats.mutex);
3244
3245        add_bytes_read(c, res);
3246
3247        /* Beginning of UDP packet is the request ID; save it. */
3248        c->request_id = buf[0] * 256 + buf[1];
3249
3250        /* If this is a multi-packet request, drop it. */
3251        if (buf[4] != 0 || buf[5] != 1) {
3252            out_string(c, "SERVER_ERROR multi-packet request not supported");
3253            return READ_NO_DATA_RECEIVED;
3254        }
3255
3256        /* Don't care about any of the rest of the header. */
3257        res -= 8;
3258        memmove(c->rbuf, c->rbuf + 8, res);
3259
3260        c->rbytes += res;
3261        c->rcurr = c->rbuf;
3262        return READ_DATA_RECEIVED;
3263    }
3264    return READ_NO_DATA_RECEIVED;
3265}
3266
3267/*
3268 * read from network as much as we can, handle buffer overflow and connection
3269 * close.
3270 * before reading, move the remaining incomplete fragment of a command
3271 * (if any) to the beginning of the buffer.
3272 * @return enum try_read_result
3273 */
3274static enum try_read_result try_read_network(conn *c) {
3275    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
3276    int res;
3277
3278    cb_assert(c != NULL);
3279
3280    if (c->rcurr != c->rbuf) {
3281        if (c->rbytes != 0) /* otherwise there's nothing to copy */
3282            memmove(c->rbuf, c->rcurr, c->rbytes);
3283        c->rcurr = c->rbuf;
3284    }
3285
3286    while (1) {
3287        if (c->rbytes >= c->rsize) {
3288            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
3289            if (!new_rbuf) {
3290                if (settings.verbose > 0)
3291                    moxi_log_write("Couldn't realloc input buffer\n");
3292                c->rbytes = 0; /* ignore what we read */
3293                out_string(c, "SERVER_ERROR out of memory reading request");
3294                c->write_and_go = conn_closing;
3295                return READ_MEMORY_ERROR;
3296            }
3297            c->rcurr = c->rbuf = new_rbuf;
3298            c->rsize *= 2;
3299        }
3300
3301        int avail = c->rsize - c->rbytes;
3302        cb_assert(avail > 0);
3303        res = recv(c->sfd, c->rbuf + c->rbytes, avail, 0);
3304        if (res > 0) {
3305            cb_mutex_enter(&c->thread->stats.mutex);
3306            c->thread->stats.bytes_read += res;
3307            cb_mutex_exit(&c->thread->stats.mutex);
3308
3309            add_bytes_read(c, res);
3310
3311            gotdata = READ_DATA_RECEIVED;
3312            c->rbytes += res;
3313            if (res == avail) {
3314                continue;
3315            } else {
3316                break;
3317            }
3318        }
3319        if (res == 0) {
3320            return READ_ERROR;
3321        }
3322        if (res == -1) {
3323            if (is_blocking(errno)) {
3324                break;
3325            }
3326            return READ_ERROR;
3327        }
3328    }
3329    return gotdata;
3330}
3331
3332bool update_event_real(conn *c, const int new_flags, const char *update_diag) {
3333    return update_event_timed_real(c, new_flags, NULL, update_diag);
3334}
3335
3336bool update_event_timed_real(conn *c, const int new_flags, struct timeval *timeout, const char *update_diag) {
3337    cb_assert(c != NULL);
3338
3339    struct event_base *base = c->event.ev_base;
3340    if (c->ev_flags == new_flags && timeout == NULL)
3341        return true;
3342
3343    c->update_diag = update_diag;
3344
3345    if (event_del(&c->event) == -1)
3346        return false;
3347    c->ev_flags = new_flags;
3348    if (new_flags == 0 && timeout == NULL)
3349        return true;
3350    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
3351    event_base_set(base, &c->event);
3352    if (event_add(&c->event, timeout) == -1)
3353        return false;
3354    return true;
3355}
3356
3357/*
3358 * Sets whether we are listening for new connections or not.
3359 */
3360void do_accept_new_conns(const bool do_accept) {
3361    conn *next;
3362
3363    for (next = listen_conn; next; next = next->next) {
3364        if (do_accept) {
3365            update_event(next, EV_READ | EV_PERSIST);
3366            if (listen(next->sfd, settings.backlog) != 0) {
3367                perror("listen");
3368            }
3369        }
3370        else {
3371            update_event(next, 0);
3372            if (listen(next->sfd, 0) != 0) {
3373                perror("listen");
3374            }
3375        }
3376    }
3377
3378    if (do_accept) {
3379        STATS_LOCK();
3380        stats.accepting_conns = true;
3381        STATS_UNLOCK();
3382    } else {
3383        STATS_LOCK();
3384        stats.accepting_conns = false;
3385        stats.listen_disabled_num++;
3386        STATS_UNLOCK();
3387    }
3388}
3389
3390/*
3391 * Transmit the next chunk of data from our list of msgbuf structures.
3392 *
3393 * Returns:
3394 *   TRANSMIT_COMPLETE   All done writing.
3395 *   TRANSMIT_INCOMPLETE More data remaining to write.
3396 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
3397 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3398 */
3399static enum transmit_result transmit(conn *c) {
3400    cb_assert(c != NULL);
3401
3402    if (c->msgcurr < c->msgused &&
3403            c->msglist[c->msgcurr].msg_iovlen == 0) {
3404        /* Finished writing the current msg; advance to the next. */
3405        c->msgcurr++;
3406    }
3407    if (c->msgcurr < c->msgused) {
3408        ssize_t res;
3409        struct msghdr *m = &c->msglist[c->msgcurr];
3410#ifdef WIN32
3411        DWORD error;
3412#else
3413        int error;
3414#endif
3415
3416        res = sendmsg(c->sfd, m, 0);
3417#ifdef WIN32
3418        error = WSAGetLastError();
3419#else
3420        error = errno;
3421#endif
3422        if (res > 0) {
3423            cb_mutex_enter(&c->thread->stats.mutex);
3424            c->thread->stats.bytes_written += res;
3425            cb_mutex_exit(&c->thread->stats.mutex);
3426
3427            /* We've written some of the data. Remove the completed
3428               iovec entries from the list of pending writes. */
3429            while (m->msg_iovlen > 0 && res >= (ssize_t)(m->msg_iov->iov_len)) {
3430                res -= (ssize_t)m->msg_iov->iov_len;
3431                m->msg_iovlen--;
3432                m->msg_iov++;
3433            }
3434
3435            /* Might have written just part of the last iovec entry;
3436               adjust it so the next write will do the rest. */
3437            if (res > 0) {
3438                m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
3439                m->msg_iov->iov_len -= res;
3440            }
3441            return TRANSMIT_INCOMPLETE;
3442        }
3443
3444        if (res == -1 && is_blocking(error)) {
3445            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3446                if (settings.verbose > 0)
3447                    moxi_log_write("Couldn't update event\n");
3448                conn_set_state(c, conn_closing);
3449                return TRANSMIT_HARD_ERROR;
3450            }
3451            return TRANSMIT_SOFT_ERROR;
3452        }
3453        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3454           we have a real error, on which we close the connection */
3455        if (settings.verbose > 0)
3456            perror("Failed to write, and not due to blocking");
3457
3458        if (IS_UDP(c->transport))
3459            conn_set_state(c, conn_read);
3460        else
3461            conn_set_state(c, conn_closing);
3462        return TRANSMIT_HARD_ERROR;
3463    } else {
3464        return TRANSMIT_COMPLETE;
3465    }
3466}
3467
3468void drive_machine(conn *c) {
3469    bool stop = false;
3470    SOCKET sfd;
3471    socklen_t addrlen;
3472    struct sockaddr_storage addr;
3473    int nreqs = settings.reqs_per_event;
3474    int res;
3475#ifdef WIN32
3476    DWORD error;
3477#else
3478    int error;
3479#endif
3480
3481    cb_assert(c != NULL);
3482
3483    while (!stop) {
3484        if (settings.verbose > 2) {
3485            moxi_log_write("%d: drive_machine %s\n",
3486                    c->sfd, state_text(c->state));
3487        }
3488
3489        switch(c->state) {
3490        case conn_listening:
3491            addrlen = sizeof(addr);
3492            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == INVALID_SOCKET) {
3493#ifdef WIN32
3494                error = WSAGetLastError();
3495#else
3496                error = errno;
3497#endif
3498                if (is_blocking(error)) {
3499                    /* these are transient, so don't log anything */
3500                    stop = true;
3501                } else if (is_emfile(error)) {
3502                    if (settings.verbose > 0)
3503                        moxi_log_write("Too many open connections\n");
3504                    accept_new_conns(false);
3505                    stop = true;
3506                } else {
3507                    perror("accept()");
3508                    stop = true;
3509                }
3510                break;
3511            }
3512            if (evutil_make_socket_nonblocking(sfd) == -1) {
3513                perror("setting O_NONBLOCK");
3514                closesocket(sfd);
3515                break;
3516            }
3517
3518            dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
3519                              DATA_BUFFER_SIZE,
3520                              c->protocol,
3521                              tcp_transport,
3522                              c->funcs, c->extra);
3523            stop = true;
3524            break;
3525
3526        case conn_connecting:
3527            if (c->funcs->conn_connect != NULL) {
3528                if (c->funcs->conn_connect(c) == true) {
3529                    stop = true;
3530                }
3531            } else {
3532                conn_set_state(c, conn_closing);
3533                update_event(c, 0);
3534            }
3535            break;
3536
3537        case conn_waiting:
3538            if (!update_event(c, EV_READ | EV_PERSIST)) {
3539                if (settings.verbose > 0)
3540                    moxi_log_write("Couldn't update event\n");
3541                conn_set_state(c, conn_closing);
3542                break;
3543            }
3544
3545            conn_set_state(c, conn_read);
3546            stop = true;
3547            break;
3548
3549        case conn_read:
3550            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
3551
3552            switch (res) {
3553            case READ_NO_DATA_RECEIVED:
3554                conn_set_state(c, conn_waiting);
3555                break;
3556            case READ_DATA_RECEIVED:
3557                conn_set_state(c, conn_parse_cmd);
3558                break;
3559            case READ_ERROR:
3560                conn_set_state(c, conn_closing);
3561                break;
3562            case READ_MEMORY_ERROR: /* Failed to allocate more memory */
3563                /* State already set by try_read_network */
3564                break;
3565            }
3566            break;
3567
3568        case conn_parse_cmd:
3569            if (try_read_command(c) == 0) {
3570                /* wee need more data! */
3571                conn_set_state(c, conn_waiting);
3572            }
3573
3574            break;
3575
3576        case conn_new_cmd:
3577            /* Only process nreqs at a time to avoid starving other
3578               connections */
3579
3580            --nreqs;
3581            if (IS_DOWNSTREAM(c->protocol) || nreqs >= 0) {
3582                reset_cmd_handler(c);
3583            } else {
3584                cb_mutex_enter(&c->thread->stats.mutex);
3585                c->thread->stats.conn_yields++;
3586                cb_mutex_exit(&c->thread->stats.mutex);
3587                if (c->rbytes > 0) {
3588                    /* We have already read in data into the input buffer,
3589                       so libevent will most likely not signal read events
3590                       on the socket (unless more data is available. As a
3591                       hack we should just put in a request to write data,
3592                       because that should be possible ;-)
3593                    */
3594                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3595                        if (settings.verbose > 0) {
3596                            moxi_log_write("Couldn't update event\n");
3597                        }
3598
3599                        conn_set_state(c, conn_closing);
3600                    }
3601                }
3602                stop = true;
3603            }
3604            break;
3605
3606        case conn_nread:
3607            cb_assert(c->rlbytes >= 0);
3608            if (c->rlbytes == 0) {
3609                complete_nread(c);
3610                break;
3611            }
3612            /* first check if we have leftovers in the conn_read buffer */
3613            if (c->rbytes > 0) {
3614                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
3615                if (c->ritem != c->rcurr) {
3616                    memmove(c->ritem, c->rcurr, tocopy);
3617                }
3618                c->ritem += tocopy;
3619                c->rlbytes -= tocopy;
3620                c->rcurr += tocopy;
3621                c->rbytes -= tocopy;
3622                if (c->rlbytes == 0) {
3623                    break;
3624                }
3625            }
3626
3627            /*  now try reading from the socket */
3628            res = recv(c->sfd, c->ritem, c->rlbytes, 0);
3629#ifdef WIN32
3630            error = WSAGetLastError();
3631#else
3632            error = errno;
3633#endif
3634
3635            if (res > 0) {
3636                add_bytes_read(c, res);
3637
3638                if (c->rcurr == c->ritem) {
3639                    c->rcurr += res;
3640                }
3641                c->ritem += res;
3642                c->rlbytes -= res;
3643                break;
3644            }
3645            if (res == 0) { /* end of stream */
3646                conn_set_state(c, conn_closing);
3647                break;
3648            }
3649            if (res == -1 && is_blocking(error)) {
3650                if (!update_event(c, EV_READ | EV_PERSIST)) {
3651                    if (settings.verbose > 0)
3652                        moxi_log_write( "Couldn't update event\n");
3653                    conn_set_state(c, conn_closing);
3654                    break;
3655                }
3656                stop = true;
3657                break;
3658            }
3659            /* otherwise we have a real error, on which we close the connection */
3660            if (!is_closed_conn(error) && settings.verbose > 0) {
3661                moxi_log_write("Failed to read, and not due to blocking:\n"
3662                        "errno: %d %s \n"
3663                        "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3664                        errno, strerror(errno),
3665                        (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
3666                        (int)c->rlbytes, (int)c->rsize);
3667            }
3668            conn_set_state(c, conn_closing);
3669            break;
3670
3671        case conn_swallow:
3672            /* we are reading sbytes and throwing them away */
3673            if (c->sbytes == 0) {
3674                conn_set_state(c, conn_new_cmd);
3675                break;
3676            }
3677
3678            /* first check if we have leftovers in the conn_read buffer */
3679            if (c->rbytes > 0) {
3680                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
3681                c->sbytes -= tocopy;
3682                c->rcurr += tocopy;
3683                c->rbytes -= tocopy;
3684                break;
3685            }
3686
3687            /*  now try reading from the socket */
3688            res = recv(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize, 0);
3689#ifdef WIN32
3690            error = WSAGetLastError();
3691#else
3692            error = errno;
3693#endif
3694
3695            if (res > 0) {
3696                cb_mutex_enter(&c->thread->stats.mutex);
3697                c->thread->stats.bytes_read += res;
3698                cb_mutex_exit(&c->thread->stats.mutex);
3699                add_bytes_read(c, res);
3700                c->sbytes -= res;
3701                break;
3702            }
3703            if (res == 0) { /* end of stream */
3704                conn_set_state(c, conn_closing);
3705                break;
3706            }
3707            if (res == -1 && is_blocking(error)) {
3708                if (!update_event(c, EV_READ | EV_PERSIST)) {
3709                    if (settings.verbose > 0)
3710                        moxi_log_write("Couldn't update event\n");
3711                    conn_set_state(c, conn_closing);
3712                    break;
3713                }
3714                stop = true;
3715                break;
3716            }
3717            /* otherwise we have a real error, on which we close the connection */
3718            if (!is_closed_conn(error) && settings.verbose > 0) {
3719                moxi_log_write("Failed to read, and not due to blocking\n");
3720            }
3721            conn_set_state(c, conn_closing);
3722            break;
3723
3724        case conn_write:
3725            /*
3726             * We want to write out a simple response. If we haven't already,
3727             * assemble it into a msgbuf list (this will be a single-entry
3728             * list for TCP or a two-entry list for UDP).
3729             */
3730            if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
3731                if (add_iov(c, c->wcurr, c->wbytes) != 0) {
3732                    if (settings.verbose > 0)
3733                        moxi_log_write("Couldn't build response\n");
3734                    conn_set_state(c, conn_closing);
3735                    break;
3736                }
3737            }
3738
3739            /* fall through... */
3740
3741        case conn_mwrite:
3742          if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
3743            if (settings.verbose > 0)
3744              moxi_log_write("Failed to build UDP headers\n");
3745            conn_set_state(c, conn_closing);
3746            break;
3747          }
3748            switch (transmit(c)) {
3749            case TRANSMIT_COMPLETE:
3750                if (c->state == conn_mwrite) {
3751                    while (c->ileft > 0) {
3752                        item *it = *(c->icurr);
3753                        cb_assert((it->it_flags & ITEM_SLABBED) == 0);
3754                        item_remove(it);
3755                        c->icurr++;
3756                        c->ileft--;
3757                    }
3758                    while (c->suffixleft > 0) {
3759                        char *suffix = *(c->suffixcurr);
3760                        cache_free(c->thread->suffix_cache, suffix);
3761                        c->suffixcurr++;
3762                        c->suffixleft--;
3763                    }
3764                    conn_set_state(c, c->write_and_go);
3765                } else if (c->state == conn_write) {
3766                    if (c->write_and_free) {
3767                        free(c->write_and_free);
3768                        c->write_and_free = 0;
3769                    }
3770                    conn_set_state(c, c->write_and_go);
3771                } else {
3772                    if (settings.verbose > 0)
3773                        moxi_log_write("Unexpected state %d\n", c->state);
3774                    conn_set_state(c, conn_closing);
3775                }
3776                break;
3777
3778            case TRANSMIT_INCOMPLETE:
3779            case TRANSMIT_HARD_ERROR:
3780                break;                   /* Continue in state machine. */
3781
3782            case TRANSMIT_SOFT_ERROR:
3783                stop = true;
3784                break;
3785            }
3786            break;
3787
3788        case conn_pause:
3789            /* In case whoever put us into conn_pause didn't clear out */
3790            /* libevent registration, do so now. */
3791
3792            update_event(c, 0);
3793
3794            if (c->funcs->conn_pause != NULL)
3795                c->funcs->conn_pause(c);
3796
3797            stop = true;
3798            break;
3799
3800        case conn_closing:
3801            if (c->funcs->conn_close != NULL)
3802                c->funcs->conn_close(c);
3803
3804            if (IS_UDP(c->transport))
3805                conn_cleanup(c);
3806            else
3807                conn_close(c);
3808            stop = true;
3809            break;
3810
3811        case conn_max_state:
3812            cb_assert(false);
3813            break;
3814        }
3815    }
3816
3817    return;
3818}
3819
3820void add_bytes_read(conn *c, int bytes_read) {
3821    cb_assert(c != NULL);
3822    cb_mutex_enter(&c->thread->stats.mutex);
3823    c->thread->stats.bytes_read += bytes_read;
3824    cb_mutex_exit(&c->thread->stats.mutex);
3825}
3826
3827static void event_handler(evutil_socket_t fd, short which, void *arg) {
3828    conn *c;
3829
3830    c = (conn *)arg;
3831    cb_assert(c != NULL);
3832
3833    c->which = which;
3834
3835    /* sanity */
3836    if (fd != c->sfd) {
3837        if (settings.verbose > 0)
3838            moxi_log_write("Catastrophic: event fd doesn't match conn fd!\n");
3839        conn_close(c);
3840        return;
3841    }
3842
3843    c->update_diag = "working";
3844
3845    drive_machine(c);
3846
3847    /* wait for next event */
3848    return;
3849}
3850
3851static SOCKET new_socket(struct addrinfo *ai) {
3852    SOCKET sfd;
3853
3854    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
3855        return INVALID_SOCKET;
3856    }
3857
3858    if (evutil_make_socket_nonblocking(sfd) == -1) {
3859        perror("setting O_NONBLOCK");
3860        closesocket(sfd);
3861        return INVALID_SOCKET;
3862    }
3863    return sfd;
3864}
3865
3866
3867/*
3868 * Sets a socket's send buffer size to the maximum allowed by the system.
3869 */
3870static void maximize_sndbuf(const SOCKET sfd) {
3871    socklen_t intsize = sizeof(int);
3872    int last_good = 0;
3873    int min, max, avg;
3874    int old_size;
3875
3876    /* Start with the default size. */
3877    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void*)&old_size, &intsize) != 0) {
3878        if (settings.verbose > 0)
3879            perror("getsockopt(SO_SNDBUF)");
3880        return;
3881    }
3882
3883    /* Binary-search for the real maximum. */
3884    min = old_size;
3885    max = MAX_SENDBUF_SIZE;
3886
3887    while (min <= max) {
3888        avg = ((unsigned int)(min + max)) / 2;
3889        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
3890            last_good = avg;
3891            min = avg + 1;
3892        } else {
3893            max = avg - 1;
3894        }
3895    }
3896
3897    if (settings.verbose > 1)
3898        moxi_log_write("<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
3899}
3900
3901/**
3902 * Create a socket and bind it to a specific port number
3903 * @param port the port number to bind to
3904 * @param transport the transport protocol (TCP / UDP)
3905 * @param portnumber_file A filepointer to write the port numbers to
3906 *        when they are successfully added to the list of ports we
3907 *        listen on.
3908 */
3909int server_socket(int port, enum network_transport transport,
3910                  FILE *portnumber_file) {
3911    SOCKET sfd;
3912    struct linger ling = {0, 0};
3913    struct</