xref: /6.0.3/moxi/src/memcached.c (revision 9add9f2e)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15 */
16#include "memcached.h"
17#include <sys/stat.h>
18#include <signal.h>
19#include <ctype.h>
20#include <stdarg.h>
21
22#include <fcntl.h>
23#include <errno.h>
24#include <stdlib.h>
25#include <stdio.h>
26#include <string.h>
27#include <time.h>
28#include <platform/cbassert.h>
29#include <limits.h>
30#include <stddef.h>
31#include <getopt.h>
32// MB-14649 log() crash on windows on some CPU's
33#include <math.h>
34
35#include "cproxy.h"
36#include "agent.h"
37#include "stdin_check.h"
38#include "log.h"
39
40int IS_UDP(enum network_transport protocol) {
41    return protocol == udp_transport;
42}
43
44#ifdef WIN32
45static int is_blocking(DWORD dw) {
46    return (dw == WSAEWOULDBLOCK);
47}
48static int is_emfile(DWORD dw) {
49    return (dw == WSAEMFILE);
50}
51static int is_closed_conn(DWORD dw) {
52    return (dw == WSAENOTCONN || WSAECONNRESET);
53}
54static int is_addrinuse(DWORD dw) {
55    return (dw == WSAEADDRINUSE);
56}
57#else
58static int is_blocking(int dw) {
59    return (dw == EAGAIN || dw == EWOULDBLOCK);
60}
61
62static int is_emfile(int dw) {
63    return (dw == EMFILE);
64}
65
66static int is_closed_conn(int dw) {
67    return  (dw == ENOTCONN || dw != ECONNRESET);
68}
69
70static int is_addrinuse(int dw) {
71    return (dw == EADDRINUSE);
72}
73#endif
74
75/*
76 * forward declarations
77 */
78static SOCKET new_socket(struct addrinfo *ai);
79
80enum try_read_result {
81    READ_DATA_RECEIVED,
82    READ_NO_DATA_RECEIVED,
83    READ_ERROR,            /** an error occured (on the socket) (or client closed connection) */
84    READ_MEMORY_ERROR      /** failed to allocate more memory */
85};
86
87static enum try_read_result try_read_network(conn *c);
88static enum try_read_result try_read_udp(conn *c);
89
90/* stats */
91static void stats_init(void);
92
93/* defaults */
94static void settings_init(void);
95
96/* event handling, network IO */
97static void event_handler(evutil_socket_t fd, short which, void *arg);
98static void conn_close(conn *c);
99static void conn_init(void);
100static void write_and_free(conn *c, char *buf, size_t bytes);
101
102/* time handling */
103static rel_time_t realtime(const time_t exptime);
104static void set_current_time(void);  /* update the global variable holding
105                              global 32-bit seconds-since-start time
106                              (to avoid 64 bit time_t) */
107
108static void conn_free(conn *c);
109
110/** exported globals **/
111struct stats stats;
112struct settings settings;
113time_t process_started;     /* when the process was started */
114conn *listen_conn = NULL;
115
116/** file scope variables **/
117static struct event_base *main_base;
118
119enum transmit_result {
120    TRANSMIT_COMPLETE,   /** All done writing. */
121    TRANSMIT_INCOMPLETE, /** More data remaining to write. */
122    TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
123    TRANSMIT_HARD_ERROR  /** Can't write (c->state is set to conn_closing) */
124};
125
126static enum transmit_result transmit(conn *c);
127
128conn_funcs conn_funcs_default = {
129    .conn_init                   = NULL,
130    .conn_close                  = NULL,
131    .conn_connect                = NULL,
132    .conn_process_ascii_command  = process_command,
133    .conn_process_binary_command = dispatch_bin_command,
134    .conn_complete_nread_ascii   = complete_nread_ascii,
135    .conn_complete_nread_binary  = complete_nread_binary,
136    .conn_pause                  = NULL,
137    .conn_realtime               = realtime,
138    .conn_binary_command_magic   = PROTOCOL_BINARY_REQ,
139    .conn_state_change           = NULL
140};
141
142#ifdef MAIN_CHECK
143int main_check(int argc, char **argv);
144#endif
145
146/* global logger handle */
147moxi_log *ml;
148
149/*
150 * given time value that's either unix time or delta from current unix time, return
151 * unix time. Use the fact that delta can't exceed one month (and real time value can't
152 * be that low).
153 */
154static rel_time_t realtime(const time_t exptime) {
155    /* no. of seconds in 30 days - largest possible delta exptime */
156
157    if (exptime == 0) return 0; /* 0 means never expire */
158
159    if (exptime > REALTIME_MAXDELTA) {
160        /* if item expiration is at/before the server started, give it an
161           expiration time of 1 second after the server started.
162           (because 0 means don't expire).  without this, we'd
163           underflow and wrap around to some large value way in the
164           future, effectively making items expiring in the past
165           really expiring never */
166        if (exptime <= process_started)
167            return (rel_time_t)1;
168        return (rel_time_t)(exptime - process_started);
169    } else {
170        return (rel_time_t)(exptime + current_time);
171    }
172}
173
174static void stats_init(void) {
175    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
176    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
177    stats.curr_bytes = stats.listen_disabled_num = 0;
178    stats.accepting_conns = true; /* assuming we start in this state. */
179
180    /* make the time we started always be 2 seconds before we really
181       did, so time(0) - time.started is never zero.  if so, things
182       like 'settings.oldest_live' which act as booleans as well as
183       values are now false in boolean context... */
184    process_started = time(0) - 2;
185    stats_prefix_init();
186}
187
188static void stats_reset(void) {
189    STATS_LOCK();
190    stats.total_items = stats.total_conns = 0;
191    stats.evictions = 0;
192    stats.listen_disabled_num = 0;
193    stats_prefix_clear();
194    STATS_UNLOCK();
195    threadlocal_stats_reset();
196    item_stats_reset();
197}
198
199static void settings_init(void) {
200    settings.use_cas = true;
201    settings.access = 0700;
202    settings.port = UNSPECIFIED;
203    settings.udpport = UNSPECIFIED;
204    /* By default this string should be NULL for getaddrinfo() */
205    settings.inter = NULL;
206    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
207    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
208    settings.verbose = 0;
209    settings.oldest_live = 0;
210    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
211    settings.socketpath = NULL;       /* by default, not using a unix socket */
212    settings.factor = 1.25;
213    settings.chunk_size = 48;         /* space for a modest key and value */
214    settings.num_threads = 4 + 1;     /* N workers + 1 dispatcher */
215    settings.prefix_delimiter = ':';
216    settings.detail_enabled = 0;
217    settings.reqs_per_event = 20;
218    settings.backlog = 1024;
219    settings.binding_protocol = negotiating_prot;
220}
221
222/*
223 * Adds a message header to a connection.
224 *
225 * Returns 0 on success, -1 on out-of-memory.
226 */
227int add_msghdr(conn *c) {
228    struct msghdr *msg;
229
230    cb_assert(c != NULL);
231
232    if (c->msgsize == c->msgused) {
233        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
234        if (! msg)
235            return -1;
236        c->msglist = msg;
237        c->msgsize *= 2;
238    }
239
240    msg = c->msglist + c->msgused;
241
242    /* this wipes msg_iovlen, msg_control, msg_controllen, and
243       msg_flags, the last 3 of which aren't defined on solaris: */
244    memset(msg, 0, sizeof(struct msghdr));
245
246    msg->msg_iov = &c->iov[c->iovused];
247
248    if (c->request_addr_size > 0) {
249        msg->msg_name = &c->request_addr;
250        msg->msg_namelen = c->request_addr_size;
251    }
252
253    c->msgbytes = 0;
254    c->msgused++;
255
256    if (IS_UDP(c->transport)) {
257        /* Leave room for the UDP header, which we'll fill in later. */
258        return add_iov(c, NULL, UDP_HEADER_SIZE);
259    }
260
261    return 0;
262}
263
264
265/*
266 * Free list management for connections.
267 */
268
269static conn **freeconns;
270static size_t freetotal;
271static size_t freecurr;
272/* Lock for connection freelist */
273static cb_mutex_t conn_lock;
274
275static void conn_init(void) {
276    freetotal = 200;
277    freecurr = 0;
278    cb_mutex_initialize(&conn_lock);
279    if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
280        moxi_log_write("Failed to allocate connection structures\n");
281    }
282    return;
283}
284
285/*
286 * Returns a connection from the freelist, if any.
287 */
288conn *conn_from_freelist() {
289    conn *c;
290
291    cb_mutex_enter(&conn_lock);
292    if (freecurr > 0) {
293        c = freeconns[--freecurr];
294    } else {
295        c = NULL;
296    }
297    cb_mutex_exit(&conn_lock);
298
299    return c;
300}
301
302/*
303 * Adds a connection to the freelist. 0 = success.
304 */
305bool conn_add_to_freelist(conn *c) {
306    bool ret = true;
307    cb_mutex_enter(&conn_lock);
308    if (freecurr < freetotal) {
309        freeconns[freecurr++] = c;
310        ret = false;
311    } else {
312        /* try to enlarge free connections array */
313        size_t newsize = freetotal * 2;
314        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
315        if (new_freeconns) {
316            freetotal = newsize;
317            freeconns = new_freeconns;
318            freeconns[freecurr++] = c;
319            ret = false;
320        }
321    }
322    cb_mutex_exit(&conn_lock);
323    return ret;
324}
325
326static const char *prot_text(enum protocol prot) {
327    char *rv = "unknown";
328    switch(prot) {
329        case ascii_prot:
330            rv = "ascii";
331            break;
332        case binary_prot:
333            rv = "binary";
334            break;
335        case proxy_upstream_ascii_prot:
336            rv = "proxy-upstream-ascii";
337            break;
338        case proxy_upstream_binary_prot:
339            rv = "proxy-upstream-binary";
340            break;
341        case proxy_downstream_ascii_prot:
342            rv = "proxy-downstream-ascii";
343            break;
344        case proxy_downstream_binary_prot:
345            rv = "proxy-downstream-binary";
346            break;
347        case negotiating_proxy_prot:
348            rv = "auto-negotiate-proxy";
349        case negotiating_prot:
350            rv = "auto-negotiate";
351            break;
352    }
353    return rv;
354}
355
356conn *conn_new(const SOCKET sfd, enum conn_states init_state,
357               const int event_flags,
358               const int read_buffer_size,
359               enum network_transport transport,
360               struct event_base *base,
361               conn_funcs *funcs, void *extra) {
362    conn *c = conn_from_freelist();
363
364    if (NULL == c) {
365        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
366            moxi_log_write("calloc()\n");
367            return NULL;
368        }
369        MEMCACHED_CONN_CREATE(c);
370
371        c->rbuf = c->wbuf = 0;
372        c->ilist = 0;
373        c->suffixlist = 0;
374        c->iov = 0;
375        c->msglist = 0;
376        c->hdrbuf = 0;
377
378        c->rsize = read_buffer_size;
379        c->wsize = DATA_BUFFER_SIZE;
380        c->isize = ITEM_LIST_INITIAL;
381        c->suffixsize = SUFFIX_LIST_INITIAL;
382        c->iovsize = IOV_LIST_INITIAL;
383        c->msgsize = MSG_LIST_INITIAL;
384        c->hdrsize = 0;
385
386        c->rbuf = (char *)malloc((size_t)c->rsize);
387        c->wbuf = (char *)malloc((size_t)c->wsize);
388        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
389        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
390        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
391        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
392
393        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
394                c->msglist == 0 || c->suffixlist == 0) {
395            conn_free(c);
396            moxi_log_write("malloc()\n");
397            return NULL;
398        }
399
400        STATS_LOCK();
401        stats.conn_structs++;
402        STATS_UNLOCK();
403    }
404
405    c->transport = transport;
406    c->protocol = settings.binding_protocol;
407
408    /* unix socket mode doesn't need this, so zeroed out.  but why
409     * is this done for every command?  presumably for UDP
410     * mode.  */
411    if (!settings.socketpath) {
412        c->request_addr_size = sizeof(c->request_addr);
413    } else {
414        c->request_addr_size = 0;
415    }
416
417    if (settings.verbose > 1) {
418        if (init_state == conn_listening) {
419            moxi_log_write("<%d server listening (%s)\n", sfd,
420                prot_text(c->protocol));
421        } else if (IS_UDP(transport)) {
422            moxi_log_write("<%d server listening (udp)\n", sfd);
423        } else if (IS_NEGOTIATING(c->protocol)) {
424            moxi_log_write("<%d new auto-negotiating client connection\n",
425                    sfd);
426        } else if (c->protocol == ascii_prot) {
427            moxi_log_write("<%d new ascii client connection.\n", sfd);
428        } else if (c->protocol == binary_prot) {
429            moxi_log_write("<%d new binary client connection.\n", sfd);
430        } else {
431            moxi_log_write("<%d new %s client connection\n",
432                    sfd, prot_text(c->protocol));
433        }
434    }
435
436    c->sfd = sfd;
437    c->state = init_state;
438    c->rlbytes = 0;
439    c->cmd = -1;
440    c->rbytes = c->wbytes = 0;
441    c->wcurr = c->wbuf;
442    c->rcurr = c->rbuf;
443    c->ritem = 0;
444    c->icurr = c->ilist;
445    c->suffixcurr = c->suffixlist;
446    c->ileft = 0;
447    c->suffixleft = 0;
448    c->iovused = 0;
449    c->msgcurr = 0;
450    c->msgused = 0;
451
452    c->write_and_go = init_state;
453    c->write_and_free = 0;
454    c->item = 0;
455
456    c->noreply = false;
457
458    c->funcs = funcs;
459    if (c->funcs == NULL) {
460        c->funcs = &conn_funcs_default;
461        if (settings.verbose > 1)
462            moxi_log_write( "<%d initialized conn_funcs to default\n", sfd);
463    }
464
465    c->cmd_curr = -1;
466    c->cmd_start = NULL;
467    c->cmd_start_time = 0;
468    c->cmd_retries = 0;
469    c->corked = NULL;
470    c->host_ident = NULL;
471    c->peer_host = NULL;
472    c->peer_protocol = 0;
473    c->peer_port = 0;
474    c->update_diag = NULL;
475
476    c->extra = extra;
477
478    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
479    event_base_set(base, &c->event);
480    c->ev_flags = event_flags;
481
482    if (event_add(&c->event, 0) == -1) {
483        event_del(&c->event);
484        conn_free(c);
485        perror("event_add\n");
486        return NULL;
487    }
488
489    if (c->funcs->conn_init != NULL &&
490        c->funcs->conn_init(c) == false) {
491        event_del(&c->event);
492        conn_free(c);
493        return NULL;
494    }
495
496    STATS_LOCK();
497    stats.curr_conns++;
498    stats.total_conns++;
499    STATS_UNLOCK();
500
501    MEMCACHED_CONN_ALLOCATE(c->sfd);
502
503    return c;
504}
505
506static void conn_cleanup(conn *c) {
507    cb_assert(c != NULL);
508
509    if (c->item) {
510        item_remove(c->item);
511        c->item = 0;
512    }
513
514    if (c->ileft != 0) {
515        for (; c->ileft > 0; c->ileft--, c->icurr++) {
516            item_remove(*(c->icurr));
517        }
518    }
519
520    if (c->suffixleft != 0) {
521        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
522            cache_free(c->thread->suffix_cache, *(c->suffixcurr));
523        }
524    }
525
526    if (c->write_and_free) {
527        free(c->write_and_free);
528        c->write_and_free = 0;
529    }
530}
531
532/*
533 * Frees a connection.
534 */
535void conn_free(conn *c) {
536    if (c) {
537        MEMCACHED_CONN_DESTROY(c);
538        if (c->hdrbuf)
539            free(c->hdrbuf);
540        if (c->msglist)
541            free(c->msglist);
542        if (c->rbuf)
543            free(c->rbuf);
544        if (c->wbuf)
545            free(c->wbuf);
546        if (c->ilist)
547            free(c->ilist);
548        if (c->suffixlist)
549            free(c->suffixlist);
550        if (c->iov)
551            free(c->iov);
552        if (c->host_ident)
553            free(c->host_ident);
554
555        while (c->corked != NULL) {
556            bin_cmd *bc = c->corked;
557            c->corked = c->corked->next;
558            if (bc->request_item != NULL) {
559                item_remove(bc->request_item);
560            }
561            if (bc->response_item != NULL) {
562                item_remove(bc->response_item);
563            }
564            free(bc);
565        }
566
567        free(c);
568    }
569}
570
571static void conn_close(conn *c) {
572    cb_assert(c != NULL);
573
574    /* delete the event, the socket and the conn */
575    event_del(&c->event);
576
577    if (settings.verbose > 1)
578        moxi_log_write("<%d connection closed.\n", c->sfd);
579
580    MEMCACHED_CONN_RELEASE(c->sfd);
581    closesocket(c->sfd);
582    accept_new_conns(true);
583    conn_cleanup(c);
584
585    /* if the connection has big buffers, just free it */
586    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
587        conn_free(c);
588    }
589
590    STATS_LOCK();
591    stats.curr_conns--;
592    STATS_UNLOCK();
593
594    return;
595}
596
597/*
598 * Shrinks a connection's buffers if they're too big.  This prevents
599 * periodic large "get" requests from permanently chewing lots of server
600 * memory.
601 *
602 * This should only be called in between requests since it can wipe output
603 * buffers!
604 */
605static void conn_shrink(conn *c) {
606    cb_assert(c != NULL);
607
608    if (IS_UDP(c->transport))
609        return;
610
611    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
612        char *newbuf;
613
614        if (c->rcurr != c->rbuf)
615            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
616
617        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
618
619        if (newbuf) {
620            c->rbuf = newbuf;
621            c->rsize = DATA_BUFFER_SIZE;
622        }
623        /* TODO check other branch... */
624        c->rcurr = c->rbuf;
625    }
626
627    if (c->isize > ITEM_LIST_HIGHWAT) {
628        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
629        if (newbuf) {
630            c->ilist = newbuf;
631            c->isize = ITEM_LIST_INITIAL;
632        }
633    /* TODO check error condition? */
634    }
635
636    if (c->msgsize > MSG_LIST_HIGHWAT) {
637        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
638        if (newbuf) {
639            c->msglist = newbuf;
640            c->msgsize = MSG_LIST_INITIAL;
641        }
642    /* TODO check error condition? */
643    }
644
645    if (c->iovsize > IOV_LIST_HIGHWAT) {
646        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
647        if (newbuf) {
648            c->iov = newbuf;
649            c->iovsize = IOV_LIST_INITIAL;
650        }
651    /* TODO check return value */
652    }
653}
654
655/**
656 * Convert a state name to a human readable form.
657 */
658const char *state_text(enum conn_states state) {
659    const char* const statenames[] = { "conn_listening",
660                                       "conn_new_cmd",
661                                       "conn_waiting",
662                                       "conn_read",
663                                       "conn_parse_cmd",
664                                       "conn_write",
665                                       "conn_nread",
666                                       "conn_swallow",
667                                       "conn_closing",
668                                       "conn_mwrite",
669                                       "conn_pause",
670                                       "conn_connecting" };
671    return statenames[state];
672}
673
674/*
675 * Sets a connection's current state in the state machine. Any special
676 * processing that needs to happen on certain state transitions can
677 * happen here.
678 */
679void conn_set_state(conn* c, enum conn_states state) {
680    cb_assert(c != NULL);
681    cb_assert(state < conn_max_state);
682
683    if (state != c->state) {
684        if (c->funcs != NULL &&
685            c->funcs->conn_state_change != NULL) {
686            c->funcs->conn_state_change(c, state);
687        }
688
689        if (settings.verbose > 2) {
690            moxi_log_write("%d: going from %s to %s\n",
691                    c->sfd, state_text(c->state),
692                    state_text(state));
693        }
694
695        c->state = state;
696
697        if (state == conn_write || state == conn_mwrite) {
698            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
699        }
700    }
701}
702
703/*
704 * Ensures that there is room for another struct iovec in a connection's
705 * iov list.
706 *
707 * Returns 0 on success, -1 on out-of-memory.
708 */
709int ensure_iov_space(conn *c) {
710    cb_assert(c != NULL);
711
712    if (c->iovused >= c->iovsize) {
713        int i, iovnum;
714        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
715                                (c->iovsize * 2) * sizeof(struct iovec));
716        if (! new_iov)
717            return -1;
718        c->iov = new_iov;
719        c->iovsize *= 2;
720
721        /* Point all the msghdr structures at the new list. */
722        for (i = 0, iovnum = 0; i < c->msgused; i++) {
723            c->msglist[i].msg_iov = &c->iov[iovnum];
724            iovnum += c->msglist[i].msg_iovlen;
725        }
726    }
727
728    return 0;
729}
730
731
732/*
733 * Adds data to the list of pending data that will be written out to a
734 * connection.
735 *
736 * Returns 0 on success, -1 on out-of-memory.
737 */
738int add_iov(conn *c, const void *buf, int len) {
739    struct msghdr *m;
740    int leftover;
741    bool limit_to_mtu;
742
743    cb_assert(c != NULL);
744    cb_assert(c->msgused > 0);
745
746    do {
747        m = &c->msglist[c->msgused - 1];
748
749        /*
750         * Limit UDP packets, and the first payloads of TCP replies, to
751         * UDP_MAX_PAYLOAD_SIZE bytes.
752         */
753        limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
754
755        /* We may need to start a new msghdr if this one is full. */
756        if (m->msg_iovlen == IOV_MAX ||
757            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
758            add_msghdr(c);
759            m = &c->msglist[c->msgused - 1];
760        }
761
762        if (ensure_iov_space(c) != 0)
763            return -1;
764
765        /* If the fragment is too big to fit in the datagram, split it up */
766        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
767            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
768            len -= leftover;
769        } else {
770            leftover = 0;
771        }
772
773        m = &c->msglist[c->msgused - 1];
774        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
775        m->msg_iov[m->msg_iovlen].iov_len = len;
776
777        c->msgbytes += len;
778        c->iovused++;
779        m->msg_iovlen++;
780
781        buf = ((char *)buf) + len;
782        len = leftover;
783    } while (leftover > 0);
784
785    return 0;
786}
787
788
789/*
790 * Constructs a set of UDP headers and attaches them to the outgoing messages.
791 */
792static int build_udp_headers(conn *c) {
793    int i;
794    unsigned char *hdr;
795
796    cb_assert(c != NULL);
797
798    if (c->msgused > c->hdrsize) {
799        void *new_hdrbuf;
800        if (c->hdrbuf)
801            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
802        else
803            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
804        if (! new_hdrbuf)
805            return -1;
806        c->hdrbuf = (unsigned char *)new_hdrbuf;
807        c->hdrsize = c->msgused * 2;
808    }
809
810    hdr = c->hdrbuf;
811    for (i = 0; i < c->msgused; i++) {
812        c->msglist[i].msg_iov[0].iov_base = (void*)hdr;
813        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
814        *hdr++ = c->request_id / 256;
815        *hdr++ = c->request_id % 256;
816        *hdr++ = i / 256;
817        *hdr++ = i % 256;
818        *hdr++ = c->msgused / 256;
819        *hdr++ = c->msgused % 256;
820        *hdr++ = 0;
821        *hdr++ = 0;
822        cb_assert((void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
823    }
824
825    return 0;
826}
827
828
829void out_string(conn *c, const char *str) {
830    int len;
831
832    cb_assert(c != NULL);
833
834    if (c->noreply) {
835        if (settings.verbose > 1)
836            moxi_log_write(">%d NOREPLY %s\n", c->sfd, str);
837        c->noreply = false;
838        conn_set_state(c, conn_new_cmd);
839        return;
840    }
841
842    if (settings.verbose > 1)
843        moxi_log_write(">%d %s\n", c->sfd, str);
844
845    len = (int)strlen(str);
846    if ((len + 2) > c->wsize) {
847        /* ought to be always enough. just fail for simplicity */
848        str = "SERVER_ERROR output line too long";
849        len = (int)strlen(str);
850    }
851
852    memcpy(c->wbuf, str, len);
853    memcpy(c->wbuf + len, "\r\n", 2);
854    c->wbytes = len + 2;
855    c->wcurr = c->wbuf;
856
857    conn_set_state(c, conn_write);
858    c->write_and_go = conn_new_cmd;
859    return;
860}
861
862/*
863 * we get here after reading the value in set/add/replace commands. The command
864 * has been stored in c->cmd, and the item is ready in c->item.
865 */
866void complete_nread_ascii(conn *c) {
867    cb_assert(c != NULL);
868
869    item *it = c->item;
870    int comm = c->cmd;
871    enum store_item_type ret;
872
873    cb_mutex_enter(&c->thread->stats.mutex);
874    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
875    cb_mutex_exit(&c->thread->stats.mutex);
876
877    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
878        out_string(c, "CLIENT_ERROR bad data chunk");
879    } else {
880      ret = store_item(it, comm, c);
881
882#ifdef ENABLE_DTRACE
883      uint64_t cas = ITEM_get_cas(it);
884      switch (c->cmd) {
885      case NREAD_ADD:
886          MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
887                                (ret == 1) ? it->nbytes : -1, cas);
888          break;
889      case NREAD_REPLACE:
890          MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
891                                    (ret == 1) ? it->nbytes : -1, cas);
892          break;
893      case NREAD_APPEND:
894          MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
895                                   (ret == 1) ? it->nbytes : -1, cas);
896          break;
897      case NREAD_PREPEND:
898          MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
899                                    (ret == 1) ? it->nbytes : -1, cas);
900          break;
901      case NREAD_SET:
902          MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
903                                (ret == 1) ? it->nbytes : -1, cas);
904          break;
905      case NREAD_CAS:
906          MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes,
907                                cas);
908          break;
909      }
910#endif
911
912      switch (ret) {
913      case STORED:
914          out_string(c, "STORED");
915          break;
916      case EXISTS:
917          out_string(c, "EXISTS");
918          break;
919      case NOT_FOUND:
920          out_string(c, "NOT_FOUND");
921          break;
922      case NOT_STORED:
923          out_string(c, "NOT_STORED");
924          break;
925      default:
926          out_string(c, "SERVER_ERROR Unhandled storage type.");
927      }
928
929    }
930
931    item_remove(c->item);       /* release the c->item reference */
932    c->item = 0;
933}
934
935/**
936 * get a pointer to the start of the request struct for the current command
937 */
938void* binary_get_request(conn *c) {
939    char *ret = c->rcurr;
940    ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
941            c->binary_header.request.extlen);
942
943    cb_assert(ret >= c->rbuf);
944    return ret;
945}
946
947/**
948 * get a pointer to the key in this request
949 */
950char* binary_get_key(conn *c) {
951    return c->rcurr - (c->binary_header.request.keylen);
952}
953
954static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
955    protocol_binary_response_header* header;
956
957    cb_assert(c);
958
959    c->msgcurr = 0;
960    c->msgused = 0;
961    c->iovused = 0;
962    if (add_msghdr(c) != 0) {
963        /* XXX:  out_string is inappropriate here */
964        out_string(c, "SERVER_ERROR out of memory");
965        return;
966    }
967
968    header = (protocol_binary_response_header *)c->wbuf;
969
970    header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
971    header->response.opcode = c->binary_header.request.opcode;
972    header->response.keylen = (uint16_t)htons(key_len);
973
974    header->response.extlen = (uint8_t)hdr_len;
975    header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
976    header->response.status = (uint16_t)htons(err);
977
978    header->response.bodylen = htonl(body_len);
979    header->response.opaque = c->opaque;
980    header->response.cas = mc_swap64(c->cas);
981
982    if (settings.verbose > 1) {
983        moxi_log_write(">%d Writing bin response:\n", c->sfd);
984        cproxy_dump_header(c->sfd, (char *) header->bytes);
985    }
986
987    add_iov(c, c->wbuf, sizeof(header->response));
988}
989
990void write_bin_error(conn *c, protocol_binary_response_status err, int swallow) {
991    const char *errstr = "Unknown error";
992    size_t len;
993
994    switch (err) {
995    case PROTOCOL_BINARY_RESPONSE_ENOMEM:
996        errstr = "Out of memory";
997        break;
998    case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
999        errstr = "Unknown command";
1000        break;
1001    case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
1002        errstr = "Not found";
1003        break;
1004    case PROTOCOL_BINARY_RESPONSE_EINVAL:
1005        errstr = "Invalid arguments";
1006        break;
1007    case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
1008        errstr = "Data exists for key.";
1009        break;
1010    case PROTOCOL_BINARY_RESPONSE_E2BIG:
1011        errstr = "Too large.";
1012        break;
1013    case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
1014        errstr = "Non-numeric server-side value for incr or decr";
1015        break;
1016    case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
1017        errstr = "Not stored.";
1018        break;
1019    case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
1020        errstr = "Auth failure";
1021        break;
1022    case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
1023        errstr = "Auth continue";
1024        break;
1025    case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
1026        errstr = "Not my vbucket";
1027        break;
1028    case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
1029        errstr = "Not supported";
1030        break;
1031    case PROTOCOL_BINARY_RESPONSE_EINTERNAL:
1032        errstr = "Internal error";
1033        break;
1034    case PROTOCOL_BINARY_RESPONSE_EBUSY:
1035        errstr = "System is busy";
1036        break;
1037    case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
1038        errstr = "Temporary failure";
1039        break;
1040    default:
1041        cb_assert(false);
1042        errstr = "UNHANDLED ERROR";
1043        moxi_log_write(">%d UNHANDLED ERROR: %d\n", c->sfd, err);
1044    }
1045
1046    if (settings.verbose > 1) {
1047        moxi_log_write(">%d Writing an error: %d %s\n", c->sfd, (int) err, errstr);
1048    }
1049
1050    len = strlen(errstr);
1051    add_bin_header(c, err, 0, 0, (uint32_t)len);
1052    if (len > 0) {
1053        add_iov(c, errstr, (uint32_t)len);
1054    }
1055    conn_set_state(c, conn_mwrite);
1056    if(swallow > 0) {
1057        c->sbytes = swallow;
1058        c->write_and_go = conn_swallow;
1059    } else {
1060        c->write_and_go = conn_new_cmd;
1061    }
1062}
1063
1064/* Form and send a response to a command over the binary protocol */
1065void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) {
1066    if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1067        c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1068        c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1069        add_bin_header(c, 0, hlen, keylen, dlen);
1070        if(dlen > 0) {
1071            add_iov(c, d, dlen);
1072        }
1073        conn_set_state(c, conn_mwrite);
1074        c->write_and_go = conn_new_cmd;
1075    } else {
1076        conn_set_state(c, conn_new_cmd);
1077    }
1078}
1079
1080/* Byte swap a 64-bit number */
1081uint64_t mc_swap64(uint64_t in) {
1082#ifdef ENDIAN_LITTLE
1083    /* Little endian, flip the bytes around until someone makes a faster/better
1084    * way to do this. */
1085    int64_t rv = 0;
1086    int i = 0;
1087     for(i = 0; i<8; i++) {
1088        rv = (rv << 8) | (in & 0xff);
1089        in >>= 8;
1090     }
1091    return rv;
1092#else
1093    /* big-endian machines don't need byte swapping */
1094    return in;
1095#endif
1096}
1097
1098static void complete_incr_bin(conn *c) {
1099    item *it;
1100    char *key;
1101    size_t nkey;
1102
1103    protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
1104    protocol_binary_request_incr* req = binary_get_request(c);
1105
1106    cb_assert(c != NULL);
1107    cb_assert(c->wsize >= (int) sizeof(*rsp));
1108
1109    /* fix byteorder in the request */
1110    req->message.body.delta = mc_swap64(req->message.body.delta);
1111    req->message.body.initial = mc_swap64(req->message.body.initial);
1112    req->message.body.expiration = ntohl(req->message.body.expiration);
1113    key = binary_get_key(c);
1114    nkey = c->binary_header.request.keylen;
1115
1116    if (settings.verbose) {
1117        size_t i;
1118        moxi_log_write("incr ");
1119
1120        for (i = 0; i < nkey; i++) {
1121            moxi_log_write("%c", key[i]);
1122        }
1123        moxi_log_write(" %lld, %llu, %d\n",
1124                (long long)req->message.body.delta,
1125                (long long)req->message.body.initial,
1126                req->message.body.expiration);
1127    }
1128
1129    it = item_get(key, nkey);
1130    if (it && (c->binary_header.request.cas == 0 ||
1131               c->binary_header.request.cas == ITEM_get_cas(it))) {
1132        /* Weird magic in add_delta forces me to pad here */
1133        char tmpbuf[INCR_MAX_STORAGE_LEN];
1134        protocol_binary_response_status st = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1135
1136        switch(add_delta(c, it, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
1137                         req->message.body.delta, tmpbuf)) {
1138        case OK:
1139            break;
1140        case NON_NUMERIC:
1141            st = PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL;
1142            break;
1143        case EOM:
1144            st = PROTOCOL_BINARY_RESPONSE_ENOMEM;
1145            break;
1146        }
1147
1148        if (st != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1149            write_bin_error(c, st, 0);
1150        } else {
1151            rsp->message.body.value = mc_swap64(strtoull(tmpbuf, NULL, 10));
1152            c->cas = ITEM_get_cas(it);
1153            write_bin_response(c, &rsp->message.body, 0, 0,
1154                               sizeof(rsp->message.body.value));
1155        }
1156
1157        item_remove(it);         /* release our reference */
1158    } else if (!it && req->message.body.expiration != 0xffffffff) {
1159        /* Save some room for the response */
1160        rsp->message.body.value = mc_swap64(req->message.body.initial);
1161        it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
1162                        INCR_MAX_STORAGE_LEN);
1163
1164        if (it != NULL) {
1165            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
1166                     (unsigned long long)req->message.body.initial);
1167
1168            if (store_item(it, NREAD_SET, c)) {
1169                c->cas = ITEM_get_cas(it);
1170                write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
1171            } else {
1172                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1173            }
1174            item_remove(it);         /* release our reference */
1175        } else {
1176            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1177        }
1178    } else if (it) {
1179        /* incorrect CAS */
1180        item_remove(it);         /* release our reference */
1181        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1182    } else {
1183
1184        cb_mutex_enter(&c->thread->stats.mutex);
1185        if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1186            c->thread->stats.incr_misses++;
1187        } else {
1188            c->thread->stats.decr_misses++;
1189        }
1190        cb_mutex_exit(&c->thread->stats.mutex);
1191
1192        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1193    }
1194}
1195
1196static void complete_update_bin(conn *c) {
1197    protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1198    enum store_item_type ret = NOT_STORED;
1199    cb_assert(c != NULL);
1200
1201    item *it = c->item;
1202
1203    cb_mutex_enter(&c->thread->stats.mutex);
1204    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
1205    cb_mutex_exit(&c->thread->stats.mutex);
1206
1207    /* We don't actually receive the trailing two characters in the bin
1208     * protocol, so we're going to just set them here */
1209    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1210    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1211
1212    ret = store_item(it, c->cmd, c);
1213
1214#ifdef ENABLE_DTRACE
1215    uint64_t cas = ITEM_get_cas(it);
1216    switch (c->cmd) {
1217    case NREAD_ADD:
1218        MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
1219                              (ret == STORED) ? it->nbytes : -1, cas);
1220        break;
1221    case NREAD_REPLACE:
1222        MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
1223                                  (ret == STORED) ? it->nbytes : -1, cas);
1224        break;
1225    case NREAD_APPEND:
1226        MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
1227                                 (ret == STORED) ? it->nbytes : -1, cas);
1228        break;
1229    case NREAD_PREPEND:
1230        MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
1231                                 (ret == STORED) ? it->nbytes : -1, cas);
1232        break;
1233    case NREAD_SET:
1234        MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
1235                              (ret == STORED) ? it->nbytes : -1, cas);
1236        break;
1237    }
1238#endif
1239
1240    switch (ret) {
1241    case STORED:
1242        /* Stored */
1243        write_bin_response(c, NULL, 0, 0, 0);
1244        break;
1245    case EXISTS:
1246        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1247        break;
1248    case NOT_FOUND:
1249        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1250        break;
1251    case NOT_STORED:
1252        if (c->cmd == NREAD_ADD) {
1253            eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1254        } else if(c->cmd == NREAD_REPLACE) {
1255            eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1256        } else {
1257            eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1258        }
1259        write_bin_error(c, eno, 0);
1260    }
1261
1262    item_remove(c->item);       /* release the c->item reference */
1263    c->item = 0;
1264}
1265
1266static void process_bin_get(conn *c) {
1267    item *it;
1268
1269    protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1270    char* key = binary_get_key(c);
1271    size_t nkey = c->binary_header.request.keylen;
1272
1273    if (settings.verbose) {
1274        size_t ii;
1275        moxi_log_write("<%d GET ", c->sfd);
1276        for (ii = 0; ii < nkey; ++ii) {
1277            moxi_log_write("%c", key[ii]);
1278        }
1279        moxi_log_write("\n");
1280    }
1281
1282    it = item_get(key, nkey);
1283    if (it) {
1284        /* the length has two unnecessary bytes ("\r\n") */
1285        uint16_t keylen = 0;
1286        uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1287
1288        cb_mutex_enter(&c->thread->stats.mutex);
1289        c->thread->stats.get_cmds++;
1290        c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
1291        cb_mutex_exit(&c->thread->stats.mutex);
1292
1293        MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
1294                              it->nbytes, ITEM_get_cas(it));
1295
1296        if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1297            c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1298            bodylen += (uint32_t)nkey;
1299            keylen = (uint16_t)nkey;
1300        }
1301        add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1302        rsp->message.header.response.cas = mc_swap64(ITEM_get_cas(it));
1303
1304        /* add the flags */
1305        rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1306        add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1307
1308        if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1309            c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1310            add_iov(c, ITEM_key(it), (uint32_t)nkey);
1311        }
1312
1313        /* Add the data minus the CRLF */
1314        add_iov(c, ITEM_data(it), it->nbytes - 2);
1315        conn_set_state(c, conn_mwrite);
1316        /* Remember this command so we can garbage collect it later */
1317        c->item = it;
1318    } else {
1319        cb_mutex_enter(&c->thread->stats.mutex);
1320        c->thread->stats.get_cmds++;
1321        c->thread->stats.get_misses++;
1322        cb_mutex_exit(&c->thread->stats.mutex);
1323
1324        MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1325
1326        if (c->noreply) {
1327            conn_set_state(c, conn_new_cmd);
1328        } else {
1329            if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1330                c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1331                char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1332                add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1333                        0, (uint16_t)nkey, (uint32_t)nkey);
1334                memcpy(ofs, key, nkey);
1335                add_iov(c, ofs, (int)nkey);
1336                conn_set_state(c, conn_mwrite);
1337            } else {
1338                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1339            }
1340        }
1341    }
1342
1343    if (settings.detail_enabled) {
1344        stats_prefix_record_get(key, nkey, NULL != it);
1345    }
1346}
1347
1348static void append_bin_stats(const char *key, const uint16_t klen,
1349                             const char *val, const uint32_t vlen,
1350                             conn *c) {
1351    char *buf = c->stats.buffer + c->stats.offset;
1352    uint32_t bodylen = klen + vlen;
1353    protocol_binary_response_header header = {
1354        .response = {
1355            .magic = (uint8_t)PROTOCOL_BINARY_RES,
1356            .opcode = PROTOCOL_BINARY_CMD_STAT,
1357            .keylen = (uint16_t)htons(klen),
1358            .datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1359            .bodylen = htonl(bodylen),
1360            .opaque = c->opaque
1361        }
1362    };
1363
1364    memcpy(buf, header.bytes, sizeof(header.response));
1365    buf += sizeof(header.response);
1366
1367    if (klen > 0) {
1368        memcpy(buf, key, klen);
1369        buf += klen;
1370
1371        if (vlen > 0) {
1372            memcpy(buf, val, vlen);
1373        }
1374    }
1375
1376    c->stats.offset += sizeof(header.response) + bodylen;
1377}
1378
1379static void append_ascii_stats(const char *key, const uint16_t klen,
1380                               const char *val, const uint32_t vlen,
1381                               conn *c) {
1382    char *pos = c->stats.buffer + c->stats.offset;
1383    uint32_t nbytes = 0;
1384    size_t remaining = c->stats.size - c->stats.offset;
1385    size_t room = remaining - 1;
1386
1387    if (klen == 0 && vlen == 0) {
1388        nbytes = snprintf(pos, room, "END\r\n");
1389    } else if (vlen == 0) {
1390        nbytes = snprintf(pos, room, "STAT %s\r\n", key);
1391    } else {
1392        nbytes = snprintf(pos, room, "STAT %s %s\r\n", key, val);
1393    }
1394
1395    c->stats.offset += nbytes;
1396}
1397
1398static bool grow_stats_buf(conn *c, size_t needed) {
1399    size_t nsize = c->stats.size;
1400    size_t available = nsize - c->stats.offset;
1401    bool rv = true;
1402
1403    /* Special case: No buffer -- need to allocate fresh */
1404    if (c->stats.buffer == NULL) {
1405        nsize = 1024;
1406        available = c->stats.size = c->stats.offset = 0;
1407    }
1408
1409    while (needed > available) {
1410        cb_assert(nsize > 0);
1411        nsize = nsize << 1;
1412        available = nsize - c->stats.offset;
1413    }
1414
1415    if (nsize != c->stats.size) {
1416        char *ptr = realloc(c->stats.buffer, nsize);
1417        if (ptr) {
1418            c->stats.buffer = ptr;
1419            c->stats.size = nsize;
1420        } else {
1421            rv = false;
1422        }
1423    }
1424
1425    return rv;
1426}
1427
1428static void append_stats(const char *key, const uint16_t klen,
1429                  const char *val, const uint32_t vlen,
1430                  const void *cookie)
1431{
1432    /* value without a key is invalid */
1433    if (klen == 0 && vlen > 0) {
1434        return ;
1435    }
1436
1437    conn *c = (conn*)cookie;
1438
1439    if (IS_BINARY(c->protocol)) {
1440        size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1441        if (!grow_stats_buf(c, needed)) {
1442            return ;
1443        }
1444        append_bin_stats(key, klen, val, vlen, c);
1445    } else {
1446        size_t needed = vlen + klen + 10; /* 10 == "STAT = \r\n" */
1447        if (!grow_stats_buf(c, needed)) {
1448            return ;
1449        }
1450        append_ascii_stats(key, klen, val, vlen, c);
1451    }
1452
1453    cb_assert(c->stats.offset <= c->stats.size);
1454}
1455
1456void process_bin_proxy_stats(conn *c) {
1457    struct proxy_stats_cmd_info psci = {
1458        .do_info = false,
1459        .do_settings = false,
1460        .do_behaviors = false,
1461        .do_frontcache = false,
1462        .do_keystats = false,
1463        .do_stats = true,
1464        .do_zeros = true
1465    };
1466
1467    /* proxy_stats_dump_proxy_main(&append_stats, c, &psci); */
1468    proxy_stats_dump_proxies(&append_stats, c, &psci);
1469
1470    /* Append termination package and start the transfer */
1471    append_stats(NULL, 0, NULL, 0, c);
1472    if (c->stats.buffer == NULL) {
1473        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1474    } else {
1475        write_and_free(c, c->stats.buffer, c->stats.offset);
1476        c->stats.buffer = NULL;
1477    }
1478}
1479
1480static void process_bin_stat(conn *c) {
1481    char *subcommand = binary_get_key(c);
1482    size_t nkey = c->binary_header.request.keylen;
1483
1484    if (settings.verbose) {
1485        size_t ii;
1486        moxi_log_write("<%d STATS ", c->sfd);
1487        for (ii = 0; ii < nkey; ++ii) {
1488            moxi_log_write("%c", subcommand[ii]);
1489        }
1490        moxi_log_write("\n");
1491    }
1492
1493    if (nkey == 0) {
1494        /* request all statistics */
1495        server_stats(&append_stats, c, NULL);
1496        (void)get_stats(NULL, 0, &append_stats, c);
1497    } else if (strncmp(subcommand, "reset", 5) == 0) {
1498        stats_reset();
1499    } else if (strncmp(subcommand, "settings", 8) == 0) {
1500        process_stat_settings(&append_stats, c, NULL);
1501    } else if (strncmp(subcommand, "detail", 6) == 0) {
1502        char *subcmd_pos = subcommand + 6;
1503        if (strncmp(subcmd_pos, " dump", 5) == 0) {
1504            int len;
1505            char *dump_buf = stats_prefix_dump(&len);
1506            if (dump_buf == NULL || len <= 0) {
1507                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1508                return ;
1509            } else {
1510                append_stats("detailed", (uint16_t)strlen("detailed"), dump_buf, len, c);
1511                free(dump_buf);
1512            }
1513        } else if (strncmp(subcmd_pos, " on", 3) == 0) {
1514            settings.detail_enabled = 1;
1515        } else if (strncmp(subcmd_pos, " off", 4) == 0) {
1516            settings.detail_enabled = 0;
1517        } else {
1518            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1519            return;
1520        }
1521    } else {
1522        if (get_stats(subcommand, (int)nkey, &append_stats, c)) {
1523            if (c->stats.buffer == NULL) {
1524                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1525            } else {
1526                write_and_free(c, c->stats.buffer, c->stats.offset);
1527                c->stats.buffer = NULL;
1528            }
1529        } else {
1530            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1531        }
1532
1533        return;
1534    }
1535
1536    /* Append termination package and start the transfer */
1537    append_stats(NULL, 0, NULL, 0, c);
1538    if (c->stats.buffer == NULL) {
1539        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1540    } else {
1541        write_and_free(c, c->stats.buffer, c->stats.offset);
1542        c->stats.buffer = NULL;
1543    }
1544}
1545
1546void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1547    cb_assert(c);
1548    c->substate = next_substate;
1549    c->rlbytes = c->keylen + extra;
1550
1551    /* Ok... do we have room for the extras and the key in the input buffer? */
1552    ptrdiff_t offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1553    if (c->rlbytes > c->rsize - offset) {
1554        int nsize = c->rsize;
1555        int size = c->rlbytes + sizeof(protocol_binary_request_header);
1556
1557        while (size > nsize) {
1558            nsize *= 2;
1559        }
1560
1561        if (nsize != c->rsize) {
1562            if (settings.verbose) {
1563                moxi_log_write("%d: Need to grow buffer from %lu to %lu\n",
1564                        c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1565            }
1566            char *newm = realloc(c->rbuf, nsize);
1567            if (newm == NULL) {
1568                if (settings.verbose) {
1569                    moxi_log_write("%d: Failed to grow buffer.. closing connection\n",
1570                            c->sfd);
1571                }
1572                conn_set_state(c, conn_closing);
1573                return;
1574            }
1575
1576            c->rbuf= newm;
1577            /* rcurr should point to the same offset in the packet */
1578            c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1579            c->rsize = nsize;
1580        }
1581        if (c->rbuf != c->rcurr) {
1582            memmove(c->rbuf, c->rcurr, c->rbytes);
1583            c->rcurr = c->rbuf;
1584            if (settings.verbose) {
1585                moxi_log_write("%d: Repack input buffer\n", c->sfd);
1586            }
1587        }
1588    }
1589
1590    /* preserve the header in the buffer.. */
1591    c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1592    conn_set_state(c, conn_nread);
1593}
1594
1595void process_bin_noreply(conn *c) {
1596    cb_assert(c);
1597    c->noreply = true;
1598    switch (c->binary_header.request.opcode) {
1599    case PROTOCOL_BINARY_CMD_SETQ:
1600        c->cmd = PROTOCOL_BINARY_CMD_SET;
1601        break;
1602    case PROTOCOL_BINARY_CMD_ADDQ:
1603        c->cmd = PROTOCOL_BINARY_CMD_ADD;
1604        break;
1605    case PROTOCOL_BINARY_CMD_REPLACEQ:
1606        c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
1607        break;
1608    case PROTOCOL_BINARY_CMD_DELETEQ:
1609        c->cmd = PROTOCOL_BINARY_CMD_DELETE;
1610        break;
1611    case PROTOCOL_BINARY_CMD_INCREMENTQ:
1612        c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
1613        break;
1614    case PROTOCOL_BINARY_CMD_DECREMENTQ:
1615        c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
1616        break;
1617    case PROTOCOL_BINARY_CMD_QUITQ:
1618        c->cmd = PROTOCOL_BINARY_CMD_QUIT;
1619        break;
1620    case PROTOCOL_BINARY_CMD_FLUSHQ:
1621        c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
1622        break;
1623    case PROTOCOL_BINARY_CMD_APPENDQ:
1624        c->cmd = PROTOCOL_BINARY_CMD_APPEND;
1625        break;
1626    case PROTOCOL_BINARY_CMD_PREPENDQ:
1627        c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
1628        break;
1629    case PROTOCOL_BINARY_CMD_GETQ:
1630        c->cmd = PROTOCOL_BINARY_CMD_GET;
1631        break;
1632    case PROTOCOL_BINARY_CMD_GETKQ:
1633        c->cmd = PROTOCOL_BINARY_CMD_GETK;
1634        break;
1635    default:
1636        c->noreply = false;
1637    }
1638}
1639
1640void dispatch_bin_command(conn *c) {
1641    int protocol_error = 0;
1642
1643    uint32_t extlen = c->binary_header.request.extlen;
1644    uint32_t keylen = c->binary_header.request.keylen;
1645    uint32_t bodylen = c->binary_header.request.bodylen;
1646
1647    if ((extlen + keylen) > bodylen) {
1648        /* Just write an error message and disconnect the client */
1649        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1650        if (settings.verbose) {
1651            moxi_log_write("Protocol error (opcode %02x), close connection %d\n",
1652                           c->binary_header.request.opcode,
1653                           c->sfd);
1654        }
1655        c->write_and_go = conn_closing;
1656        return;
1657    }
1658
1659    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
1660
1661    process_bin_noreply(c);
1662
1663    switch (c->cmd) {
1664        case PROTOCOL_BINARY_CMD_VERSION:
1665            if (extlen == 0 && keylen == 0 && bodylen == 0) {
1666                write_bin_response(c, VERSION, 0, 0, (int)strlen(VERSION));
1667            } else {
1668                protocol_error = 1;
1669            }
1670            break;
1671        case PROTOCOL_BINARY_CMD_FLUSH:
1672            if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
1673                bin_read_key(c, bin_read_flush_exptime, extlen);
1674            } else {
1675                protocol_error = 1;
1676            }
1677            break;
1678        case PROTOCOL_BINARY_CMD_NOOP:
1679            if (extlen == 0 && keylen == 0 && bodylen == 0) {
1680                write_bin_response(c, NULL, 0, 0, 0);
1681            } else {
1682                protocol_error = 1;
1683            }
1684            break;
1685        case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
1686        case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
1687        case PROTOCOL_BINARY_CMD_REPLACE:
1688            if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
1689                bin_read_key(c, bin_reading_set_header, 8);
1690            } else {
1691                protocol_error = 1;
1692            }
1693            break;
1694        case PROTOCOL_BINARY_CMD_GETQ:  /* FALLTHROUGH */
1695        case PROTOCOL_BINARY_CMD_GET:   /* FALLTHROUGH */
1696        case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
1697        case PROTOCOL_BINARY_CMD_GETK:
1698        case PROTOCOL_BINARY_CMD_GETL:
1699            if (extlen == 0 && bodylen == keylen && keylen > 0) {
1700                bin_read_key(c, bin_reading_get_key, 0);
1701            } else {
1702                protocol_error = 1;
1703            }
1704            break;
1705        case PROTOCOL_BINARY_CMD_DELETE:
1706            if (keylen > 0 && extlen == 0 && bodylen == keylen) {
1707                bin_read_key(c, bin_reading_del_header, extlen);
1708            } else {
1709                protocol_error = 1;
1710            }
1711            break;
1712        case PROTOCOL_BINARY_CMD_INCREMENT:
1713        case PROTOCOL_BINARY_CMD_DECREMENT:
1714            if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
1715                bin_read_key(c, bin_reading_incr_header, 20);
1716            } else {
1717                protocol_error = 1;
1718            }
1719            break;
1720        case PROTOCOL_BINARY_CMD_APPEND:
1721        case PROTOCOL_BINARY_CMD_PREPEND:
1722            if (keylen > 0 && extlen == 0) {
1723                bin_read_key(c, bin_reading_set_header, 0);
1724            } else {
1725                protocol_error = 1;
1726            }
1727            break;
1728        case PROTOCOL_BINARY_CMD_STAT:
1729            if (extlen == 0) {
1730                bin_read_key(c, bin_reading_stat, 0);
1731            } else {
1732                protocol_error = 1;
1733            }
1734            break;
1735        case PROTOCOL_BINARY_CMD_QUIT:
1736            if (keylen == 0 && extlen == 0 && bodylen == 0) {
1737                write_bin_response(c, NULL, 0, 0, 0);
1738                c->write_and_go = conn_closing;
1739                if (c->noreply) {
1740                    conn_set_state(c, conn_closing);
1741                }
1742            } else {
1743                protocol_error = 1;
1744            }
1745            break;
1746        default:
1747            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
1748    }
1749
1750    if (protocol_error) {
1751        /* Just write an error message and disconnect the client */
1752        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1753        if (settings.verbose) {
1754            moxi_log_write("Protocol error (opcode %02x), close connection %d\n",
1755                    c->binary_header.request.opcode, c->sfd);
1756        }
1757        c->write_and_go = conn_closing;
1758    }
1759}
1760
1761static void process_bin_update(conn *c) {
1762    char *key;
1763    int nkey;
1764    int vlen;
1765    item *it;
1766    protocol_binary_request_set* req = binary_get_request(c);
1767
1768    cb_assert(c != NULL);
1769
1770    key = binary_get_key(c);
1771    nkey = c->binary_header.request.keylen;
1772
1773    /* fix byteorder in the request */
1774    req->message.body.flags = ntohl(req->message.body.flags);
1775    req->message.body.expiration = ntohl(req->message.body.expiration);
1776
1777    vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
1778
1779    if (settings.verbose) {
1780        int ii;
1781        if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
1782            moxi_log_write("<%d ADD ", c->sfd);
1783        } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
1784            moxi_log_write("<%d SET ", c->sfd);
1785        } else {
1786            moxi_log_write("<%d REPLACE ", c->sfd);
1787        }
1788        for (ii = 0; ii < nkey; ++ii) {
1789            moxi_log_write("%c", key[ii]);
1790        }
1791
1792        if (settings.verbose > 1) {
1793            moxi_log_write(" Value len is %d", vlen);
1794        }
1795        moxi_log_write("\n");
1796    }
1797
1798    if (settings.detail_enabled) {
1799        stats_prefix_record_set(key, nkey);
1800    }
1801
1802    it = item_alloc(key, nkey, req->message.body.flags,
1803            c->funcs->conn_realtime(req->message.body.expiration), vlen+2);
1804
1805    if (it == 0) {
1806        if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) {
1807            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
1808        } else {
1809            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1810        }
1811
1812        /* Avoid stale data persisting in cache because we failed alloc.
1813         * Unacceptable for SET. Anywhere else too? */
1814        if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
1815            it = item_get(key, nkey);
1816            if (it) {
1817                item_unlink(it);
1818                item_remove(it);
1819            }
1820        }
1821
1822        /* swallow the data line */
1823        c->write_and_go = conn_swallow;
1824        return;
1825    }
1826
1827    ITEM_set_cas(it, c->binary_header.request.cas);
1828
1829    switch (c->cmd) {
1830        case PROTOCOL_BINARY_CMD_ADD:
1831            c->cmd = NREAD_ADD;
1832            break;
1833        case PROTOCOL_BINARY_CMD_SET:
1834            c->cmd = NREAD_SET;
1835            break;
1836        case PROTOCOL_BINARY_CMD_REPLACE:
1837            c->cmd = NREAD_REPLACE;
1838            break;
1839        default:
1840            cb_assert(0);
1841    }
1842
1843    if (ITEM_get_cas(it) != 0) {
1844        c->cmd = NREAD_CAS;
1845    }
1846
1847    c->item = it;
1848    c->ritem = ITEM_data(it);
1849    c->rlbytes = vlen;
1850    conn_set_state(c, conn_nread);
1851    c->substate = bin_read_set_value;
1852}
1853
1854static void process_bin_append_prepend(conn *c) {
1855    char *key;
1856    int nkey;
1857    int vlen;
1858    item *it;
1859
1860    cb_assert(c != NULL);
1861
1862    key = binary_get_key(c);
1863    nkey = c->binary_header.request.keylen;
1864    vlen = c->binary_header.request.bodylen - nkey;
1865
1866    if (settings.verbose > 1) {
1867        moxi_log_write("Value len is %d\n", vlen);
1868    }
1869
1870    if (settings.detail_enabled) {
1871        stats_prefix_record_set(key, nkey);
1872    }
1873
1874    it = item_alloc(key, nkey, 0, 0, vlen+2);
1875
1876    if (it == 0) {
1877        if (! item_size_ok(nkey, 0, vlen + 2)) {
1878            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
1879        } else {
1880            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1881        }
1882        /* swallow the data line */
1883        c->write_and_go = conn_swallow;
1884        return;
1885    }
1886
1887    ITEM_set_cas(it, c->binary_header.request.cas);
1888
1889    switch (c->cmd) {
1890        case PROTOCOL_BINARY_CMD_APPEND:
1891            c->cmd = NREAD_APPEND;
1892            break;
1893        case PROTOCOL_BINARY_CMD_PREPEND:
1894            c->cmd = NREAD_PREPEND;
1895            break;
1896        default:
1897            cb_assert(0);
1898    }
1899
1900    c->item = it;
1901    c->ritem = ITEM_data(it);
1902    c->rlbytes = vlen;
1903    conn_set_state(c, conn_nread);
1904    c->substate = bin_read_set_value;
1905}
1906
1907static void process_bin_flush(conn *c) {
1908    time_t exptime = 0;
1909    protocol_binary_request_flush* req = binary_get_request(c);
1910
1911    if (c->binary_header.request.extlen == sizeof(req->message.body)) {
1912        exptime = ntohl(req->message.body.expiration);
1913    }
1914
1915    set_current_time();
1916
1917    if (exptime > 0) {
1918        settings.oldest_live = c->funcs->conn_realtime(exptime) - 1;
1919    } else {
1920        settings.oldest_live = current_time - 1;
1921    }
1922    item_flush_expired();
1923
1924    cb_mutex_enter(&c->thread->stats.mutex);
1925    c->thread->stats.flush_cmds++;
1926    cb_mutex_exit(&c->thread->stats.mutex);
1927
1928    write_bin_response(c, NULL, 0, 0, 0);
1929}
1930
1931static void process_bin_delete(conn *c) {
1932    item *it;
1933
1934    protocol_binary_request_delete* req = binary_get_request(c);
1935
1936    char* key = binary_get_key(c);
1937    size_t nkey = c->binary_header.request.keylen;
1938
1939    cb_assert(c != NULL);
1940
1941    if (settings.verbose) {
1942        moxi_log_write("Deleting %s\n", key);
1943    }
1944
1945    if (settings.detail_enabled) {
1946        stats_prefix_record_delete(key, nkey);
1947    }
1948
1949    it = item_get(key, nkey);
1950    if (it) {
1951        uint64_t cas=mc_swap64(req->message.header.request.cas);
1952        if (cas == 0 || cas == ITEM_get_cas(it)) {
1953            MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
1954            item_unlink(it);
1955            write_bin_response(c, NULL, 0, 0, 0);
1956        } else {
1957            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1958        }
1959        item_remove(it);      /* release our reference */
1960    } else {
1961        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1962    }
1963}
1964
1965void complete_nread_binary(conn *c) {
1966    cb_assert(c != NULL);
1967    cb_assert(c->cmd >= 0);
1968
1969    switch(c->substate) {
1970    case bin_reading_set_header:
1971        if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
1972                c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
1973            process_bin_append_prepend(c);
1974        } else {
1975            process_bin_update(c);
1976        }
1977        break;
1978    case bin_read_set_value:
1979        complete_update_bin(c);
1980        break;
1981    case bin_reading_get_key:
1982        process_bin_get(c);
1983        break;
1984    case bin_reading_stat:
1985        process_bin_stat(c);
1986        break;
1987    case bin_reading_del_header:
1988        process_bin_delete(c);
1989        break;
1990    case bin_reading_incr_header:
1991        complete_incr_bin(c);
1992        break;
1993    case bin_read_flush_exptime:
1994        process_bin_flush(c);
1995        break;
1996    default:
1997        moxi_log_write("Not handling substate %d\n", c->substate);
1998        cb_assert(0);
1999    }
2000}
2001
2002void reset_cmd_handler(conn *c) {
2003    c->cmd = -1;
2004    c->cmd_curr = -1;
2005    c->substate = bin_no_state;
2006    conn_cleanup(c);
2007    conn_shrink(c);
2008    if (c->rbytes > 0) {
2009        conn_set_state(c, conn_parse_cmd);
2010    } else {
2011        conn_set_state(c, conn_waiting);
2012    }
2013}
2014
2015void complete_nread(conn *c) {
2016    cb_assert(c != NULL);
2017    cb_assert(c->funcs != NULL);
2018
2019    if (IS_ASCII(c->protocol)) {
2020        c->funcs->conn_complete_nread_ascii(c);
2021    } else if (IS_BINARY(c->protocol)) {
2022        c->funcs->conn_complete_nread_binary(c);
2023    }
2024}
2025
2026/*
2027 * Stores an item in the cache according to the semantics of one of the set
2028 * commands. In threaded mode, this is protected by the cache lock.
2029 *
2030 * Returns the state of storage.
2031 */
2032enum store_item_type do_store_item(item *it, int comm, conn *c) {
2033    char *key = ITEM_key(it);
2034    item *old_it = do_item_get(key, it->nkey);
2035    enum store_item_type stored = NOT_STORED;
2036
2037    item *new_it = NULL;
2038    int flags;
2039
2040    if (old_it != NULL && comm == NREAD_ADD) {
2041        /* add only adds a nonexistent item, but promote to head of LRU */
2042        do_item_update(old_it);
2043    } else if (!old_it && (comm == NREAD_REPLACE
2044        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
2045    {
2046        /* replace only replaces an existing value; don't store */
2047    } else if (comm == NREAD_CAS) {
2048        /* validate cas operation */
2049        if(old_it == NULL) {
2050            /* LRU expired */
2051            stored = NOT_FOUND;
2052            cb_mutex_enter(&c->thread->stats.mutex);
2053            c->thread->stats.cas_misses++;
2054            cb_mutex_exit(&c->thread->stats.mutex);
2055        }
2056        else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
2057            /* cas validates */
2058            /* it and old_it may belong to different classes. */
2059            /* I'm updating the stats for the one that's getting pushed out */
2060            cb_mutex_enter(&c->thread->stats.mutex);
2061            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
2062            cb_mutex_exit(&c->thread->stats.mutex);
2063
2064            item_replace(old_it, it);
2065            stored = STORED;
2066        } else {
2067            cb_mutex_enter(&c->thread->stats.mutex);
2068            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
2069            cb_mutex_exit(&c->thread->stats.mutex);
2070
2071            if(settings.verbose > 1) {
2072                moxi_log_write("CAS:  failure: expected %llu, got %llu\n",
2073                        (unsigned long long)ITEM_get_cas(old_it),
2074                        (unsigned long long)ITEM_get_cas(it));
2075            }
2076            stored = EXISTS;
2077        }
2078    } else {
2079        /*
2080         * Append - combine new and old record into single one. Here it's
2081         * atomic and thread-safe.
2082         */
2083        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
2084            /*
2085             * Validate CAS
2086             */
2087            if (ITEM_get_cas(it) != 0) {
2088                /* CAS much be equal */
2089                if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
2090                    stored = EXISTS;
2091                }
2092            }
2093
2094            if (stored == NOT_STORED) {
2095                /* we have it and old_it here - alloc memory to hold both */
2096                /* flags was already lost - so recover them from ITEM_suffix(it) */
2097
2098                flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
2099
2100                new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
2101
2102                if (new_it == NULL) {
2103                    /* SERVER_ERROR out of memory */
2104                    if (old_it != NULL)
2105                        do_item_remove(old_it);
2106
2107                    return NOT_STORED;
2108                }
2109
2110                /* copy data from it and old_it to new_it */
2111
2112                if (comm == NREAD_APPEND) {
2113                    memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
2114                    memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
2115                } else {
2116                    /* NREAD_PREPEND */
2117                    memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
2118                    memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
2119                }
2120
2121                it = new_it;
2122            }
2123        }
2124
2125        if (stored == NOT_STORED) {
2126            if (old_it != NULL)
2127                item_replace(old_it, it);
2128            else
2129                do_item_link(it);
2130
2131            c->cas = ITEM_get_cas(it);
2132
2133            stored = STORED;
2134        }
2135    }
2136
2137    if (old_it != NULL)
2138        do_item_remove(old_it);         /* release our reference */
2139    if (new_it != NULL)
2140        do_item_remove(new_it);
2141
2142    if (stored == STORED) {
2143        c->cas = ITEM_get_cas(it);
2144    }
2145
2146    return stored;
2147}
2148
2149#define COMMAND_TOKEN 0
2150#define SUBCOMMAND_TOKEN 1
2151#define KEY_TOKEN 1
2152
2153#define MAX_TOKENS 8
2154
2155/*
2156 * Tokenize the command string by replacing whitespace with '\0' and update
2157 * the token array tokens with pointer to start of each token and length.
2158 * Returns total number of tokens.  The last valid token is the terminal
2159 * token (value points to the first unprocessed character of the string and
2160 * length zero).
2161 *
2162 * Usage example:
2163 *
2164 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2165 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
2166 *          ...
2167 *      }
2168 *      ncommand = tokens[ix].value - command;
2169 *      command  = tokens[ix].value;
2170 *   }
2171 */
2172size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
2173    char *s, *e;
2174    size_t ntokens = 0;
2175
2176    cb_assert(command != NULL && tokens != NULL && max_tokens > 1);
2177
2178    for (s = e = command; ntokens < max_tokens - 1; ++e) {
2179        if (*e == ' ') {
2180            if (s != e) {
2181                tokens[ntokens].value = s;
2182                tokens[ntokens].length = e - s;
2183                ntokens++;
2184                *e = '\0';
2185            }
2186            s = e + 1;
2187        }
2188        else if (*e == '\0') {
2189            if (s != e) {
2190                tokens[ntokens].value = s;
2191                tokens[ntokens].length = e - s;
2192                ntokens++;
2193            }
2194
2195            break; /* string end */
2196        }
2197    }
2198
2199    /*
2200     * If we scanned the whole string, the terminal value pointer is null,
2201     * otherwise it is the first unprocessed character.
2202     */
2203    tokens[ntokens].value =  *e == '\0' ? NULL : e;
2204    tokens[ntokens].length = 0;
2205    ntokens++;
2206
2207    return ntokens;
2208}
2209
2210/* set up a connection to write a buffer then free it, used for stats */
2211static void write_and_free(conn *c, char *buf, size_t bytes) {
2212    if (buf) {
2213        c->write_and_free = buf;
2214        c->wcurr = buf;
2215        c->wbytes = (int)bytes;
2216        conn_set_state(c, conn_write);
2217        c->write_and_go = conn_new_cmd;
2218    } else {
2219        out_string(c, "SERVER_ERROR out of memory writing stats");
2220    }
2221}
2222
2223void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens) {
2224    int noreply_index = (int)ntokens - 2;
2225
2226    cb_assert(noreply_index >= 0);
2227
2228    /*
2229      NOTE: this function is not the first place where we are going to
2230      send the reply.  We could send it instead from process_command()
2231      if the request line has wrong number of tokens.  However parsing
2232      malformed line for "noreply" option is not reliable anyway, so
2233      it can't be helped.
2234    */
2235    if (tokens[noreply_index].value
2236        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
2237        c->noreply = true;
2238    }
2239}
2240
2241void append_stat(const char *name, ADD_STAT add_stats, void *c,
2242                 const char *fmt, ...) {
2243    char val_str[STAT_VAL_LEN];
2244    int vlen;
2245    va_list ap;
2246
2247    cb_assert(name);
2248    cb_assert(add_stats);
2249    cb_assert(c);
2250    cb_assert(fmt);
2251
2252    va_start(ap, fmt);
2253    vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
2254    va_end(ap);
2255
2256    add_stats(name, (uint16_t)strlen(name), val_str, vlen, c);
2257}
2258
2259void append_prefix_stat(const char *prefix, const char *name, ADD_STAT add_stats, void *c,
2260                        const char *fmt, ...) {
2261    char val_str[STAT_VAL_LEN];
2262    int vlen;
2263    char *val_free = 0;
2264    char *val;
2265    va_list ap;
2266
2267    cb_assert(name);
2268    cb_assert(add_stats);
2269    cb_assert(c);
2270    cb_assert(fmt);
2271
2272    va_start(ap, fmt);
2273    vlen = vsnprintf(val_str, sizeof(val_str), fmt, ap);
2274    va_end(ap);
2275
2276    if (vlen > (int)sizeof(val_str)-1) {
2277        val_free = malloc(vlen+1);
2278        if (val_free != 0) {
2279            val = val_free;
2280            va_start(ap, fmt);
2281            vsnprintf(val_free, vlen+1, fmt, ap);
2282            va_end(ap);
2283        } else {
2284            val = val_str;
2285            vlen = sizeof(val_str)-1;
2286        }
2287    } else {
2288        val = val_str;
2289    }
2290
2291    if (prefix == NULL) {
2292        add_stats(name, (uint16_t)strlen(name), val, vlen, c);
2293    } else {
2294        char key_str[STAT_KEY_LEN];
2295        strcpy(key_str, prefix); strcat(key_str, name);
2296        add_stats(key_str, (uint16_t)strlen(key_str), val, vlen, c);
2297    }
2298
2299    free(val_free);
2300}
2301
2302static void process_stats_detail(conn *c, const char *command) {
2303    cb_assert(c != NULL);
2304
2305    if (strcmp(command, "on") == 0) {
2306        settings.detail_enabled = 1;
2307        out_string(c, "OK");
2308    }
2309    else if (strcmp(command, "off") == 0) {
2310        settings.detail_enabled = 0;
2311        out_string(c, "OK");
2312    }
2313    else if (strcmp(command, "dump") == 0) {
2314        int len;
2315        char *stats_dump = stats_prefix_dump(&len);
2316        write_and_free(c, stats_dump, len);
2317    }
2318    else {
2319        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
2320    }
2321}
2322
2323/* return server specific stats only */
2324void server_stats(ADD_STAT add_stats, void *c, const char *prefix) {
2325#ifndef _MSC_VER
2326    pid_t pid = getpid();
2327#endif
2328    rel_time_t now = current_time;
2329
2330    struct thread_stats thread_stats;
2331    threadlocal_stats_aggregate(&thread_stats);
2332    struct slab_stats slab_stats;
2333    slab_stats_aggregate(&thread_stats, &slab_stats);
2334
2335#ifndef WIN32
2336    struct rusage usage;
2337    getrusage(RUSAGE_SELF, &usage);
2338#endif /* !WIN32 */
2339
2340    STATS_LOCK();
2341#ifndef _MSC_VER
2342    APPEND_PREFIX_STAT("pid", "%lu", (long)pid);
2343#endif
2344    APPEND_PREFIX_STAT("uptime", "%u", now);
2345    APPEND_PREFIX_STAT("time", "%ld", now + (long)process_started);
2346    APPEND_PREFIX_STAT("version", "%s", VERSION);
2347    APPEND_PREFIX_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2348
2349#ifndef WIN32
2350    if (1) {
2351        char rusage_buf[128];
2352        snprintf(rusage_buf, sizeof(rusage_buf),  "%ld.%06ld",
2353                 (long)usage.ru_utime.tv_sec, (long)usage.ru_utime.tv_usec);
2354        APPEND_PREFIX_STAT("rusage_user", "%s", rusage_buf);
2355        snprintf(rusage_buf, sizeof(rusage_buf),  "%ld.%06ld",
2356                 (long)usage.ru_stime.tv_sec, (long)usage.ru_stime.tv_usec);
2357        APPEND_PREFIX_STAT("rusage_system", "%s", rusage_buf);
2358    }
2359#endif
2360
2361    APPEND_PREFIX_STAT("curr_connections", "%u", stats.curr_conns - 1);
2362    APPEND_PREFIX_STAT("total_connections", "%u", stats.total_conns);
2363    APPEND_PREFIX_STAT("connection_structures", "%u", stats.conn_structs);
2364    APPEND_PREFIX_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
2365    APPEND_PREFIX_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
2366    APPEND_PREFIX_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
2367    APPEND_PREFIX_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
2368    APPEND_PREFIX_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
2369    APPEND_PREFIX_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
2370    APPEND_PREFIX_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
2371    APPEND_PREFIX_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
2372    APPEND_PREFIX_STAT("incr_hits", "%llu", (unsigned long long)slab_stats.incr_hits);
2373    APPEND_PREFIX_STAT("decr_misses", "%llu", (unsigned long long)thread_stats.decr_misses);
2374    APPEND_PREFIX_STAT("decr_hits", "%llu", (unsigned long long)slab_stats.decr_hits);
2375    APPEND_PREFIX_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
2376    APPEND_PREFIX_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
2377    APPEND_PREFIX_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
2378    APPEND_PREFIX_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
2379    APPEND_PREFIX_STAT("bytes_written", "%llu", (unsigned long long)thread_stats.bytes_written);
2380    APPEND_PREFIX_STAT("limit_maxbytes", "%llu", (unsigned long long)settings.maxbytes);
2381    APPEND_PREFIX_STAT("accepting_conns", "%u", stats.accepting_conns);
2382    APPEND_PREFIX_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
2383    APPEND_PREFIX_STAT("threads", "%d", settings.num_threads);
2384    APPEND_PREFIX_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
2385
2386    STATS_UNLOCK();
2387}
2388
2389void process_stat_settings(ADD_STAT add_stats, void *c, const char *prefix) {
2390    cb_assert(add_stats);
2391    APPEND_PREFIX_STAT("maxbytes", "%u", (unsigned int)settings.maxbytes);
2392    APPEND_PREFIX_STAT("maxconns", "%d", settings.maxconns);
2393    APPEND_PREFIX_STAT("tcpport", "%d", settings.port);
2394    APPEND_PREFIX_STAT("udpport", "%d", settings.udpport);
2395    APPEND_PREFIX_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
2396    APPEND_PREFIX_STAT("verbosity", "%d", settings.verbose);
2397    APPEND_PREFIX_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
2398    APPEND_PREFIX_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
2399    APPEND_PREFIX_STAT("domain_socket", "%s",
2400                settings.socketpath ? settings.socketpath : "NULL");
2401    APPEND_PREFIX_STAT("umask", "%o", settings.access);
2402    APPEND_PREFIX_STAT("growth_factor", "%.2f", settings.factor);
2403    APPEND_PREFIX_STAT("chunk_size", "%d", settings.chunk_size);
2404    APPEND_PREFIX_STAT("num_threads", "%d", settings.num_threads);
2405    APPEND_PREFIX_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
2406    APPEND_PREFIX_STAT("detail_enabled", "%s",
2407                settings.detail_enabled ? "yes" : "no");
2408    APPEND_PREFIX_STAT("reqs_per_event", "%d", settings.reqs_per_event);
2409    APPEND_PREFIX_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
2410    APPEND_PREFIX_STAT("tcp_backlog", "%d", settings.backlog);
2411    APPEND_PREFIX_STAT("binding_protocol", "%s",
2412                prot_text(settings.binding_protocol));
2413}
2414
2415static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
2416    const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
2417    cb_assert(c != NULL);
2418
2419    if (ntokens < 2) {
2420        out_string(c, "CLIENT_ERROR bad command line");
2421        return;
2422    }
2423
2424    if (ntokens == 2) {
2425        server_stats(&append_stats, c, NULL);
2426        (void)get_stats(NULL, 0, &append_stats, c);
2427    } else if (strcmp(subcommand, "reset") == 0) {
2428        stats_reset();
2429        out_string(c, "RESET");
2430        return ;
2431    } else if (strcmp(subcommand, "detail") == 0) {
2432        /* NOTE: how to tackle detail with binary? */
2433        if (ntokens < 4)
2434            process_stats_detail(c, "");  /* outputs the error message */
2435        else
2436            process_stats_detail(c, tokens[2].value);
2437        /* Output already generated */
2438        return ;
2439    } else if (strcmp(subcommand, "settings") == 0) {
2440        process_stat_settings(&append_stats, c, NULL);
2441    } else if (strcmp(subcommand, "cachedump") == 0) {
2442        char *buf;
2443        unsigned int bytes, id, limit = 0;
2444
2445        if (ntokens < 5) {
2446            out_string(c, "CLIENT_ERROR bad command line");
2447            return;
2448        }
2449
2450        if (!safe_strtoul(tokens[2].value, &id) ||
2451            !safe_strtoul(tokens[3].value, &limit)) {
2452            out_string(c, "CLIENT_ERROR bad command line format");
2453            return;
2454        }
2455
2456        buf = item_cachedump(id, limit, &bytes);
2457        write_and_free(c, buf, bytes);
2458        return ;
2459    } else {
2460        /* getting here means that the subcommand is either engine specific or
2461           is invalid. query the engine and see. */
2462        if (get_stats(subcommand, (int)strlen(subcommand), &append_stats, c)) {
2463            if (c->stats.buffer == NULL) {
2464                out_string(c, "SERVER_ERROR out of memory writing stats");
2465            } else {
2466                write_and_free(c, c->stats.buffer, c->stats.offset);
2467                c->stats.buffer = NULL;
2468            }
2469        } else {
2470            out_string(c, "ERROR");
2471        }
2472        return ;
2473    }
2474
2475    /* append terminator and start the transfer */
2476    append_stats(NULL, 0, NULL, 0, c);
2477
2478    if (c->stats.buffer == NULL) {
2479        out_string(c, "SERVER_ERROR out of memory writing stats");
2480    } else {
2481        write_and_free(c, c->stats.buffer, c->stats.offset);
2482        c->stats.buffer = NULL;
2483    }
2484}
2485
2486/* ntokens is overwritten here... shrug.. */
2487static void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
2488    char *key;
2489    size_t nkey;
2490    int i = 0, sid = 0;
2491    item *it;
2492    token_t *key_token = &tokens[KEY_TOKEN];
2493    char *suffix;
2494    int stats_get_cmds   = 0;
2495    int stats_get_misses = 0;
2496    int stats_get_hits[MAX_NUMBER_OF_SLAB_CLASSES];
2497    cb_assert(c != NULL);
2498
2499    memset(&stats_get_hits, 0, sizeof(stats_get_hits));
2500
2501    do {
2502        while(key_token->length != 0) {
2503
2504            key = key_token->value;
2505            nkey = key_token->length;
2506
2507            if(nkey > KEY_MAX_LENGTH) {
2508                cb_mutex_enter(&c->thread->stats.mutex);
2509                c->thread->stats.get_cmds   += stats_get_cmds;
2510                c->thread->stats.get_misses += stats_get_misses;
2511                for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2512                    c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2513                }
2514                cb_mutex_exit(&c->thread->stats.mutex);
2515                out_string(c, "CLIENT_ERROR bad command line format");
2516                return;
2517            }
2518
2519            stats_get_cmds++;
2520            it = item_get(key, nkey);
2521            if (settings.detail_enabled) {
2522                stats_prefix_record_get(key, nkey, NULL != it);
2523            }
2524            if (it) {
2525                if (i >= c->isize) {
2526                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
2527                    if (new_list) {
2528                        c->isize *= 2;
2529                        c->ilist = new_list;
2530                    } else {
2531                        item_remove(it);
2532                        break;
2533                    }
2534                }
2535
2536                /*
2537                 * Construct the response. Each hit adds three elements to the
2538                 * outgoing data list:
2539                 *   "VALUE "
2540                 *   key
2541                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
2542                 */
2543
2544                if (return_cas)
2545                {
2546                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2547                                        it->nbytes, ITEM_get_cas(it));
2548                  /* Goofy mid-flight realloc. */
2549                  if (i >= c->suffixsize) {
2550                    char **new_suffix_list = realloc(c->suffixlist,
2551                                           sizeof(char *) * c->suffixsize * 2);
2552                    if (new_suffix_list) {
2553                        c->suffixsize *= 2;
2554                        c->suffixlist  = new_suffix_list;
2555                    } else {
2556                        item_remove(it);
2557                        break;
2558                    }
2559                  }
2560
2561                  suffix = cache_alloc(c->thread->suffix_cache);
2562                  if (suffix == NULL) {
2563                    cb_mutex_enter(&c->thread->stats.mutex);
2564                    c->thread->stats.get_cmds   += stats_get_cmds;
2565                    c->thread->stats.get_misses += stats_get_misses;
2566                    for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2567                        c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2568                    }
2569                    cb_mutex_exit(&c->thread->stats.mutex);
2570                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
2571                    item_remove(it);
2572                    return;
2573                  }
2574                  *(c->suffixlist + i) = suffix;
2575                  int suffix_len = snprintf(suffix, SUFFIX_SIZE,
2576                                            " %llu\r\n",
2577                                            (unsigned long long)ITEM_get_cas(it));
2578                  if (add_iov(c, "VALUE ", 6) != 0 ||
2579                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2580                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
2581                      add_iov(c, suffix, suffix_len) != 0 ||
2582                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
2583                      {
2584                          item_remove(it);
2585                          break;
2586                      }
2587                }
2588                else
2589                {
2590                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2591                                        it->nbytes, ITEM_get_cas(it));
2592                  if (add_iov(c, "VALUE ", 6) != 0 ||
2593                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2594                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
2595                      {
2596                          item_remove(it);
2597                          break;
2598                      }
2599                }
2600
2601
2602                if (settings.verbose > 1)
2603                    moxi_log_write(">%d sending key %s\n", c->sfd, ITEM_key(it));
2604
2605                /* item_get() has incremented it->refcount for us */
2606                stats_get_hits[it->slabs_clsid]++;
2607                item_update(it);
2608                *(c->ilist + i) = it;
2609                i++;
2610
2611            } else {
2612                stats_get_misses++;
2613                MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
2614            }
2615
2616            key_token++;
2617        }
2618
2619        /*
2620         * If the command string hasn't been fully processed, get the next set
2621         * of tokens.
2622         */
2623        if(key_token->value != NULL) {
2624            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
2625            key_token = tokens;
2626        }
2627
2628    } while(key_token->value != NULL);
2629
2630    c->icurr = c->ilist;
2631    c->ileft = i;
2632    if (return_cas) {
2633        c->suffixcurr = c->suffixlist;
2634        c->suffixleft = i;
2635    }
2636
2637    if (settings.verbose > 1)
2638        moxi_log_write(">%d END\n", c->sfd);
2639
2640    /*
2641        If the loop was terminated because of out-of-memory, it is not
2642        reliable to add END\r\n to the buffer, because it might not end
2643        in \r\n. So we send SERVER_ERROR instead.
2644    */
2645    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
2646        || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
2647        out_string(c, "SERVER_ERROR out of memory writing get response");
2648    }
2649    else {
2650        conn_set_state(c, conn_mwrite);
2651        c->write_and_go = conn_new_cmd;
2652        c->msgcurr = 0;
2653    }
2654
2655    cb_mutex_enter(&c->thread->stats.mutex);
2656    c->thread->stats.get_cmds   += stats_get_cmds;
2657    c->thread->stats.get_misses += stats_get_misses;
2658    for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2659        c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2660    }
2661    cb_mutex_exit(&c->thread->stats.mutex);
2662
2663    return;
2664}
2665
2666void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
2667    char *key;
2668    size_t nkey;
2669    unsigned int flags;
2670    int32_t exptime_int = 0;
2671    time_t exptime;
2672    int vlen;
2673    uint64_t req_cas_id=0;
2674    item *it;
2675
2676    cb_assert(c != NULL);
2677
2678    set_noreply_maybe(c, tokens, ntokens);
2679
2680    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2681        out_string(c, "CLIENT_ERROR bad command line format");
2682        return;
2683    }
2684
2685    key = tokens[KEY_TOKEN].value;
2686    nkey = tokens[KEY_TOKEN].length;
2687
2688    if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
2689           && safe_strtol(tokens[3].value, &exptime_int)
2690           && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
2691        out_string(c, "CLIENT_ERROR bad command line format");
2692        return;
2693    }
2694
2695    /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2696    exptime = exptime_int;
2697
2698    /* does cas value exist? */
2699    if (handle_cas) {
2700        if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
2701            out_string(c, "CLIENT_ERROR bad command line format");
2702            return;
2703        }
2704    }
2705
2706    vlen += 2;
2707    if (vlen < 0 || vlen - 2 < 0) {
2708        out_string(c, "CLIENT_ERROR bad command line format");
2709        return;
2710    }
2711
2712    if (settings.detail_enabled) {
2713        stats_prefix_record_set(key, nkey);
2714    }
2715
2716    it = item_alloc(key, nkey, flags, c->funcs->conn_realtime(exptime), vlen);
2717
2718    if (it == 0) {
2719        if (! item_size_ok(nkey, flags, vlen))
2720            out_string(c, "SERVER_ERROR object too large for cache");
2721        else
2722            out_string(c, "SERVER_ERROR out of memory storing object");
2723        /* swallow the data line */
2724        c->write_and_go = conn_swallow;
2725        c->sbytes = vlen;
2726
2727        /* Avoid stale data persisting in cache because we failed alloc.
2728         * Unacceptable for SET. Anywhere else too? */
2729        if (comm == NREAD_SET) {
2730            it = item_get(key, nkey);
2731            if (it) {
2732                item_unlink(it);
2733                item_remove(it);
2734            }
2735        }
2736
2737        return;
2738    }
2739    ITEM_set_cas(it, req_cas_id);
2740
2741    c->item = it;
2742    c->ritem = ITEM_data(it);
2743    c->rlbytes = it->nbytes;
2744    c->cmd = comm;
2745    conn_set_state(c, conn_nread);
2746}
2747
2748static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2749    char temp[INCR_MAX_STORAGE_LEN];
2750    item *it;
2751    uint64_t delta;
2752    char *key;
2753    size_t nkey;
2754
2755    cb_assert(c != NULL);
2756
2757    set_noreply_maybe(c, tokens, ntokens);
2758
2759    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2760        out_string(c, "CLIENT_ERROR bad command line format");
2761        return;
2762    }
2763
2764    key = tokens[KEY_TOKEN].value;
2765    nkey = tokens[KEY_TOKEN].length;
2766
2767    if (!safe_strtoull(tokens[2].value, &delta)) {
2768        out_string(c, "CLIENT_ERROR invalid numeric delta argument");
2769        return;
2770    }
2771
2772    it = item_get(key, nkey);
2773    if (!it) {
2774        cb_mutex_enter(&c->thread->stats.mutex);
2775        if (incr) {
2776            c->thread->stats.incr_misses++;
2777        } else {
2778            c->thread->stats.decr_misses++;
2779        }
2780        cb_mutex_exit(&c->thread->stats.mutex);
2781
2782        out_string(c, "NOT_FOUND");
2783        return;
2784    }
2785
2786    switch(add_delta(c, it, incr, delta, temp)) {
2787    case OK:
2788        out_string(c, temp);
2789        break;
2790    case NON_NUMERIC:
2791        out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
2792        break;
2793    case EOM:
2794        out_string(c, "SERVER_ERROR out of memory");
2795        break;
2796    }
2797    item_remove(it);         /* release our reference */
2798}
2799
2800/*
2801 * adds a delta value to a numeric item.
2802 *
2803 * c     connection requesting the operation
2804 * it    item to adjust
2805 * incr  true to increment value, false to decrement
2806 * delta amount to adjust value by
2807 * buf   buffer for response string
2808 *
2809 * returns a response string to send back to the client.
2810 */
2811enum delta_result_type do_add_delta(conn *c, item *it, const bool incr,
2812                                    const int64_t delta, char *buf) {
2813    char *ptr;
2814    int64_t value;
2815    int res;
2816
2817    ptr = ITEM_data(it);
2818
2819    if (!safe_strtoull(ptr, (uint64_t *)&value)) {
2820        return NON_NUMERIC;
2821    }
2822
2823    if (incr) {
2824        value += delta;
2825        MEMCACHED_COMMAND_INCR(c->sfd, ITEM_key(it), it->nkey, value);
2826    } else {
2827        if(delta > value) {
2828            value = 0;
2829        } else {
2830            value -= delta;
2831        }
2832        MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
2833    }
2834
2835    cb_mutex_enter(&c->thread->stats.mutex);
2836    if (incr) {
2837        c->thread->stats.slab_stats[it->slabs_clsid].incr_hits++;
2838    } else {
2839        c->thread->stats.slab_stats[it->slabs_clsid].decr_hits++;
2840    }
2841    cb_mutex_exit(&c->thread->stats.mutex);
2842
2843    snprintf(buf, INCR_MAX_STORAGE_LEN, "%llu", (unsigned long long)value);
2844    res = (int)strlen(buf);
2845    if (res + 2 > it->nbytes) { /* need to realloc */
2846        item *new_it;
2847        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2848        if (new_it == 0) {
2849            return EOM;
2850        }
2851        memcpy(ITEM_data(new_it), buf, res);
2852        memcpy(ITEM_data(new_it) + res, "\r\n", 2);
2853        item_replace(it, new_it);
2854        do_item_remove(new_it);       /* release our reference */
2855    } else { /* replace in-place */
2856        /* When changing the value without replacing the item, we
2857           need to update the CAS on the existing item. */
2858        ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
2859
2860        memcpy(ITEM_data(it), buf, res);
2861        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2862    }
2863
2864    return OK;
2865}
2866
2867static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2868    char *key;
2869    size_t nkey;
2870    item *it;
2871
2872    cb_assert(c != NULL);
2873
2874    set_noreply_maybe(c, tokens, ntokens);
2875
2876    key = tokens[KEY_TOKEN].value;
2877    nkey = tokens[KEY_TOKEN].length;
2878
2879    if(nkey > KEY_MAX_LENGTH) {
2880        out_string(c, "CLIENT_ERROR bad command line format");
2881        return;
2882    }
2883
2884    if (settings.detail_enabled) {
2885        stats_prefix_record_delete(key, nkey);
2886    }
2887
2888    it = item_get(key, nkey);
2889    if (it) {
2890        MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
2891
2892        cb_mutex_enter(&c->thread->stats.mutex);
2893        c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
2894        cb_mutex_exit(&c->thread->stats.mutex);
2895
2896        item_unlink(it);
2897        item_remove(it);      /* release our reference */
2898        out_string(c, "DELETED");
2899    } else {
2900        cb_mutex_enter(&c->thread->stats.mutex);
2901        c->thread->stats.delete_misses++;
2902        cb_mutex_exit(&c->thread->stats.mutex);
2903
2904        out_string(c, "NOT_FOUND");
2905    }
2906}
2907
2908void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2909    unsigned int level;
2910
2911    cb_assert(c != NULL);
2912
2913    set_noreply_maybe(c, tokens, ntokens);
2914    if (c->noreply && ntokens == 3) {
2915        /* "verbosity noreply" is not according to the correct syntax */
2916        c->noreply = false;
2917        out_string(c, "ERROR");
2918        return;
2919    }
2920
2921    if (safe_strtoul(tokens[1].value, &level)) {
2922        settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2923        out_string(c, "OK");
2924    } else {
2925        out_string(c, "ERROR");
2926    }
2927}
2928
2929void process_command(conn *c, char *command) {
2930
2931    token_t tokens[MAX_TOKENS];
2932    size_t ntokens;
2933    int comm;
2934
2935    cb_assert(c != NULL);
2936
2937    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
2938
2939    if (settings.verbose > 1)
2940        moxi_log_write("<%d %s\n", c->sfd, command);
2941
2942    /*
2943     * for commands set/add/replace, we build an item and read the data
2944     * directly into it, then continue in nread_complete().
2945     */
2946
2947    c->msgcurr = 0;
2948    c->msgused = 0;
2949    c->iovused = 0;
2950    if (add_msghdr(c) != 0) {
2951        out_string(c, "SERVER_ERROR out of memory preparing response");
2952        return;
2953    }
2954
2955    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2956    if (ntokens >= 3 &&
2957        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2958         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2959
2960        process_get_command(c, tokens, ntokens, false);
2961
2962    } else if ((ntokens == 6 || ntokens == 7) &&
2963               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2964                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2965                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2966                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2967                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2968
2969        process_update_command(c, tokens, ntokens, comm, false);
2970
2971    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2972
2973        process_update_command(c, tokens, ntokens, comm, true);
2974
2975    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2976
2977        process_arithmetic_command(c, tokens, ntokens, 1);
2978
2979    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2980
2981        process_get_command(c, tokens, ntokens, true);
2982
2983    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2984
2985        process_arithmetic_command(c, tokens, ntokens, 0);
2986
2987    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2988
2989        process_delete_command(c, tokens, ntokens);
2990
2991    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2992
2993        process_stat(c, tokens, ntokens);
2994
2995    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2996        time_t exptime = 0;
2997        set_current_time();
2998
2999        set_noreply_maybe(c, tokens, ntokens);
3000
3001        cb_mutex_enter(&c->thread->stats.mutex);
3002        c->thread->stats.flush_cmds++;
3003        cb_mutex_exit(&c->thread->stats.mutex);
3004
3005        if(ntokens == (c->noreply ? 3 : 2)) {
3006            settings.oldest_live = current_time - 1;
3007            item_flush_expired();
3008            out_string(c, "OK");
3009            return;
3010        }
3011
3012        exptime = strtol(tokens[1].value, NULL, 10);
3013        if(errno == ERANGE) {
3014            out_string(c, "CLIENT_ERROR bad command line format");
3015            return;
3016        }
3017
3018        /*
3019          If exptime is zero realtime() would return zero too, and
3020          realtime(exptime) - 1 would overflow to the max unsigned
3021          value.  So we process exptime == 0 the same way we do when
3022          no delay is given at all.
3023        */
3024        if (exptime > 0)
3025            settings.oldest_live = c->funcs->conn_realtime(exptime) - 1;
3026        else /* exptime == 0 */
3027            settings.oldest_live = current_time - 1;
3028        item_flush_expired();
3029        out_string(c, "OK");
3030        return;
3031
3032    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
3033
3034        out_string(c, "VERSION " VERSION);
3035
3036    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
3037
3038        conn_set_state(c, conn_closing);
3039
3040    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
3041                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
3042#ifdef ALLOW_SLABS_REASSIGN
3043
3044        int src, dst, rv;
3045
3046        src = strtol(tokens[2].value, NULL, 10);
3047        dst  = strtol(tokens[3].value, NULL, 10);
3048
3049        if(errno == ERANGE) {
3050            out_string(c, "CLIENT_ERROR bad command line format");
3051            return;
3052        }
3053
3054        rv = slabs_reassign(src, dst);
3055        if (rv == 1) {
3056            out_string(c, "DONE");
3057            return;
3058        }
3059        if (rv == 0) {
3060            out_string(c, "CANT");
3061            return;
3062        }
3063        if (rv == -1) {
3064            out_string(c, "BUSY");
3065            return;
3066        }
3067#else
3068        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
3069#endif
3070    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
3071        process_verbosity_command(c, tokens, ntokens);
3072    } else {
3073        out_string(c, "ERROR");
3074    }
3075    return;
3076}
3077
3078void process_stats_proxy_command(conn *c, token_t *tokens, const size_t ntokens) {
3079    if (ntokens == 4 && strcmp(tokens[2].value, "reset") == 0) {
3080        proxy_td *ptd = c->extra;
3081        if (ptd != NULL) {
3082            proxy_stats_reset(ptd->proxy->main);
3083        }
3084
3085        out_string(c, "OK");
3086        return;
3087    }
3088
3089    if (ntokens == 4 && strcmp(tokens[2].value, "timings") == 0) {
3090        proxy_stats_dump_timings(&append_stats, c);
3091    } else if (ntokens == 4 && strcmp(tokens[2].value, "config") == 0) {
3092        proxy_stats_dump_config(&append_stats, c);
3093    } else {
3094        bool do_all = (ntokens == 3 || strcmp(tokens[2].value, "all") == 0);
3095        struct proxy_stats_cmd_info psci = {
3096            .do_info       = (do_all || strcmp(tokens[2].value, "info") == 0),
3097            .do_settings   = (do_all || strcmp(tokens[2].value, "settings") == 0),
3098            .do_behaviors  = (do_all || strcmp(tokens[2].value, "behaviors") == 0),
3099            .do_frontcache = (do_all || strcmp(tokens[2].value, "frontcache") == 0),
3100            .do_keystats   = (do_all || strcmp(tokens[2].value, "keystats") == 0),
3101            .do_stats      = (do_all || strcmp(tokens[2].value, "stats") == 0),
3102            .do_zeros      = (do_all || ntokens == 4)
3103        };
3104
3105        if (psci.do_info) {
3106            proxy_stats_dump_basic(&append_stats, c, "basic:");
3107        }
3108
3109        if (psci.do_settings) {
3110            process_stat_settings(&append_stats, c, "memcached:settings:");
3111        }
3112
3113        if (psci.do_stats) {
3114            server_stats(&append_stats, c, "memcached:stats:" );
3115        }
3116
3117        proxy_stats_dump_proxy_main(&append_stats, c, &psci);
3118
3119        proxy_stats_dump_proxies(&append_stats, c, &psci);
3120    }
3121
3122    /* append terminator and start the transfer */
3123    append_stats(NULL, 0, NULL, 0, c);
3124
3125    if (c->stats.buffer == NULL) {
3126        out_string(c, "SERVER_ERROR out of memory writing stats");
3127    } else {
3128        write_and_free(c, c->stats.buffer, c->stats.offset);
3129        c->stats.buffer = NULL;
3130    }
3131}
3132
3133/*
3134 * if we have a complete line in the buffer, process it.
3135 */
3136int try_read_command(conn *c) {
3137    cb_assert(c != NULL);
3138    cb_assert(c->rcurr <= (c->rbuf + c->rsize));
3139    cb_assert(c->rbytes > 0);
3140
3141    if (IS_NEGOTIATING(c->protocol) || c->transport == udp_transport)  {
3142        if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
3143            c->protocol = IS_PROXY(c->protocol) ? proxy_upstream_binary_prot : binary_prot;
3144        } else {
3145            c->protocol = IS_PROXY(c->protocol) ? proxy_upstream_ascii_prot : ascii_prot;
3146        }
3147
3148        if (settings.verbose) {
3149            moxi_log_write("%d: Client using the %s protocol\n", c->sfd,
3150                    prot_text(c->protocol));
3151        }
3152    }
3153
3154    if (IS_BINARY(c->protocol)) {
3155        /* Do we have the complete packet header? */
3156        if (c->rbytes < (int)sizeof(c->binary_header)) {
3157            /* need more data! */
3158            return 0;
3159        } else {
3160#ifdef NEED_ALIGN
3161            if (((long)(c->rcurr)) % 8 != 0) {
3162                /* must realign input buffer */
3163                memmove(c->rbuf, c->rcurr, c->rbytes);
3164                c->rcurr = c->rbuf;
3165                if (settings.verbose) {
3166                    moxi_log_write("%d: Realign input buffer\n", c->sfd);
3167                }
3168            }
3169#endif
3170            protocol_binary_request_header* req;
3171            req = (protocol_binary_request_header*)c->rcurr;
3172
3173            if (settings.verbose > 1) {
3174                /* Dump the packet before we convert it to host order */
3175                moxi_log_write("<%d Read binary protocol data:\n", c->sfd);
3176                cproxy_dump_header(c->sfd, (char *) req->bytes);
3177            }
3178
3179            c->binary_header = *req;
3180            c->binary_header.request.keylen = ntohs(req->request.keylen);
3181            c->binary_header.request.bodylen = ntohl(req->request.bodylen);
3182            c->binary_header.request.cas = mc_swap64(req->request.cas);
3183
3184            if (c->binary_header.request.magic != c->funcs->conn_binary_command_magic) {
3185                if (settings.verbose) {
3186                    moxi_log_write("Invalid magic:  %x\n",
3187                            c->binary_header.request.magic);
3188                }
3189                conn_set_state(c, conn_closing);
3190                return -1;
3191            }
3192
3193            /*
3194             * disconnect the client if it tries to send illegal packets
3195             * or packets that's too big. The max limit is 20Mb, so
3196             * disconnect the client if it sends > 21.
3197             */
3198            if (c->binary_header.request.bodylen > (21 * 1024 * 1024)) {
3199                if (settings.verbose) {
3200                    moxi_log_write("Packet too big: 0x%x\n",
3201                            c->binary_header.request.bodylen);
3202                }
3203                conn_set_state(c, conn_closing);
3204                return -1;
3205            }
3206
3207            uint32_t body = c->binary_header.request.keylen;
3208            body += c->binary_header.request.extlen;
3209            if (body > c->binary_header.request.bodylen) {
3210                if (settings.verbose) {
3211                    moxi_log_write("Content of packet bigger than encoded body: 0x%x > 0x%x\n",
3212                                   body, c->binary_header.request.bodylen);
3213                }
3214                conn_set_state(c, conn_closing);
3215                return -1;
3216            }
3217
3218            c->msgcurr = 0;
3219            c->msgused = 0;
3220            c->iovused = 0;
3221            if (add_msghdr(c) != 0) {
3222                out_string(c, "SERVER_ERROR out of memory");
3223                return 0;
3224            }
3225
3226            c->cmd = c->binary_header.request.opcode;
3227            c->keylen = c->binary_header.request.keylen;
3228            c->opaque = c->binary_header.request.opaque;
3229            /* clear the returned cas value */
3230            c->cas = 0;
3231
3232            c->funcs->conn_process_binary_command(c);
3233
3234            c->rbytes -= sizeof(c->binary_header);
3235            c->rcurr += sizeof(c->binary_header);
3236        }
3237    } else {
3238        char *el, *cont;
3239
3240        if (c->rbytes == 0)
3241            return 0;
3242        el = memchr(c->rcurr, '\n', c->rbytes);
3243        if (!el)
3244            return 0;
3245        cont = el + 1;
3246        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
3247            el--;
3248        }
3249        *el = '\0';
3250
3251        cb_assert(cont <= (c->rcurr + c->rbytes));
3252
3253        c->funcs->conn_process_ascii_command(c, c->rcurr);
3254
3255        c->rbytes -= (int)(cont - c->rcurr);
3256        c->rcurr = cont;
3257
3258        cb_assert(c->rcurr <= (c->rbuf + c->rsize));
3259    }
3260
3261    return 1;
3262}
3263
3264/*
3265 * read a UDP request.
3266 */
3267static enum try_read_result try_read_udp(conn *c) {
3268    int res;
3269
3270    cb_assert(c != NULL);
3271
3272    c->request_addr_size = sizeof(c->request_addr);
3273    res = recvfrom(c->sfd, c->rbuf, c->rsize,
3274                   0, &c->request_addr, &c->request_addr_size);
3275    if (res > 8) {
3276        unsigned char *buf = (unsigned char *)c->rbuf;
3277
3278        cb_mutex_enter(&c->thread->stats.mutex);
3279        c->thread->stats.bytes_read += res;
3280        cb_mutex_exit(&c->thread->stats.mutex);
3281
3282        add_bytes_read(c, res);
3283
3284        /* Beginning of UDP packet is the request ID; save it. */
3285        c->request_id = buf[0] * 256 + buf[1];
3286
3287        /* If this is a multi-packet request, drop it. */
3288        if (buf[4] != 0 || buf[5] != 1) {
3289            out_string(c, "SERVER_ERROR multi-packet request not supported");
3290            return READ_NO_DATA_RECEIVED;
3291        }
3292
3293        /* Don't care about any of the rest of the header. */
3294        res -= 8;
3295        memmove(c->rbuf, c->rbuf + 8, res);
3296
3297        c->rbytes += res;
3298        c->rcurr = c->rbuf;
3299        return READ_DATA_RECEIVED;
3300    }
3301    return READ_NO_DATA_RECEIVED;
3302}
3303
3304/*
3305 * read from network as much as we can, handle buffer overflow and connection
3306 * close.
3307 * before reading, move the remaining incomplete fragment of a command
3308 * (if any) to the beginning of the buffer.
3309 * @return enum try_read_result
3310 */
3311static enum try_read_result try_read_network(conn *c) {
3312    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
3313    int res;
3314
3315    cb_assert(c != NULL);
3316
3317    if (c->rcurr != c->rbuf) {
3318        if (c->rbytes != 0) /* otherwise there's nothing to copy */
3319            memmove(c->rbuf, c->rcurr, c->rbytes);
3320        c->rcurr = c->rbuf;
3321    }
3322
3323    while (1) {
3324        if (c->rbytes >= c->rsize) {
3325            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
3326            if (!new_rbuf) {
3327                if (settings.verbose > 0)
3328                    moxi_log_write("Couldn't realloc input buffer\n");
3329                c->rbytes = 0; /* ignore what we read */
3330                out_string(c, "SERVER_ERROR out of memory reading request");
3331                c->write_and_go = conn_closing;
3332                return READ_MEMORY_ERROR;
3333            }
3334            c->rcurr = c->rbuf = new_rbuf;
3335            c->rsize *= 2;
3336        }
3337
3338        int avail = c->rsize - c->rbytes;
3339        cb_assert(avail > 0);
3340        res = recv(c->sfd, c->rbuf + c->rbytes, avail, 0);
3341        if (res > 0) {
3342            cb_mutex_enter(&c->thread->stats.mutex);
3343            c->thread->stats.bytes_read += res;
3344            cb_mutex_exit(&c->thread->stats.mutex);
3345
3346            add_bytes_read(c, res);
3347
3348            gotdata = READ_DATA_RECEIVED;
3349            c->rbytes += res;
3350            if (res == avail) {
3351                continue;
3352            } else {
3353                break;
3354            }
3355        }
3356        if (res == 0) {
3357            return READ_ERROR;
3358        }
3359        if (res == -1) {
3360            if (is_blocking(errno)) {
3361                break;
3362            }
3363            return READ_ERROR;
3364        }
3365    }
3366    return gotdata;
3367}
3368
3369bool update_event_real(conn *c, const int new_flags, const char *update_diag) {
3370    return update_event_timed_real(c, new_flags, NULL, update_diag);
3371}
3372
3373bool update_event_timed_real(conn *c, const int new_flags, struct timeval *timeout, const char *update_diag) {
3374    cb_assert(c != NULL);
3375
3376    struct event_base *base = c->event.ev_base;
3377    if (c->ev_flags == new_flags && timeout == NULL)
3378        return true;
3379
3380    c->update_diag = update_diag;
3381
3382    if (event_del(&c->event) == -1)
3383        return false;
3384    c->ev_flags = new_flags;
3385    if (new_flags == 0 && timeout == NULL)
3386        return true;
3387    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
3388    event_base_set(base, &c->event);
3389    if (event_add(&c->event, timeout) == -1)
3390        return false;
3391    return true;
3392}
3393
3394/*
3395 * Sets whether we are listening for new connections or not.
3396 */
3397void do_accept_new_conns(const bool do_accept) {
3398    conn *next;
3399
3400    for (next = listen_conn; next; next = next->next) {
3401        if (do_accept) {
3402            update_event(next, EV_READ | EV_PERSIST);
3403            if (listen(next->sfd, settings.backlog) != 0) {
3404                perror("listen");
3405            }
3406        }
3407        else {
3408            update_event(next, 0);
3409            if (listen(next->sfd, 0) != 0) {
3410                perror("listen");
3411            }
3412        }
3413    }
3414
3415    if (do_accept) {
3416        STATS_LOCK();
3417        stats.accepting_conns = true;
3418        STATS_UNLOCK();
3419    } else {
3420        STATS_LOCK();
3421        stats.accepting_conns = false;
3422        stats.listen_disabled_num++;
3423        STATS_UNLOCK();
3424    }
3425}
3426
3427/*
3428 * Transmit the next chunk of data from our list of msgbuf structures.
3429 *
3430 * Returns:
3431 *   TRANSMIT_COMPLETE   All done writing.
3432 *   TRANSMIT_INCOMPLETE More data remaining to write.
3433 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
3434 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3435 */
3436static enum transmit_result transmit(conn *c) {
3437    cb_assert(c != NULL);
3438
3439    if (c->msgcurr < c->msgused &&
3440            c->msglist[c->msgcurr].msg_iovlen == 0) {
3441        /* Finished writing the current msg; advance to the next. */
3442        c->msgcurr++;
3443    }
3444    if (c->msgcurr < c->msgused) {
3445        ssize_t res;
3446        struct msghdr *m = &c->msglist[c->msgcurr];
3447#ifdef WIN32
3448        DWORD error;
3449#else
3450        int error;
3451#endif
3452
3453        res = sendmsg(c->sfd, m, 0);
3454#ifdef WIN32
3455        error = WSAGetLastError();
3456#else
3457        error = errno;
3458#endif
3459        if (res > 0) {
3460            cb_mutex_enter(&c->thread->stats.mutex);
3461            c->thread->stats.bytes_written += res;
3462            cb_mutex_exit(&c->thread->stats.mutex);
3463
3464            /* We've written some of the data. Remove the completed
3465               iovec entries from the list of pending writes. */
3466            while (m->msg_iovlen > 0 && res >= (ssize_t)(m->msg_iov->iov_len)) {
3467                res -= (ssize_t)m->msg_iov->iov_len;
3468                m->msg_iovlen--;
3469                m->msg_iov++;
3470            }
3471
3472            /* Might have written just part of the last iovec entry;
3473               adjust it so the next write will do the rest. */
3474            if (res > 0) {
3475                m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
3476                m->msg_iov->iov_len -= res;
3477            }
3478            return TRANSMIT_INCOMPLETE;
3479        }
3480
3481        if (res == -1 && is_blocking(error)) {
3482            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3483                if (settings.verbose > 0)
3484                    moxi_log_write("Couldn't update event\n");
3485                conn_set_state(c, conn_closing);
3486                return TRANSMIT_HARD_ERROR;
3487            }
3488            return TRANSMIT_SOFT_ERROR;
3489        }
3490        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3491           we have a real error, on which we close the connection */
3492        if (settings.verbose > 0)
3493            perror("Failed to write, and not due to blocking");
3494
3495        if (IS_UDP(c->transport))
3496            conn_set_state(c, conn_read);
3497        else
3498            conn_set_state(c, conn_closing);
3499        return TRANSMIT_HARD_ERROR;
3500    } else {
3501        return TRANSMIT_COMPLETE;
3502    }
3503}
3504
3505void drive_machine(conn *c) {
3506    bool stop = false;
3507    SOCKET sfd;
3508    socklen_t addrlen;
3509    struct sockaddr_storage addr;
3510    int nreqs = settings.reqs_per_event;
3511    int res;
3512#ifdef WIN32
3513    DWORD error;
3514#else
3515    int error;
3516#endif
3517
3518    cb_assert(c != NULL);
3519
3520    while (!stop) {
3521        if (settings.verbose > 2) {
3522            moxi_log_write("%d: drive_machine %s\n",
3523                    c->sfd, state_text(c->state));
3524        }
3525
3526        switch(c->state) {
3527        case conn_listening:
3528            addrlen = sizeof(addr);
3529            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == INVALID_SOCKET) {
3530#ifdef WIN32
3531                error = WSAGetLastError();
3532#else
3533                error = errno;
3534#endif
3535                if (is_blocking(error)) {
3536                    /* these are transient, so don't log anything */
3537                    stop = true;
3538                } else if (is_emfile(error)) {
3539                    if (settings.verbose > 0)
3540                        moxi_log_write("Too many open connections\n");
3541                    accept_new_conns(false);
3542                    stop = true;
3543                } else {
3544                    perror("accept()");
3545                    stop = true;
3546                }
3547                break;
3548            }
3549            if (evutil_make_socket_nonblocking(sfd) == -1) {
3550                perror("setting O_NONBLOCK");
3551                closesocket(sfd);
3552                break;
3553            }
3554
3555            dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
3556                              DATA_BUFFER_SIZE,
3557                              c->protocol,
3558                              tcp_transport,
3559                              c->funcs, c->extra);
3560            stop = true;
3561            break;
3562
3563        case conn_connecting:
3564            if (c->funcs->conn_connect != NULL) {
3565                if (c->funcs->conn_connect(c) == true) {
3566                    stop = true;
3567                }
3568            } else {
3569                conn_set_state(c, conn_closing);
3570                update_event(c, 0);
3571            }
3572            break;
3573
3574        case conn_waiting:
3575            if (!update_event(c, EV_READ | EV_PERSIST)) {
3576                if (settings.verbose > 0)
3577                    moxi_log_write("Couldn't update event\n");
3578                conn_set_state(c, conn_closing);
3579                break;
3580            }
3581
3582            conn_set_state(c, conn_read);
3583            stop = true;
3584            break;
3585
3586        case conn_read:
3587            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
3588
3589            switch (res) {
3590            case READ_NO_DATA_RECEIVED:
3591                conn_set_state(c, conn_waiting);
3592                break;
3593            case READ_DATA_RECEIVED:
3594                conn_set_state(c, conn_parse_cmd);
3595                break;
3596            case READ_ERROR:
3597                conn_set_state(c, conn_closing);
3598                break;
3599            case READ_MEMORY_ERROR: /* Failed to allocate more memory */
3600                /* State already set by try_read_network */
3601                break;
3602            }
3603            break;
3604
3605        case conn_parse_cmd:
3606            if (try_read_command(c) == 0) {
3607                /* wee need more data! */
3608                conn_set_state(c, conn_waiting);
3609            }
3610
3611            break;
3612
3613        case conn_new_cmd:
3614            /* Only process nreqs at a time to avoid starving other
3615               connections */
3616
3617            --nreqs;
3618            if (IS_DOWNSTREAM(c->protocol) || nreqs >= 0) {
3619                reset_cmd_handler(c);
3620            } else {
3621                cb_mutex_enter(&c->thread->stats.mutex);
3622                c->thread->stats.conn_yields++;
3623                cb_mutex_exit(&c->thread->stats.mutex);
3624                if (c->rbytes > 0) {
3625                    /* We have already read in data into the input buffer,
3626                       so libevent will most likely not signal read events
3627                       on the socket (unless more data is available. As a
3628                       hack we should just put in a request to write data,
3629                       because that should be possible ;-)
3630                    */
3631                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3632                        if (settings.verbose > 0) {
3633                            moxi_log_write("Couldn't update event\n");
3634                        }
3635
3636                        conn_set_state(c, conn_closing);
3637                    }
3638                }
3639                stop = true;
3640            }
3641            break;
3642
3643        case conn_nread:
3644            cb_assert(c->rlbytes >= 0);
3645            if (c->rlbytes == 0) {
3646                complete_nread(c);
3647                break;
3648            }
3649            /* first check if we have leftovers in the conn_read buffer */
3650            if (c->rbytes > 0) {
3651                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
3652                if (c->ritem != c->rcurr) {
3653                    memmove(c->ritem, c->rcurr, tocopy);
3654                }
3655                c->ritem += tocopy;
3656                c->rlbytes -= tocopy;
3657                c->rcurr += tocopy;
3658                c->rbytes -= tocopy;
3659                if (c->rlbytes == 0) {
3660                    break;
3661                }
3662            }
3663
3664            /*  now try reading from the socket */
3665            res = recv(c->sfd, c->ritem, c->rlbytes, 0);
3666#ifdef WIN32
3667            error = WSAGetLastError();
3668#else
3669            error = errno;
3670#endif
3671
3672            if (res > 0) {
3673                add_bytes_read(c, res);
3674
3675                if (c->rcurr == c->ritem) {
3676                    c->rcurr += res;
3677                }
3678                c->ritem += res;
3679                c->rlbytes -= res;
3680                break;
3681            }
3682            if (res == 0) { /* end of stream */
3683                conn_set_state(c, conn_closing);
3684                break;
3685            }
3686            if (res == -1 && is_blocking(error)) {
3687                if (!update_event(c, EV_READ | EV_PERSIST)) {
3688                    if (settings.verbose > 0)
3689                        moxi_log_write( "Couldn't update event\n");
3690                    conn_set_state(c, conn_closing);
3691                    break;
3692                }
3693                stop = true;
3694                break;
3695            }
3696            /* otherwise we have a real error, on which we close the connection */
3697            if (!is_closed_conn(error) && settings.verbose > 0) {
3698                moxi_log_write("Failed to read, and not due to blocking:\n"
3699                        "errno: %d %s \n"
3700                        "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3701                        errno, strerror(errno),
3702                        (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
3703                        (int)c->rlbytes, (int)c->rsize);
3704            }
3705            conn_set_state(c, conn_closing);
3706            break;
3707
3708        case conn_swallow:
3709            /* we are reading sbytes and throwing them away */
3710            if (c->sbytes == 0) {
3711                conn_set_state(c, conn_new_cmd);
3712                break;
3713            }
3714
3715            /* first check if we have leftovers in the conn_read buffer */
3716            if (c->rbytes > 0) {
3717                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
3718                c->sbytes -= tocopy;
3719                c->rcurr += tocopy;
3720                c->rbytes -= tocopy;
3721                break;
3722            }
3723
3724            /*  now try reading from the socket */
3725            res = recv(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize, 0);
3726#ifdef WIN32
3727            error = WSAGetLastError();
3728#else
3729            error = errno;
3730#endif
3731
3732            if (res > 0) {
3733                cb_mutex_enter(&c->thread->stats.mutex);
3734                c->thread->stats.bytes_read += res;
3735                cb_mutex_exit(&c->thread->stats.mutex);
3736                add_bytes_read(c, res);
3737                c->sbytes -= res;
3738                break;
3739            }
3740            if (res == 0) { /* end of stream */
3741                conn_set_state(c, conn_closing);
3742                break;
3743            }
3744            if (res == -1 && is_blocking(error)) {
3745                if (!update_event(c, EV_READ | EV_PERSIST)) {
3746                    if (settings.verbose > 0)
3747                        moxi_log_write("Couldn't update event\n");
3748                    conn_set_state(c, conn_closing);
3749                    break;
3750                }
3751                stop = true;
3752                break;
3753            }
3754            /* otherwise we have a real error, on which we close the connection */
3755            if (!is_closed_conn(error) && settings.verbose > 0) {
3756                moxi_log_write("Failed to read, and not due to blocking\n");
3757            }
3758            conn_set_state(c, conn_closing);
3759            break;
3760
3761        case conn_write:
3762            /*
3763             * We want to write out a simple response. If we haven't already,
3764             * assemble it into a msgbuf list (this will be a single-entry
3765             * list for TCP or a two-entry list for UDP).
3766             */
3767            if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
3768                if (add_iov(c, c->wcurr, c->wbytes) != 0) {
3769                    if (settings.verbose > 0)
3770                        moxi_log_write("Couldn't build response\n");
3771                    conn_set_state(c, conn_closing);
3772                    break;
3773                }
3774            }
3775
3776            /* fall through... */
3777
3778        case conn_mwrite:
3779          if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
3780            if (settings.verbose > 0)
3781              moxi_log_write("Failed to build UDP headers\n");
3782            conn_set_state(c, conn_closing);
3783            break;
3784          }
3785            switch (transmit(c)) {
3786            case TRANSMIT_COMPLETE:
3787                if (c->state == conn_mwrite) {
3788                    while (c->ileft > 0) {
3789                        item *it = *(c->icurr);
3790                        cb_assert((it->it_flags & ITEM_SLABBED) == 0);
3791                        item_remove(it);
3792                        c->icurr++;
3793                        c->ileft--;
3794                    }
3795                    while (c->suffixleft > 0) {
3796                        char *suffix = *(c->suffixcurr);
3797                        cache_free(c->thread->suffix_cache, suffix);
3798                        c->suffixcurr++;
3799                        c->suffixleft--;
3800                    }
3801                    conn_set_state(c, c->write_and_go);
3802                } else if (c->state == conn_write) {
3803                    if (c->write_and_free) {
3804                        free(c->write_and_free);
3805                        c->write_and_free = 0;
3806                    }
3807                    conn_set_state(c, c->write_and_go);
3808                } else {
3809                    if (settings.verbose > 0)
3810                        moxi_log_write("Unexpected state %d\n", c->state);
3811                    conn_set_state(c, conn_closing);
3812                }
3813                break;
3814
3815            case TRANSMIT_INCOMPLETE:
3816            case TRANSMIT_HARD_ERROR:
3817                break;                   /* Continue in state machine. */
3818
3819            case TRANSMIT_SOFT_ERROR:
3820                stop = true;
3821                break;
3822            }
3823            break;
3824
3825        case conn_pause:
3826            /* In case whoever put us into conn_pause didn't clear out */
3827            /* libevent registration, do so now. */
3828
3829            update_event(c, 0);
3830
3831            if (c->funcs->conn_pause != NULL)
3832                c->funcs->conn_pause(c);
3833
3834            stop = true;
3835            break;
3836
3837        case conn_closing:
3838            if (c->funcs->conn_close != NULL)
3839                c->funcs->conn_close(c);
3840
3841            if (IS_UDP(c->transport))
3842                conn_cleanup(c);
3843            else
3844                conn_close(c);
3845            stop = true;
3846            break;
3847
3848        case conn_max_state:
3849            cb_assert(false);
3850            break;
3851        }
3852    }
3853
3854    return;
3855}
3856
3857void add_bytes_read(conn *c, int bytes_read) {
3858    cb_assert(c != NULL);
3859    cb_mutex_enter(&c->thread->stats.mutex);
3860    c->thread->stats.bytes_read += bytes_read;
3861    cb_mutex_exit(&c->thread->stats.mutex);
3862}
3863
3864static void event_handler(evutil_socket_t fd, short which, void *arg) {
3865    conn *c;
3866
3867    c = (conn *)arg;
3868    cb_assert(c != NULL);
3869
3870    c->which = which;
3871
3872    /* sanity */
3873    if (fd != c->sfd) {
3874        if (settings.verbose > 0)
3875            moxi_log_write("Catastrophic: event fd doesn't match conn fd!\n");
3876        conn_close(c);
3877        return;
3878    }
3879
3880    c->update_diag = "working";
3881
3882    drive_machine(c);
3883
3884    /* wait for next event */
3885    return;
3886}
3887
3888static SOCKET new_socket(struct addrinfo *ai) {
3889    SOCKET sfd;
3890
3891    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
3892        return INVALID_SOCKET;
3893    }
3894
3895    if (evutil_make_socket_nonblocking(sfd) == -1) {
3896        perror("setting O_NONBLOCK");
3897        closesocket(sfd);
3898        return INVALID_SOCKET;
3899    }
3900    return sfd;
3901}
3902
3903
3904/*
3905 * Sets a socket's send buffer size to the maximum allowed by the system.
3906 */
3907static void maximize_sndbuf(const SOCKET sfd) {
3908    socklen_t intsize = sizeof(int);
3909    int last_good = 0;
3910    int