xref: /6.0.3/moxi/src/cproxy_protocol_a2b.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <errno.h>
7#include <platform/cbassert.h>
8#include <math.h>
9#include "memcached.h"
10#include "cproxy.h"
11#include "work.h"
12#include "log.h"
13
14/* Internal declarations. */
15
16static protocol_binary_request_noop req_noop = {
17    .bytes = {0}
18};
19
20#define CMD_TOKEN  0
21#define KEY_TOKEN  1
22#define MAX_TOKENS 9
23
24/* A2B means ascii-to-binary (or, ascii upstream and binary downstream). */
25
26struct A2BSpec {
27    char *line;
28
29    protocol_binary_command cmd;
30    protocol_binary_command cmdq;
31
32    int     size;         /* Number of bytes in request header. */
33    token_t tokens[MAX_TOKENS];
34    int     ntokens;
35    bool    noreply_allowed;
36    int     num_optional; /* Number of optional arguments in cmd. */
37    bool    broadcast;    /* True if cmd does scatter/gather. */
38};
39
40/* The a2b_specs are immutable after init. */
41
42/* The arguments are carefully named with unique first characters. */
43
44struct A2BSpec a2b_specs[] = {
45    { .line = "set <key> <flags> <exptime> <bytes> [noreply]",
46      .cmd  = PROTOCOL_BINARY_CMD_SET,
47      .cmdq = PROTOCOL_BINARY_CMD_SETQ,
48      /* The size should be... */
49      /*   sizeof(protocol_binary_request_header) [24 bytes] + */
50      /*   sizeof(protocol_binary_request_set.message.body) [8 bytes] */
51      .size = sizeof(protocol_binary_request_header) + 8
52    },
53    { .line = "add <key> <flags> <exptime> <bytes> [noreply]",
54      .cmd  = PROTOCOL_BINARY_CMD_ADD,
55      .cmdq = PROTOCOL_BINARY_CMD_ADDQ,
56      /* The size should be... */
57      /*   sizeof(protocol_binary_request_header) [24 bytes] + */
58      /*   sizeof(protocol_binary_request_add.message.body) [8 bytes] */
59      .size = sizeof(protocol_binary_request_header) + 8
60    },
61    { .line = "replace <key> <flags> <exptime> <bytes> [noreply]",
62      .cmd  = PROTOCOL_BINARY_CMD_REPLACE,
63      .cmdq = PROTOCOL_BINARY_CMD_REPLACEQ,
64      /* The size should be... */
65      /*   sizeof(protocol_binary_request_header) [24 bytes] + */
66      /*   sizeof(protocol_binary_request_replace.message.body) [8 bytes] */
67      .size = sizeof(protocol_binary_request_header) + 8
68    },
69    { .line = "append <key> <skip_flags> <skip_exptime> <bytes> [noreply]",
70      .cmd  = PROTOCOL_BINARY_CMD_APPEND,
71      .cmdq = PROTOCOL_BINARY_CMD_APPENDQ,
72      .size = sizeof(protocol_binary_request_append)
73    },
74    { .line = "prepend <key> <skip_flags> <skip_exptime> <bytes> [noreply]" ,
75      .cmd  = PROTOCOL_BINARY_CMD_PREPEND,
76      .cmdq = PROTOCOL_BINARY_CMD_PREPENDQ,
77      .size = sizeof(protocol_binary_request_prepend)
78    },
79    { .line = "cas <key> <flags> <exptime> <bytes> <cas> [noreply]",
80      .cmd  = PROTOCOL_BINARY_CMD_SET,
81      .cmdq = PROTOCOL_BINARY_CMD_SETQ,
82      /* The size should be... */
83      /*   sizeof(protocol_binary_request_header) [24 bytes] + */
84      /*   sizeof(protocol_binary_request_set.message.body) [8 bytes] */
85      .size = sizeof(protocol_binary_request_header) + 8
86    },
87    { .line = "delete <key> [noreply]",
88      .cmd  = PROTOCOL_BINARY_CMD_DELETE,
89      .cmdq = PROTOCOL_BINARY_CMD_DELETEQ,
90      .size = sizeof(protocol_binary_request_delete)
91    },
92    { .line = "incr <key> <value> [noreply]",
93      .cmd  = PROTOCOL_BINARY_CMD_INCREMENT,
94      .cmdq = PROTOCOL_BINARY_CMD_INCREMENTQ,
95      /* The size should be... */
96      /*   sizeof(protocol_binary_request_header) [24 bytes] + */
97      /*   sizeof(protocol_binary_request_incr.message.body) [20 bytes] */
98      .size = sizeof(protocol_binary_request_header) + 20
99    },
100    { .line = "decr <key> <value> [noreply]",
101      .cmd  = PROTOCOL_BINARY_CMD_DECREMENT,
102      .cmdq = PROTOCOL_BINARY_CMD_DECREMENTQ,
103      /* The size should be... */
104      /*   sizeof(protocol_binary_request_header) [24 bytes] + */
105      /*   sizeof(protocol_binary_request_decr.message.body) [20 bytes] */
106      .size = sizeof(protocol_binary_request_header) + 20
107    },
108    { .line = "flush_all [xpiration] [noreply]", /* TODO: noreply tricky here. */
109      .cmd  = PROTOCOL_BINARY_CMD_FLUSH,
110      .cmdq = PROTOCOL_BINARY_CMD_FLUSHQ,
111      /* The size should be... */
112      /*   sizeof(protocol_binary_request_header) [24 bytes] + */
113      /*   sizeof(protocol_binary_request_flush.message.body) [4 bytes] */
114      .size = sizeof(protocol_binary_request_header) + 4,
115      .broadcast = true
116    },
117    { .line = "get <key>*", /* Multi-key GET/GETS */
118      .cmd  = PROTOCOL_BINARY_CMD_GETKQ,
119      .cmdq = -1,
120      .size = sizeof(protocol_binary_request_header)
121    },
122    { .line = "get <key>", /* Single-key GET/GETS. */
123      .cmd  = PROTOCOL_BINARY_CMD_GETK,
124      .cmdq = -1,
125      .size = sizeof(protocol_binary_request_header)
126    },
127    { .line = "stats [args]*",
128      .cmd  = PROTOCOL_BINARY_CMD_STAT,
129      .cmdq = -1,
130      .size = sizeof(protocol_binary_request_stats),
131      .broadcast = true
132    },
133    { .line = "version",
134      .cmd  = PROTOCOL_BINARY_CMD_VERSION,
135      .cmdq = -1,
136      .size = sizeof(protocol_binary_request_version)
137    },
138    { .line = "getl <key> <xpiration>", /* Single-key GETL. */
139      .cmd  = PROTOCOL_BINARY_CMD_GETL,
140      .cmdq = -1,
141      .size = sizeof(protocol_binary_request_header) + 4
142    },
143    { .line = "unl <key> <cas>", /* Single-key UNL. */
144      .cmd  = PROTOCOL_BINARY_CMD_UNL,
145      .cmdq = -1,
146      .size = sizeof(protocol_binary_request_header)
147    },
148    { .line = "touch <key> <xpiration>",
149      .cmd  = PROTOCOL_BINARY_CMD_TOUCH,
150      .cmdq = -1,
151      /* The size should be... */
152      /*   sizeof(protocol_binary_request_header) [24 bytes] + */
153      /*   sizeof(protocol_binary_request_touch.message.body) [4 bytes] */
154      .size = sizeof(protocol_binary_request_header) + 4,
155    },
156    { .line = 0 } /* NULL sentinel. */
157};
158
159/* These are immutable after init. */
160
161struct A2BSpec *a2b_spec_map[0x100] = {0}; /* Lookup table by A2BSpec->cmd. */
162int             a2b_size_max = 0;          /* Max header + extra frame bytes. */
163
164int a2b_fill_request(short    cmd,
165                     token_t *cmd_tokens,
166                     int      cmd_ntokens,
167                     bool     noreply,
168                     protocol_binary_request_header *header,
169                     uint8_t **out_key,
170                     uint16_t *out_keylen,
171                     uint8_t  *out_extlen);
172
173bool a2b_fill_request_token(struct A2BSpec *spec,
174                            int      cur_token,
175                            token_t *cmd_tokens,
176                            int      cmd_ntokens,
177                            protocol_binary_request_header *header,
178                            uint8_t **out_key,
179                            uint16_t *out_keylen,
180                            uint8_t  *out_extlen);
181
182void a2b_process_downstream_response(conn *c);
183
184int a2b_multiget_start(conn *c, char *cmd, int cmd_len);
185int a2b_multiget_skey(conn *c, char *skey, int skey_len, int vbucket, int key_index);
186int a2b_multiget_end(conn *c);
187
188void a2b_set_opaque(conn *c, protocol_binary_request_header *header, bool noreply);
189
190bool a2b_not_my_vbucket(conn *uc, conn *c,
191                        protocol_binary_response_header *header);
192
193void cproxy_init_a2b() {
194    int i = 0;
195
196    memset(&req_noop, 0, sizeof(req_noop));
197
198    req_noop.message.header.request.magic    = PROTOCOL_BINARY_REQ;
199    req_noop.message.header.request.opcode   = PROTOCOL_BINARY_CMD_NOOP;
200    req_noop.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
201
202    /* Run through the a2b_specs to populate the a2b_spec_map. */
203
204    while (true) {
205        struct A2BSpec *spec = &a2b_specs[i];
206        int j;
207        int noreply_index;
208
209        if (spec->line == NULL) {
210            break;
211        }
212
213        spec->ntokens = scan_tokens(spec->line,
214                                    spec->tokens,
215                                    MAX_TOKENS, NULL);
216        cb_assert(spec->ntokens > 1);
217
218        noreply_index = spec->ntokens - 2;
219        if (spec->tokens[noreply_index].value &&
220            strcmp(spec->tokens[noreply_index].value,
221                   "[noreply]") == 0) {
222            spec->noreply_allowed = true;
223        } else {
224            spec->noreply_allowed = false;
225        }
226
227        spec->num_optional = 0;
228        for (j = 0; j < spec->ntokens; j++) {
229            if (spec->tokens[j].value &&
230                spec->tokens[j].value[0] == '[') {
231                spec->num_optional++;
232            }
233        }
234
235        if (a2b_size_max < spec->size) {
236            a2b_size_max = spec->size;
237        }
238
239        cb_assert(spec->cmd < (sizeof(a2b_spec_map) /
240                            sizeof(struct A2BSpec *)));
241
242        a2b_spec_map[spec->cmd] = spec;
243
244        i = i + 1;
245    }
246}
247
248int a2b_fill_request(short    cmd,
249                     token_t *cmd_tokens,
250                     int      cmd_ntokens,
251                     bool     noreply,
252                     protocol_binary_request_header *header,
253                     uint8_t **out_key,
254                     uint16_t *out_keylen,
255                     uint8_t  *out_extlen) {
256    struct A2BSpec *spec;
257    cb_assert(header);
258    cb_assert(cmd_tokens);
259    cb_assert(cmd_ntokens > 1);
260    cb_assert(cmd_tokens[CMD_TOKEN].value);
261    cb_assert(cmd_tokens[CMD_TOKEN].length > 0);
262    cb_assert(out_key);
263    cb_assert(out_keylen);
264    cb_assert(out_extlen);
265
266    spec = a2b_spec_map[cmd];
267    if (spec != NULL) {
268        if (cmd_ntokens >= (spec->ntokens - spec->num_optional) &&
269            cmd_ntokens <= (spec->ntokens)) {
270            int i;
271
272            header->request.magic = PROTOCOL_BINARY_REQ;
273
274            if (noreply) {
275                cb_assert(spec->cmd != (protocol_binary_command) -1);
276
277                header->request.opcode = spec->cmdq;
278                header->request.opaque = htonl(OPAQUE_IGNORE_REPLY);
279
280                if (settings.verbose > 2) {
281                    moxi_log_write("a2b_fill_request OPAQUE_IGNORE_REPLY, cmdq: %x\n",
282                            spec->cmdq);
283                }
284            } else {
285                header->request.opcode = spec->cmd;
286            }
287
288            /* Start at 1 to skip the CMD_TOKEN. */
289
290            for (i = 1; i < cmd_ntokens - 1; i++) {
291                if (a2b_fill_request_token(spec, i,
292                                           cmd_tokens, cmd_ntokens,
293                                           header,
294                                           out_key,
295                                           out_keylen,
296                                           out_extlen) == false) {
297                    return 0;
298                }
299            }
300
301            return spec->size; /* Success. */
302        }
303    } else {
304        if (settings.verbose > 2) {
305            moxi_log_write("a2b_fill_request unknown cmd: %x\n", cmd);
306        }
307    }
308
309    return 0;
310}
311
312bool a2b_fill_request_token(struct A2BSpec *spec,
313                            int      cur_token,
314                            token_t *cmd_tokens,
315                            int      cmd_ntokens,
316                            protocol_binary_request_header *header,
317                            uint8_t **out_key,
318                            uint16_t *out_keylen,
319                            uint8_t  *out_extlen) {
320
321    uint64_t delta;
322    char t;
323
324    (void)cmd_ntokens;
325    cb_assert(header);
326    cb_assert(spec);
327    cb_assert(spec->tokens);
328    cb_assert(spec->ntokens > 1);
329    cb_assert(spec->tokens[cur_token].value);
330    cb_assert(cur_token > 0);
331    cb_assert(cur_token < cmd_ntokens);
332    cb_assert(cur_token < spec->ntokens);
333
334    if (settings.verbose > 2) {
335        moxi_log_write("a2b_fill_request_token %s\n",
336                spec->tokens[cur_token].value);
337    }
338
339    t = spec->tokens[cur_token].value[1];
340    switch (t) {
341    case 'k': /* key */
342        cb_assert(out_key);
343        cb_assert(out_keylen);
344        *out_key    = (uint8_t *) cmd_tokens[cur_token].value;
345        *out_keylen = (uint16_t)  cmd_tokens[cur_token].length;
346        header->request.keylen =
347            htons((uint16_t) cmd_tokens[cur_token].length);
348        break;
349
350    case 'v': /* value (for incr/decr) */
351        delta = 0;
352        if (safe_strtoull(cmd_tokens[cur_token].value, &delta)) {
353            protocol_binary_request_incr *req;
354            cb_assert(out_extlen);
355
356            header->request.extlen   = *out_extlen = 20;
357            header->request.datatype = PROTOCOL_BINARY_RAW_BYTES;
358
359            req = (protocol_binary_request_incr *) header;
360
361            req->message.body.delta = htonll(delta);
362            req->message.body.initial = 0;
363            req->message.body.expiration = 0xffffffff;
364        } else {
365            /* TODO: Send back better error. */
366            return false;
367        }
368        break;
369
370    case 'x': { /* xpiration (for flush_all) */
371        int32_t exptime_int = 0;
372        time_t  exptime = 0;
373
374        if (safe_strtol(cmd_tokens[cur_token].value, &exptime_int)) {
375            /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
376            protocol_binary_request_flush *req;
377            exptime = exptime_int;
378
379            header->request.extlen   = *out_extlen = 4;
380            header->request.datatype = PROTOCOL_BINARY_RAW_BYTES;
381
382            req = (protocol_binary_request_flush *) header;
383
384            req->message.body.expiration = htonl(exptime);
385        }
386        break;
387    }
388
389    case 'a': /* args (for stats) */
390        cb_assert(out_key);
391        cb_assert(out_keylen);
392        *out_key    = (uint8_t *) cmd_tokens[cur_token].value;
393        *out_keylen = (uint16_t)  cmd_tokens[cur_token].length;
394        header->request.keylen =
395            htons((uint16_t) cmd_tokens[cur_token].length);
396        break;
397
398   case 'c': { /* cas value for unl */
399        uint64_t cas = 0;
400        if (safe_strtoull(cmd_tokens[cur_token].value, &cas)) {
401            header->request.cas = cas;
402        }
403        break;
404   }
405
406    /* The noreply was handled in a2b_fill_request(). */
407
408    /* case 'n': // noreply */
409
410    /* The above are handled by looking at the item struct. */
411
412    /* case 'f': // FALLTHRU, flags */
413    /* case 'e': // FALLTHRU, exptime */
414    /* case 'b': // FALLTHRU, bytes */
415    /* case 's': // FALLTHRU, skip_xxx */
416    /* case 'c': // FALLTHRU, cas */
417
418    default:
419        break;
420    }
421
422    return true;
423}
424
425/* Called when we receive a binary response header from
426 * a downstream server, via try_read_command()/drive_machine().
427 */
428void cproxy_process_a2b_downstream(conn *c) {
429    protocol_binary_response_header *header;
430    int extlen;
431    int keylen;
432    uint32_t bodylen;
433
434    cb_assert(c != NULL);
435    cb_assert(c->cmd >= 0);
436    cb_assert(c->next == NULL);
437    cb_assert(c->item == NULL);
438    cb_assert(IS_BINARY(c->protocol));
439    cb_assert(IS_PROXY(c->protocol));
440
441    /* Snapshot rcurr, because the caller, try_read_command(), changes it. */
442
443    c->cmd_start = c->rcurr;
444
445    header = (protocol_binary_response_header *) &c->binary_header;
446    header->response.status = (uint16_t) ntohs(header->response.status);
447
448    cb_assert(header->response.magic == (uint8_t) PROTOCOL_BINARY_RES);
449    cb_assert(header->response.opcode == c->cmd);
450
451    process_bin_noreply(c); /* Map quiet c->cmd values into non-quiet. */
452
453    extlen = header->response.extlen;
454    keylen = header->response.keylen;
455    bodylen = header->response.bodylen;
456
457    cb_assert(bodylen >= (uint32_t) keylen + extlen);
458
459    /* Our approach is to read everything we can before */
460    /* getting into big switch/case statements for the */
461    /* actual processing. */
462
463    /* If status is non-zero (an err code), then bodylen should be small. */
464    /* If status is 0, then bodylen might be for a huge item during */
465    /* a GET family of response. */
466
467    /* If bodylen > extlen + keylen, then we should nread */
468    /* the ext+key and set ourselves up for a later item nread. */
469
470    /* We overload the meaning of the conn substates... */
471    /* - bin_reading_get_key means do nread for ext and key data. */
472    /* - bin_read_set_value means do nread for item data. */
473
474    if (settings.verbose > 2) {
475        moxi_log_write("<%d cproxy_process_a2b_downstream %x %d %d %u\n",
476                c->sfd, c->cmd, extlen, keylen, bodylen);
477    }
478
479    if (keylen > 0 || extlen > 0) {
480        /* One reason we reach here is during a */
481        /* GET/GETQ/GETK/GETKQ hit response, because extlen */
482        /* will be > 0 for the flags. */
483
484        /* Also, we reach here during a GETK miss response, since */
485        /* keylen will be > 0.  Oddly, a GETK miss response will have */
486        /* a non-zero status of PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, */
487        /* but won't have any extra error message string. */
488
489        /* Also, we reach here during a STAT response, with */
490        /* keylen > 0, extlen == 0, and bodylen == keylen. */
491
492        cb_assert(c->cmd == PROTOCOL_BINARY_CMD_GET ||
493               c->cmd == PROTOCOL_BINARY_CMD_GETK ||
494               c->cmd == PROTOCOL_BINARY_CMD_GETL ||
495               c->cmd == PROTOCOL_BINARY_CMD_STAT);
496
497        bin_read_key(c, bin_reading_get_key, extlen);
498    } else {
499        cb_assert(keylen == 0 && extlen == 0);
500
501        if (bodylen > 0) {
502            /* We reach here on error response, version response, */
503            /* or incr/decr responses, which all have only (relatively */
504            /* small) body bytes, and with no ext bytes and no key bytes. */
505
506            /* For example, error responses will have 0 keylen, */
507            /* 0 extlen, with an error message string for the body. */
508
509            /* We'll just reuse the key-reading code path, rather */
510            /* than allocating an item. */
511
512            cb_assert(header->response.status != 0 ||
513                   c->cmd == PROTOCOL_BINARY_CMD_VERSION ||
514                   c->cmd == PROTOCOL_BINARY_CMD_INCREMENT ||
515                   c->cmd == PROTOCOL_BINARY_CMD_DECREMENT ||
516                   c->cmd == PROTOCOL_BINARY_CMD_UNL);
517
518            bin_read_key(c, bin_reading_get_key, bodylen);
519        } else {
520            cb_assert(keylen == 0 && extlen == 0 && bodylen == 0);
521
522            /* We have the entire response in the header, */
523            /* such as due to a general success response, */
524            /* including a no-op response. */
525
526            a2b_process_downstream_response(c);
527        }
528    }
529}
530
531/* We reach here after nread'ing a ext+key or item.
532 */
533void cproxy_process_a2b_downstream_nread(conn *c) {
534    downstream *d;
535    protocol_binary_response_header *header;
536    int extlen;
537    int keylen;
538    uint32_t bodylen;
539
540    cb_assert(c != NULL);
541    cb_assert(c->cmd >= 0);
542    cb_assert(c->next == NULL);
543    cb_assert(c->cmd_start != NULL);
544    cb_assert(IS_BINARY(c->protocol));
545    cb_assert(IS_PROXY(c->protocol));
546
547    d = c->extra;
548    cb_assert(d);
549
550    header = (protocol_binary_response_header *) &c->binary_header;
551    extlen = header->response.extlen;
552    keylen = header->response.keylen;
553    bodylen = header->response.bodylen;
554
555    if (settings.verbose > 2) {
556        moxi_log_write("<%d cproxy_process_a2b_downstream_nread %d %d, cmd %x %d %d\n",
557                c->sfd, c->ileft, c->isize, c->cmd, c->substate,
558                header->response.status);
559    }
560
561    if (c->substate == bin_reading_get_key &&
562        header->response.status == 0 &&
563        (c->cmd == PROTOCOL_BINARY_CMD_GET ||
564         c->cmd == PROTOCOL_BINARY_CMD_GETK ||
565         c->cmd == PROTOCOL_BINARY_CMD_STAT ||
566         c->cmd == PROTOCOL_BINARY_CMD_GETL)) {
567
568        item *it;
569        char *key;
570        int vlen;
571        int flags = 0;
572
573        if (settings.verbose > 2) {
574            moxi_log_write("<%d cproxy_process_a2b_downstream_nread %d %d %x get/getk/stat\n",
575                    c->sfd, c->ileft, c->isize, c->cmd);
576        }
577
578        cb_assert(c->item == NULL);
579
580        /* Alloc an item and continue with an item nread. */
581        /* We item_alloc() even if vlen is 0, so that later */
582        /* code can assume an item exists. */
583
584        key = binary_get_key(c);
585        vlen = bodylen - (keylen + extlen);
586
587        cb_assert(key);
588        cb_assert(vlen >= 0);
589
590        if (c->cmd == PROTOCOL_BINARY_CMD_GET ||
591            c->cmd == PROTOCOL_BINARY_CMD_GETK ||
592            c->cmd == PROTOCOL_BINARY_CMD_GETL) {
593            protocol_binary_response_get *response_get =
594                (protocol_binary_response_get *) binary_get_request(c);
595
596            cb_assert(extlen == sizeof(response_get->message.body));
597
598            flags = ntohl(response_get->message.body.flags);
599        }
600
601        it = item_alloc(key, keylen, flags, 0, vlen + 2);
602        if (it != NULL) {
603            uint64_t cas = CPROXY_NOT_CAS;
604            conn *uc;
605
606            c->item = it;
607            c->ritem = ITEM_data(it);
608            c->rlbytes = vlen;
609            c->substate = bin_read_set_value;
610
611            uc = d->upstream_conn;
612            if (uc != NULL &&
613                uc->cmd_start != NULL &&
614                (strncmp(uc->cmd_start, "gets ", 5) == 0 ||
615                 strncmp(uc->cmd_start, "getl ", 5) == 0)) {
616                cas = header->response.cas;
617            }
618
619            ITEM_set_cas(it, cas);
620
621            conn_set_state(c, conn_nread);
622        } else {
623            d->ptd->stats.stats.err_oom++;
624            cproxy_close_conn(c);
625        }
626    } else {
627        a2b_process_downstream_response(c);
628    }
629}
630
631static void a2b_out_error(conn *uc, uint16_t status) {
632    switch (status) {
633    case PROTOCOL_BINARY_RESPONSE_SUCCESS:
634        out_string(uc, "OK");
635        break;
636    case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
637        out_string(uc, "NOT_FOUND");
638        break;
639    case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
640        out_string(uc, "EXISTS");
641        break;
642    case PROTOCOL_BINARY_RESPONSE_E2BIG:
643        out_string(uc, "SERVER_ERROR a2b e2big");
644        break;
645    case PROTOCOL_BINARY_RESPONSE_EINVAL:
646        out_string(uc, "SERVER_ERROR a2b einval");
647        break;
648    case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
649        out_string(uc, "NOT_STORED");
650        break;
651    case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
652        out_string(uc, "SERVER_ERROR a2b delta_badval");
653        break;
654    case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
655        out_string(uc, "SERVER_ERROR a2b not_my_vbucket");
656        break;
657    case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
658        out_string(uc, "SERVER_ERROR a2b auth_error");
659        break;
660    case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
661        out_string(uc, "SERVER_ERROR a2b auth_continue");
662        break;
663    case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
664        out_string(uc, "SERVER_ERROR a2b unknown");
665        break;
666    case PROTOCOL_BINARY_RESPONSE_ENOMEM:
667        out_string(uc, "SERVER_ERROR a2b out of memory");
668        break;
669    case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
670        out_string(uc, "SERVER_ERROR a2b not supported");
671        break;
672    case PROTOCOL_BINARY_RESPONSE_EINTERNAL:
673        out_string(uc, "SERVER_ERROR a2b einternal");
674        break;
675    case PROTOCOL_BINARY_RESPONSE_EBUSY:
676        out_string(uc, "SERVER_ERROR a2b ebusy");
677        break;
678    case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
679        out_string(uc, "SERVER_ERROR temporary failure");
680        break;
681    default:
682        out_string(uc, "SERVER_ERROR a2b error");
683        break;
684    }
685}
686
687/* Invoked when we have read a complete downstream binary response,
688 * including header, ext, key, and item data, as appropriate.
689 */
690void a2b_process_downstream_response(conn *c) {
691    protocol_binary_response_header *header;
692    uint32_t extlen;
693    uint32_t keylen;
694    uint32_t bodylen;
695    uint16_t status;
696    downstream *d;
697    item *it;
698    conn *uc;
699
700    cb_assert(c != NULL);
701    cb_assert(c->cmd >= 0);
702    cb_assert(c->next == NULL);
703    cb_assert(c->cmd_start != NULL);
704    cb_assert(IS_BINARY(c->protocol));
705    cb_assert(IS_PROXY(c->protocol));
706
707    header = (protocol_binary_response_header *) &c->binary_header;
708
709    extlen = header->response.extlen;
710    keylen = header->response.keylen;
711    bodylen = header->response.bodylen;
712    status = header->response.status;
713
714    if (settings.verbose > 2) {
715        moxi_log_write("<%d cproxy_process_a2b_downstream_response, cmd: %x, item: %d, status: %d\n",
716                c->sfd, c->cmd, (c->item != NULL), status);
717    }
718
719    /* We reach here when we have the entire response, */
720    /* including header, ext, key, and possibly item data. */
721    /* Now we can get into big switch/case processing. */
722
723    d = c->extra;
724    cb_assert(d != NULL);
725    cb_assert(d->ptd != NULL);
726    cb_assert(d->ptd->proxy != NULL);
727
728    it = c->item;
729
730    /* Clear c->item because we either move it to the upstream or */
731    /* item_remove() it on error. */
732
733    c->item = NULL;
734
735    if (cproxy_binary_ignore_reply(c, header, it)) {
736        return;
737    }
738
739    uc = d->upstream_conn;
740
741    /* Handle not-my-vbucket error response. */
742
743    if (status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
744        cb_assert(it == NULL);
745
746        if (a2b_not_my_vbucket(uc, c, header)) {
747            return;
748        }
749    }
750
751    switch (c->cmd) {
752    case PROTOCOL_BINARY_CMD_GETK:
753        if (settings.verbose > 2) {
754            moxi_log_write("%d: cproxy_process_a2b_downstream_response GETK "
755                    "noreply: %d\n", c->sfd, c->noreply);
756        }
757
758        if (c->noreply == false) {
759            /* Single-key GET/GETS. */
760
761            if (status == 0) {
762                cb_assert(it != NULL);
763                cb_assert(it->nbytes >= 2);
764                cb_assert(keylen > 0);
765                cb_assert(extlen > 0);
766
767                if (bodylen >= keylen + extlen) {
768                    *(ITEM_data(it) + it->nbytes - 2) = '\r';
769                    *(ITEM_data(it) + it->nbytes - 1) = '\n';
770
771                    multiget_ascii_downstream_response(d, it);
772                } else {
773                    cb_assert(false); /* TODO. */
774                }
775
776                item_remove(it);
777            }
778
779            conn_set_state(c, conn_pause);
780
781            cproxy_update_event_write(d, uc);
782
783            return;
784        }
785
786        /* Multi-key GET/GETS. */
787
788        /* We should keep processing for a non-quiet */
789        /* terminating response (NO-OP). */
790
791        conn_set_state(c, conn_new_cmd);
792
793        if (status != 0) {
794            cb_assert(it == NULL);
795
796            if (status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
797                return; /* Swallow miss response. */
798            }
799
800            /* TODO: Handle error case.  Should we pause the conn */
801            /*       or keep looking for more responses? */
802
803            cb_assert(false);
804            return;
805        }
806
807        cb_assert(status == 0);
808        cb_assert(it != NULL);
809        cb_assert(it->nbytes >= 2);
810        cb_assert(keylen > 0);
811        cb_assert(extlen > 0);
812
813        if (bodylen >= keylen + extlen) {
814            *(ITEM_data(it) + it->nbytes - 2) = '\r';
815            *(ITEM_data(it) + it->nbytes - 1) = '\n';
816
817            multiget_ascii_downstream_response(d, it);
818        } else {
819            cb_assert(false); /* TODO. */
820        }
821
822        item_remove(it);
823        break;
824    case PROTOCOL_BINARY_CMD_GETL:
825        if (settings.verbose > 2) {
826            moxi_log_write("%d: cproxy_process_a2b_downstream_response GETL "
827                    "noreply: %d\n", c->sfd, c->noreply);
828        }
829
830        if (c->noreply == false) {
831            switch (status) {
832                case PROTOCOL_BINARY_RESPONSE_SUCCESS:
833                    cb_assert(it != NULL);
834                    cb_assert(it->nbytes >= 2);
835                    cb_assert(extlen > 0);
836
837                    if (bodylen >= keylen + extlen) {
838                        *(ITEM_data(it) + it->nbytes - 2) = '\r';
839                        *(ITEM_data(it) + it->nbytes - 1) = '\n';
840
841                        cproxy_upstream_ascii_item_response(it, uc, -1);
842                    } else {
843                        cb_assert(false); /* TODO. */
844                    }
845
846                    item_remove(it);
847                    break;
848
849                case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
850                case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
851                    /*
852                     * currently membase does not send ETMPFAIL for
853                     * engine error code for ENGINE_TMPFAIL
854                     */
855                    d->upstream_suffix = "LOCK_ERROR\r\n";
856                    d->upstream_suffix_len = 0;
857                    d->upstream_status = status;
858                    d->upstream_retry = 0;
859                    d->target_host_ident = NULL;
860                    break;
861
862                case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
863                    d->upstream_suffix = "NOT_FOUND\r\n";
864                    d->upstream_suffix_len = 0;
865                    d->upstream_status = status;
866                    d->upstream_retry = 0;
867                    d->target_host_ident = NULL;
868                    break;
869            }
870
871            conn_set_state(c, conn_pause);
872
873            cproxy_update_event_write(d, uc);
874
875            return;
876        }
877
878    case PROTOCOL_BINARY_CMD_FLUSH:
879        conn_set_state(c, conn_pause);
880
881        if (uc != NULL) {
882            if (status == PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED) {
883                if (d->upstream_suffix != NULL &&
884                    d->upstream_suffix_len == 0 &&
885                    strncmp(d->upstream_suffix, "OK\r\n", 4) == 0) {
886                    d->upstream_suffix = "SERVER_ERROR flush_all not supported\r\n";
887                }
888            }
889        }
890
891        /* TODO: Handle flush_all's expiration parameter against */
892        /* the front_cache. */
893
894        /* TODO: We flush the front_cache too often, inefficiently */
895        /* on every downstream FLUSH response, rather than on */
896        /* just the last FLUSH response. */
897
898        if (uc != NULL) {
899            mcache_flush_all(&d->ptd->proxy->front_cache, 0);
900        }
901        break;
902
903    case PROTOCOL_BINARY_CMD_NOOP:
904        conn_set_state(c, conn_pause);
905        break;
906
907    case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
908    case PROTOCOL_BINARY_CMD_ADD:
909    case PROTOCOL_BINARY_CMD_REPLACE:
910    case PROTOCOL_BINARY_CMD_APPEND:
911    case PROTOCOL_BINARY_CMD_PREPEND:
912    case PROTOCOL_BINARY_CMD_TOUCH:
913        conn_set_state(c, conn_pause);
914
915        if (uc != NULL) {
916            cb_assert(uc->next == NULL);
917
918            switch (status) {
919            case PROTOCOL_BINARY_RESPONSE_SUCCESS:
920                out_string(uc, "STORED");
921                break;
922            case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
923                if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
924                    out_string(uc, "NOT_STORED");
925                } else {
926                    out_string(uc, "EXISTS");
927                }
928                break;
929            case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
930                if (c->cmd == PROTOCOL_BINARY_CMD_REPLACE) {
931                    out_string(uc, "NOT_STORED");
932                } else {
933                    out_string(uc, "NOT_FOUND");
934                }
935                break;
936            default:
937                a2b_out_error(uc, status);
938                break;
939            }
940
941            cproxy_del_front_cache_key_ascii(d, uc->cmd_start);
942
943            cproxy_update_event_write(d, uc);
944        }
945        break;
946
947    case PROTOCOL_BINARY_CMD_DELETE:
948        conn_set_state(c, conn_pause);
949
950        if (uc != NULL) {
951            cb_assert(uc->next == NULL);
952
953            switch (status) {
954            case 0:
955                out_string(uc, "DELETED");
956                break;
957            case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
958                out_string(uc, "EXISTS");
959                break;
960            case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
961            case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
962            case PROTOCOL_BINARY_RESPONSE_ENOMEM: /* TODO. */
963            default:
964                out_string(uc, "NOT_FOUND");
965                break;
966            }
967
968            cproxy_del_front_cache_key_ascii(d, uc->cmd_start);
969
970            cproxy_update_event_write(d, uc);
971        }
972        break;
973
974    case PROTOCOL_BINARY_CMD_INCREMENT: /* FALLTHROUGH */
975    case PROTOCOL_BINARY_CMD_DECREMENT:
976        conn_set_state(c, conn_pause);
977
978        if (uc != NULL) {
979            protocol_binary_response_incr *response_incr;
980            cb_assert(uc->next == NULL);
981
982            /* TODO: Any weird alignment/padding issues on different */
983            /*       platforms in this cast to worry about here? */
984
985            response_incr = (protocol_binary_response_incr *) c->cmd_start;
986
987            switch (status) {
988            case 0: {
989                char *s = add_conn_suffix(uc);
990                if (s != NULL) {
991                    uint64_t v = mc_swap64(response_incr->message.body.value);
992                    sprintf(s, "%"PRIu64"", v);
993                    out_string(uc, s);
994                } else {
995                    d->ptd->stats.stats.err_oom++;
996                    cproxy_close_conn(uc);
997                }
998                break;
999            }
1000            case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: /* Due to CAS. */
1001                out_string(uc, "EXISTS");
1002                break;
1003            case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
1004                out_string(uc, "NOT_FOUND");
1005                break;
1006            case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
1007                out_string(uc, "NOT_STORED");
1008                break;
1009            case PROTOCOL_BINARY_RESPONSE_ENOMEM: /* TODO. */
1010            default:
1011                out_string(uc, "SERVER_ERROR a2b arith error");
1012                break;
1013            }
1014
1015            cproxy_del_front_cache_key_ascii(d, uc->cmd_start);
1016
1017            cproxy_update_event_write(d, uc);
1018        }
1019        break;
1020
1021    case PROTOCOL_BINARY_CMD_VERSION:
1022    case PROTOCOL_BINARY_CMD_UNL:
1023        conn_set_state(c, conn_pause);
1024
1025        if (uc != NULL) {
1026            cb_assert(uc->next == NULL);
1027
1028            if ((header->response.status == 0 ||
1029                 c->cmd == PROTOCOL_BINARY_CMD_UNL) &&
1030                header->response.keylen == 0 &&
1031                header->response.extlen == 0 &&
1032                header->response.bodylen > 0) {
1033                char *s = add_conn_suffix(uc);
1034                uint32_t buf_offset = 0;
1035
1036                if (s != NULL) {
1037                    /* TODO: Assuming bodylen is not that long. */
1038                    if (c->cmd == PROTOCOL_BINARY_CMD_VERSION) {
1039                        memcpy(s, "VERSION ", 8);
1040                        buf_offset = 8; /* sizeof "VERSION " */
1041                    }
1042                    memcpy(s + buf_offset,
1043                           c->cmd_start + sizeof(protocol_binary_response_version),
1044                           header->response.bodylen);
1045                    s[buf_offset + header->response.bodylen] = '\0';
1046                    out_string(uc, s);
1047                } else {
1048                    d->ptd->stats.stats.err_oom++;
1049                    cproxy_close_conn(uc);
1050                }
1051            } else {
1052                out_string(uc, "SERVER_ERROR");
1053            }
1054
1055            cproxy_update_event_write(d, uc);
1056        }
1057        break;
1058
1059    case PROTOCOL_BINARY_CMD_STAT:
1060        cb_assert(c->noreply == false);
1061
1062        if (keylen > 0) {
1063            cb_assert(it != NULL);      /* Holds the stat value. */
1064            cb_assert(it->nbytes >= 2); /* Note: ep-engine-to-mc_couch can return empty STAT val. */
1065            cb_assert(bodylen >= keylen);
1066            cb_assert(d->merger != NULL);
1067
1068            if (uc != NULL) {
1069                cb_assert(uc->next == NULL);
1070
1071                /* TODO: Handle ITEM and PREFIX. */
1072
1073                protocol_stats_merge_name_val(d->merger,
1074                                              "STAT", 4,
1075                                              ITEM_key(it), it->nkey,
1076                                              ITEM_data(it), it->nbytes - 2);
1077            }
1078
1079            item_remove(it);
1080            conn_set_state(c, conn_new_cmd);
1081        } else {
1082            /* Handle the stats terminator, which might have an error or */
1083            /* non-empty bodylen, for some implementations of memcached protocol. */
1084
1085            cb_assert(it == NULL);
1086            conn_set_state(c, conn_pause);
1087        }
1088        break;
1089
1090    case PROTOCOL_BINARY_CMD_QUIT:
1091    default:
1092        cb_assert(false); /* TODO: Handled unexpected responses. */
1093        break;
1094    }
1095}
1096
1097/* Do the actual work of forwarding the command from an
1098 * upstream ascii conn to its assigned binary downstream.
1099 */
1100bool cproxy_forward_a2b_downstream(downstream *d) {
1101    conn *uc;
1102    int server_index = -1;
1103    int nc;
1104
1105    cb_assert(d != NULL);
1106
1107    uc = d->upstream_conn;
1108
1109    cb_assert(uc != NULL);
1110    cb_assert(uc->state == conn_pause);
1111    cb_assert(uc->cmd_start != NULL);
1112    cb_assert(uc->thread != NULL);
1113    cb_assert(uc->thread->base != NULL);
1114    cb_assert(IS_ASCII(uc->protocol));
1115    cb_assert(IS_PROXY(uc->protocol));
1116
1117    if (cproxy_is_broadcast_cmd(uc->cmd_curr) == true) {
1118        cproxy_ascii_broadcast_suffix(d);
1119    } else {
1120        char *key = NULL;
1121        int   key_len = 0;
1122
1123        if (ascii_scan_key(uc->cmd_start, &key, &key_len) &&
1124            key != NULL &&
1125            key_len > 0) {
1126            server_index = cproxy_server_index(d, key, key_len, NULL);
1127            if (server_index < 0) {
1128                return false;
1129            }
1130        }
1131    }
1132
1133    nc = cproxy_connect_downstream(d, uc->thread, server_index);
1134    if (nc == -1) {
1135        return true;
1136    }
1137
1138    if (nc > 0) {
1139        cb_assert(d->downstream_conns != NULL);
1140
1141        if (d->usec_start == 0 &&
1142            d->ptd->behavior_pool.base.time_stats) {
1143            d->usec_start = usec_now();
1144        }
1145
1146        if (uc->cmd == -1) {
1147            return cproxy_forward_a2b_simple_downstream(d, uc->cmd_start, uc);
1148        } else {
1149            return cproxy_forward_a2b_item_downstream(d, uc->cmd, uc->item, uc);
1150        }
1151    }
1152
1153    return false;
1154}
1155
1156/* Forward a simple one-liner ascii command to a binary downstream.
1157 * For example, get, incr/decr, delete, etc.
1158 * The response, though, might be a simple line or
1159 * multiple VALUE+END lines.
1160 */
1161bool cproxy_forward_a2b_simple_downstream(downstream *d,
1162                                          char *command, conn *uc) {
1163    int vbucket = -1;
1164    bool local;
1165    conn *c;
1166    uint8_t *out_key = NULL;
1167    uint16_t out_keylen = 0;
1168    uint8_t out_extlen = 0;
1169    int cmd_len = 0;
1170    token_t tokens[MAX_TOKENS];
1171    size_t ntokens;
1172    char *key;
1173    int key_len;
1174
1175    cb_assert(d != NULL);
1176    cb_assert(d->ptd != NULL);
1177    cb_assert(d->ptd->proxy != NULL);
1178    cb_assert(d->downstream_conns != NULL);
1179    cb_assert(command != NULL);
1180    cb_assert(uc != NULL);
1181    cb_assert(uc->item == NULL);
1182    cb_assert(uc->cmd_curr != (protocol_binary_command) -1);
1183    cb_assert(d->merger == NULL);
1184
1185    /* Handles multi-key get and gets. */
1186
1187    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_GETKQ) {
1188        /* Only use front_cache for 'get', not for 'gets'. */
1189        mcache *front_cache =
1190            (command[3] == ' ') ? &d->ptd->proxy->front_cache : NULL;
1191
1192        return multiget_ascii_downstream(d, uc,
1193                                         a2b_multiget_start,
1194                                         a2b_multiget_skey,
1195                                         a2b_multiget_end,
1196                                         front_cache);
1197    }
1198
1199    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_GETK ||
1200        uc->cmd_curr == PROTOCOL_BINARY_CMD_GETL) {
1201        d->upstream_suffix = "END\r\n";
1202        d->upstream_suffix_len = 0;
1203        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1204        d->upstream_retry = 0;
1205        d->target_host_ident = NULL;
1206    }
1207
1208    cb_assert(uc->next == NULL);
1209
1210    /* TODO: Inefficient repeated scan_tokens. */
1211
1212    ntokens = scan_tokens(command, tokens, MAX_TOKENS, &cmd_len);
1213    key= tokens[KEY_TOKEN].value;
1214    key_len = tokens[KEY_TOKEN].length;
1215
1216    if (ntokens <= 1) { /* This was checked long ago, while parsing */
1217        cb_assert(false);  /* the upstream conn. */
1218        return false;
1219    }
1220
1221    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_FLUSH) {
1222        protocol_binary_request_flush req;
1223        protocol_binary_request_header *preq = (void*)&req;
1224        int size;
1225
1226        memset(&req, 0, sizeof(req));
1227        size = a2b_fill_request(uc->cmd_curr,
1228                                tokens, ntokens,
1229                                uc->noreply, preq,
1230                                &out_key,
1231                                &out_keylen,
1232                                &out_extlen);
1233        if (size > 0) {
1234            cb_assert(out_key == NULL);
1235            cb_assert(out_keylen == 0);
1236
1237            if (settings.verbose > 2) {
1238                moxi_log_write("a2b broadcast flush_all\n");
1239            }
1240
1241            if (out_extlen == 0) {
1242                preq->request.extlen   = out_extlen = 4;
1243                preq->request.datatype = PROTOCOL_BINARY_RAW_BYTES;
1244            }
1245
1246            return cproxy_broadcast_a2b_downstream(d, preq, size,
1247                                                   out_key,
1248                                                   out_keylen,
1249                                                   out_extlen, uc,
1250                                                   "OK\r\n");
1251        }
1252
1253        if (settings.verbose > 2) {
1254            moxi_log_write("a2b broadcast flush_all no size\n");
1255        }
1256
1257        return false;
1258    }
1259
1260    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_STAT) {
1261        protocol_binary_request_stats req;
1262        protocol_binary_request_header *preq = (void*)&req;
1263        int size;
1264
1265        memset(&req, 0, sizeof(req));
1266        size = a2b_fill_request(uc->cmd_curr,
1267                                tokens, ntokens,
1268                                uc->noreply, preq,
1269                                &out_key,
1270                                &out_keylen,
1271                                &out_extlen);
1272        if (size > 0) {
1273            cb_assert(out_extlen == 0);
1274            cb_assert(uc->noreply == false);
1275
1276            if (settings.verbose > 2) {
1277                moxi_log_write("a2b broadcast %s\n", command);
1278            }
1279
1280            if (strncmp(command + 5, " reset", 6) == 0) {
1281                return cproxy_broadcast_a2b_downstream(d, preq, size,
1282                                                       out_key,
1283                                                       out_keylen,
1284                                                       out_extlen, uc,
1285                                                       "RESET\r\n");
1286            }
1287
1288            if (cproxy_broadcast_a2b_downstream(d, preq, size,
1289                                                out_key,
1290                                                out_keylen,
1291                                                out_extlen, uc,
1292                                                "END\r\n")) {
1293                d->merger = genhash_init(128, skeyhash_ops);
1294                return true;
1295            }
1296        }
1297
1298        return false;
1299    }
1300
1301    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_VERSION) {
1302        /* Fake key so that we hash to some server. */
1303
1304        key     = "v";
1305        key_len = 1;
1306    }
1307
1308    /* Assuming we're already connected to downstream. */
1309    /* Handle all other simple commands. */
1310    c = cproxy_find_downstream_conn_ex(d, key, key_len,
1311                                       &local, &vbucket);
1312
1313    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_VERSION) {
1314        key     = NULL;
1315        key_len = 0;
1316    }
1317
1318    if (c != NULL) {
1319        if (local) {
1320            uc->hit_local = true;
1321        }
1322
1323        if (cproxy_prep_conn_for_write(c)) {
1324            protocol_binary_request_header *header;
1325            int size;
1326
1327            cb_assert(c->state == conn_pause);
1328            cb_assert(c->wbuf);
1329            cb_assert(c->wsize >= a2b_size_max);
1330
1331            header = (protocol_binary_request_header *) c->wbuf;
1332            memset(header, 0, a2b_size_max);
1333            size = a2b_fill_request(uc->cmd_curr,
1334                                    tokens, ntokens,
1335                                    uc->noreply,
1336                                    header,
1337                                    &out_key,
1338                                    &out_keylen,
1339                                    &out_extlen);
1340            if (size > 0) {
1341                cb_assert(size <= a2b_size_max);
1342                cb_assert(key     == (char *) out_key);
1343                cb_assert(key_len == (int)    out_keylen);
1344                cb_assert(header->request.bodylen == 0);
1345
1346                if (vbucket >= 0) {
1347                    header->request.reserved = htons(vbucket);
1348                    header->request.opaque   = htonl(vbucket);
1349                }
1350
1351                header->request.bodylen =
1352                    htonl(out_keylen + out_extlen);
1353
1354                a2b_set_opaque(c, header, uc->noreply);
1355
1356                add_iov(c, header, size);
1357
1358                if (out_key != NULL &&
1359                    out_keylen > 0) {
1360                    add_iov(c, out_key, out_keylen);
1361                }
1362
1363                if (settings.verbose > 2) {
1364                    moxi_log_write("forwarding a2b to %d, cmd %x, noreply %d, vbucket %d\n",
1365                                   c->sfd, header->request.opcode, uc->noreply, vbucket);
1366
1367                    cproxy_dump_header(c->sfd, (char *) header);
1368                }
1369
1370                conn_set_state(c, conn_mwrite);
1371                c->write_and_go = conn_new_cmd;
1372
1373                if (update_event(c, EV_WRITE | EV_PERSIST)) {
1374                    d->downstream_used_start = 1;
1375                    d->downstream_used       = 1;
1376
1377                    if (cproxy_dettach_if_noreply(d, uc) == false) {
1378                        cproxy_start_downstream_timeout(d, c);
1379                    } else {
1380                        c->write_and_go = conn_pause;
1381
1382                        cproxy_front_cache_delete(d->ptd, key, key_len);
1383                    }
1384
1385                    return true;
1386                } else {
1387                    /* TODO: Error handling. */
1388
1389                    if (settings.verbose > 1) {
1390                        moxi_log_write("ERROR: Couldn't a2b update write event\n");
1391                    }
1392
1393                    if (d->upstream_suffix == NULL) {
1394                        d->upstream_suffix = "SERVER_ERROR a2b event oom\r\n";
1395                        d->upstream_suffix_len = 0;
1396                        d->upstream_status = PROTOCOL_BINARY_RESPONSE_ENOMEM;
1397                        d->upstream_retry = 0;
1398                        d->target_host_ident = NULL;
1399                    }
1400                }
1401            } else {
1402                /* TODO: Error handling. */
1403
1404                if (settings.verbose > 1) {
1405                    moxi_log_write("ERROR: Couldn't a2b fill request: %s (%x)\n",
1406                            command, uc->cmd_curr);
1407                }
1408
1409                if (d->upstream_suffix == NULL) {
1410                    d->upstream_suffix = "CLIENT_ERROR a2b parse request\r\n";
1411                    d->upstream_suffix_len = 0;
1412                    d->upstream_status = PROTOCOL_BINARY_RESPONSE_EINVAL;
1413                    d->upstream_retry = 0;
1414                    d->target_host_ident = NULL;
1415                }
1416            }
1417
1418            d->ptd->stats.stats.err_oom++;
1419            cproxy_close_conn(c);
1420        } else {
1421            d->ptd->stats.stats.err_downstream_write_prep++;
1422            cproxy_close_conn(c);
1423        }
1424    }
1425
1426    return false;
1427}
1428
1429int a2b_multiget_start(conn *c, char *cmd, int cmd_len) {
1430    (void)c;
1431    (void)cmd;
1432    (void)cmd_len;
1433    return 0; /* No-op. */
1434}
1435
1436/* An skey is a space prefixed key string.
1437 */
1438int a2b_multiget_skey(conn *c, char *skey, int skey_length, int vbucket, int key_index) {
1439    char *key     = skey + 1;
1440    int   key_len = skey_length - 1;
1441
1442    item *it = item_alloc("b", 1, 0, 0, sizeof(protocol_binary_request_get));
1443    if (it != NULL) {
1444        if (add_conn_item(c, it)) {
1445            protocol_binary_request_getk *req =
1446                (protocol_binary_request_getk *) ITEM_data(it);
1447
1448            memset(req, 0, sizeof(req->bytes));
1449
1450            req->message.header.request.magic  = PROTOCOL_BINARY_REQ;
1451            req->message.header.request.opcode = PROTOCOL_BINARY_CMD_GETKQ;
1452            req->message.header.request.keylen = htons((uint16_t) key_len);
1453            req->message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
1454            req->message.header.request.bodylen  = htonl(key_len);
1455            req->message.header.request.opaque   = htonl(key_index);
1456
1457            if (vbucket >= 0) {
1458                req->message.header.request.reserved = htons(vbucket);
1459
1460                if (settings.verbose > 2) {
1461                    char key_buf[KEY_MAX_LENGTH + 10];
1462                    cb_assert(key_len <= KEY_MAX_LENGTH);
1463                    memcpy(key_buf, key, key_len);
1464                    key_buf[key_len] = '\0';
1465
1466                    moxi_log_write("<%d a2b_multiget_skey '%s' %d %d\n",
1467                            c->sfd, key_buf, vbucket, key_index);
1468                }
1469            }
1470
1471            if (add_iov(c, ITEM_data(it), sizeof(req->bytes)) == 0 &&
1472                add_iov(c, key, key_len) == 0) {
1473                return 0; /* Success. */
1474            }
1475
1476            return -1;
1477        }
1478
1479        item_remove(it);
1480    }
1481
1482    return -1;
1483}
1484
1485int a2b_multiget_end(conn *c) {
1486    return add_iov(c, &req_noop.bytes, sizeof(req_noop.bytes));
1487}
1488
1489/* Used for broadcast commands, like flush_all or stats.
1490 */
1491bool cproxy_broadcast_a2b_downstream(downstream *d,
1492                                     protocol_binary_request_header *req,
1493                                     int req_size,
1494                                     uint8_t *key,
1495                                     uint16_t keylen,
1496                                     uint8_t  extlen,
1497                                     conn *uc,
1498                                     char *suffix) {
1499    int nwrite = 0;
1500    int nconns;
1501    int i;
1502
1503    cb_assert(d != NULL);
1504    cb_assert(d->ptd != NULL);
1505    cb_assert(d->ptd->proxy != NULL);
1506    cb_assert(d->downstream_conns != NULL);
1507    cb_assert(d->downstream_used_start == 0);
1508    cb_assert(d->downstream_used == 0);
1509    cb_assert(req != NULL);
1510    cb_assert(req_size >= (int) sizeof(req));
1511    cb_assert(req->request.bodylen == 0);
1512    cb_assert(uc != NULL);
1513    cb_assert(uc->next == NULL);
1514    cb_assert(uc->item == NULL);
1515
1516    req->request.bodylen = htonl(keylen + extlen);
1517
1518    nconns = mcs_server_count(&d->mst);
1519    for (i = 0; i < nconns; i++) {
1520        conn *c = d->downstream_conns[i];
1521        if (c != NULL &&
1522            c != NULL_CONN) {
1523            if (cproxy_prep_conn_for_write(c)) {
1524                cb_assert(c->state == conn_pause);
1525                cb_assert(c->wbuf);
1526                cb_assert(c->wsize >= req_size);
1527
1528                memcpy(c->wbuf, req, req_size);
1529
1530                add_iov(c, c->wbuf, req_size);
1531
1532                if (key != NULL &&
1533                    keylen > 0) {
1534                    add_iov(c, key, keylen);
1535                }
1536
1537                conn_set_state(c, conn_mwrite);
1538                c->write_and_go = conn_new_cmd;
1539
1540                if (update_event(c, EV_WRITE | EV_PERSIST)) {
1541                    nwrite++;
1542
1543                    if (uc->noreply) {
1544                        c->write_and_go = conn_pause;
1545                    }
1546                } else {
1547                    if (settings.verbose > 1) {
1548                        moxi_log_write("ERROR: Update cproxy write event failed\n");
1549                    }
1550
1551                    d->ptd->stats.stats.err_oom++;
1552                    cproxy_close_conn(c);
1553                }
1554            } else {
1555                if (settings.verbose > 1) {
1556                    moxi_log_write("ERROR: a2b broadcast prep conn failed\n");
1557                }
1558
1559                d->ptd->stats.stats.err_downstream_write_prep++;
1560                cproxy_close_conn(c);
1561            }
1562        }
1563    }
1564
1565    if (settings.verbose > 2) {
1566        moxi_log_write("%d: a2b broadcast nwrite %d out of %d\n",
1567                uc->sfd, nwrite, nconns);
1568    }
1569
1570    if (nwrite > 0) {
1571        d->downstream_used_start = nwrite;
1572        d->downstream_used       = nwrite;
1573
1574        if (cproxy_dettach_if_noreply(d, uc) == false) {
1575            d->upstream_suffix = suffix;
1576            d->upstream_suffix_len = 0;
1577            d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1578            d->upstream_retry = 0;
1579            d->target_host_ident = NULL;
1580
1581            cproxy_start_downstream_timeout(d, NULL);
1582        } else {
1583            /* TODO: Handle flush_all's expiration parameter against */
1584            /* the front_cache. */
1585
1586            if (req->request.opcode == PROTOCOL_BINARY_CMD_FLUSH ||
1587                req->request.opcode == PROTOCOL_BINARY_CMD_FLUSHQ) {
1588                mcache_flush_all(&d->ptd->proxy->front_cache, 0);
1589            }
1590        }
1591
1592        return true;
1593    }
1594
1595    return false;
1596}
1597
1598/* Forward an upstream command that came with item data,
1599 * like set/add/replace/etc.
1600 */
1601bool cproxy_forward_a2b_item_downstream(downstream *d, short cmd,
1602                                        item *it, conn *uc) {
1603
1604    int  vbucket = -1;
1605    bool local;
1606    conn *c;
1607
1608    cb_assert(d != NULL);
1609    cb_assert(d->ptd != NULL);
1610    cb_assert(d->ptd->proxy != NULL);
1611    cb_assert(d->downstream_conns != NULL);
1612    cb_assert(it != NULL);
1613    cb_assert(it->nbytes >= 2);
1614    cb_assert(uc != NULL);
1615    cb_assert(uc->next == NULL);
1616    cb_assert(cmd > 0);
1617
1618    /* Assuming we're already connected to downstream. */
1619
1620    c = cproxy_find_downstream_conn_ex(d, ITEM_key(it), it->nkey,
1621                                       &local, &vbucket);
1622    if (c != NULL) {
1623        if (local) {
1624            uc->hit_local = true;
1625        }
1626        if (cproxy_prep_conn_for_write(c)) {
1627            uint8_t  extlen;
1628            uint32_t hdrlen;
1629            item *it_hdr;
1630
1631            if (settings.verbose > 2) {
1632                moxi_log_write("%d: a2b_item_forward, state: %s\n",
1633                               c->sfd, state_text(c->state));
1634            }
1635
1636            cb_assert(c->state == conn_pause);
1637
1638            extlen = (cmd == NREAD_APPEND || cmd == NREAD_PREPEND) ? 0 : 8;
1639            hdrlen = sizeof(protocol_binary_request_header) +
1640                extlen;
1641
1642            it_hdr = item_alloc("i", 1, 0, 0, hdrlen);
1643            if (it_hdr != NULL) {
1644                if (add_conn_item(c, it_hdr)) {
1645                    protocol_binary_request_header *req =
1646                        (protocol_binary_request_header *) ITEM_data(it_hdr);
1647
1648                    memset(req, 0, hdrlen);
1649
1650                    req->request.magic    = PROTOCOL_BINARY_REQ;
1651                    req->request.datatype = PROTOCOL_BINARY_RAW_BYTES;
1652                    req->request.keylen   = htons((uint16_t) it->nkey);
1653                    req->request.extlen   = extlen;
1654
1655                    if (vbucket >= 0) {
1656                        /* We also put the vbucket id into the opaque, */
1657                        /* so we can have it later for not-my-vbucket */
1658                        /* error handling. */
1659
1660                        req->request.reserved = htons(vbucket);
1661                        req->request.opaque   = htonl(vbucket);
1662                    }
1663
1664                    switch (cmd) {
1665                    case NREAD_SET:
1666                        req->request.opcode =
1667                            uc->noreply ?
1668                            PROTOCOL_BINARY_CMD_SETQ :
1669                            PROTOCOL_BINARY_CMD_SET;
1670                        break;
1671                    case NREAD_CAS: {
1672                        uint64_t cas = ITEM_get_cas(it);
1673                        req->request.cas = mc_swap64(cas);
1674                        req->request.opcode =
1675                            uc->noreply ?
1676                            PROTOCOL_BINARY_CMD_SETQ :
1677                            PROTOCOL_BINARY_CMD_SET;
1678                        break;
1679                    }
1680                    case NREAD_ADD:
1681                        req->request.opcode =
1682                            uc->noreply ?
1683                            PROTOCOL_BINARY_CMD_ADDQ :
1684                            PROTOCOL_BINARY_CMD_ADD;
1685                        break;
1686                    case NREAD_REPLACE:
1687                        req->request.opcode =
1688                            uc->noreply ?
1689                            PROTOCOL_BINARY_CMD_REPLACEQ :
1690                            PROTOCOL_BINARY_CMD_REPLACE;
1691                        break;
1692                    case NREAD_APPEND:
1693                        req->request.opcode =
1694                            uc->noreply ?
1695                            PROTOCOL_BINARY_CMD_APPENDQ :
1696                            PROTOCOL_BINARY_CMD_APPEND;
1697                        break;
1698                    case NREAD_PREPEND:
1699                        req->request.opcode =
1700                            uc->noreply ?
1701                            PROTOCOL_BINARY_CMD_PREPENDQ :
1702                            PROTOCOL_BINARY_CMD_PREPEND;
1703                        break;
1704                    default:
1705                        cb_assert(false); /* TODO. */
1706                        break;
1707                    }
1708
1709                    a2b_set_opaque(c, req, uc->noreply);
1710
1711                    if (cmd != NREAD_APPEND &&
1712                        cmd != NREAD_PREPEND) {
1713                        protocol_binary_request_set *req_set =
1714                            (protocol_binary_request_set *) req;
1715
1716                        req_set->message.body.flags =
1717                            htonl(strtoul(ITEM_suffix(it), NULL, 10));
1718
1719                        req_set->message.body.expiration =
1720                            htonl(it->exptime);
1721                    }
1722
1723                    req->request.bodylen =
1724                        htonl(it->nkey + (it->nbytes - 2) + extlen);
1725
1726                    if (add_iov(c, ITEM_data(it_hdr), hdrlen) == 0 &&
1727                        add_iov(c, ITEM_key(it),  it->nkey) == 0 &&
1728                        add_iov(c, ITEM_data(it), it->nbytes - 2) == 0) {
1729                        conn_set_state(c, conn_mwrite);
1730                        c->write_and_go = conn_new_cmd;
1731
1732                        if (update_event(c, EV_WRITE | EV_PERSIST)) {
1733                            d->downstream_used_start = 1;
1734                            d->downstream_used       = 1;
1735
1736                            if (cproxy_dettach_if_noreply(d, uc) == false) {
1737                                cproxy_start_downstream_timeout(d, c);
1738
1739                                if (cmd == NREAD_SET &&
1740                                    cproxy_optimize_set_ascii(d, uc,
1741                                                              ITEM_key(it),
1742                                                              it->nkey)) {
1743                                    d->ptd->stats.stats.tot_optimize_sets++;
1744                                }
1745                            } else {
1746                                c->write_and_go = conn_pause;
1747
1748                                /* TODO: At this point, the item key string is */
1749                                /* not '\0' or space terminated, which is */
1750                                /* required by the mcache API. */
1751                                /* Be sure to config front_cache to be off */
1752                                /* for binary protocol downstreams. */
1753
1754                                /* mcache_delete(&d->ptd->proxy->front_cache, */
1755                                /*               ITEM_key(it), it->nkey); */
1756                            }
1757
1758                            return true;
1759                        }
1760                    }
1761                }
1762
1763                item_remove(it_hdr);
1764            }
1765
1766            d->ptd->stats.stats.err_oom++;
1767            cproxy_close_conn(c);
1768        } else {
1769            d->ptd->stats.stats.err_downstream_write_prep++;
1770            cproxy_close_conn(c);
1771        }
1772    }
1773
1774    return false;
1775}
1776
1777void a2b_set_opaque(conn *c, protocol_binary_request_header *header,
1778                    bool noreply) {
1779    if (noreply) {
1780        /* Set a magic opaque value during quiet commands that tells us later */
1781        /* that we can ignore the downstream's error response messge, */
1782        /* since the upstream ascii client doesn't want it. */
1783
1784        header->request.opaque = htonl(OPAQUE_IGNORE_REPLY);
1785
1786        if (settings.verbose > 2) {
1787            moxi_log_write("%d: a2b_set_opaque OPAQUE_IGNORE_REPLY, cmdq: %x\n",
1788                           c->sfd, header->request.opcode);
1789        }
1790    }
1791}
1792
1793bool a2b_not_my_vbucket(conn *uc, conn *c,
1794                        protocol_binary_response_header *header) {
1795    downstream *d = c->extra;
1796    cb_assert(d != NULL);
1797    cb_assert(d->ptd != NULL);
1798
1799    if (settings.verbose > 2) {
1800        moxi_log_write("<%d a2b_not_my_vbucket, "
1801                       "cmd: %x %d\n",
1802                       c->sfd, header->response.opcode, uc != NULL);
1803    }
1804
1805    if ((c->cmd != PROTOCOL_BINARY_CMD_GETK &&
1806         c->cmd != PROTOCOL_BINARY_CMD_GETL) ||
1807        c->noreply == false) {
1808        int max_retries;
1809        int vbucket;
1810        int sindex;
1811
1812        /* For non-multi-key GET commands, enqueue a retry after */
1813        /* informing the vbucket map.  This includes single-key GET's. */
1814
1815        if (uc == NULL) {
1816            /* If the client went away, though, don't retry. */
1817
1818            conn_set_state(c, conn_pause);
1819            return true;
1820        }
1821
1822        vbucket = ntohl(header->response.opaque);
1823        sindex = downstream_conn_index(d, c);
1824
1825        if (settings.verbose > 2) {
1826            moxi_log_write("<%d a2b_not_my_vbucket, "
1827                           "cmd: %x not multi-key get, sindex %d, vbucket %d, retries %d\n",
1828                           c->sfd, header->response.opcode, sindex, vbucket, uc->cmd_retries);
1829        }
1830
1831        mcs_server_invalid_vbucket(&d->mst, sindex, vbucket);
1832
1833        /* As long as the upstream is still open and we haven't */
1834        /* retried too many times already. */
1835
1836        max_retries = cproxy_max_retries(d);
1837
1838        if (uc->cmd_retries < max_retries) {
1839            uc->cmd_retries++;
1840
1841            d->upstream_retry++;
1842            d->ptd->stats.stats.tot_retry_vbucket++;
1843
1844            conn_set_state(c, conn_pause);
1845            return true;
1846        }
1847
1848        if (settings.verbose > 2) {
1849            moxi_log_write("%d: a2b_not_my_vbucket, "
1850                           "cmd: %x skipping retry %d >= %d\n",
1851                           c->sfd, header->response.opcode, uc->cmd_retries,
1852                           max_retries);
1853        }
1854
1855        return false;
1856    } else {
1857        int key_index;
1858        char *key;
1859        int key_len;
1860        char key_buf[KEY_MAX_LENGTH + 10];
1861        int vbucket = -1;
1862        int sindex;
1863
1864        /* Handle ascii multi-GET commands by awaiting all NOOP's from */
1865        /* downstream servers, eating the NOOP's, and retrying with */
1866        /* the same multiget de-duplication map, which might be partially */
1867        /* filled in already. */
1868
1869        if (uc == NULL) {
1870            /* If the client went away, though, don't retry, */
1871            /* but keep looking for that NOOP. */
1872
1873            conn_set_state(c, conn_new_cmd);
1874            return true;
1875        }
1876
1877        cb_assert(uc->cmd_start != NULL);
1878        cb_assert(header->response.opaque != 0);
1879
1880        key_index = ntohl(header->response.opaque);
1881        key = uc->cmd_start + key_index;
1882        key_len = skey_len(key);
1883
1884        /* The key is not NULL or space terminated. */
1885
1886        cb_assert(key_len <= KEY_MAX_LENGTH);
1887        memcpy(key_buf, key, key_len);
1888        key_buf[key_len] = '\0';
1889
1890        sindex = downstream_conn_index(d, c);
1891
1892        mcs_key_hash(&d->mst, key_buf, key_len, &vbucket);
1893
1894        if (settings.verbose > 2) {
1895            moxi_log_write("<%d a2b_not_my_vbucket, "
1896                           "cmd: %x get/getk '%s' %d retry %d, sindex %d, vbucket %d\n",
1897                           c->sfd, header->response.opcode, key_buf, key_len,
1898                           d->upstream_retry + 1, sindex, vbucket);
1899        }
1900
1901        mcs_server_invalid_vbucket(&d->mst, sindex, vbucket);
1902
1903        /* Update the de-duplication map, removing the key, so that */
1904        /* we'll reattempt another request for the key during the */
1905        /* retry. */
1906
1907        if (d->multiget != NULL) {
1908            multiget_entry *entry = genhash_find(d->multiget, key_buf);
1909
1910            if (settings.verbose > 2) {
1911                moxi_log_write("<%d a2b_not_my_vbucket, "
1912                               "cmd: %x get/getk '%s' %d retry: %d, entry: %d, vbucket %d "
1913                               "deleting multiget entry\n",
1914                               c->sfd, header->response.opcode, key_buf, key_len,
1915                               d->upstream_retry + 1, entry != NULL, vbucket);
1916            }
1917
1918            genhash_delete(d->multiget, key_buf);
1919
1920            while (entry != NULL) {
1921                multiget_entry *curr = entry;
1922                entry = entry->next;
1923                free(curr);
1924            }
1925        } else {
1926            if (settings.verbose > 2) {
1927                moxi_log_write("<%d a2b_not_my_vbucket no dedupe map, "
1928                               "cmd: %x get/getk '%s' %d retry: %d, vbucket %d\n",
1929                               c->sfd, header->response.opcode, key_buf, key_len,
1930                               d->upstream_retry + 1, vbucket);
1931            }
1932        }
1933
1934        /* Signal that we need to retry, where this counter is */
1935        /* later checked after all NOOP's from downstreams are */
1936        /* received. */
1937
1938        d->upstream_retry++;
1939        d->ptd->stats.stats.tot_retry_vbucket++;
1940
1941        conn_set_state(c, conn_new_cmd);
1942        return true;
1943    }
1944}
1945