xref: /6.0.3/moxi/src/memcached.c (revision 33c5c06a)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15 */
16#include "memcached.h"
17#include <sys/stat.h>
18#include <signal.h>
19#include <ctype.h>
20#include <stdarg.h>
21
22#include <fcntl.h>
23#include <errno.h>
24#include <stdlib.h>
25#include <stdio.h>
26#include <string.h>
27#include <time.h>
28#include <assert.h>
29#include <limits.h>
30#include <stddef.h>
31#include <getopt.h>
32
33#include "cproxy.h"
34#include "agent.h"
35#include "stdin_check.h"
36#include "log.h"
37
38int IS_UDP(enum network_transport protocol) {
39    return protocol == udp_transport;
40}
41
42#ifdef WIN32
43static int is_blocking(DWORD dw) {
44    return (dw == WSAEWOULDBLOCK);
45}
46static int is_emfile(DWORD dw) {
47    return (dw == WSAEMFILE);
48}
49static int is_closed_conn(DWORD dw) {
50    return (dw == WSAENOTCONN || WSAECONNRESET);
51}
52static int is_addrinuse(DWORD dw) {
53    return (dw == WSAEADDRINUSE);
54}
55#else
56static int is_blocking(int dw) {
57    return (dw == EAGAIN || dw == EWOULDBLOCK);
58}
59
60static int is_emfile(int dw) {
61    return (dw == EMFILE);
62}
63
64static int is_closed_conn(int dw) {
65    return  (dw == ENOTCONN || dw != ECONNRESET);
66}
67
68static int is_addrinuse(int dw) {
69    return (dw == EADDRINUSE);
70}
71#endif
72
73/*
74 * forward declarations
75 */
76static SOCKET new_socket(struct addrinfo *ai);
77
78enum try_read_result {
79    READ_DATA_RECEIVED,
80    READ_NO_DATA_RECEIVED,
81    READ_ERROR,            /** an error occured (on the socket) (or client closed connection) */
82    READ_MEMORY_ERROR      /** failed to allocate more memory */
83};
84
85static enum try_read_result try_read_network(conn *c);
86static enum try_read_result try_read_udp(conn *c);
87
88/* stats */
89static void stats_init(void);
90
91/* defaults */
92static void settings_init(void);
93
94/* event handling, network IO */
95static void event_handler(evutil_socket_t fd, short which, void *arg);
96static void conn_close(conn *c);
97static void conn_init(void);
98static void write_and_free(conn *c, char *buf, size_t bytes);
99
100/* time handling */
101static rel_time_t realtime(const time_t exptime);
102static void set_current_time(void);  /* update the global variable holding
103                              global 32-bit seconds-since-start time
104                              (to avoid 64 bit time_t) */
105
106static void conn_free(conn *c);
107
108/** exported globals **/
109struct stats stats;
110struct settings settings;
111time_t process_started;     /* when the process was started */
112conn *listen_conn = NULL;
113
114/** file scope variables **/
115static struct event_base *main_base;
116
117enum transmit_result {
118    TRANSMIT_COMPLETE,   /** All done writing. */
119    TRANSMIT_INCOMPLETE, /** More data remaining to write. */
120    TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
121    TRANSMIT_HARD_ERROR  /** Can't write (c->state is set to conn_closing) */
122};
123
124static enum transmit_result transmit(conn *c);
125
126conn_funcs conn_funcs_default = {
127    .conn_init                   = NULL,
128    .conn_close                  = NULL,
129    .conn_connect                = NULL,
130    .conn_process_ascii_command  = process_command,
131    .conn_process_binary_command = dispatch_bin_command,
132    .conn_complete_nread_ascii   = complete_nread_ascii,
133    .conn_complete_nread_binary  = complete_nread_binary,
134    .conn_pause                  = NULL,
135    .conn_realtime               = realtime,
136    .conn_binary_command_magic   = PROTOCOL_BINARY_REQ,
137    .conn_state_change           = NULL
138};
139
140#ifdef MAIN_CHECK
141int main_check(int argc, char **argv);
142#endif
143
144/* global logger handle */
145moxi_log *ml;
146
147/*
148 * given time value that's either unix time or delta from current unix time, return
149 * unix time. Use the fact that delta can't exceed one month (and real time value can't
150 * be that low).
151 */
152static rel_time_t realtime(const time_t exptime) {
153    /* no. of seconds in 30 days - largest possible delta exptime */
154
155    if (exptime == 0) return 0; /* 0 means never expire */
156
157    if (exptime > REALTIME_MAXDELTA) {
158        /* if item expiration is at/before the server started, give it an
159           expiration time of 1 second after the server started.
160           (because 0 means don't expire).  without this, we'd
161           underflow and wrap around to some large value way in the
162           future, effectively making items expiring in the past
163           really expiring never */
164        if (exptime <= process_started)
165            return (rel_time_t)1;
166        return (rel_time_t)(exptime - process_started);
167    } else {
168        return (rel_time_t)(exptime + current_time);
169    }
170}
171
172static void stats_init(void) {
173    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
174    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
175    stats.curr_bytes = stats.listen_disabled_num = 0;
176    stats.accepting_conns = true; /* assuming we start in this state. */
177
178    /* make the time we started always be 2 seconds before we really
179       did, so time(0) - time.started is never zero.  if so, things
180       like 'settings.oldest_live' which act as booleans as well as
181       values are now false in boolean context... */
182    process_started = time(0) - 2;
183    stats_prefix_init();
184}
185
186static void stats_reset(void) {
187    STATS_LOCK();
188    stats.total_items = stats.total_conns = 0;
189    stats.evictions = 0;
190    stats.listen_disabled_num = 0;
191    stats_prefix_clear();
192    STATS_UNLOCK();
193    threadlocal_stats_reset();
194    item_stats_reset();
195}
196
197static void settings_init(void) {
198    settings.use_cas = true;
199    settings.access = 0700;
200    settings.port = UNSPECIFIED;
201    settings.udpport = UNSPECIFIED;
202    /* By default this string should be NULL for getaddrinfo() */
203    settings.inter = NULL;
204    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
205    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
206    settings.verbose = 0;
207    settings.oldest_live = 0;
208    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
209    settings.socketpath = NULL;       /* by default, not using a unix socket */
210    settings.factor = 1.25;
211    settings.chunk_size = 48;         /* space for a modest key and value */
212    settings.num_threads = 4 + 1;     /* N workers + 1 dispatcher */
213    settings.prefix_delimiter = ':';
214    settings.detail_enabled = 0;
215    settings.reqs_per_event = 20;
216    settings.backlog = 1024;
217    settings.binding_protocol = negotiating_prot;
218}
219
220/*
221 * Adds a message header to a connection.
222 *
223 * Returns 0 on success, -1 on out-of-memory.
224 */
225int add_msghdr(conn *c) {
226    struct msghdr *msg;
227
228    assert(c != NULL);
229
230    if (c->msgsize == c->msgused) {
231        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
232        if (! msg)
233            return -1;
234        c->msglist = msg;
235        c->msgsize *= 2;
236    }
237
238    msg = c->msglist + c->msgused;
239
240    /* this wipes msg_iovlen, msg_control, msg_controllen, and
241       msg_flags, the last 3 of which aren't defined on solaris: */
242    memset(msg, 0, sizeof(struct msghdr));
243
244    msg->msg_iov = &c->iov[c->iovused];
245
246    if (c->request_addr_size > 0) {
247        msg->msg_name = &c->request_addr;
248        msg->msg_namelen = c->request_addr_size;
249    }
250
251    c->msgbytes = 0;
252    c->msgused++;
253
254    if (IS_UDP(c->transport)) {
255        /* Leave room for the UDP header, which we'll fill in later. */
256        return add_iov(c, NULL, UDP_HEADER_SIZE);
257    }
258
259    return 0;
260}
261
262
263/*
264 * Free list management for connections.
265 */
266
267static conn **freeconns;
268static size_t freetotal;
269static size_t freecurr;
270/* Lock for connection freelist */
271static cb_mutex_t conn_lock;
272
273static void conn_init(void) {
274    freetotal = 200;
275    freecurr = 0;
276    cb_mutex_initialize(&conn_lock);
277    if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
278        moxi_log_write("Failed to allocate connection structures\n");
279    }
280    return;
281}
282
283/*
284 * Returns a connection from the freelist, if any.
285 */
286conn *conn_from_freelist() {
287    conn *c;
288
289    cb_mutex_enter(&conn_lock);
290    if (freecurr > 0) {
291        c = freeconns[--freecurr];
292    } else {
293        c = NULL;
294    }
295    cb_mutex_exit(&conn_lock);
296
297    return c;
298}
299
300/*
301 * Adds a connection to the freelist. 0 = success.
302 */
303bool conn_add_to_freelist(conn *c) {
304    bool ret = true;
305    cb_mutex_enter(&conn_lock);
306    if (freecurr < freetotal) {
307        freeconns[freecurr++] = c;
308        ret = false;
309    } else {
310        /* try to enlarge free connections array */
311        size_t newsize = freetotal * 2;
312        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
313        if (new_freeconns) {
314            freetotal = newsize;
315            freeconns = new_freeconns;
316            freeconns[freecurr++] = c;
317            ret = false;
318        }
319    }
320    cb_mutex_exit(&conn_lock);
321    return ret;
322}
323
324static const char *prot_text(enum protocol prot) {
325    char *rv = "unknown";
326    switch(prot) {
327        case ascii_prot:
328            rv = "ascii";
329            break;
330        case binary_prot:
331            rv = "binary";
332            break;
333        case proxy_upstream_ascii_prot:
334            rv = "proxy-upstream-ascii";
335            break;
336        case proxy_upstream_binary_prot:
337            rv = "proxy-upstream-binary";
338            break;
339        case proxy_downstream_ascii_prot:
340            rv = "proxy-downstream-ascii";
341            break;
342        case proxy_downstream_binary_prot:
343            rv = "proxy-downstream-binary";
344            break;
345        case negotiating_proxy_prot:
346            rv = "auto-negotiate-proxy";
347        case negotiating_prot:
348            rv = "auto-negotiate";
349            break;
350    }
351    return rv;
352}
353
354conn *conn_new(const SOCKET sfd, enum conn_states init_state,
355               const int event_flags,
356               const int read_buffer_size,
357               enum network_transport transport,
358               struct event_base *base,
359               conn_funcs *funcs, void *extra) {
360    conn *c = conn_from_freelist();
361
362    if (NULL == c) {
363        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
364            moxi_log_write("calloc()\n");
365            return NULL;
366        }
367        MEMCACHED_CONN_CREATE(c);
368
369        c->rbuf = c->wbuf = 0;
370        c->ilist = 0;
371        c->suffixlist = 0;
372        c->iov = 0;
373        c->msglist = 0;
374        c->hdrbuf = 0;
375
376        c->rsize = read_buffer_size;
377        c->wsize = DATA_BUFFER_SIZE;
378        c->isize = ITEM_LIST_INITIAL;
379        c->suffixsize = SUFFIX_LIST_INITIAL;
380        c->iovsize = IOV_LIST_INITIAL;
381        c->msgsize = MSG_LIST_INITIAL;
382        c->hdrsize = 0;
383
384        c->rbuf = (char *)malloc((size_t)c->rsize);
385        c->wbuf = (char *)malloc((size_t)c->wsize);
386        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
387        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
388        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
389        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
390
391        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
392                c->msglist == 0 || c->suffixlist == 0) {
393            conn_free(c);
394            moxi_log_write("malloc()\n");
395            return NULL;
396        }
397
398        STATS_LOCK();
399        stats.conn_structs++;
400        STATS_UNLOCK();
401    }
402
403    c->transport = transport;
404    c->protocol = settings.binding_protocol;
405
406    /* unix socket mode doesn't need this, so zeroed out.  but why
407     * is this done for every command?  presumably for UDP
408     * mode.  */
409    if (!settings.socketpath) {
410        c->request_addr_size = sizeof(c->request_addr);
411    } else {
412        c->request_addr_size = 0;
413    }
414
415    if (settings.verbose > 1) {
416        if (init_state == conn_listening) {
417            moxi_log_write("<%d server listening (%s)\n", sfd,
418                prot_text(c->protocol));
419        } else if (IS_UDP(transport)) {
420            moxi_log_write("<%d server listening (udp)\n", sfd);
421        } else if (IS_NEGOTIATING(c->protocol)) {
422            moxi_log_write("<%d new auto-negotiating client connection\n",
423                    sfd);
424        } else if (c->protocol == ascii_prot) {
425            moxi_log_write("<%d new ascii client connection.\n", sfd);
426        } else if (c->protocol == binary_prot) {
427            moxi_log_write("<%d new binary client connection.\n", sfd);
428        } else {
429            moxi_log_write("<%d new %s client connection\n",
430                    sfd, prot_text(c->protocol));
431        }
432    }
433
434    c->sfd = sfd;
435    c->state = init_state;
436    c->rlbytes = 0;
437    c->cmd = -1;
438    c->rbytes = c->wbytes = 0;
439    c->wcurr = c->wbuf;
440    c->rcurr = c->rbuf;
441    c->ritem = 0;
442    c->icurr = c->ilist;
443    c->suffixcurr = c->suffixlist;
444    c->ileft = 0;
445    c->suffixleft = 0;
446    c->iovused = 0;
447    c->msgcurr = 0;
448    c->msgused = 0;
449
450    c->write_and_go = init_state;
451    c->write_and_free = 0;
452    c->item = 0;
453
454    c->noreply = false;
455
456    c->funcs = funcs;
457    if (c->funcs == NULL) {
458        c->funcs = &conn_funcs_default;
459        if (settings.verbose > 1)
460            moxi_log_write( "<%d initialized conn_funcs to default\n", sfd);
461    }
462
463    c->cmd_curr = -1;
464    c->cmd_start = NULL;
465    c->cmd_start_time = 0;
466    c->cmd_retries = 0;
467    c->corked = NULL;
468    c->host_ident = NULL;
469    c->peer_host = NULL;
470    c->peer_protocol = 0;
471    c->peer_port = 0;
472    c->update_diag = NULL;
473
474    c->extra = extra;
475
476    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
477    event_base_set(base, &c->event);
478    c->ev_flags = event_flags;
479
480    if (event_add(&c->event, 0) == -1) {
481        event_del(&c->event);
482        conn_free(c);
483        perror("event_add\n");
484        return NULL;
485    }
486
487    if (c->funcs->conn_init != NULL &&
488        c->funcs->conn_init(c) == false) {
489        event_del(&c->event);
490        conn_free(c);
491        return NULL;
492    }
493
494    STATS_LOCK();
495    stats.curr_conns++;
496    stats.total_conns++;
497    STATS_UNLOCK();
498
499    MEMCACHED_CONN_ALLOCATE(c->sfd);
500
501    return c;
502}
503
504static void conn_cleanup(conn *c) {
505    assert(c != NULL);
506
507    if (c->item) {
508        item_remove(c->item);
509        c->item = 0;
510    }
511
512    if (c->ileft != 0) {
513        for (; c->ileft > 0; c->ileft--, c->icurr++) {
514            item_remove(*(c->icurr));
515        }
516    }
517
518    if (c->suffixleft != 0) {
519        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
520            cache_free(c->thread->suffix_cache, *(c->suffixcurr));
521        }
522    }
523
524    if (c->write_and_free) {
525        free(c->write_and_free);
526        c->write_and_free = 0;
527    }
528}
529
530/*
531 * Frees a connection.
532 */
533void conn_free(conn *c) {
534    if (c) {
535        MEMCACHED_CONN_DESTROY(c);
536        if (c->hdrbuf)
537            free(c->hdrbuf);
538        if (c->msglist)
539            free(c->msglist);
540        if (c->rbuf)
541            free(c->rbuf);
542        if (c->wbuf)
543            free(c->wbuf);
544        if (c->ilist)
545            free(c->ilist);
546        if (c->suffixlist)
547            free(c->suffixlist);
548        if (c->iov)
549            free(c->iov);
550        if (c->host_ident)
551            free(c->host_ident);
552
553        while (c->corked != NULL) {
554            bin_cmd *bc = c->corked;
555            c->corked = c->corked->next;
556            if (bc->request_item != NULL) {
557                item_remove(bc->request_item);
558            }
559            if (bc->response_item != NULL) {
560                item_remove(bc->response_item);
561            }
562            free(bc);
563        }
564
565        free(c);
566    }
567}
568
569static void conn_close(conn *c) {
570    assert(c != NULL);
571
572    /* delete the event, the socket and the conn */
573    event_del(&c->event);
574
575    if (settings.verbose > 1)
576        moxi_log_write("<%d connection closed.\n", c->sfd);
577
578    MEMCACHED_CONN_RELEASE(c->sfd);
579    closesocket(c->sfd);
580    accept_new_conns(true);
581    conn_cleanup(c);
582
583    /* if the connection has big buffers, just free it */
584    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
585        conn_free(c);
586    }
587
588    STATS_LOCK();
589    stats.curr_conns--;
590    STATS_UNLOCK();
591
592    return;
593}
594
595/*
596 * Shrinks a connection's buffers if they're too big.  This prevents
597 * periodic large "get" requests from permanently chewing lots of server
598 * memory.
599 *
600 * This should only be called in between requests since it can wipe output
601 * buffers!
602 */
603static void conn_shrink(conn *c) {
604    assert(c != NULL);
605
606    if (IS_UDP(c->transport))
607        return;
608
609    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
610        char *newbuf;
611
612        if (c->rcurr != c->rbuf)
613            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
614
615        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
616
617        if (newbuf) {
618            c->rbuf = newbuf;
619            c->rsize = DATA_BUFFER_SIZE;
620        }
621        /* TODO check other branch... */
622        c->rcurr = c->rbuf;
623    }
624
625    if (c->isize > ITEM_LIST_HIGHWAT) {
626        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
627        if (newbuf) {
628            c->ilist = newbuf;
629            c->isize = ITEM_LIST_INITIAL;
630        }
631    /* TODO check error condition? */
632    }
633
634    if (c->msgsize > MSG_LIST_HIGHWAT) {
635        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
636        if (newbuf) {
637            c->msglist = newbuf;
638            c->msgsize = MSG_LIST_INITIAL;
639        }
640    /* TODO check error condition? */
641    }
642
643    if (c->iovsize > IOV_LIST_HIGHWAT) {
644        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
645        if (newbuf) {
646            c->iov = newbuf;
647            c->iovsize = IOV_LIST_INITIAL;
648        }
649    /* TODO check return value */
650    }
651}
652
653/**
654 * Convert a state name to a human readable form.
655 */
656const char *state_text(enum conn_states state) {
657    const char* const statenames[] = { "conn_listening",
658                                       "conn_new_cmd",
659                                       "conn_waiting",
660                                       "conn_read",
661                                       "conn_parse_cmd",
662                                       "conn_write",
663                                       "conn_nread",
664                                       "conn_swallow",
665                                       "conn_closing",
666                                       "conn_mwrite",
667                                       "conn_pause",
668                                       "conn_connecting" };
669    return statenames[state];
670}
671
672/*
673 * Sets a connection's current state in the state machine. Any special
674 * processing that needs to happen on certain state transitions can
675 * happen here.
676 */
677void conn_set_state(conn *c, enum conn_states state) {
678    assert(c != NULL);
679    assert(state < conn_max_state);
680
681    if (state != c->state) {
682        if (c->funcs != NULL &&
683            c->funcs->conn_state_change != NULL) {
684            c->funcs->conn_state_change(c, state);
685        }
686
687        if (settings.verbose > 2) {
688            moxi_log_write("%d: going from %s to %s\n",
689                    c->sfd, state_text(c->state),
690                    state_text(state));
691        }
692
693        c->state = state;
694
695        if (state == conn_write || state == conn_mwrite) {
696            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
697        }
698    }
699}
700
701/*
702 * Ensures that there is room for another struct iovec in a connection's
703 * iov list.
704 *
705 * Returns 0 on success, -1 on out-of-memory.
706 */
707int ensure_iov_space(conn *c) {
708    assert(c != NULL);
709
710    if (c->iovused >= c->iovsize) {
711        int i, iovnum;
712        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
713                                (c->iovsize * 2) * sizeof(struct iovec));
714        if (! new_iov)
715            return -1;
716        c->iov = new_iov;
717        c->iovsize *= 2;
718
719        /* Point all the msghdr structures at the new list. */
720        for (i = 0, iovnum = 0; i < c->msgused; i++) {
721            c->msglist[i].msg_iov = &c->iov[iovnum];
722            iovnum += c->msglist[i].msg_iovlen;
723        }
724    }
725
726    return 0;
727}
728
729
730/*
731 * Adds data to the list of pending data that will be written out to a
732 * connection.
733 *
734 * Returns 0 on success, -1 on out-of-memory.
735 */
736int add_iov(conn *c, const void *buf, int len) {
737    struct msghdr *m;
738    int leftover;
739    bool limit_to_mtu;
740
741    assert(c != NULL);
742    assert(c->msgused > 0);
743
744    do {
745        m = &c->msglist[c->msgused - 1];
746
747        /*
748         * Limit UDP packets, and the first payloads of TCP replies, to
749         * UDP_MAX_PAYLOAD_SIZE bytes.
750         */
751        limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
752
753        /* We may need to start a new msghdr if this one is full. */
754        if (m->msg_iovlen == IOV_MAX ||
755            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
756            add_msghdr(c);
757            m = &c->msglist[c->msgused - 1];
758        }
759
760        if (ensure_iov_space(c) != 0)
761            return -1;
762
763        /* If the fragment is too big to fit in the datagram, split it up */
764        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
765            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
766            len -= leftover;
767        } else {
768            leftover = 0;
769        }
770
771        m = &c->msglist[c->msgused - 1];
772        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
773        m->msg_iov[m->msg_iovlen].iov_len = len;
774
775        c->msgbytes += len;
776        c->iovused++;
777        m->msg_iovlen++;
778
779        buf = ((char *)buf) + len;
780        len = leftover;
781    } while (leftover > 0);
782
783    return 0;
784}
785
786
787/*
788 * Constructs a set of UDP headers and attaches them to the outgoing messages.
789 */
790static int build_udp_headers(conn *c) {
791    int i;
792    unsigned char *hdr;
793
794    assert(c != NULL);
795
796    if (c->msgused > c->hdrsize) {
797        void *new_hdrbuf;
798        if (c->hdrbuf)
799            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
800        else
801            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
802        if (! new_hdrbuf)
803            return -1;
804        c->hdrbuf = (unsigned char *)new_hdrbuf;
805        c->hdrsize = c->msgused * 2;
806    }
807
808    hdr = c->hdrbuf;
809    for (i = 0; i < c->msgused; i++) {
810        c->msglist[i].msg_iov[0].iov_base = (void*)hdr;
811        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
812        *hdr++ = c->request_id / 256;
813        *hdr++ = c->request_id % 256;
814        *hdr++ = i / 256;
815        *hdr++ = i % 256;
816        *hdr++ = c->msgused / 256;
817        *hdr++ = c->msgused % 256;
818        *hdr++ = 0;
819        *hdr++ = 0;
820        assert((void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
821    }
822
823    return 0;
824}
825
826
827void out_string(conn *c, const char *str) {
828    int len;
829
830    assert(c != NULL);
831
832    if (c->noreply) {
833        if (settings.verbose > 1)
834            moxi_log_write(">%d NOREPLY %s\n", c->sfd, str);
835        c->noreply = false;
836        conn_set_state(c, conn_new_cmd);
837        return;
838    }
839
840    if (settings.verbose > 1)
841        moxi_log_write(">%d %s\n", c->sfd, str);
842
843    len = (int)strlen(str);
844    if ((len + 2) > c->wsize) {
845        /* ought to be always enough. just fail for simplicity */
846        str = "SERVER_ERROR output line too long";
847        len = (int)strlen(str);
848    }
849
850    memcpy(c->wbuf, str, len);
851    memcpy(c->wbuf + len, "\r\n", 2);
852    c->wbytes = len + 2;
853    c->wcurr = c->wbuf;
854
855    conn_set_state(c, conn_write);
856    c->write_and_go = conn_new_cmd;
857    return;
858}
859
860/*
861 * we get here after reading the value in set/add/replace commands. The command
862 * has been stored in c->cmd, and the item is ready in c->item.
863 */
864void complete_nread_ascii(conn *c) {
865    assert(c != NULL);
866
867    item *it = c->item;
868    int comm = c->cmd;
869    enum store_item_type ret;
870
871    cb_mutex_enter(&c->thread->stats.mutex);
872    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
873    cb_mutex_exit(&c->thread->stats.mutex);
874
875    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
876        out_string(c, "CLIENT_ERROR bad data chunk");
877    } else {
878      ret = store_item(it, comm, c);
879
880#ifdef ENABLE_DTRACE
881      uint64_t cas = ITEM_get_cas(it);
882      switch (c->cmd) {
883      case NREAD_ADD:
884          MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
885                                (ret == 1) ? it->nbytes : -1, cas);
886          break;
887      case NREAD_REPLACE:
888          MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
889                                    (ret == 1) ? it->nbytes : -1, cas);
890          break;
891      case NREAD_APPEND:
892          MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
893                                   (ret == 1) ? it->nbytes : -1, cas);
894          break;
895      case NREAD_PREPEND:
896          MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
897                                    (ret == 1) ? it->nbytes : -1, cas);
898          break;
899      case NREAD_SET:
900          MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
901                                (ret == 1) ? it->nbytes : -1, cas);
902          break;
903      case NREAD_CAS:
904          MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes,
905                                cas);
906          break;
907      }
908#endif
909
910      switch (ret) {
911      case STORED:
912          out_string(c, "STORED");
913          break;
914      case EXISTS:
915          out_string(c, "EXISTS");
916          break;
917      case NOT_FOUND:
918          out_string(c, "NOT_FOUND");
919          break;
920      case NOT_STORED:
921          out_string(c, "NOT_STORED");
922          break;
923      default:
924          out_string(c, "SERVER_ERROR Unhandled storage type.");
925      }
926
927    }
928
929    item_remove(c->item);       /* release the c->item reference */
930    c->item = 0;
931}
932
933/**
934 * get a pointer to the start of the request struct for the current command
935 */
936void* binary_get_request(conn *c) {
937    char *ret = c->rcurr;
938    ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
939            c->binary_header.request.extlen);
940
941    assert(ret >= c->rbuf);
942    return ret;
943}
944
945/**
946 * get a pointer to the key in this request
947 */
948char* binary_get_key(conn *c) {
949    return c->rcurr - (c->binary_header.request.keylen);
950}
951
952static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
953    protocol_binary_response_header* header;
954
955    assert(c);
956
957    c->msgcurr = 0;
958    c->msgused = 0;
959    c->iovused = 0;
960    if (add_msghdr(c) != 0) {
961        /* XXX:  out_string is inappropriate here */
962        out_string(c, "SERVER_ERROR out of memory");
963        return;
964    }
965
966    header = (protocol_binary_response_header *)c->wbuf;
967
968    header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
969    header->response.opcode = c->binary_header.request.opcode;
970    header->response.keylen = (uint16_t)htons(key_len);
971
972    header->response.extlen = (uint8_t)hdr_len;
973    header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
974    header->response.status = (uint16_t)htons(err);
975
976    header->response.bodylen = htonl(body_len);
977    header->response.opaque = c->opaque;
978    header->response.cas = mc_swap64(c->cas);
979
980    if (settings.verbose > 1) {
981        moxi_log_write(">%d Writing bin response:\n", c->sfd);
982        cproxy_dump_header(c->sfd, (char *) header->bytes);
983    }
984
985    add_iov(c, c->wbuf, sizeof(header->response));
986}
987
988void write_bin_error(conn *c, protocol_binary_response_status err, int swallow) {
989    const char *errstr = "Unknown error";
990    size_t len;
991
992    switch (err) {
993    case PROTOCOL_BINARY_RESPONSE_ENOMEM:
994        errstr = "Out of memory";
995        break;
996    case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
997        errstr = "Unknown command";
998        break;
999    case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
1000        errstr = "Not found";
1001        break;
1002    case PROTOCOL_BINARY_RESPONSE_EINVAL:
1003        errstr = "Invalid arguments";
1004        break;
1005    case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
1006        errstr = "Data exists for key.";
1007        break;
1008    case PROTOCOL_BINARY_RESPONSE_E2BIG:
1009        errstr = "Too large.";
1010        break;
1011    case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
1012        errstr = "Non-numeric server-side value for incr or decr";
1013        break;
1014    case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
1015        errstr = "Not stored.";
1016        break;
1017    case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
1018        errstr = "Auth failure";
1019        break;
1020    case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
1021        errstr = "Auth continue";
1022        break;
1023    case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
1024        errstr = "Not my vbucket";
1025        break;
1026    case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
1027        errstr = "Not supported";
1028        break;
1029    case PROTOCOL_BINARY_RESPONSE_EINTERNAL:
1030        errstr = "Internal error";
1031        break;
1032    case PROTOCOL_BINARY_RESPONSE_EBUSY:
1033        errstr = "System is busy";
1034        break;
1035    case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
1036        errstr = "Temporary failure";
1037        break;
1038    default:
1039        assert(false);
1040        errstr = "UNHANDLED ERROR";
1041        moxi_log_write(">%d UNHANDLED ERROR: %d\n", c->sfd, err);
1042    }
1043
1044    if (settings.verbose > 1) {
1045        moxi_log_write(">%d Writing an error: %d %s\n", c->sfd, (int) err, errstr);
1046    }
1047
1048    len = strlen(errstr);
1049    add_bin_header(c, err, 0, 0, (uint32_t)len);
1050    if (len > 0) {
1051        add_iov(c, errstr, (uint32_t)len);
1052    }
1053    conn_set_state(c, conn_mwrite);
1054    if(swallow > 0) {
1055        c->sbytes = swallow;
1056        c->write_and_go = conn_swallow;
1057    } else {
1058        c->write_and_go = conn_new_cmd;
1059    }
1060}
1061
1062/* Form and send a response to a command over the binary protocol */
1063void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) {
1064    if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1065        c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1066        c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1067        add_bin_header(c, 0, hlen, keylen, dlen);
1068        if(dlen > 0) {
1069            add_iov(c, d, dlen);
1070        }
1071        conn_set_state(c, conn_mwrite);
1072        c->write_and_go = conn_new_cmd;
1073    } else {
1074        conn_set_state(c, conn_new_cmd);
1075    }
1076}
1077
1078/* Byte swap a 64-bit number */
1079uint64_t mc_swap64(uint64_t in) {
1080#ifdef ENDIAN_LITTLE
1081    /* Little endian, flip the bytes around until someone makes a faster/better
1082    * way to do this. */
1083    int64_t rv = 0;
1084    int i = 0;
1085     for(i = 0; i<8; i++) {
1086        rv = (rv << 8) | (in & 0xff);
1087        in >>= 8;
1088     }
1089    return rv;
1090#else
1091    /* big-endian machines don't need byte swapping */
1092    return in;
1093#endif
1094}
1095
1096static void complete_incr_bin(conn *c) {
1097    item *it;
1098    char *key;
1099    size_t nkey;
1100
1101    protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
1102    protocol_binary_request_incr* req = binary_get_request(c);
1103
1104    assert(c != NULL);
1105    assert(c->wsize >= (int) sizeof(*rsp));
1106
1107    /* fix byteorder in the request */
1108    req->message.body.delta = mc_swap64(req->message.body.delta);
1109    req->message.body.initial = mc_swap64(req->message.body.initial);
1110    req->message.body.expiration = ntohl(req->message.body.expiration);
1111    key = binary_get_key(c);
1112    nkey = c->binary_header.request.keylen;
1113
1114    if (settings.verbose) {
1115        size_t i;
1116        moxi_log_write("incr ");
1117
1118        for (i = 0; i < nkey; i++) {
1119            moxi_log_write("%c", key[i]);
1120        }
1121        moxi_log_write(" %lld, %llu, %d\n",
1122                (long long)req->message.body.delta,
1123                (long long)req->message.body.initial,
1124                req->message.body.expiration);
1125    }
1126
1127    it = item_get(key, nkey);
1128    if (it && (c->binary_header.request.cas == 0 ||
1129               c->binary_header.request.cas == ITEM_get_cas(it))) {
1130        /* Weird magic in add_delta forces me to pad here */
1131        char tmpbuf[INCR_MAX_STORAGE_LEN];
1132        protocol_binary_response_status st = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1133
1134        switch(add_delta(c, it, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
1135                         req->message.body.delta, tmpbuf)) {
1136        case OK:
1137            break;
1138        case NON_NUMERIC:
1139            st = PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL;
1140            break;
1141        case EOM:
1142            st = PROTOCOL_BINARY_RESPONSE_ENOMEM;
1143            break;
1144        }
1145
1146        if (st != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1147            write_bin_error(c, st, 0);
1148        } else {
1149            rsp->message.body.value = mc_swap64(strtoull(tmpbuf, NULL, 10));
1150            c->cas = ITEM_get_cas(it);
1151            write_bin_response(c, &rsp->message.body, 0, 0,
1152                               sizeof(rsp->message.body.value));
1153        }
1154
1155        item_remove(it);         /* release our reference */
1156    } else if (!it && req->message.body.expiration != 0xffffffff) {
1157        /* Save some room for the response */
1158        rsp->message.body.value = mc_swap64(req->message.body.initial);
1159        it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
1160                        INCR_MAX_STORAGE_LEN);
1161
1162        if (it != NULL) {
1163            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
1164                     (unsigned long long)req->message.body.initial);
1165
1166            if (store_item(it, NREAD_SET, c)) {
1167                c->cas = ITEM_get_cas(it);
1168                write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
1169            } else {
1170                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1171            }
1172            item_remove(it);         /* release our reference */
1173        } else {
1174            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1175        }
1176    } else if (it) {
1177        /* incorrect CAS */
1178        item_remove(it);         /* release our reference */
1179        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1180    } else {
1181
1182        cb_mutex_enter(&c->thread->stats.mutex);
1183        if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1184            c->thread->stats.incr_misses++;
1185        } else {
1186            c->thread->stats.decr_misses++;
1187        }
1188        cb_mutex_exit(&c->thread->stats.mutex);
1189
1190        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1191    }
1192}
1193
1194static void complete_update_bin(conn *c) {
1195    protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1196    enum store_item_type ret = NOT_STORED;
1197    assert(c != NULL);
1198
1199    item *it = c->item;
1200
1201    cb_mutex_enter(&c->thread->stats.mutex);
1202    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
1203    cb_mutex_exit(&c->thread->stats.mutex);
1204
1205    /* We don't actually receive the trailing two characters in the bin
1206     * protocol, so we're going to just set them here */
1207    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1208    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1209
1210    ret = store_item(it, c->cmd, c);
1211
1212#ifdef ENABLE_DTRACE
1213    uint64_t cas = ITEM_get_cas(it);
1214    switch (c->cmd) {
1215    case NREAD_ADD:
1216        MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
1217                              (ret == STORED) ? it->nbytes : -1, cas);
1218        break;
1219    case NREAD_REPLACE:
1220        MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
1221                                  (ret == STORED) ? it->nbytes : -1, cas);
1222        break;
1223    case NREAD_APPEND:
1224        MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
1225                                 (ret == STORED) ? it->nbytes : -1, cas);
1226        break;
1227    case NREAD_PREPEND:
1228        MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
1229                                 (ret == STORED) ? it->nbytes : -1, cas);
1230        break;
1231    case NREAD_SET:
1232        MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
1233                              (ret == STORED) ? it->nbytes : -1, cas);
1234        break;
1235    }
1236#endif
1237
1238    switch (ret) {
1239    case STORED:
1240        /* Stored */
1241        write_bin_response(c, NULL, 0, 0, 0);
1242        break;
1243    case EXISTS:
1244        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1245        break;
1246    case NOT_FOUND:
1247        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1248        break;
1249    case NOT_STORED:
1250        if (c->cmd == NREAD_ADD) {
1251            eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1252        } else if(c->cmd == NREAD_REPLACE) {
1253            eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1254        } else {
1255            eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1256        }
1257        write_bin_error(c, eno, 0);
1258    }
1259
1260    item_remove(c->item);       /* release the c->item reference */
1261    c->item = 0;
1262}
1263
1264static void process_bin_get(conn *c) {
1265    item *it;
1266
1267    protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1268    char* key = binary_get_key(c);
1269    size_t nkey = c->binary_header.request.keylen;
1270
1271    if (settings.verbose) {
1272        size_t ii;
1273        moxi_log_write("<%d GET ", c->sfd);
1274        for (ii = 0; ii < nkey; ++ii) {
1275            moxi_log_write("%c", key[ii]);
1276        }
1277        moxi_log_write("\n");
1278    }
1279
1280    it = item_get(key, nkey);
1281    if (it) {
1282        /* the length has two unnecessary bytes ("\r\n") */
1283        uint16_t keylen = 0;
1284        uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1285
1286        cb_mutex_enter(&c->thread->stats.mutex);
1287        c->thread->stats.get_cmds++;
1288        c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
1289        cb_mutex_exit(&c->thread->stats.mutex);
1290
1291        MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
1292                              it->nbytes, ITEM_get_cas(it));
1293
1294        if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1295            c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1296            bodylen += (uint32_t)nkey;
1297            keylen = (uint16_t)nkey;
1298        }
1299        add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1300        rsp->message.header.response.cas = mc_swap64(ITEM_get_cas(it));
1301
1302        /* add the flags */
1303        rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1304        add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1305
1306        if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1307            c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1308            add_iov(c, ITEM_key(it), (uint32_t)nkey);
1309        }
1310
1311        /* Add the data minus the CRLF */
1312        add_iov(c, ITEM_data(it), it->nbytes - 2);
1313        conn_set_state(c, conn_mwrite);
1314        /* Remember this command so we can garbage collect it later */
1315        c->item = it;
1316    } else {
1317        cb_mutex_enter(&c->thread->stats.mutex);
1318        c->thread->stats.get_cmds++;
1319        c->thread->stats.get_misses++;
1320        cb_mutex_exit(&c->thread->stats.mutex);
1321
1322        MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1323
1324        if (c->noreply) {
1325            conn_set_state(c, conn_new_cmd);
1326        } else {
1327            if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
1328                c->cmd == PROTOCOL_BINARY_CMD_GETL) {
1329                char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1330                add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1331                        0, (uint16_t)nkey, (uint32_t)nkey);
1332                memcpy(ofs, key, nkey);
1333                add_iov(c, ofs, (int)nkey);
1334                conn_set_state(c, conn_mwrite);
1335            } else {
1336                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1337            }
1338        }
1339    }
1340
1341    if (settings.detail_enabled) {
1342        stats_prefix_record_get(key, nkey, NULL != it);
1343    }
1344}
1345
1346static void append_bin_stats(const char *key, const uint16_t klen,
1347                             const char *val, const uint32_t vlen,
1348                             conn *c) {
1349    char *buf = c->stats.buffer + c->stats.offset;
1350    uint32_t bodylen = klen + vlen;
1351    protocol_binary_response_header header = {
1352        .response = {
1353            .magic = (uint8_t)PROTOCOL_BINARY_RES,
1354            .opcode = PROTOCOL_BINARY_CMD_STAT,
1355            .keylen = (uint16_t)htons(klen),
1356            .datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1357            .bodylen = htonl(bodylen),
1358            .opaque = c->opaque
1359        }
1360    };
1361
1362    memcpy(buf, header.bytes, sizeof(header.response));
1363    buf += sizeof(header.response);
1364
1365    if (klen > 0) {
1366        memcpy(buf, key, klen);
1367        buf += klen;
1368
1369        if (vlen > 0) {
1370            memcpy(buf, val, vlen);
1371        }
1372    }
1373
1374    c->stats.offset += sizeof(header.response) + bodylen;
1375}
1376
1377static void append_ascii_stats(const char *key, const uint16_t klen,
1378                               const char *val, const uint32_t vlen,
1379                               conn *c) {
1380    char *pos = c->stats.buffer + c->stats.offset;
1381    uint32_t nbytes = 0;
1382    size_t remaining = c->stats.size - c->stats.offset;
1383    size_t room = remaining - 1;
1384
1385    if (klen == 0 && vlen == 0) {
1386        nbytes = snprintf(pos, room, "END\r\n");
1387    } else if (vlen == 0) {
1388        nbytes = snprintf(pos, room, "STAT %s\r\n", key);
1389    } else {
1390        nbytes = snprintf(pos, room, "STAT %s %s\r\n", key, val);
1391    }
1392
1393    c->stats.offset += nbytes;
1394}
1395
1396static bool grow_stats_buf(conn *c, size_t needed) {
1397    size_t nsize = c->stats.size;
1398    size_t available = nsize - c->stats.offset;
1399    bool rv = true;
1400
1401    /* Special case: No buffer -- need to allocate fresh */
1402    if (c->stats.buffer == NULL) {
1403        nsize = 1024;
1404        available = c->stats.size = c->stats.offset = 0;
1405    }
1406
1407    while (needed > available) {
1408        assert(nsize > 0);
1409        nsize = nsize << 1;
1410        available = nsize - c->stats.offset;
1411    }
1412
1413    if (nsize != c->stats.size) {
1414        char *ptr = realloc(c->stats.buffer, nsize);
1415        if (ptr) {
1416            c->stats.buffer = ptr;
1417            c->stats.size = nsize;
1418        } else {
1419            rv = false;
1420        }
1421    }
1422
1423    return rv;
1424}
1425
1426static void append_stats(const char *key, const uint16_t klen,
1427                  const char *val, const uint32_t vlen,
1428                  const void *cookie)
1429{
1430    /* value without a key is invalid */
1431    if (klen == 0 && vlen > 0) {
1432        return ;
1433    }
1434
1435    conn *c = (conn*)cookie;
1436
1437    if (IS_BINARY(c->protocol)) {
1438        size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1439        if (!grow_stats_buf(c, needed)) {
1440            return ;
1441        }
1442        append_bin_stats(key, klen, val, vlen, c);
1443    } else {
1444        size_t needed = vlen + klen + 10; /* 10 == "STAT = \r\n" */
1445        if (!grow_stats_buf(c, needed)) {
1446            return ;
1447        }
1448        append_ascii_stats(key, klen, val, vlen, c);
1449    }
1450
1451    assert(c->stats.offset <= c->stats.size);
1452}
1453
1454void process_bin_proxy_stats(conn *c) {
1455    struct proxy_stats_cmd_info psci = {
1456        .do_info = false,
1457        .do_settings = false,
1458        .do_behaviors = false,
1459        .do_frontcache = false,
1460        .do_keystats = false,
1461        .do_stats = true,
1462        .do_zeros = true
1463    };
1464
1465    /* proxy_stats_dump_proxy_main(&append_stats, c, &psci); */
1466    proxy_stats_dump_proxies(&append_stats, c, &psci);
1467
1468    /* Append termination package and start the transfer */
1469    append_stats(NULL, 0, NULL, 0, c);
1470    if (c->stats.buffer == NULL) {
1471        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1472    } else {
1473        write_and_free(c, c->stats.buffer, c->stats.offset);
1474        c->stats.buffer = NULL;
1475    }
1476}
1477
1478static void process_bin_stat(conn *c) {
1479    char *subcommand = binary_get_key(c);
1480    size_t nkey = c->binary_header.request.keylen;
1481
1482    if (settings.verbose) {
1483        size_t ii;
1484        moxi_log_write("<%d STATS ", c->sfd);
1485        for (ii = 0; ii < nkey; ++ii) {
1486            moxi_log_write("%c", subcommand[ii]);
1487        }
1488        moxi_log_write("\n");
1489    }
1490
1491    if (nkey == 0) {
1492        /* request all statistics */
1493        server_stats(&append_stats, c, NULL);
1494        (void)get_stats(NULL, 0, &append_stats, c);
1495    } else if (strncmp(subcommand, "reset", 5) == 0) {
1496        stats_reset();
1497    } else if (strncmp(subcommand, "settings", 8) == 0) {
1498        process_stat_settings(&append_stats, c, NULL);
1499    } else if (strncmp(subcommand, "detail", 6) == 0) {
1500        char *subcmd_pos = subcommand + 6;
1501        if (strncmp(subcmd_pos, " dump", 5) == 0) {
1502            int len;
1503            char *dump_buf = stats_prefix_dump(&len);
1504            if (dump_buf == NULL || len <= 0) {
1505                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1506                return ;
1507            } else {
1508                append_stats("detailed", (uint16_t)strlen("detailed"), dump_buf, len, c);
1509                free(dump_buf);
1510            }
1511        } else if (strncmp(subcmd_pos, " on", 3) == 0) {
1512            settings.detail_enabled = 1;
1513        } else if (strncmp(subcmd_pos, " off", 4) == 0) {
1514            settings.detail_enabled = 0;
1515        } else {
1516            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1517            return;
1518        }
1519    } else {
1520        if (get_stats(subcommand, (int)nkey, &append_stats, c)) {
1521            if (c->stats.buffer == NULL) {
1522                write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1523            } else {
1524                write_and_free(c, c->stats.buffer, c->stats.offset);
1525                c->stats.buffer = NULL;
1526            }
1527        } else {
1528            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1529        }
1530
1531        return;
1532    }
1533
1534    /* Append termination package and start the transfer */
1535    append_stats(NULL, 0, NULL, 0, c);
1536    if (c->stats.buffer == NULL) {
1537        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1538    } else {
1539        write_and_free(c, c->stats.buffer, c->stats.offset);
1540        c->stats.buffer = NULL;
1541    }
1542}
1543
1544void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1545    assert(c);
1546    c->substate = next_substate;
1547    c->rlbytes = c->keylen + extra;
1548
1549    /* Ok... do we have room for the extras and the key in the input buffer? */
1550    ptrdiff_t offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1551    if (c->rlbytes > c->rsize - offset) {
1552        int nsize = c->rsize;
1553        int size = c->rlbytes + sizeof(protocol_binary_request_header);
1554
1555        while (size > nsize) {
1556            nsize *= 2;
1557        }
1558
1559        if (nsize != c->rsize) {
1560            if (settings.verbose) {
1561                moxi_log_write("%d: Need to grow buffer from %lu to %lu\n",
1562                        c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1563            }
1564            char *newm = realloc(c->rbuf, nsize);
1565            if (newm == NULL) {
1566                if (settings.verbose) {
1567                    moxi_log_write("%d: Failed to grow buffer.. closing connection\n",
1568                            c->sfd);
1569                }
1570                conn_set_state(c, conn_closing);
1571                return;
1572            }
1573
1574            c->rbuf= newm;
1575            /* rcurr should point to the same offset in the packet */
1576            c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1577            c->rsize = nsize;
1578        }
1579        if (c->rbuf != c->rcurr) {
1580            memmove(c->rbuf, c->rcurr, c->rbytes);
1581            c->rcurr = c->rbuf;
1582            if (settings.verbose) {
1583                moxi_log_write("%d: Repack input buffer\n", c->sfd);
1584            }
1585        }
1586    }
1587
1588    /* preserve the header in the buffer.. */
1589    c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1590    conn_set_state(c, conn_nread);
1591}
1592
1593void process_bin_noreply(conn *c) {
1594    assert(c);
1595    c->noreply = true;
1596    switch (c->binary_header.request.opcode) {
1597    case PROTOCOL_BINARY_CMD_SETQ:
1598        c->cmd = PROTOCOL_BINARY_CMD_SET;
1599        break;
1600    case PROTOCOL_BINARY_CMD_ADDQ:
1601        c->cmd = PROTOCOL_BINARY_CMD_ADD;
1602        break;
1603    case PROTOCOL_BINARY_CMD_REPLACEQ:
1604        c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
1605        break;
1606    case PROTOCOL_BINARY_CMD_DELETEQ:
1607        c->cmd = PROTOCOL_BINARY_CMD_DELETE;
1608        break;
1609    case PROTOCOL_BINARY_CMD_INCREMENTQ:
1610        c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
1611        break;
1612    case PROTOCOL_BINARY_CMD_DECREMENTQ:
1613        c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
1614        break;
1615    case PROTOCOL_BINARY_CMD_QUITQ:
1616        c->cmd = PROTOCOL_BINARY_CMD_QUIT;
1617        break;
1618    case PROTOCOL_BINARY_CMD_FLUSHQ:
1619        c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
1620        break;
1621    case PROTOCOL_BINARY_CMD_APPENDQ:
1622        c->cmd = PROTOCOL_BINARY_CMD_APPEND;
1623        break;
1624    case PROTOCOL_BINARY_CMD_PREPENDQ:
1625        c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
1626        break;
1627    case PROTOCOL_BINARY_CMD_GETQ:
1628        c->cmd = PROTOCOL_BINARY_CMD_GET;
1629        break;
1630    case PROTOCOL_BINARY_CMD_GETKQ:
1631        c->cmd = PROTOCOL_BINARY_CMD_GETK;
1632        break;
1633    default:
1634        c->noreply = false;
1635    }
1636}
1637
1638void dispatch_bin_command(conn *c) {
1639    int protocol_error = 0;
1640
1641    uint32_t extlen = c->binary_header.request.extlen;
1642    uint32_t keylen = c->binary_header.request.keylen;
1643    uint32_t bodylen = c->binary_header.request.bodylen;
1644
1645    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
1646
1647    process_bin_noreply(c);
1648
1649    switch (c->cmd) {
1650        case PROTOCOL_BINARY_CMD_VERSION:
1651            if (extlen == 0 && keylen == 0 && bodylen == 0) {
1652                write_bin_response(c, VERSION, 0, 0, (int)strlen(VERSION));
1653            } else {
1654                protocol_error = 1;
1655            }
1656            break;
1657        case PROTOCOL_BINARY_CMD_FLUSH:
1658            if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
1659                bin_read_key(c, bin_read_flush_exptime, extlen);
1660            } else {
1661                protocol_error = 1;
1662            }
1663            break;
1664        case PROTOCOL_BINARY_CMD_NOOP:
1665            if (extlen == 0 && keylen == 0 && bodylen == 0) {
1666                write_bin_response(c, NULL, 0, 0, 0);
1667            } else {
1668                protocol_error = 1;
1669            }
1670            break;
1671        case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
1672        case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
1673        case PROTOCOL_BINARY_CMD_REPLACE:
1674            if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
1675                bin_read_key(c, bin_reading_set_header, 8);
1676            } else {
1677                protocol_error = 1;
1678            }
1679            break;
1680        case PROTOCOL_BINARY_CMD_GETQ:  /* FALLTHROUGH */
1681        case PROTOCOL_BINARY_CMD_GET:   /* FALLTHROUGH */
1682        case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
1683        case PROTOCOL_BINARY_CMD_GETK:
1684        case PROTOCOL_BINARY_CMD_GETL:
1685            if (extlen == 0 && bodylen == keylen && keylen > 0) {
1686                bin_read_key(c, bin_reading_get_key, 0);
1687            } else {
1688                protocol_error = 1;
1689            }
1690            break;
1691        case PROTOCOL_BINARY_CMD_DELETE:
1692            if (keylen > 0 && extlen == 0 && bodylen == keylen) {
1693                bin_read_key(c, bin_reading_del_header, extlen);
1694            } else {
1695                protocol_error = 1;
1696            }
1697            break;
1698        case PROTOCOL_BINARY_CMD_INCREMENT:
1699        case PROTOCOL_BINARY_CMD_DECREMENT:
1700            if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
1701                bin_read_key(c, bin_reading_incr_header, 20);
1702            } else {
1703                protocol_error = 1;
1704            }
1705            break;
1706        case PROTOCOL_BINARY_CMD_APPEND:
1707        case PROTOCOL_BINARY_CMD_PREPEND:
1708            if (keylen > 0 && extlen == 0) {
1709                bin_read_key(c, bin_reading_set_header, 0);
1710            } else {
1711                protocol_error = 1;
1712            }
1713            break;
1714        case PROTOCOL_BINARY_CMD_STAT:
1715            if (extlen == 0) {
1716                bin_read_key(c, bin_reading_stat, 0);
1717            } else {
1718                protocol_error = 1;
1719            }
1720            break;
1721        case PROTOCOL_BINARY_CMD_QUIT:
1722            if (keylen == 0 && extlen == 0 && bodylen == 0) {
1723                write_bin_response(c, NULL, 0, 0, 0);
1724                c->write_and_go = conn_closing;
1725                if (c->noreply) {
1726                    conn_set_state(c, conn_closing);
1727                }
1728            } else {
1729                protocol_error = 1;
1730            }
1731            break;
1732        default:
1733            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
1734    }
1735
1736    if (protocol_error) {
1737        /* Just write an error message and disconnect the client */
1738        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1739        if (settings.verbose) {
1740            moxi_log_write("Protocol error (opcode %02x), close connection %d\n",
1741                    c->binary_header.request.opcode, c->sfd);
1742        }
1743        c->write_and_go = conn_closing;
1744    }
1745}
1746
1747static void process_bin_update(conn *c) {
1748    char *key;
1749    int nkey;
1750    int vlen;
1751    item *it;
1752    protocol_binary_request_set* req = binary_get_request(c);
1753
1754    assert(c != NULL);
1755
1756    key = binary_get_key(c);
1757    nkey = c->binary_header.request.keylen;
1758
1759    /* fix byteorder in the request */
1760    req->message.body.flags = ntohl(req->message.body.flags);
1761    req->message.body.expiration = ntohl(req->message.body.expiration);
1762
1763    vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
1764
1765    if (settings.verbose) {
1766        int ii;
1767        if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
1768            moxi_log_write("<%d ADD ", c->sfd);
1769        } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
1770            moxi_log_write("<%d SET ", c->sfd);
1771        } else {
1772            moxi_log_write("<%d REPLACE ", c->sfd);
1773        }
1774        for (ii = 0; ii < nkey; ++ii) {
1775            moxi_log_write("%c", key[ii]);
1776        }
1777
1778        if (settings.verbose > 1) {
1779            moxi_log_write(" Value len is %d", vlen);
1780        }
1781        moxi_log_write("\n");
1782    }
1783
1784    if (settings.detail_enabled) {
1785        stats_prefix_record_set(key, nkey);
1786    }
1787
1788    it = item_alloc(key, nkey, req->message.body.flags,
1789            c->funcs->conn_realtime(req->message.body.expiration), vlen+2);
1790
1791    if (it == 0) {
1792        if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) {
1793            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
1794        } else {
1795            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1796        }
1797
1798        /* Avoid stale data persisting in cache because we failed alloc.
1799         * Unacceptable for SET. Anywhere else too? */
1800        if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
1801            it = item_get(key, nkey);
1802            if (it) {
1803                item_unlink(it);
1804                item_remove(it);
1805            }
1806        }
1807
1808        /* swallow the data line */
1809        c->write_and_go = conn_swallow;
1810        return;
1811    }
1812
1813    ITEM_set_cas(it, c->binary_header.request.cas);
1814
1815    switch (c->cmd) {
1816        case PROTOCOL_BINARY_CMD_ADD:
1817            c->cmd = NREAD_ADD;
1818            break;
1819        case PROTOCOL_BINARY_CMD_SET:
1820            c->cmd = NREAD_SET;
1821            break;
1822        case PROTOCOL_BINARY_CMD_REPLACE:
1823            c->cmd = NREAD_REPLACE;
1824            break;
1825        default:
1826            assert(0);
1827    }
1828
1829    if (ITEM_get_cas(it) != 0) {
1830        c->cmd = NREAD_CAS;
1831    }
1832
1833    c->item = it;
1834    c->ritem = ITEM_data(it);
1835    c->rlbytes = vlen;
1836    conn_set_state(c, conn_nread);
1837    c->substate = bin_read_set_value;
1838}
1839
1840static void process_bin_append_prepend(conn *c) {
1841    char *key;
1842    int nkey;
1843    int vlen;
1844    item *it;
1845
1846    assert(c != NULL);
1847
1848    key = binary_get_key(c);
1849    nkey = c->binary_header.request.keylen;
1850    vlen = c->binary_header.request.bodylen - nkey;
1851
1852    if (settings.verbose > 1) {
1853        moxi_log_write("Value len is %d\n", vlen);
1854    }
1855
1856    if (settings.detail_enabled) {
1857        stats_prefix_record_set(key, nkey);
1858    }
1859
1860    it = item_alloc(key, nkey, 0, 0, vlen+2);
1861
1862    if (it == 0) {
1863        if (! item_size_ok(nkey, 0, vlen + 2)) {
1864            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
1865        } else {
1866            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1867        }
1868        /* swallow the data line */
1869        c->write_and_go = conn_swallow;
1870        return;
1871    }
1872
1873    ITEM_set_cas(it, c->binary_header.request.cas);
1874
1875    switch (c->cmd) {
1876        case PROTOCOL_BINARY_CMD_APPEND:
1877            c->cmd = NREAD_APPEND;
1878            break;
1879        case PROTOCOL_BINARY_CMD_PREPEND:
1880            c->cmd = NREAD_PREPEND;
1881            break;
1882        default:
1883            assert(0);
1884    }
1885
1886    c->item = it;
1887    c->ritem = ITEM_data(it);
1888    c->rlbytes = vlen;
1889    conn_set_state(c, conn_nread);
1890    c->substate = bin_read_set_value;
1891}
1892
1893static void process_bin_flush(conn *c) {
1894    time_t exptime = 0;
1895    protocol_binary_request_flush* req = binary_get_request(c);
1896
1897    if (c->binary_header.request.extlen == sizeof(req->message.body)) {
1898        exptime = ntohl(req->message.body.expiration);
1899    }
1900
1901    set_current_time();
1902
1903    if (exptime > 0) {
1904        settings.oldest_live = c->funcs->conn_realtime(exptime) - 1;
1905    } else {
1906        settings.oldest_live = current_time - 1;
1907    }
1908    item_flush_expired();
1909
1910    cb_mutex_enter(&c->thread->stats.mutex);
1911    c->thread->stats.flush_cmds++;
1912    cb_mutex_exit(&c->thread->stats.mutex);
1913
1914    write_bin_response(c, NULL, 0, 0, 0);
1915}
1916
1917static void process_bin_delete(conn *c) {
1918    item *it;
1919
1920    protocol_binary_request_delete* req = binary_get_request(c);
1921
1922    char* key = binary_get_key(c);
1923    size_t nkey = c->binary_header.request.keylen;
1924
1925    assert(c != NULL);
1926
1927    if (settings.verbose) {
1928        moxi_log_write("Deleting %s\n", key);
1929    }
1930
1931    if (settings.detail_enabled) {
1932        stats_prefix_record_delete(key, nkey);
1933    }
1934
1935    it = item_get(key, nkey);
1936    if (it) {
1937        uint64_t cas=mc_swap64(req->message.header.request.cas);
1938        if (cas == 0 || cas == ITEM_get_cas(it)) {
1939            MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
1940            item_unlink(it);
1941            write_bin_response(c, NULL, 0, 0, 0);
1942        } else {
1943            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1944        }
1945        item_remove(it);      /* release our reference */
1946    } else {
1947        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1948    }
1949}
1950
1951void complete_nread_binary(conn *c) {
1952    assert(c != NULL);
1953    assert(c->cmd >= 0);
1954
1955    switch(c->substate) {
1956    case bin_reading_set_header:
1957        if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
1958                c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
1959            process_bin_append_prepend(c);
1960        } else {
1961            process_bin_update(c);
1962        }
1963        break;
1964    case bin_read_set_value:
1965        complete_update_bin(c);
1966        break;
1967    case bin_reading_get_key:
1968        process_bin_get(c);
1969        break;
1970    case bin_reading_stat:
1971        process_bin_stat(c);
1972        break;
1973    case bin_reading_del_header:
1974        process_bin_delete(c);
1975        break;
1976    case bin_reading_incr_header:
1977        complete_incr_bin(c);
1978        break;
1979    case bin_read_flush_exptime:
1980        process_bin_flush(c);
1981        break;
1982    default:
1983        moxi_log_write("Not handling substate %d\n", c->substate);
1984        assert(0);
1985    }
1986}
1987
1988void reset_cmd_handler(conn *c) {
1989    c->cmd = -1;
1990    c->cmd_curr = -1;
1991    c->substate = bin_no_state;
1992    conn_cleanup(c);
1993    conn_shrink(c);
1994    if (c->rbytes > 0) {
1995        conn_set_state(c, conn_parse_cmd);
1996    } else {
1997        conn_set_state(c, conn_waiting);
1998    }
1999}
2000
2001void complete_nread(conn *c) {
2002    assert(c != NULL);
2003    assert(c->funcs != NULL);
2004
2005    if (IS_ASCII(c->protocol)) {
2006        c->funcs->conn_complete_nread_ascii(c);
2007    } else if (IS_BINARY(c->protocol)) {
2008        c->funcs->conn_complete_nread_binary(c);
2009    }
2010}
2011
2012/*
2013 * Stores an item in the cache according to the semantics of one of the set
2014 * commands. In threaded mode, this is protected by the cache lock.
2015 *
2016 * Returns the state of storage.
2017 */
2018enum store_item_type do_store_item(item *it, int comm, conn *c) {
2019    char *key = ITEM_key(it);
2020    item *old_it = do_item_get(key, it->nkey);
2021    enum store_item_type stored = NOT_STORED;
2022
2023    item *new_it = NULL;
2024    int flags;
2025
2026    if (old_it != NULL && comm == NREAD_ADD) {
2027        /* add only adds a nonexistent item, but promote to head of LRU */
2028        do_item_update(old_it);
2029    } else if (!old_it && (comm == NREAD_REPLACE
2030        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
2031    {
2032        /* replace only replaces an existing value; don't store */
2033    } else if (comm == NREAD_CAS) {
2034        /* validate cas operation */
2035        if(old_it == NULL) {
2036            /* LRU expired */
2037            stored = NOT_FOUND;
2038            cb_mutex_enter(&c->thread->stats.mutex);
2039            c->thread->stats.cas_misses++;
2040            cb_mutex_exit(&c->thread->stats.mutex);
2041        }
2042        else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
2043            /* cas validates */
2044            /* it and old_it may belong to different classes. */
2045            /* I'm updating the stats for the one that's getting pushed out */
2046            cb_mutex_enter(&c->thread->stats.mutex);
2047            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
2048            cb_mutex_exit(&c->thread->stats.mutex);
2049
2050            item_replace(old_it, it);
2051            stored = STORED;
2052        } else {
2053            cb_mutex_enter(&c->thread->stats.mutex);
2054            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
2055            cb_mutex_exit(&c->thread->stats.mutex);
2056
2057            if(settings.verbose > 1) {
2058                moxi_log_write("CAS:  failure: expected %llu, got %llu\n",
2059                        (unsigned long long)ITEM_get_cas(old_it),
2060                        (unsigned long long)ITEM_get_cas(it));
2061            }
2062            stored = EXISTS;
2063        }
2064    } else {
2065        /*
2066         * Append - combine new and old record into single one. Here it's
2067         * atomic and thread-safe.
2068         */
2069        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
2070            /*
2071             * Validate CAS
2072             */
2073            if (ITEM_get_cas(it) != 0) {
2074                /* CAS much be equal */
2075                if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
2076                    stored = EXISTS;
2077                }
2078            }
2079
2080            if (stored == NOT_STORED) {
2081                /* we have it and old_it here - alloc memory to hold both */
2082                /* flags was already lost - so recover them from ITEM_suffix(it) */
2083
2084                flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
2085
2086                new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
2087
2088                if (new_it == NULL) {
2089                    /* SERVER_ERROR out of memory */
2090                    if (old_it != NULL)
2091                        do_item_remove(old_it);
2092
2093                    return NOT_STORED;
2094                }
2095
2096                /* copy data from it and old_it to new_it */
2097
2098                if (comm == NREAD_APPEND) {
2099                    memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
2100                    memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
2101                } else {
2102                    /* NREAD_PREPEND */
2103                    memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
2104                    memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
2105                }
2106
2107                it = new_it;
2108            }
2109        }
2110
2111        if (stored == NOT_STORED) {
2112            if (old_it != NULL)
2113                item_replace(old_it, it);
2114            else
2115                do_item_link(it);
2116
2117            c->cas = ITEM_get_cas(it);
2118
2119            stored = STORED;
2120        }
2121    }
2122
2123    if (old_it != NULL)
2124        do_item_remove(old_it);         /* release our reference */
2125    if (new_it != NULL)
2126        do_item_remove(new_it);
2127
2128    if (stored == STORED) {
2129        c->cas = ITEM_get_cas(it);
2130    }
2131
2132    return stored;
2133}
2134
2135#define COMMAND_TOKEN 0
2136#define SUBCOMMAND_TOKEN 1
2137#define KEY_TOKEN 1
2138
2139#define MAX_TOKENS 8
2140
2141/*
2142 * Tokenize the command string by replacing whitespace with '\0' and update
2143 * the token array tokens with pointer to start of each token and length.
2144 * Returns total number of tokens.  The last valid token is the terminal
2145 * token (value points to the first unprocessed character of the string and
2146 * length zero).
2147 *
2148 * Usage example:
2149 *
2150 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2151 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
2152 *          ...
2153 *      }
2154 *      ncommand = tokens[ix].value - command;
2155 *      command  = tokens[ix].value;
2156 *   }
2157 */
2158size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
2159    char *s, *e;
2160    size_t ntokens = 0;
2161
2162    assert(command != NULL && tokens != NULL && max_tokens > 1);
2163
2164    for (s = e = command; ntokens < max_tokens - 1; ++e) {
2165        if (*e == ' ') {
2166            if (s != e) {
2167                tokens[ntokens].value = s;
2168                tokens[ntokens].length = e - s;
2169                ntokens++;
2170                *e = '\0';
2171            }
2172            s = e + 1;
2173        }
2174        else if (*e == '\0') {
2175            if (s != e) {
2176                tokens[ntokens].value = s;
2177                tokens[ntokens].length = e - s;
2178                ntokens++;
2179            }
2180
2181            break; /* string end */
2182        }
2183    }
2184
2185    /*
2186     * If we scanned the whole string, the terminal value pointer is null,
2187     * otherwise it is the first unprocessed character.
2188     */
2189    tokens[ntokens].value =  *e == '\0' ? NULL : e;
2190    tokens[ntokens].length = 0;
2191    ntokens++;
2192
2193    return ntokens;
2194}
2195
2196/* set up a connection to write a buffer then free it, used for stats */
2197static void write_and_free(conn *c, char *buf, size_t bytes) {
2198    if (buf) {
2199        c->write_and_free = buf;
2200        c->wcurr = buf;
2201        c->wbytes = (int)bytes;
2202        conn_set_state(c, conn_write);
2203        c->write_and_go = conn_new_cmd;
2204    } else {
2205        out_string(c, "SERVER_ERROR out of memory writing stats");
2206    }
2207}
2208
2209void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens) {
2210    int noreply_index = (int)ntokens - 2;
2211
2212    assert(noreply_index >= 0);
2213
2214    /*
2215      NOTE: this function is not the first place where we are going to
2216      send the reply.  We could send it instead from process_command()
2217      if the request line has wrong number of tokens.  However parsing
2218      malformed line for "noreply" option is not reliable anyway, so
2219      it can't be helped.
2220    */
2221    if (tokens[noreply_index].value
2222        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
2223        c->noreply = true;
2224    }
2225}
2226
2227void append_stat(const char *name, ADD_STAT add_stats, void *c,
2228                 const char *fmt, ...) {
2229    char val_str[STAT_VAL_LEN];
2230    int vlen;
2231    va_list ap;
2232
2233    assert(name);
2234    assert(add_stats);
2235    assert(c);
2236    assert(fmt);
2237
2238    va_start(ap, fmt);
2239    vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
2240    va_end(ap);
2241
2242    add_stats(name, (uint16_t)strlen(name), val_str, vlen, c);
2243}
2244
2245void append_prefix_stat(const char *prefix, const char *name, ADD_STAT add_stats, void *c,
2246                        const char *fmt, ...) {
2247    char val_str[STAT_VAL_LEN];
2248    int vlen;
2249    char *val_free = 0;
2250    char *val;
2251    va_list ap;
2252
2253    assert(name);
2254    assert(add_stats);
2255    assert(c);
2256    assert(fmt);
2257
2258    va_start(ap, fmt);
2259    vlen = vsnprintf(val_str, sizeof(val_str), fmt, ap);
2260    va_end(ap);
2261
2262    if (vlen > (int)sizeof(val_str)-1) {
2263        val_free = malloc(vlen+1);
2264        if (val_free != 0) {
2265            val = val_free;
2266            va_start(ap, fmt);
2267            vsnprintf(val_free, vlen+1, fmt, ap);
2268            va_end(ap);
2269        } else {
2270            val = val_str;
2271            vlen = sizeof(val_str)-1;
2272        }
2273    } else {
2274        val = val_str;
2275    }
2276
2277    if (prefix == NULL) {
2278        add_stats(name, (uint16_t)strlen(name), val, vlen, c);
2279    } else {
2280        char key_str[STAT_KEY_LEN];
2281        strcpy(key_str, prefix); strcat(key_str, name);
2282        add_stats(key_str, (uint16_t)strlen(key_str), val, vlen, c);
2283    }
2284
2285    free(val_free);
2286}
2287
2288static void process_stats_detail(conn *c, const char *command) {
2289    assert(c != NULL);
2290
2291    if (strcmp(command, "on") == 0) {
2292        settings.detail_enabled = 1;
2293        out_string(c, "OK");
2294    }
2295    else if (strcmp(command, "off") == 0) {
2296        settings.detail_enabled = 0;
2297        out_string(c, "OK");
2298    }
2299    else if (strcmp(command, "dump") == 0) {
2300        int len;
2301        char *stats_dump = stats_prefix_dump(&len);
2302        write_and_free(c, stats_dump, len);
2303    }
2304    else {
2305        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
2306    }
2307}
2308
2309/* return server specific stats only */
2310void server_stats(ADD_STAT add_stats, void *c, const char *prefix) {
2311#ifndef _MSC_VER
2312    pid_t pid = getpid();
2313#endif
2314    rel_time_t now = current_time;
2315
2316    struct thread_stats thread_stats;
2317    threadlocal_stats_aggregate(&thread_stats);
2318    struct slab_stats slab_stats;
2319    slab_stats_aggregate(&thread_stats, &slab_stats);
2320
2321#ifndef WIN32
2322    struct rusage usage;
2323    getrusage(RUSAGE_SELF, &usage);
2324#endif /* !WIN32 */
2325
2326    STATS_LOCK();
2327#ifndef _MSC_VER
2328    APPEND_PREFIX_STAT("pid", "%lu", (long)pid);
2329#endif
2330    APPEND_PREFIX_STAT("uptime", "%u", now);
2331    APPEND_PREFIX_STAT("time", "%ld", now + (long)process_started);
2332    APPEND_PREFIX_STAT("version", "%s", VERSION);
2333    APPEND_PREFIX_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2334
2335#ifndef WIN32
2336    if (1) {
2337        char rusage_buf[128];
2338        snprintf(rusage_buf, sizeof(rusage_buf),  "%ld.%06ld",
2339                 (long)usage.ru_utime.tv_sec, (long)usage.ru_utime.tv_usec);
2340        APPEND_PREFIX_STAT("rusage_user", "%s", rusage_buf);
2341        snprintf(rusage_buf, sizeof(rusage_buf),  "%ld.%06ld",
2342                 (long)usage.ru_stime.tv_sec, (long)usage.ru_stime.tv_usec);
2343        APPEND_PREFIX_STAT("rusage_system", "%s", rusage_buf);
2344    }
2345#endif
2346
2347    APPEND_PREFIX_STAT("curr_connections", "%u", stats.curr_conns - 1);
2348    APPEND_PREFIX_STAT("total_connections", "%u", stats.total_conns);
2349    APPEND_PREFIX_STAT("connection_structures", "%u", stats.conn_structs);
2350    APPEND_PREFIX_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
2351    APPEND_PREFIX_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
2352    APPEND_PREFIX_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
2353    APPEND_PREFIX_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
2354    APPEND_PREFIX_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
2355    APPEND_PREFIX_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
2356    APPEND_PREFIX_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
2357    APPEND_PREFIX_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
2358    APPEND_PREFIX_STAT("incr_hits", "%llu", (unsigned long long)slab_stats.incr_hits);
2359    APPEND_PREFIX_STAT("decr_misses", "%llu", (unsigned long long)thread_stats.decr_misses);
2360    APPEND_PREFIX_STAT("decr_hits", "%llu", (unsigned long long)slab_stats.decr_hits);
2361    APPEND_PREFIX_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
2362    APPEND_PREFIX_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
2363    APPEND_PREFIX_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
2364    APPEND_PREFIX_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
2365    APPEND_PREFIX_STAT("bytes_written", "%llu", (unsigned long long)thread_stats.bytes_written);
2366    APPEND_PREFIX_STAT("limit_maxbytes", "%llu", (unsigned long long)settings.maxbytes);
2367    APPEND_PREFIX_STAT("accepting_conns", "%u", stats.accepting_conns);
2368    APPEND_PREFIX_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
2369    APPEND_PREFIX_STAT("threads", "%d", settings.num_threads);
2370    APPEND_PREFIX_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
2371
2372    STATS_UNLOCK();
2373}
2374
2375void process_stat_settings(ADD_STAT add_stats, void *c, const char *prefix) {
2376    assert(add_stats);
2377    APPEND_PREFIX_STAT("maxbytes", "%u", (unsigned int)settings.maxbytes);
2378    APPEND_PREFIX_STAT("maxconns", "%d", settings.maxconns);
2379    APPEND_PREFIX_STAT("tcpport", "%d", settings.port);
2380    APPEND_PREFIX_STAT("udpport", "%d", settings.udpport);
2381    APPEND_PREFIX_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
2382    APPEND_PREFIX_STAT("verbosity", "%d", settings.verbose);
2383    APPEND_PREFIX_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
2384    APPEND_PREFIX_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
2385    APPEND_PREFIX_STAT("domain_socket", "%s",
2386                settings.socketpath ? settings.socketpath : "NULL");
2387    APPEND_PREFIX_STAT("umask", "%o", settings.access);
2388    APPEND_PREFIX_STAT("growth_factor", "%.2f", settings.factor);
2389    APPEND_PREFIX_STAT("chunk_size", "%d", settings.chunk_size);
2390    APPEND_PREFIX_STAT("num_threads", "%d", settings.num_threads);
2391    APPEND_PREFIX_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
2392    APPEND_PREFIX_STAT("detail_enabled", "%s",
2393                settings.detail_enabled ? "yes" : "no");
2394    APPEND_PREFIX_STAT("reqs_per_event", "%d", settings.reqs_per_event);
2395    APPEND_PREFIX_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
2396    APPEND_PREFIX_STAT("tcp_backlog", "%d", settings.backlog);
2397    APPEND_PREFIX_STAT("binding_protocol", "%s",
2398                prot_text(settings.binding_protocol));
2399}
2400
2401static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
2402    const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
2403    assert(c != NULL);
2404
2405    if (ntokens < 2) {
2406        out_string(c, "CLIENT_ERROR bad command line");
2407        return;
2408    }
2409
2410    if (ntokens == 2) {
2411        server_stats(&append_stats, c, NULL);
2412        (void)get_stats(NULL, 0, &append_stats, c);
2413    } else if (strcmp(subcommand, "reset") == 0) {
2414        stats_reset();
2415        out_string(c, "RESET");
2416        return ;
2417    } else if (strcmp(subcommand, "detail") == 0) {
2418        /* NOTE: how to tackle detail with binary? */
2419        if (ntokens < 4)
2420            process_stats_detail(c, "");  /* outputs the error message */
2421        else
2422            process_stats_detail(c, tokens[2].value);
2423        /* Output already generated */
2424        return ;
2425    } else if (strcmp(subcommand, "settings") == 0) {
2426        process_stat_settings(&append_stats, c, NULL);
2427    } else if (strcmp(subcommand, "cachedump") == 0) {
2428        char *buf;
2429        unsigned int bytes, id, limit = 0;
2430
2431        if (ntokens < 5) {
2432            out_string(c, "CLIENT_ERROR bad command line");
2433            return;
2434        }
2435
2436        if (!safe_strtoul(tokens[2].value, &id) ||
2437            !safe_strtoul(tokens[3].value, &limit)) {
2438            out_string(c, "CLIENT_ERROR bad command line format");
2439            return;
2440        }
2441
2442        buf = item_cachedump(id, limit, &bytes);
2443        write_and_free(c, buf, bytes);
2444        return ;
2445    } else {
2446        /* getting here means that the subcommand is either engine specific or
2447           is invalid. query the engine and see. */
2448        if (get_stats(subcommand, (int)strlen(subcommand), &append_stats, c)) {
2449            if (c->stats.buffer == NULL) {
2450                out_string(c, "SERVER_ERROR out of memory writing stats");
2451            } else {
2452                write_and_free(c, c->stats.buffer, c->stats.offset);
2453                c->stats.buffer = NULL;
2454            }
2455        } else {
2456            out_string(c, "ERROR");
2457        }
2458        return ;
2459    }
2460
2461    /* append terminator and start the transfer */
2462    append_stats(NULL, 0, NULL, 0, c);
2463
2464    if (c->stats.buffer == NULL) {
2465        out_string(c, "SERVER_ERROR out of memory writing stats");
2466    } else {
2467        write_and_free(c, c->stats.buffer, c->stats.offset);
2468        c->stats.buffer = NULL;
2469    }
2470}
2471
2472/* ntokens is overwritten here... shrug.. */
2473static void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
2474    char *key;
2475    size_t nkey;
2476    int i = 0, sid = 0;
2477    item *it;
2478    token_t *key_token = &tokens[KEY_TOKEN];
2479    char *suffix;
2480    int stats_get_cmds   = 0;
2481    int stats_get_misses = 0;
2482    int stats_get_hits[MAX_NUMBER_OF_SLAB_CLASSES];
2483    assert(c != NULL);
2484
2485    memset(&stats_get_hits, 0, sizeof(stats_get_hits));
2486
2487    do {
2488        while(key_token->length != 0) {
2489
2490            key = key_token->value;
2491            nkey = key_token->length;
2492
2493            if(nkey > KEY_MAX_LENGTH) {
2494                cb_mutex_enter(&c->thread->stats.mutex);
2495                c->thread->stats.get_cmds   += stats_get_cmds;
2496                c->thread->stats.get_misses += stats_get_misses;
2497                for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2498                    c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2499                }
2500                cb_mutex_exit(&c->thread->stats.mutex);
2501                out_string(c, "CLIENT_ERROR bad command line format");
2502                return;
2503            }
2504
2505            stats_get_cmds++;
2506            it = item_get(key, nkey);
2507            if (settings.detail_enabled) {
2508                stats_prefix_record_get(key, nkey, NULL != it);
2509            }
2510            if (it) {
2511                if (i >= c->isize) {
2512                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
2513                    if (new_list) {
2514                        c->isize *= 2;
2515                        c->ilist = new_list;
2516                    } else {
2517                        item_remove(it);
2518                        break;
2519                    }
2520                }
2521
2522                /*
2523                 * Construct the response. Each hit adds three elements to the
2524                 * outgoing data list:
2525                 *   "VALUE "
2526                 *   key
2527                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
2528                 */
2529
2530                if (return_cas)
2531                {
2532                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2533                                        it->nbytes, ITEM_get_cas(it));
2534                  /* Goofy mid-flight realloc. */
2535                  if (i >= c->suffixsize) {
2536                    char **new_suffix_list = realloc(c->suffixlist,
2537                                           sizeof(char *) * c->suffixsize * 2);
2538                    if (new_suffix_list) {
2539                        c->suffixsize *= 2;
2540                        c->suffixlist  = new_suffix_list;
2541                    } else {
2542                        item_remove(it);
2543                        break;
2544                    }
2545                  }
2546
2547                  suffix = cache_alloc(c->thread->suffix_cache);
2548                  if (suffix == NULL) {
2549                    cb_mutex_enter(&c->thread->stats.mutex);
2550                    c->thread->stats.get_cmds   += stats_get_cmds;
2551                    c->thread->stats.get_misses += stats_get_misses;
2552                    for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2553                        c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2554                    }
2555                    cb_mutex_exit(&c->thread->stats.mutex);
2556                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
2557                    item_remove(it);
2558                    return;
2559                  }
2560                  *(c->suffixlist + i) = suffix;
2561                  int suffix_len = snprintf(suffix, SUFFIX_SIZE,
2562                                            " %llu\r\n",
2563                                            (unsigned long long)ITEM_get_cas(it));
2564                  if (add_iov(c, "VALUE ", 6) != 0 ||
2565                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2566                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
2567                      add_iov(c, suffix, suffix_len) != 0 ||
2568                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
2569                      {
2570                          item_remove(it);
2571                          break;
2572                      }
2573                }
2574                else
2575                {
2576                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2577                                        it->nbytes, ITEM_get_cas(it));
2578                  if (add_iov(c, "VALUE ", 6) != 0 ||
2579                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2580                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
2581                      {
2582                          item_remove(it);
2583                          break;
2584                      }
2585                }
2586
2587
2588                if (settings.verbose > 1)
2589                    moxi_log_write(">%d sending key %s\n", c->sfd, ITEM_key(it));
2590
2591                /* item_get() has incremented it->refcount for us */
2592                stats_get_hits[it->slabs_clsid]++;
2593                item_update(it);
2594                *(c->ilist + i) = it;
2595                i++;
2596
2597            } else {
2598                stats_get_misses++;
2599                MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
2600            }
2601
2602            key_token++;
2603        }
2604
2605        /*
2606         * If the command string hasn't been fully processed, get the next set
2607         * of tokens.
2608         */
2609        if(key_token->value != NULL) {
2610            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
2611            key_token = tokens;
2612        }
2613
2614    } while(key_token->value != NULL);
2615
2616    c->icurr = c->ilist;
2617    c->ileft = i;
2618    if (return_cas) {
2619        c->suffixcurr = c->suffixlist;
2620        c->suffixleft = i;
2621    }
2622
2623    if (settings.verbose > 1)
2624        moxi_log_write(">%d END\n", c->sfd);
2625
2626    /*
2627        If the loop was terminated because of out-of-memory, it is not
2628        reliable to add END\r\n to the buffer, because it might not end
2629        in \r\n. So we send SERVER_ERROR instead.
2630    */
2631    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
2632        || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
2633        out_string(c, "SERVER_ERROR out of memory writing get response");
2634    }
2635    else {
2636        conn_set_state(c, conn_mwrite);
2637        c->write_and_go = conn_new_cmd;
2638        c->msgcurr = 0;
2639    }
2640
2641    cb_mutex_enter(&c->thread->stats.mutex);
2642    c->thread->stats.get_cmds   += stats_get_cmds;
2643    c->thread->stats.get_misses += stats_get_misses;
2644    for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
2645        c->thread->stats.slab_stats[sid].get_hits += stats_get_hits[sid];
2646    }
2647    cb_mutex_exit(&c->thread->stats.mutex);
2648
2649    return;
2650}
2651
2652void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
2653    char *key;
2654    size_t nkey;
2655    unsigned int flags;
2656    int32_t exptime_int = 0;
2657    time_t exptime;
2658    int vlen;
2659    uint64_t req_cas_id=0;
2660    item *it;
2661
2662    assert(c != NULL);
2663
2664    set_noreply_maybe(c, tokens, ntokens);
2665
2666    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2667        out_string(c, "CLIENT_ERROR bad command line format");
2668        return;
2669    }
2670
2671    key = tokens[KEY_TOKEN].value;
2672    nkey = tokens[KEY_TOKEN].length;
2673
2674    if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
2675           && safe_strtol(tokens[3].value, &exptime_int)
2676           && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
2677        out_string(c, "CLIENT_ERROR bad command line format");
2678        return;
2679    }
2680
2681    /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2682    exptime = exptime_int;
2683
2684    /* does cas value exist? */
2685    if (handle_cas) {
2686        if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
2687            out_string(c, "CLIENT_ERROR bad command line format");
2688            return;
2689        }
2690    }
2691
2692    vlen += 2;
2693    if (vlen < 0 || vlen - 2 < 0) {
2694        out_string(c, "CLIENT_ERROR bad command line format");
2695        return;
2696    }
2697
2698    if (settings.detail_enabled) {
2699        stats_prefix_record_set(key, nkey);
2700    }
2701
2702    it = item_alloc(key, nkey, flags, c->funcs->conn_realtime(exptime), vlen);
2703
2704    if (it == 0) {
2705        if (! item_size_ok(nkey, flags, vlen))
2706            out_string(c, "SERVER_ERROR object too large for cache");
2707        else
2708            out_string(c, "SERVER_ERROR out of memory storing object");
2709        /* swallow the data line */
2710        c->write_and_go = conn_swallow;
2711        c->sbytes = vlen;
2712
2713        /* Avoid stale data persisting in cache because we failed alloc.
2714         * Unacceptable for SET. Anywhere else too? */
2715        if (comm == NREAD_SET) {
2716            it = item_get(key, nkey);
2717            if (it) {
2718                item_unlink(it);
2719                item_remove(it);
2720            }
2721        }
2722
2723        return;
2724    }
2725    ITEM_set_cas(it, req_cas_id);
2726
2727    c->item = it;
2728    c->ritem = ITEM_data(it);
2729    c->rlbytes = it->nbytes;
2730    c->cmd = comm;
2731    conn_set_state(c, conn_nread);
2732}
2733
2734static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2735    char temp[INCR_MAX_STORAGE_LEN];
2736    item *it;
2737    uint64_t delta;
2738    char *key;
2739    size_t nkey;
2740
2741    assert(c != NULL);
2742
2743    set_noreply_maybe(c, tokens, ntokens);
2744
2745    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2746        out_string(c, "CLIENT_ERROR bad command line format");
2747        return;
2748    }
2749
2750    key = tokens[KEY_TOKEN].value;
2751    nkey = tokens[KEY_TOKEN].length;
2752
2753    if (!safe_strtoull(tokens[2].value, &delta)) {
2754        out_string(c, "CLIENT_ERROR invalid numeric delta argument");
2755        return;
2756    }
2757
2758    it = item_get(key, nkey);
2759    if (!it) {
2760        cb_mutex_enter(&c->thread->stats.mutex);
2761        if (incr) {
2762            c->thread->stats.incr_misses++;
2763        } else {
2764            c->thread->stats.decr_misses++;
2765        }
2766        cb_mutex_exit(&c->thread->stats.mutex);
2767
2768        out_string(c, "NOT_FOUND");
2769        return;
2770    }
2771
2772    switch(add_delta(c, it, incr, delta, temp)) {
2773    case OK:
2774        out_string(c, temp);
2775        break;
2776    case NON_NUMERIC:
2777        out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
2778        break;
2779    case EOM:
2780        out_string(c, "SERVER_ERROR out of memory");
2781        break;
2782    }
2783    item_remove(it);         /* release our reference */
2784}
2785
2786/*
2787 * adds a delta value to a numeric item.
2788 *
2789 * c     connection requesting the operation
2790 * it    item to adjust
2791 * incr  true to increment value, false to decrement
2792 * delta amount to adjust value by
2793 * buf   buffer for response string
2794 *
2795 * returns a response string to send back to the client.
2796 */
2797enum delta_result_type do_add_delta(conn *c, item *it, const bool incr,
2798                                    const int64_t delta, char *buf) {
2799    char *ptr;
2800    int64_t value;
2801    int res;
2802
2803    ptr = ITEM_data(it);
2804
2805    if (!safe_strtoull(ptr, (uint64_t *)&value)) {
2806        return NON_NUMERIC;
2807    }
2808
2809    if (incr) {
2810        value += delta;
2811        MEMCACHED_COMMAND_INCR(c->sfd, ITEM_key(it), it->nkey, value);
2812    } else {
2813        if(delta > value) {
2814            value = 0;
2815        } else {
2816            value -= delta;
2817        }
2818        MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
2819    }
2820
2821    cb_mutex_enter(&c->thread->stats.mutex);
2822    if (incr) {
2823        c->thread->stats.slab_stats[it->slabs_clsid].incr_hits++;
2824    } else {
2825        c->thread->stats.slab_stats[it->slabs_clsid].decr_hits++;
2826    }
2827    cb_mutex_exit(&c->thread->stats.mutex);
2828
2829    snprintf(buf, INCR_MAX_STORAGE_LEN, "%llu", (unsigned long long)value);
2830    res = (int)strlen(buf);
2831    if (res + 2 > it->nbytes) { /* need to realloc */
2832        item *new_it;
2833        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2834        if (new_it == 0) {
2835            return EOM;
2836        }
2837        memcpy(ITEM_data(new_it), buf, res);
2838        memcpy(ITEM_data(new_it) + res, "\r\n", 2);
2839        item_replace(it, new_it);
2840        do_item_remove(new_it);       /* release our reference */
2841    } else { /* replace in-place */
2842        /* When changing the value without replacing the item, we
2843           need to update the CAS on the existing item. */
2844        ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
2845
2846        memcpy(ITEM_data(it), buf, res);
2847        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2848    }
2849
2850    return OK;
2851}
2852
2853static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2854    char *key;
2855    size_t nkey;
2856    item *it;
2857
2858    assert(c != NULL);
2859
2860    set_noreply_maybe(c, tokens, ntokens);
2861
2862    key = tokens[KEY_TOKEN].value;
2863    nkey = tokens[KEY_TOKEN].length;
2864
2865    if(nkey > KEY_MAX_LENGTH) {
2866        out_string(c, "CLIENT_ERROR bad command line format");
2867        return;
2868    }
2869
2870    if (settings.detail_enabled) {
2871        stats_prefix_record_delete(key, nkey);
2872    }
2873
2874    it = item_get(key, nkey);
2875    if (it) {
2876        MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
2877
2878        cb_mutex_enter(&c->thread->stats.mutex);
2879        c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
2880        cb_mutex_exit(&c->thread->stats.mutex);
2881
2882        item_unlink(it);
2883        item_remove(it);      /* release our reference */
2884        out_string(c, "DELETED");
2885    } else {
2886        cb_mutex_enter(&c->thread->stats.mutex);
2887        c->thread->stats.delete_misses++;
2888        cb_mutex_exit(&c->thread->stats.mutex);
2889
2890        out_string(c, "NOT_FOUND");
2891    }
2892}
2893
2894void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2895    unsigned int level;
2896
2897    assert(c != NULL);
2898
2899    set_noreply_maybe(c, tokens, ntokens);
2900    if (c->noreply && ntokens == 3) {
2901        /* "verbosity noreply" is not according to the correct syntax */
2902        c->noreply = false;
2903        out_string(c, "ERROR");
2904        return;
2905    }
2906
2907    if (safe_strtoul(tokens[1].value, &level)) {
2908        settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2909        out_string(c, "OK");
2910    } else {
2911        out_string(c, "ERROR");
2912    }
2913}
2914
2915void process_command(conn *c, char *command) {
2916
2917    token_t tokens[MAX_TOKENS];
2918    size_t ntokens;
2919    int comm;
2920
2921    assert(c != NULL);
2922
2923    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
2924
2925    if (settings.verbose > 1)
2926        moxi_log_write("<%d %s\n", c->sfd, command);
2927
2928    /*
2929     * for commands set/add/replace, we build an item and read the data
2930     * directly into it, then continue in nread_complete().
2931     */
2932
2933    c->msgcurr = 0;
2934    c->msgused = 0;
2935    c->iovused = 0;
2936    if (add_msghdr(c) != 0) {
2937        out_string(c, "SERVER_ERROR out of memory preparing response");
2938        return;
2939    }
2940
2941    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2942    if (ntokens >= 3 &&
2943        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2944         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2945
2946        process_get_command(c, tokens, ntokens, false);
2947
2948    } else if ((ntokens == 6 || ntokens == 7) &&
2949               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2950                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2951                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2952                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2953                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2954
2955        process_update_command(c, tokens, ntokens, comm, false);
2956
2957    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2958
2959        process_update_command(c, tokens, ntokens, comm, true);
2960
2961    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2962
2963        process_arithmetic_command(c, tokens, ntokens, 1);
2964
2965    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2966
2967        process_get_command(c, tokens, ntokens, true);
2968
2969    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2970
2971        process_arithmetic_command(c, tokens, ntokens, 0);
2972
2973    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2974
2975        process_delete_command(c, tokens, ntokens);
2976
2977    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2978
2979        process_stat(c, tokens, ntokens);
2980
2981    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2982        time_t exptime = 0;
2983        set_current_time();
2984
2985        set_noreply_maybe(c, tokens, ntokens);
2986
2987        cb_mutex_enter(&c->thread->stats.mutex);
2988        c->thread->stats.flush_cmds++;
2989        cb_mutex_exit(&c->thread->stats.mutex);
2990
2991        if(ntokens == (c->noreply ? 3 : 2)) {
2992            settings.oldest_live = current_time - 1;
2993            item_flush_expired();
2994            out_string(c, "OK");
2995            return;
2996        }
2997
2998        exptime = strtol(tokens[1].value, NULL, 10);
2999        if(errno == ERANGE) {
3000            out_string(c, "CLIENT_ERROR bad command line format");
3001            return;
3002        }
3003
3004        /*
3005          If exptime is zero realtime() would return zero too, and
3006          realtime(exptime) - 1 would overflow to the max unsigned
3007          value.  So we process exptime == 0 the same way we do when
3008          no delay is given at all.
3009        */
3010        if (exptime > 0)
3011            settings.oldest_live = c->funcs->conn_realtime(exptime) - 1;
3012        else /* exptime == 0 */
3013            settings.oldest_live = current_time - 1;
3014        item_flush_expired();
3015        out_string(c, "OK");
3016        return;
3017
3018    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
3019
3020        out_string(c, "VERSION " VERSION);
3021
3022    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
3023
3024        conn_set_state(c, conn_closing);
3025
3026    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
3027                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
3028#ifdef ALLOW_SLABS_REASSIGN
3029
3030        int src, dst, rv;
3031
3032        src = strtol(tokens[2].value, NULL, 10);
3033        dst  = strtol(tokens[3].value, NULL, 10);
3034
3035        if(errno == ERANGE) {
3036            out_string(c, "CLIENT_ERROR bad command line format");
3037            return;
3038        }
3039
3040        rv = slabs_reassign(src, dst);
3041        if (rv == 1) {
3042            out_string(c, "DONE");
3043            return;
3044        }
3045        if (rv == 0) {
3046            out_string(c, "CANT");
3047            return;
3048        }
3049        if (rv == -1) {
3050            out_string(c, "BUSY");
3051            return;
3052        }
3053#else
3054        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
3055#endif
3056    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
3057        process_verbosity_command(c, tokens, ntokens);
3058    } else {
3059        out_string(c, "ERROR");
3060    }
3061    return;
3062}
3063
3064void process_stats_proxy_command(conn *c, token_t *tokens, const size_t ntokens) {
3065    if (ntokens == 4 && strcmp(tokens[2].value, "reset") == 0) {
3066        proxy_td *ptd = c->extra;
3067        if (ptd != NULL) {
3068            proxy_stats_reset(ptd->proxy->main);
3069        }
3070
3071        out_string(c, "OK");
3072        return;
3073    }
3074
3075    if (ntokens == 4 && strcmp(tokens[2].value, "timings") == 0) {
3076        proxy_stats_dump_timings(&append_stats, c);
3077    } else if (ntokens == 4 && strcmp(tokens[2].value, "config") == 0) {
3078        proxy_stats_dump_config(&append_stats, c);
3079    } else {
3080        bool do_all = (ntokens == 3 || strcmp(tokens[2].value, "all") == 0);
3081        struct proxy_stats_cmd_info psci = {
3082            .do_info       = (do_all || strcmp(tokens[2].value, "info") == 0),
3083            .do_settings   = (do_all || strcmp(tokens[2].value, "settings") == 0),
3084            .do_behaviors  = (do_all || strcmp(tokens[2].value, "behaviors") == 0),
3085            .do_frontcache = (do_all || strcmp(tokens[2].value, "frontcache") == 0),
3086            .do_keystats   = (do_all || strcmp(tokens[2].value, "keystats") == 0),
3087            .do_stats      = (do_all || strcmp(tokens[2].value, "stats") == 0),
3088            .do_zeros      = (do_all || ntokens == 4)
3089        };
3090
3091        if (psci.do_info) {
3092            proxy_stats_dump_basic(&append_stats, c, "basic:");
3093        }
3094
3095        if (psci.do_settings) {
3096            process_stat_settings(&append_stats, c, "memcached:settings:");
3097        }
3098
3099        if (psci.do_stats) {
3100            server_stats(&append_stats, c, "memcached:stats:" );
3101        }
3102
3103        proxy_stats_dump_proxy_main(&append_stats, c, &psci);
3104
3105        proxy_stats_dump_proxies(&append_stats, c, &psci);
3106    }
3107
3108    /* append terminator and start the transfer */
3109    append_stats(NULL, 0, NULL, 0, c);
3110
3111    if (c->stats.buffer == NULL) {
3112        out_string(c, "SERVER_ERROR out of memory writing stats");
3113    } else {
3114        write_and_free(c, c->stats.buffer, c->stats.offset);
3115        c->stats.buffer = NULL;
3116    }
3117}
3118
3119/*
3120 * if we have a complete line in the buffer, process it.
3121 */
3122int try_read_command(conn *c) {
3123    assert(c != NULL);
3124    assert(c->rcurr <= (c->rbuf + c->rsize));
3125    assert(c->rbytes > 0);
3126
3127    if (IS_NEGOTIATING(c->protocol) || c->transport == udp_transport)  {
3128        if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
3129            c->protocol = IS_PROXY(c->protocol) ? proxy_upstream_binary_prot : binary_prot;
3130        } else {
3131            c->protocol = IS_PROXY(c->protocol) ? proxy_upstream_ascii_prot : ascii_prot;
3132        }
3133
3134        if (settings.verbose) {
3135            moxi_log_write("%d: Client using the %s protocol\n", c->sfd,
3136                    prot_text(c->protocol));
3137        }
3138    }
3139
3140    if (IS_BINARY(c->protocol)) {
3141        /* Do we have the complete packet header? */
3142        if (c->rbytes < (int)sizeof(c->binary_header)) {
3143            /* need more data! */
3144            return 0;
3145        } else {
3146#ifdef NEED_ALIGN
3147            if (((long)(c->rcurr)) % 8 != 0) {
3148                /* must realign input buffer */
3149                memmove(c->rbuf, c->rcurr, c->rbytes);
3150                c->rcurr = c->rbuf;
3151                if (settings.verbose) {
3152                    moxi_log_write("%d: Realign input buffer\n", c->sfd);
3153                }
3154            }
3155#endif
3156            protocol_binary_request_header* req;
3157            req = (protocol_binary_request_header*)c->rcurr;
3158
3159            if (settings.verbose > 1) {
3160                /* Dump the packet before we convert it to host order */
3161                moxi_log_write("<%d Read binary protocol data:\n", c->sfd);
3162                cproxy_dump_header(c->sfd, (char *) req->bytes);
3163            }
3164
3165            c->binary_header = *req;
3166            c->binary_header.request.keylen = ntohs(req->request.keylen);
3167            c->binary_header.request.bodylen = ntohl(req->request.bodylen);
3168            c->binary_header.request.cas = mc_swap64(req->request.cas);
3169
3170            if (c->binary_header.request.magic != c->funcs->conn_binary_command_magic) {
3171                if (settings.verbose) {
3172                    moxi_log_write("Invalid magic:  %x\n",
3173                            c->binary_header.request.magic);
3174                }
3175                conn_set_state(c, conn_closing);
3176                return -1;
3177            }
3178
3179            c->msgcurr = 0;
3180            c->msgused = 0;
3181            c->iovused = 0;
3182            if (add_msghdr(c) != 0) {
3183                out_string(c, "SERVER_ERROR out of memory");
3184                return 0;
3185            }
3186
3187            c->cmd = c->binary_header.request.opcode;
3188            c->keylen = c->binary_header.request.keylen;
3189            c->opaque = c->binary_header.request.opaque;
3190            /* clear the returned cas value */
3191            c->cas = 0;
3192
3193            c->funcs->conn_process_binary_command(c);
3194
3195            c->rbytes -= sizeof(c->binary_header);
3196            c->rcurr += sizeof(c->binary_header);
3197        }
3198    } else {
3199        char *el, *cont;
3200
3201        if (c->rbytes == 0)
3202            return 0;
3203        el = memchr(c->rcurr, '\n', c->rbytes);
3204        if (!el)
3205            return 0;
3206        cont = el + 1;
3207        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
3208            el--;
3209        }
3210        *el = '\0';
3211
3212        assert(cont <= (c->rcurr + c->rbytes));
3213
3214        c->funcs->conn_process_ascii_command(c, c->rcurr);
3215
3216        c->rbytes -= (int)(cont - c->rcurr);
3217        c->rcurr = cont;
3218
3219        assert(c->rcurr <= (c->rbuf + c->rsize));
3220    }
3221
3222    return 1;
3223}
3224
3225/*
3226 * read a UDP request.
3227 */
3228static enum try_read_result try_read_udp(conn *c) {
3229    int res;
3230
3231    assert(c != NULL);
3232
3233    c->request_addr_size = sizeof(c->request_addr);
3234    res = recvfrom(c->sfd, c->rbuf, c->rsize,
3235                   0, &c->request_addr, &c->request_addr_size);
3236    if (res > 8) {
3237        unsigned char *buf = (unsigned char *)c->rbuf;
3238
3239        cb_mutex_enter(&c->thread->stats.mutex);
3240        c->thread->stats.bytes_read += res;
3241        cb_mutex_exit(&c->thread->stats.mutex);
3242
3243        add_bytes_read(c, res);
3244
3245        /* Beginning of UDP packet is the request ID; save it. */
3246        c->request_id = buf[0] * 256 + buf[1];
3247
3248        /* If this is a multi-packet request, drop it. */
3249        if (buf[4] != 0 || buf[5] != 1) {
3250            out_string(c, "SERVER_ERROR multi-packet request not supported");
3251            return READ_NO_DATA_RECEIVED;
3252        }
3253
3254        /* Don't care about any of the rest of the header. */
3255        res -= 8;
3256        memmove(c->rbuf, c->rbuf + 8, res);
3257
3258        c->rbytes += res;
3259        c->rcurr = c->rbuf;
3260        return READ_DATA_RECEIVED;
3261    }
3262    return READ_NO_DATA_RECEIVED;
3263}
3264
3265/*
3266 * read from network as much as we can, handle buffer overflow and connection
3267 * close.
3268 * before reading, move the remaining incomplete fragment of a command
3269 * (if any) to the beginning of the buffer.
3270 * @return enum try_read_result
3271 */
3272static enum try_read_result try_read_network(conn *c) {
3273    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
3274    int res;
3275
3276    assert(c != NULL);
3277
3278    if (c->rcurr != c->rbuf) {
3279        if (c->rbytes != 0) /* otherwise there's nothing to copy */
3280            memmove(c->rbuf, c->rcurr, c->rbytes);
3281        c->rcurr = c->rbuf;
3282    }
3283
3284    while (1) {
3285        if (c->rbytes >= c->rsize) {
3286            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
3287            if (!new_rbuf) {
3288                if (settings.verbose > 0)
3289                    moxi_log_write("Couldn't realloc input buffer\n");
3290                c->rbytes = 0; /* ignore what we read */
3291                out_string(c, "SERVER_ERROR out of memory reading request");
3292                c->write_and_go = conn_closing;
3293                return READ_MEMORY_ERROR;
3294            }
3295            c->rcurr = c->rbuf = new_rbuf;
3296            c->rsize *= 2;
3297        }
3298
3299        int avail = c->rsize - c->rbytes;
3300        assert(avail > 0);
3301        res = recv(c->sfd, c->rbuf + c->rbytes, avail, 0);
3302        if (res > 0) {
3303            cb_mutex_enter(&c->thread->stats.mutex);
3304            c->thread->stats.bytes_read += res;
3305            cb_mutex_exit(&c->thread->stats.mutex);
3306
3307            add_bytes_read(c, res);
3308
3309            gotdata = READ_DATA_RECEIVED;
3310            c->rbytes += res;
3311            if (res == avail) {
3312                continue;
3313            } else {
3314                break;
3315            }
3316        }
3317        if (res == 0) {
3318            return READ_ERROR;
3319        }
3320        if (res == -1) {
3321            if (is_blocking(errno)) {
3322                break;
3323            }
3324            return READ_ERROR;
3325        }
3326    }
3327    return gotdata;
3328}
3329
3330bool update_event_real(conn *c, const int new_flags, const char *update_diag) {
3331    return update_event_timed_real(c, new_flags, NULL, update_diag);
3332}
3333
3334bool update_event_timed_real(conn *c, const int new_flags, struct timeval *timeout, const char *update_diag) {
3335    assert(c != NULL);
3336
3337    struct event_base *base = c->event.ev_base;
3338    if (c->ev_flags == new_flags && timeout == NULL)
3339        return true;
3340
3341    c->update_diag = update_diag;
3342
3343    if (event_del(&c->event) == -1)
3344        return false;
3345    c->ev_flags = new_flags;
3346    if (new_flags == 0 && timeout == NULL)
3347        return true;
3348    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
3349    event_base_set(base, &c->event);
3350    if (event_add(&c->event, timeout) == -1)
3351        return false;
3352    return true;
3353}
3354
3355/*
3356 * Sets whether we are listening for new connections or not.
3357 */
3358void do_accept_new_conns(const bool do_accept) {
3359    conn *next;
3360
3361    for (next = listen_conn; next; next = next->next) {
3362        if (do_accept) {
3363            update_event(next, EV_READ | EV_PERSIST);
3364            if (listen(next->sfd, settings.backlog) != 0) {
3365                perror("listen");
3366            }
3367        }
3368        else {
3369            update_event(next, 0);
3370            if (listen(next->sfd, 0) != 0) {
3371                perror("listen");
3372            }
3373        }
3374    }
3375
3376    if (do_accept) {
3377        STATS_LOCK();
3378        stats.accepting_conns = true;
3379        STATS_UNLOCK();
3380    } else {
3381        STATS_LOCK();
3382        stats.accepting_conns = false;
3383        stats.listen_disabled_num++;
3384        STATS_UNLOCK();
3385    }
3386}
3387
3388/*
3389 * Transmit the next chunk of data from our list of msgbuf structures.
3390 *
3391 * Returns:
3392 *   TRANSMIT_COMPLETE   All done writing.
3393 *   TRANSMIT_INCOMPLETE More data remaining to write.
3394 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
3395 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3396 */
3397static enum transmit_result transmit(conn *c) {
3398    assert(c != NULL);
3399
3400    if (c->msgcurr < c->msgused &&
3401            c->msglist[c->msgcurr].msg_iovlen == 0) {
3402        /* Finished writing the current msg; advance to the next. */
3403        c->msgcurr++;
3404    }
3405    if (c->msgcurr < c->msgused) {
3406        ssize_t res;
3407        struct msghdr *m = &c->msglist[c->msgcurr];
3408#ifdef WIN32
3409        DWORD error;
3410#else
3411        int error;
3412#endif
3413
3414        res = sendmsg(c->sfd, m, 0);
3415#ifdef WIN32
3416        error = WSAGetLastError();
3417#else
3418        error = errno;
3419#endif
3420        if (res > 0) {
3421            cb_mutex_enter(&c->thread->stats.mutex);
3422            c->thread->stats.bytes_written += res;
3423            cb_mutex_exit(&c->thread->stats.mutex);
3424
3425            /* We've written some of the data. Remove the completed
3426               iovec entries from the list of pending writes. */
3427            while (m->msg_iovlen > 0 && res >= (ssize_t)(m->msg_iov->iov_len)) {
3428                res -= (ssize_t)m->msg_iov->iov_len;
3429                m->msg_iovlen--;
3430                m->msg_iov++;
3431            }
3432
3433            /* Might have written just part of the last iovec entry;
3434               adjust it so the next write will do the rest. */
3435            if (res > 0) {
3436                m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
3437                m->msg_iov->iov_len -= res;
3438            }
3439            return TRANSMIT_INCOMPLETE;
3440        }
3441
3442        if (res == -1 && is_blocking(error)) {
3443            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3444                if (settings.verbose > 0)
3445                    moxi_log_write("Couldn't update event\n");
3446                conn_set_state(c, conn_closing);
3447                return TRANSMIT_HARD_ERROR;
3448            }
3449            return TRANSMIT_SOFT_ERROR;
3450        }
3451        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3452           we have a real error, on which we close the connection */
3453        if (settings.verbose > 0)
3454            perror("Failed to write, and not due to blocking");
3455
3456        if (IS_UDP(c->transport))
3457            conn_set_state(c, conn_read);
3458        else
3459            conn_set_state(c, conn_closing);
3460        return TRANSMIT_HARD_ERROR;
3461    } else {
3462        return TRANSMIT_COMPLETE;
3463    }
3464}
3465
3466void drive_machine(conn *c) {
3467    bool stop = false;
3468    SOCKET sfd;
3469    socklen_t addrlen;
3470    struct sockaddr_storage addr;
3471    int nreqs = settings.reqs_per_event;
3472    int res;
3473#ifdef WIN32
3474    DWORD error;
3475#else
3476    int error;
3477#endif
3478
3479    assert(c != NULL);
3480
3481    while (!stop) {
3482        if (settings.verbose > 2) {
3483            moxi_log_write("%d: drive_machine %s\n",
3484                    c->sfd, state_text(c->state));
3485        }
3486
3487        switch(c->state) {
3488        case conn_listening:
3489            addrlen = sizeof(addr);
3490            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == INVALID_SOCKET) {
3491#ifdef WIN32
3492                error = WSAGetLastError();
3493#else
3494                error = errno;
3495#endif
3496                if (is_blocking(error)) {
3497                    /* these are transient, so don't log anything */
3498                    stop = true;
3499                } else if (is_emfile(error)) {
3500                    if (settings.verbose > 0)
3501                        moxi_log_write("Too many open connections\n");
3502                    accept_new_conns(false);
3503                    stop = true;
3504                } else {
3505                    perror("accept()");
3506                    stop = true;
3507                }
3508                break;
3509            }
3510            if (evutil_make_socket_nonblocking(sfd) == -1) {
3511                perror("setting O_NONBLOCK");
3512                closesocket(sfd);
3513                break;
3514            }
3515
3516            dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
3517                              DATA_BUFFER_SIZE,
3518                              c->protocol,
3519                              tcp_transport,
3520                              c->funcs, c->extra);
3521            stop = true;
3522            break;
3523
3524        case conn_connecting:
3525            if (c->funcs->conn_connect != NULL) {
3526                if (c->funcs->conn_connect(c) == true) {
3527                    stop = true;
3528                }
3529            } else {
3530                conn_set_state(c, conn_closing);
3531                update_event(c, 0);
3532            }
3533            break;
3534
3535        case conn_waiting:
3536            if (!update_event(c, EV_READ | EV_PERSIST)) {
3537                if (settings.verbose > 0)
3538                    moxi_log_write("Couldn't update event\n");
3539                conn_set_state(c, conn_closing);
3540                break;
3541            }
3542
3543            conn_set_state(c, conn_read);
3544            stop = true;
3545            break;
3546
3547        case conn_read:
3548            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
3549
3550            switch (res) {
3551            case READ_NO_DATA_RECEIVED:
3552                conn_set_state(c, conn_waiting);
3553                break;
3554            case READ_DATA_RECEIVED:
3555                conn_set_state(c, conn_parse_cmd);
3556                break;
3557            case READ_ERROR:
3558                conn_set_state(c, conn_closing);
3559                break;
3560            case READ_MEMORY_ERROR: /* Failed to allocate more memory */
3561                /* State already set by try_read_network */
3562                break;
3563            }
3564            break;
3565
3566        case conn_parse_cmd:
3567            if (try_read_command(c) == 0) {
3568                /* wee need more data! */
3569                conn_set_state(c, conn_waiting);
3570            }
3571
3572            break;
3573
3574        case conn_new_cmd:
3575            /* Only process nreqs at a time to avoid starving other
3576               connections */
3577
3578            --nreqs;
3579            if (IS_DOWNSTREAM(c->protocol) || nreqs >= 0) {
3580                reset_cmd_handler(c);
3581            } else {
3582                cb_mutex_enter(&c->thread->stats.mutex);
3583                c->thread->stats.conn_yields++;
3584                cb_mutex_exit(&c->thread->stats.mutex);
3585                if (c->rbytes > 0) {
3586                    /* We have already read in data into the input buffer,
3587                       so libevent will most likely not signal read events
3588                       on the socket (unless more data is available. As a
3589                       hack we should just put in a request to write data,
3590                       because that should be possible ;-)
3591                    */
3592                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3593                        if (settings.verbose > 0) {
3594                            moxi_log_write("Couldn't update event\n");
3595                        }
3596
3597                        conn_set_state(c, conn_closing);
3598                    }
3599                }
3600                stop = true;
3601            }
3602            break;
3603
3604        case conn_nread:
3605            assert(c->rlbytes >= 0);
3606            if (c->rlbytes == 0) {
3607                complete_nread(c);
3608                break;
3609            }
3610            /* first check if we have leftovers in the conn_read buffer */
3611            if (c->rbytes > 0) {
3612                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
3613                if (c->ritem != c->rcurr) {
3614                    memmove(c->ritem, c->rcurr, tocopy);
3615                }
3616                c->ritem += tocopy;
3617                c->rlbytes -= tocopy;
3618                c->rcurr += tocopy;
3619                c->rbytes -= tocopy;
3620                if (c->rlbytes == 0) {
3621                    break;
3622                }
3623            }
3624
3625            /*  now try reading from the socket */
3626            res = recv(c->sfd, c->ritem, c->rlbytes, 0);
3627#ifdef WIN32
3628            error = WSAGetLastError();
3629#else
3630            error = errno;
3631#endif
3632
3633            if (res > 0) {
3634                add_bytes_read(c, res);
3635
3636                if (c->rcurr == c->ritem) {
3637                    c->rcurr += res;
3638                }
3639                c->ritem += res;
3640                c->rlbytes -= res;
3641                break;
3642            }
3643            if (res == 0) { /* end of stream */
3644                conn_set_state(c, conn_closing);
3645                break;
3646            }
3647            if (res == -1 && is_blocking(error)) {
3648                if (!update_event(c, EV_READ | EV_PERSIST)) {
3649                    if (settings.verbose > 0)
3650                        moxi_log_write( "Couldn't update event\n");
3651                    conn_set_state(c, conn_closing);
3652                    break;
3653                }
3654                stop = true;
3655                break;
3656            }
3657            /* otherwise we have a real error, on which we close the connection */
3658            if (!is_closed_conn(error) && settings.verbose > 0) {
3659                moxi_log_write("Failed to read, and not due to blocking:\n"
3660                        "errno: %d %s \n"
3661                        "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3662                        errno, strerror(errno),
3663                        (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
3664                        (int)c->rlbytes, (int)c->rsize);
3665            }
3666            conn_set_state(c, conn_closing);
3667            break;
3668
3669        case conn_swallow:
3670            /* we are reading sbytes and throwing them away */
3671            if (c->sbytes == 0) {
3672                conn_set_state(c, conn_new_cmd);
3673                break;
3674            }
3675
3676            /* first check if we have leftovers in the conn_read buffer */
3677            if (c->rbytes > 0) {
3678                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
3679                c->sbytes -= tocopy;
3680                c->rcurr += tocopy;
3681                c->rbytes -= tocopy;
3682                break;
3683            }
3684
3685            /*  now try reading from the socket */
3686            res = recv(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize, 0);
3687#ifdef WIN32
3688            error = WSAGetLastError();
3689#else
3690            error = errno;
3691#endif
3692
3693            if (res > 0) {
3694                cb_mutex_enter(&c->thread->stats.mutex);
3695                c->thread->stats.bytes_read += res;
3696                cb_mutex_exit(&c->thread->stats.mutex);
3697                add_bytes_read(c, res);
3698                c->sbytes -= res;
3699                break;
3700            }
3701            if (res == 0) { /* end of stream */
3702                conn_set_state(c, conn_closing);
3703                break;
3704            }
3705            if (res == -1 && is_blocking(error)) {
3706                if (!update_event(c, EV_READ | EV_PERSIST)) {
3707                    if (settings.verbose > 0)
3708                        moxi_log_write("Couldn't update event\n");
3709                    conn_set_state(c, conn_closing);
3710                    break;
3711                }
3712                stop = true;
3713                break;
3714            }
3715            /* otherwise we have a real error, on which we close the connection */
3716            if (!is_closed_conn(error) && settings.verbose > 0) {
3717                moxi_log_write("Failed to read, and not due to blocking\n");
3718            }
3719            conn_set_state(c, conn_closing);
3720            break;
3721
3722        case conn_write:
3723            /*
3724             * We want to write out a simple response. If we haven't already,
3725             * assemble it into a msgbuf list (this will be a single-entry
3726             * list for TCP or a two-entry list for UDP).
3727             */
3728            if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
3729                if (add_iov(c, c->wcurr, c->wbytes) != 0) {
3730                    if (settings.verbose > 0)
3731                        moxi_log_write("Couldn't build response\n");
3732                    conn_set_state(c, conn_closing);
3733                    break;
3734                }
3735            }
3736
3737            /* fall through... */
3738
3739        case conn_mwrite:
3740          if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
3741            if (settings.verbose > 0)
3742              moxi_log_write("Failed to build UDP headers\n");
3743            conn_set_state(c, conn_closing);
3744            break;
3745          }
3746            switch (transmit(c)) {
3747            case TRANSMIT_COMPLETE:
3748                if (c->state == conn_mwrite) {
3749                    while (c->ileft > 0) {
3750                        item *it = *(c->icurr);
3751                        assert((it->it_flags & ITEM_SLABBED) == 0);
3752                        item_remove(it);
3753                        c->icurr++;
3754                        c->ileft--;
3755                    }
3756                    while (c->suffixleft > 0) {
3757                        char *suffix = *(c->suffixcurr);
3758                        cache_free(c->thread->suffix_cache, suffix);
3759                        c->suffixcurr++;
3760                        c->suffixleft--;
3761                    }
3762                    conn_set_state(c, c->write_and_go);
3763                } else if (c->state == conn_write) {
3764                    if (c->write_and_free) {
3765                        free(c->write_and_free);
3766                        c->write_and_free = 0;
3767                    }
3768                    conn_set_state(c, c->write_and_go);
3769                } else {
3770                    if (settings.verbose > 0)
3771                        moxi_log_write("Unexpected state %d\n", c->state);
3772                    conn_set_state(c, conn_closing);
3773                }
3774                break;
3775
3776            case TRANSMIT_INCOMPLETE:
3777            case TRANSMIT_HARD_ERROR:
3778                break;                   /* Continue in state machine. */
3779
3780            case TRANSMIT_SOFT_ERROR:
3781                stop = true;
3782                break;
3783            }
3784            break;
3785
3786        case conn_pause:
3787            /* In case whoever put us into conn_pause didn't clear out */
3788            /* libevent registration, do so now. */
3789
3790            update_event(c, 0);
3791
3792            if (c->funcs->conn_pause != NULL)
3793                c->funcs->conn_pause(c);
3794
3795            stop = true;
3796            break;
3797
3798        case conn_closing:
3799            if (c->funcs->conn_close != NULL)
3800                c->funcs->conn_close(c);
3801
3802            if (IS_UDP(c->transport))
3803                conn_cleanup(c);
3804            else
3805                conn_close(c);
3806            stop = true;
3807            break;
3808
3809        case conn_max_state:
3810            assert(false);
3811            break;
3812        }
3813    }
3814
3815    return;
3816}
3817
3818void add_bytes_read(conn *c, int bytes_read) {
3819    assert(c != NULL);
3820    cb_mutex_enter(&c->thread->stats.mutex);
3821    c->thread->stats.bytes_read += bytes_read;
3822    cb_mutex_exit(&c->thread->stats.mutex);
3823}
3824
3825static void event_handler(evutil_socket_t fd, short which, void *arg) {
3826    conn *c;
3827
3828    c = (conn *)arg;
3829    assert(c != NULL);
3830
3831    c->which = which;
3832
3833    /* sanity */
3834    if (fd != c->sfd) {
3835        if (settings.verbose > 0)
3836            moxi_log_write("Catastrophic: event fd doesn't match conn fd!\n");
3837        conn_close(c);
3838        return;
3839    }
3840
3841    c->update_diag = "working";
3842
3843    drive_machine(c);
3844
3845    /* wait for next event */
3846    return;
3847}
3848
3849static SOCKET new_socket(struct addrinfo *ai) {
3850    SOCKET sfd;
3851
3852    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
3853        return INVALID_SOCKET;
3854    }
3855
3856    if (evutil_make_socket_nonblocking(sfd) == -1) {
3857        perror("setting O_NONBLOCK");
3858        closesocket(sfd);
3859        return INVALID_SOCKET;
3860    }
3861    return sfd;
3862}
3863
3864
3865/*
3866 * Sets a socket's send buffer size to the maximum allowed by the system.
3867 */
3868static void maximize_sndbuf(const SOCKET sfd) {
3869    socklen_t intsize = sizeof(int);
3870    int last_good = 0;
3871    int min, max, avg;
3872    int old_size;
3873
3874    /* Start with the default size. */
3875    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void*)&old_size, &intsize) != 0) {
3876        if (settings.verbose > 0)
3877            perror("getsockopt(SO_SNDBUF)");
3878        return;
3879    }
3880
3881    /* Binary-search for the real maximum. */
3882    min = old_size;
3883    max = MAX_SENDBUF_SIZE;
3884
3885    while (min <= max) {
3886        avg = ((unsigned int)(min + max)) / 2;
3887        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
3888            last_good = avg;
3889            min = avg + 1;
3890        } else {
3891            max = avg - 1;
3892        }
3893    }
3894
3895    if (settings.verbose > 1)
3896        moxi_log_write("<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
3897}
3898
3899/**
3900 * Create a socket and bind it to a specific port number
3901 * @param port the port number to bind to
3902 * @param transport the transport protocol (TCP / UDP)
3903 * @param portnumber_file A filepointer to write the port numbers to
3904 *        when they are successfully added to the list of ports we
3905 *        listen on.
3906 */
3907int server_socket(int port, enum network_transport transport,
3908                  FILE *portnumber_file) {
3909    SOCKET sfd;
3910    struct linger ling = {0, 0};
3911    struct addrinfo *ai;
3912    struct addrinfo *next;
3913    struct