xref: /5.5.2/moxi/src/cproxy_protocol_a.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <platform/cbassert.h>
7#include "memcached.h"
8#include "cproxy.h"
9#include "work.h"
10#include "log.h"
11
12/* Internal declarations. */
13
14#define COMMAND_TOKEN 0
15#define MAX_TOKENS    8
16
17#define MAX_HOSTNAME_LEN 200
18#define MAX_PORT_LEN     8
19
20void cproxy_process_upstream_ascii(conn *c, char *line) {
21    cb_assert(c != NULL);
22    cb_assert(c->next == NULL);
23    cb_assert(c->extra != NULL);
24    cb_assert(c->cmd == -1);
25    cb_assert(c->item == NULL);
26    cb_assert(line != NULL);
27    cb_assert(line == c->rcurr);
28    cb_assert(IS_ASCII(c->protocol));
29    cb_assert(IS_PROXY(c->protocol));
30
31    if (settings.verbose > 2) {
32        moxi_log_write("<%d cproxy_process_upstream_ascii %s\n",
33                       c->sfd, line);
34    }
35
36    /* Snapshot rcurr, because the caller, try_read_command(), changes it. */
37
38    c->cmd_curr       = -1;
39    c->cmd_start      = c->rcurr;
40    c->cmd_start_time = msec_current_time;
41    c->cmd_retries    = 0;
42
43    proxy_td *ptd = c->extra;
44    cb_assert(ptd != NULL);
45
46    /* For commands set/add/replace, we build an item and read the data
47     * directly into it, then continue in nread_complete().
48     */
49    if (!cproxy_prep_conn_for_write(c)) {
50        ptd->stats.stats.err_upstream_write_prep++;
51        conn_set_state(c, conn_closing);
52        return;
53    }
54
55    bool mcmux_command = false;
56    bool self_command = false;
57
58    /* Check for proxy pattern - A:host:port or B:host:port */
59    if (true == settings.enable_mcmux_mode &&
60        ((*line == 'A' || *line == 'B') && *(line + 1) == ':')) {
61        mcmux_command = true;
62    } else if (true == settings.enable_mcmux_mode) {
63        self_command = true;
64    }
65
66    c->peer_protocol = 0;
67    c->peer_host = NULL;
68    c->peer_port = 0;
69
70    if (mcmux_command) {
71        char *peer_port = NULL;
72        int i = 0;
73
74        c->peer_protocol = (*line == 'A') ?
75            proxy_downstream_ascii_prot :
76            proxy_downstream_binary_prot;
77        line += 2;
78        c->peer_host = line;
79
80        while (*line != ' ' && *line != '\0' &&
81               *line != ':' && ++i < MAX_HOSTNAME_LEN) {
82            line++;
83        }
84
85        if (*line == '\0' || line - c->peer_host <= 0) {
86            out_string(c, "ERROR");
87            moxi_log_write("Malformed request line");
88            return;
89        }
90        *line = '\0';
91        line++;
92        peer_port = line;
93        i = 0;
94
95        while (*line != ' ' && *line != '\0' && ++i <= MAX_PORT_LEN) {
96            line++;
97        }
98
99        if (*line == '\0' || line - peer_port <= 0) {
100            out_string(c, "ERROR");
101            moxi_log_write("Malformed request line");
102            return;
103        }
104
105        c->peer_port = atoi(peer_port);
106
107        *line++ = '\0';
108        c->cmd_start = line;
109    }
110
111    int     cmd_len = 0;
112    token_t tokens[MAX_TOKENS];
113    size_t  ntokens = scan_tokens(line, tokens, MAX_TOKENS, &cmd_len);
114    char   *cmd     = tokens[COMMAND_TOKEN].value;
115    int     cmdx    = -1;
116    int     cmd_st  = STATS_CMD_TYPE_REGULAR;
117    int     comm;
118
119#define SEEN(cmd_id, is_cas, cmd_len)                           \
120    cmd_st = c->noreply ?                                       \
121        STATS_CMD_TYPE_QUIET : STATS_CMD_TYPE_REGULAR;          \
122    ptd->stats.stats_cmd[cmd_st][cmd_id].seen++;                \
123    ptd->stats.stats_cmd[cmd_st][cmd_id].read_bytes += cmd_len; \
124    if (is_cas) {                                               \
125        ptd->stats.stats_cmd[cmd_st][cmd_id].cas++;             \
126    }
127
128    if (ntokens >= 3 &&
129        (false == self_command) &&
130        (strncmp(cmd, "get", 3) == 0)) {
131        if (cmd[3] == 'l') {
132            c->cmd_curr = PROTOCOL_BINARY_CMD_GETL;
133        } else if (ntokens == 3) {
134            /* Single-key get/gets optimization. */
135
136            c->cmd_curr = PROTOCOL_BINARY_CMD_GETK;
137        } else {
138            c->cmd_curr = PROTOCOL_BINARY_CMD_GETKQ;
139        }
140
141        /* Handles get and gets. */
142
143        cproxy_pause_upstream_for_downstream(ptd, c);
144
145        /* The cmd_len from scan_tokens might not include */
146        /* all the keys, so cmd_len might not == strlen(command). */
147        /* Handle read_bytes during multiget broadcast. */
148
149        if (cmd[3] == 'l') {
150            SEEN(STATS_CMD_GETL, true, 0);
151        } else {
152            SEEN(STATS_CMD_GET, cmd[3] == 's', 0);
153        }
154
155    } else if ((ntokens == 6 || ntokens == 7) &&
156                (false == self_command) &&
157               ((strncmp(cmd, "add", 3) == 0 &&
158                 (comm = NREAD_ADD) &&
159                 (cmdx = STATS_CMD_ADD) &&
160                 (c->cmd_curr = PROTOCOL_BINARY_CMD_ADD)) ||
161                (strncmp(cmd, "set", 3) == 0 &&
162                 (comm = NREAD_SET) &&
163                 (cmdx = STATS_CMD_SET) &&
164                 (c->cmd_curr = PROTOCOL_BINARY_CMD_SET)) ||
165                (strncmp(cmd, "replace", 7) == 0 &&
166                 (comm = NREAD_REPLACE) &&
167                 (cmdx = STATS_CMD_REPLACE) &&
168                 (c->cmd_curr = PROTOCOL_BINARY_CMD_REPLACE)) ||
169                (strncmp(cmd, "prepend", 7) == 0 &&
170                 (comm = NREAD_PREPEND) &&
171                 (cmdx = STATS_CMD_PREPEND) &&
172                 (c->cmd_curr = PROTOCOL_BINARY_CMD_PREPEND)) ||
173                (strncmp(cmd, "append", 6) == 0 &&
174                 (comm = NREAD_APPEND) &&
175                 (cmdx = STATS_CMD_APPEND) &&
176                 (c->cmd_curr = PROTOCOL_BINARY_CMD_APPEND)))) {
177        cb_assert(c->item == NULL);
178        c->item = NULL;
179
180        process_update_command(c, tokens, ntokens, comm, false);
181
182        if (cmdx >= 0) {
183            item *it = c->item;
184            if (it != NULL) {
185                SEEN(cmdx, false, cmd_len + it->nbytes);
186            } else {
187                SEEN(cmdx, false, cmd_len);
188                ptd->stats.stats_cmd[cmd_st][cmdx].misses++;
189            }
190        }
191
192    } else if ((ntokens == 7 || ntokens == 8) &&
193               (false == self_command) &&
194               (strncmp(cmd, "cas", 3) == 0 &&
195                (comm = NREAD_CAS) &&
196                (c->cmd_curr = PROTOCOL_BINARY_CMD_SET))) {
197        cb_assert(c->item == NULL);
198        c->item = NULL;
199
200        process_update_command(c, tokens, ntokens, comm, true);
201
202        item *it = c->item;
203        if (it != NULL) {
204            SEEN(STATS_CMD_CAS, true, cmd_len + it->nbytes);
205        } else {
206            SEEN(STATS_CMD_CAS, true, cmd_len);
207            ptd->stats.stats_cmd[cmd_st][STATS_CMD_CAS].misses++;
208        }
209
210    } else if ((ntokens == 4 || ntokens == 5) &&
211               (false == self_command) &&
212               (strncmp(cmd, "incr", 4) == 0) &&
213               (c->cmd_curr = PROTOCOL_BINARY_CMD_INCREMENT)) {
214        set_noreply_maybe(c, tokens, ntokens);
215        cproxy_pause_upstream_for_downstream(ptd, c);
216
217        SEEN(STATS_CMD_INCR, false, cmd_len);
218
219    } else if ((ntokens == 4 || ntokens == 5) &&
220               (false == self_command) &&
221               (strncmp(cmd, "decr", 4) == 0) &&
222               (c->cmd_curr = PROTOCOL_BINARY_CMD_DECREMENT)) {
223        set_noreply_maybe(c, tokens, ntokens);
224        cproxy_pause_upstream_for_downstream(ptd, c);
225
226        SEEN(STATS_CMD_DECR, false, cmd_len);
227
228    } else if (ntokens >= 3 && ntokens <= 4 &&
229               (false == self_command) &&
230               (strncmp(cmd, "delete", 6) == 0) &&
231               (c->cmd_curr = PROTOCOL_BINARY_CMD_DELETE)) {
232        set_noreply_maybe(c, tokens, ntokens);
233        cproxy_pause_upstream_for_downstream(ptd, c);
234
235        SEEN(STATS_CMD_DELETE, false, cmd_len);
236
237    } else if (ntokens >= 2 && ntokens <= 4 &&
238               (false == self_command) &&
239               (strncmp(cmd, "flush_all", 9) == 0) &&
240               (c->cmd_curr = PROTOCOL_BINARY_CMD_FLUSH)) {
241        set_noreply_maybe(c, tokens, ntokens);
242        cproxy_pause_upstream_for_downstream(ptd, c);
243
244        SEEN(STATS_CMD_FLUSH_ALL, false, cmd_len);
245
246    } else if (ntokens >= 3 && ntokens <= 4 &&
247               (strncmp(cmd, "stats proxy", 10) == 0)) {
248
249        process_stats_proxy_command(c, tokens, ntokens);
250
251        SEEN(STATS_CMD_STATS, false, cmd_len);
252
253    } else if (ntokens == 3 &&
254               (false == self_command) &&
255               (strcmp(cmd, "stats reset") == 0) &&
256               (c->cmd_curr = PROTOCOL_BINARY_CMD_STAT)) {
257        cproxy_pause_upstream_for_downstream(ptd, c);
258
259        SEEN(STATS_CMD_STATS_RESET, false, cmd_len);
260
261    } else if (ntokens == 2 &&
262               (false == self_command) &&
263               (strcmp(cmd, "stats") == 0) &&
264               (c->cmd_curr = PROTOCOL_BINARY_CMD_STAT)) {
265        /* Even though we've coded to handle advanced stats */
266        /* like stats cachedump, prevent those here to avoid */
267        /* locking downstream servers. */
268
269        cproxy_pause_upstream_for_downstream(ptd, c);
270
271        SEEN(STATS_CMD_STATS, false, cmd_len);
272
273    } else if (ntokens == 2 &&
274               (true == mcmux_command) &&
275               (strncmp(cmd, "version", 7) == 0) &&
276               (c->cmd_curr = PROTOCOL_BINARY_CMD_VERSION)) {
277        /* downstream version command */
278        cproxy_pause_upstream_for_downstream(ptd, c);
279
280        SEEN(STATS_CMD_VERSION, false, cmd_len);
281
282    } else if (ntokens == 2 &&
283               (strncmp(cmd, "version", 7) == 0)) {
284        out_string(c, "VERSION " VERSION);
285
286        SEEN(STATS_CMD_VERSION, false, cmd_len);
287
288    } else if ((ntokens == 3 || ntokens == 4) &&
289               (strncmp(cmd, "verbosity", 9) == 0)) {
290        process_verbosity_command(c, tokens, ntokens);
291
292        SEEN(STATS_CMD_VERBOSITY, false, cmd_len);
293
294    } else if (ntokens == 2 &&
295               (strncmp(cmd, "quit", 4) == 0)) {
296        conn_set_state(c, conn_closing);
297
298        SEEN(STATS_CMD_QUIT, false, cmd_len);
299
300    } else if (ntokens == 4 &&
301               (strncmp(cmd, "unl", 3) == 0) &&
302               (false == self_command) &&
303               (c->cmd_curr = PROTOCOL_BINARY_CMD_UNL)) {
304        cproxy_pause_upstream_for_downstream(ptd, c);
305
306        SEEN(STATS_CMD_UNL, false, cmd_len);
307
308    } else if (ntokens == 4 && /* Ex: "touch <key> <expiration>" */
309               (false == self_command) &&
310               (strncmp(cmd, "touch", 5) == 0) &&
311               (c->cmd_curr = PROTOCOL_BINARY_CMD_TOUCH)) {
312        cproxy_pause_upstream_for_downstream(ptd, c);
313
314        /* TODO: SEEN(STATS_CMD_TOUCH, false, cmd_len); */
315
316    } else {
317        out_string(c, "ERROR");
318
319        SEEN(STATS_CMD_ERROR, false, cmd_len);
320    }
321}
322
323/* We get here after reading the value in set/add/replace
324 * commands. The command has been stored in c->cmd, and
325 * the item is ready in c->item.
326 */
327void cproxy_process_upstream_ascii_nread(conn *c) {
328    cb_assert(c != NULL);
329    cb_assert(c->next == NULL);
330
331    item *it = c->item;
332
333    cb_assert(it != NULL);
334
335    /* pthread_mutex_lock(&c->thread->stats.mutex); */
336    /* c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; */
337    /* pthread_mutex_unlock(&c->thread->stats.mutex); */
338
339    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) {
340        proxy_td *ptd = c->extra;
341
342        cb_assert(ptd != NULL);
343
344        cproxy_pause_upstream_for_downstream(ptd, c);
345    } else {
346        out_string(c, "CLIENT_ERROR bad data chunk");
347    }
348}
349
350/**
351 * @param cas_emit  1: emit CAS.
352 *                  0: do not emit CAS.
353 *                 -1: data driven.
354 */
355void cproxy_upstream_ascii_item_response(item *it, conn *uc,
356                                         int cas_emit) {
357    cb_assert(it != NULL);
358    cb_assert(uc != NULL);
359    cb_assert(uc->state == conn_pause);
360    cb_assert(uc->funcs != NULL);
361    cb_assert(IS_ASCII(uc->protocol));
362    cb_assert(IS_PROXY(uc->protocol));
363
364    if (settings.verbose > 2) {
365        char key[KEY_MAX_LENGTH + 10];
366        cb_assert(it->nkey <= KEY_MAX_LENGTH);
367        memcpy(key, ITEM_key(it), it->nkey);
368        key[it->nkey] = '\0';
369
370        moxi_log_write("<%d cproxy ascii item response, key %s\n",
371                       uc->sfd, key);
372    }
373
374    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) {
375        /* TODO: Need to clean up half-written add_iov()'s. */
376        /*       Consider closing the upstream_conns? */
377
378        uint64_t cas = ITEM_get_cas(it);
379        if ((cas_emit == 0) ||
380            (cas_emit < 0 &&
381             cas == CPROXY_NOT_CAS)) {
382            if (add_conn_item(uc, it)) {
383                it->refcount++;
384
385                if (add_iov(uc, "VALUE ", 6) == 0 &&
386                    add_iov(uc, ITEM_key(it), it->nkey) == 0 &&
387                    add_iov(uc, ITEM_suffix(it),
388                            it->nsuffix + it->nbytes) == 0) {
389                    if (settings.verbose > 2) {
390                        moxi_log_write("<%d cproxy ascii item response success\n",
391                                       uc->sfd);
392                    }
393                }
394            }
395        } else {
396            char *suffix = add_conn_suffix(uc);
397            if (suffix != NULL) {
398                sprintf(suffix, " %llu\r\n", (unsigned long long) cas);
399
400                if (add_conn_item(uc, it)) {
401                    it->refcount++;
402
403                    if (add_iov(uc, "VALUE ", 6) == 0 &&
404                        add_iov(uc, ITEM_key(it), it->nkey) == 0 &&
405                        add_iov(uc, ITEM_suffix(it),
406                                it->nsuffix - 2) == 0 &&
407                        add_iov(uc, suffix, strlen(suffix)) == 0 &&
408                        add_iov(uc, ITEM_data(it), it->nbytes) == 0) {
409                        if (settings.verbose > 2) {
410                            moxi_log_write("<%d cproxy ascii item response ok\n",
411                                    uc->sfd);
412                        }
413                    }
414                }
415            }
416        }
417    } else {
418        if (settings.verbose > 1) {
419            moxi_log_write("ERROR: unexpected downstream data block");
420        }
421    }
422}
423
424/**
425 * When we're sending an ascii response line back upstream to
426 * an ascii protocol client, keep the front_cache sync'ed.
427 */
428void cproxy_del_front_cache_key_ascii_response(downstream *d,
429                                               char *response,
430                                               char *command) {
431    cb_assert(d);
432    cb_assert(d->ptd);
433    cb_assert(d->ptd->proxy);
434    cb_assert(response);
435
436    if (!mcache_started(&d->ptd->proxy->front_cache)) {
437        return;
438    }
439
440    /* TODO: Not sure if we need all these checks, or just */
441    /* clear the cache item no matter what. */
442
443    if (strncmp(response, "DELETED", 7) == 0 ||
444        strncmp(response, "STORED", 6) == 0 ||
445        strncmp(response, "EXISTS", 6) == 0 ||
446        strncmp(response, "NOT_FOUND", 9) == 0 ||
447        strncmp(response, "NOT_STORED", 10) == 0 ||
448        strncmp(response, "ERROR", 5) == 0 ||
449        strncmp(response, "SERVER_ERROR", 12) == 0 ||
450        (response[0] == '-') ||
451        (response[0] >= '0' && response[0] <= '9')) {
452        cproxy_del_front_cache_key_ascii(d, command);
453    }
454}
455
456void cproxy_del_front_cache_key_ascii(downstream *d,
457                                      char *command) {
458    cb_assert(d);
459    cb_assert(d->ptd);
460    cb_assert(d->ptd->proxy);
461
462    if (d->ptd->behavior_pool.base.front_cache_lifespan == 0) {
463        return;
464    }
465
466    if (mcache_started(&d->ptd->proxy->front_cache)) {
467        char *spc = strchr(command, ' ');
468        if (spc != NULL) {
469            char *key = spc + 1;
470            int   key_len = skey_len(key);
471
472            cproxy_front_cache_delete(d->ptd, key, key_len);
473        }
474    }
475}
476
477/**
478 * Depending on our configuration, we can optimize SET's
479 * on certain keys by making them fire-and-forget and
480 * immediately transmitting a success response to the
481 * upstream client.
482 */
483bool cproxy_optimize_set_ascii(downstream *d, conn *uc,
484                               char *key, int key_len) {
485    cb_assert(d);
486    cb_assert(d->ptd);
487    cb_assert(d->ptd->proxy);
488    cb_assert(uc);
489    cb_assert(uc->next == NULL);
490
491    if (d->ptd->behavior_pool.base.optimize_set[0] == '\0') {
492        return false;
493    }
494
495    if (matcher_check(&d->ptd->proxy->optimize_set_matcher,
496                      key, key_len, false)) {
497        d->upstream_conn = NULL;
498        d->upstream_suffix = NULL;
499        d->upstream_suffix_len = 0;
500        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
501        d->upstream_retry = 0;
502        d->target_host_ident = NULL;
503
504        out_string(uc, "STORED");
505
506        if (!update_event(uc, EV_WRITE | EV_PERSIST)) {
507            if (settings.verbose > 1) {
508                moxi_log_write("ERROR: Can't update upstream write event\n");
509            }
510
511            d->ptd->stats.stats.err_oom++;
512            cproxy_close_conn(uc);
513        }
514
515        return true;
516    }
517
518    return false;
519}
520
521void cproxy_process_downstream_ascii(conn *c, char *line) {
522    downstream *d = c->extra;
523    cb_assert(d != NULL);
524    cb_assert(d->upstream_conn != NULL);
525
526    if (IS_ASCII(d->upstream_conn->protocol)) {
527        cproxy_process_a2a_downstream(c, line);
528    } else {
529        cb_assert(false); /* TODO: b2a. */
530    }
531}
532
533void cproxy_process_downstream_ascii_nread(conn *c) {
534    downstream *d = c->extra;
535    cb_assert(d != NULL);
536    cb_assert(d->upstream_conn != NULL);
537
538    if (IS_ASCII(d->upstream_conn->protocol)) {
539        cproxy_process_a2a_downstream_nread(c);
540    } else {
541        cb_assert(false); /* TODO: b2a. */
542    }
543}
544
545bool cproxy_is_broadcast_cmd(int cmd) {
546    return (cmd == PROTOCOL_BINARY_CMD_FLUSH ||
547            cmd == PROTOCOL_BINARY_CMD_STAT || /* In a2x translation. */
548            cmd == PROTOCOL_BINARY_CMD_NOOP ||
549            cmd == PROTOCOL_BINARY_CMD_GETKQ);
550}
551
552bool ascii_scan_key(char *line, char **key, int *key_len) {
553    char *curr = line;
554
555    while (*curr != '\0' &&
556           *curr == ' ') { /* Scan to start of cmd. */
557        curr++;
558    }
559
560    while (*curr != '\0' &&
561           *curr != ' ') { /* Scan to end of cmd. */
562        curr++;
563    }
564
565    while (*curr != '\0' &&
566           *curr == ' ') { /* Scan to start of key. */
567        curr++;
568    }
569
570    *key = curr;
571
572    while (*curr != '\0' &&
573           *curr != ' ') { /* Scan to end of key. */
574        curr++;
575    }
576
577    *key_len = (int) (curr - *key);
578
579    return *key_len > 0;
580}
581
582void cproxy_ascii_broadcast_suffix(downstream *d) {
583    conn *uc = d->upstream_conn;
584    if (uc != NULL &&
585        uc->noreply == false) {
586        if (uc->cmd_curr == PROTOCOL_BINARY_CMD_FLUSH) {
587            d->upstream_suffix = "OK\r\n";
588        } else {
589            d->upstream_suffix = "END\r\n";
590        }
591
592        d->upstream_suffix_len = 0;
593        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
594        d->upstream_retry = 0;
595        d->target_host_ident = NULL;
596    }
597}
598