xref: /5.5.2/moxi/src/cproxy_protocol_a2a.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <string.h>
5#include <errno.h>
6#include <platform/cbassert.h>
7#include "memcached.h"
8#include "cproxy.h"
9#include "work.h"
10#include "log.h"
11
12/* Internal declarations. */
13
14#define KEY_TOKEN  1
15#define MAX_TOKENS 8
16
17int a2a_multiget_start(conn *c, char *cmd, int cmd_len);
18int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket, int key_index);
19int a2a_multiget_end(conn *c);
20
21void cproxy_init_a2a() {
22    /* Nothing right now. */
23}
24
25void cproxy_process_a2a_downstream(conn *c, char *line) {
26    cb_assert(c != NULL);
27    cb_assert(c->next == NULL);
28    cb_assert(c->extra != NULL);
29    cb_assert(c->cmd == -1);
30    cb_assert(c->item == NULL);
31    cb_assert(line != NULL);
32    cb_assert(line == c->rcurr);
33    cb_assert(IS_ASCII(c->protocol));
34    cb_assert(IS_PROXY(c->protocol));
35
36    if (settings.verbose > 1) {
37        moxi_log_write("<%d cproxy_process_a2a_downstream %s\n",
38                c->sfd, line);
39    }
40
41    downstream *d = c->extra;
42
43    cb_assert(d != NULL);
44    cb_assert(d->ptd != NULL);
45    cb_assert(d->ptd->proxy != NULL);
46
47    if (strncmp(line, "VALUE ", 6) == 0) {
48        token_t      tokens[MAX_TOKENS];
49        size_t       ntokens;
50        unsigned int flags;
51        int          clen = 0;
52        int          vlen;
53        uint64_t     cas = CPROXY_NOT_CAS;
54
55        ntokens = scan_tokens(line, tokens, MAX_TOKENS, &clen);
56        if (ntokens >= 5 && /* Accounts for extra termimation token. */
57            ntokens <= 6 &&
58            tokens[KEY_TOKEN].length <= KEY_MAX_LENGTH &&
59            safe_strtoul(tokens[2].value, (uint32_t *) &flags) &&
60            safe_strtoul(tokens[3].value, (uint32_t *) &vlen)) {
61            char  *key  = tokens[KEY_TOKEN].value;
62            size_t nkey = tokens[KEY_TOKEN].length;
63
64            item *it = item_alloc(key, nkey, flags, 0, vlen + 2);
65            if (it != NULL) {
66                if (ntokens == 5 ||
67                    safe_strtoull(tokens[4].value, &cas)) {
68                    ITEM_set_cas(it, cas);
69
70                    c->item = it;
71                    c->ritem = ITEM_data(it);
72                    c->rlbytes = it->nbytes;
73                    c->cmd = -1;
74
75                    conn_set_state(c, conn_nread);
76
77                    return; /* Success. */
78                } else {
79                    if (settings.verbose > 1) {
80                        moxi_log_write("cproxy could not parse cas\n");
81                    }
82                }
83            } else {
84                if (settings.verbose > 1) {
85                    moxi_log_write("cproxy could not item_alloc size %u\n",
86                            vlen + 2);
87                }
88            }
89
90            if (it != NULL) {
91                item_remove(it);
92            }
93
94            it = NULL;
95
96            c->sbytes = vlen + 2; /* Number of bytes to swallow. */
97
98            conn_set_state(c, conn_swallow);
99
100            /* Note, eventually, we'll see an END later. */
101        } else {
102            /* We don't know how much to swallow, so close the downstream. */
103            /* The conn_closing should release the downstream, */
104            /* which should write a suffix/error to the upstream. */
105
106            conn_set_state(c, conn_closing);
107        }
108    } else if (strncmp(line, "END", 3) == 0) {
109        conn_set_state(c, conn_pause);
110    } else if (strncmp(line, "OK", 2) == 0) {
111        conn_set_state(c, conn_pause);
112
113        /* TODO: Handle flush_all's expiration parameter against */
114        /* the front_cache. */
115
116        /* TODO: We flush the front_cache too often, inefficiently */
117        /* on every downstream flush_all OK response, rather than */
118        /* on just the last flush_all OK response. */
119
120        conn *uc = d->upstream_conn;
121        if (uc != NULL &&
122            uc->cmd_curr == PROTOCOL_BINARY_CMD_FLUSH) {
123            mcache_flush_all(&d->ptd->proxy->front_cache, 0);
124        }
125    } else if (strncmp(line, "STAT ", 5) == 0 ||
126               strncmp(line, "ITEM ", 5) == 0 ||
127               strncmp(line, "PREFIX ", 7) == 0) {
128        cb_assert(d->merger != NULL);
129
130        conn *uc = d->upstream_conn;
131        if (uc != NULL) {
132            cb_assert(uc->next == NULL);
133
134            if (protocol_stats_merge_line(d->merger, line) == false) {
135                /* Forward the line as-is if we couldn't merge it. */
136
137                int nline = strlen(line);
138
139                item *it = item_alloc("s", 1, 0, 0, nline + 2);
140                if (it != NULL) {
141                    strncpy(ITEM_data(it), line, nline);
142                    strncpy(ITEM_data(it) + nline, "\r\n", 2);
143
144                    if (add_conn_item(uc, it)) {
145                        add_iov(uc, ITEM_data(it), nline + 2);
146
147                        it = NULL;
148                    }
149
150                    if (it != NULL) {
151                        item_remove(it);
152                    }
153                }
154            }
155        }
156
157        conn_set_state(c, conn_new_cmd);
158    } else if (strncmp(line, "LOCK_ERROR", 10) == 0) {
159        d->upstream_suffix = "LOCK_ERROR\r\n";
160        d->upstream_suffix_len = 0;
161        d->upstream_status = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
162        d->upstream_retry = 0;
163        d->target_host_ident = NULL;
164
165        conn_set_state(c, conn_pause);
166    } else if (strncmp(line, "NOT_FOUND", 9) == 0) {
167        d->upstream_suffix = "NOT_FOUND\r\n";
168        d->upstream_suffix_len = 0;
169        d->upstream_retry = 0;
170        d->target_host_ident = NULL;
171
172        conn_set_state(c, conn_pause);
173    } else {
174        conn_set_state(c, conn_pause);
175
176        /* The upstream conn might be NULL when closed already */
177        /* or while handling a noreply. */
178
179        conn *uc = d->upstream_conn;
180        if (uc != NULL) {
181            cb_assert(uc->next == NULL);
182
183            out_string(uc, line);
184
185            if (!update_event(uc, EV_WRITE | EV_PERSIST)) {
186                if (settings.verbose > 1) {
187                    moxi_log_write("Can't update upstream write event\n");
188                }
189
190                d->ptd->stats.stats.err_oom++;
191                cproxy_close_conn(uc);
192            }
193
194            cproxy_del_front_cache_key_ascii_response(d, line,
195                                                      uc->cmd_start);
196        }
197    }
198}
199
200/* We get here after reading the value in a VALUE reply.
201 * The item is ready in c->item.
202 */
203void cproxy_process_a2a_downstream_nread(conn *c) {
204    cb_assert(c != NULL);
205
206    if (settings.verbose > 1) {
207        moxi_log_write("<%d cproxy_process_a2a_downstream_nread %d %d\n",
208                c->sfd, c->ileft, c->isize);
209    }
210
211    downstream *d = c->extra;
212    cb_assert(d != NULL);
213
214    item *it = c->item;
215    cb_assert(it != NULL);
216
217    /* Clear c->item because we either move it to the upstream or */
218    /* item_remove() it on error. */
219
220    c->item = NULL;
221
222    conn_set_state(c, conn_new_cmd);
223
224    /* pthread_mutex_lock(&c->thread->stats.mutex); */
225    /* c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; */
226    /* pthread_mutex_unlock(&c->thread->stats.mutex); */
227
228    multiget_ascii_downstream_response(d, it);
229
230    item_remove(it);
231}
232
233/* Do the actual work of forwarding the command from an
234 * upstream ascii conn to its assigned ascii downstream.
235 */
236bool cproxy_forward_a2a_downstream(downstream *d) {
237    cb_assert(d != NULL);
238
239    conn *uc = d->upstream_conn;
240
241    cb_assert(uc != NULL);
242    cb_assert(uc->state == conn_pause);
243    cb_assert(uc->cmd_start != NULL);
244    cb_assert(uc->thread != NULL);
245    cb_assert(uc->thread->base != NULL);
246    cb_assert(IS_ASCII(uc->protocol));
247    cb_assert(IS_PROXY(uc->protocol));
248
249    int server_index = -1;
250
251    if (cproxy_is_broadcast_cmd(uc->cmd_curr) == true) {
252        cproxy_ascii_broadcast_suffix(d);
253    } else {
254        char *key = NULL;
255        int   key_len = 0;
256
257        if (ascii_scan_key(uc->cmd_start, &key, &key_len) &&
258            key != NULL &&
259            key_len > 0) {
260            server_index = cproxy_server_index(d, key, key_len, NULL);
261            if (server_index < 0) {
262                return false;
263            }
264        }
265    }
266
267    int nc = cproxy_connect_downstream(d, uc->thread, server_index);
268    if (nc == -1) {
269        return true;
270    }
271
272    if (nc > 0) {
273        cb_assert(d->downstream_conns != NULL);
274
275        if (d->usec_start == 0 &&
276            d->ptd->behavior_pool.base.time_stats) {
277            d->usec_start = usec_now();
278        }
279
280        if (uc->cmd == -1) {
281            return cproxy_forward_a2a_simple_downstream(d, uc->cmd_start, uc);
282        } else {
283            return cproxy_forward_a2a_item_downstream(d, uc->cmd, uc->item, uc);
284        }
285    }
286
287    if (settings.verbose > 2) {
288        moxi_log_write("%d: cproxy_forward_a2a_downstream connect failed\n",
289                uc->sfd);
290    }
291
292    return false;
293}
294
295/* Forward a simple one-liner command downstream.
296 * For example, get, incr/decr, delete, etc.
297 * The response, though, might be a simple line or
298 * multiple VALUE+END lines.
299 */
300bool cproxy_forward_a2a_simple_downstream(downstream *d,
301                                          char *command, conn *uc) {
302    cb_assert(d != NULL);
303    cb_assert(d->ptd != NULL);
304    cb_assert(d->ptd->proxy != NULL);
305    cb_assert(d->downstream_conns != NULL);
306    cb_assert(command != NULL);
307    cb_assert(uc != NULL);
308    cb_assert(uc->item == NULL);
309    cb_assert(uc->cmd_curr != (protocol_binary_command) -1);
310    cb_assert(d->multiget == NULL);
311    cb_assert(d->merger == NULL);
312
313    /* Handles get and gets. */
314
315    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_GETK ||
316        uc->cmd_curr == PROTOCOL_BINARY_CMD_GETKQ ||
317        uc->cmd_curr == PROTOCOL_BINARY_CMD_GETL) {
318        /* Only use front_cache for 'get', not for 'gets'. */
319
320        mcache *front_cache =
321            (command[3] == ' ') ? &d->ptd->proxy->front_cache : NULL;
322
323        return multiget_ascii_downstream(d, uc,
324                                         a2a_multiget_start,
325                                         a2a_multiget_skey,
326                                         a2a_multiget_end,
327                                         front_cache);
328    }
329
330    cb_assert(uc->next == NULL);
331
332    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_FLUSH) {
333        return cproxy_broadcast_a2a_downstream(d, command, uc,
334                                               "OK\r\n");
335    }
336
337    if (uc->cmd_curr == PROTOCOL_BINARY_CMD_STAT) {
338        if (strncmp(command + 5, " reset", 6) == 0) {
339            return cproxy_broadcast_a2a_downstream(d, command, uc,
340                                                   "RESET\r\n");
341        }
342
343        if (cproxy_broadcast_a2a_downstream(d, command, uc,
344                                            "END\r\n")) {
345            d->merger = genhash_init(512, skeyhash_ops);
346            return true;
347        } else {
348            return false;
349        }
350    }
351
352    /* TODO: Inefficient repeated scan_tokens. */
353
354    int      cmd_len = 0;
355    token_t  tokens[MAX_TOKENS];
356    size_t   ntokens = scan_tokens(command, tokens, MAX_TOKENS, &cmd_len);
357    char    *key     = tokens[KEY_TOKEN].value;
358    int      key_len = tokens[KEY_TOKEN].length;
359
360    if (ntokens <= 1) { /* This was checked long ago, while parsing */
361        cb_assert(false);  /* the upstream conn. */
362        return false;
363    }
364
365    /* Assuming we're already connected to downstream. */
366
367    if (!strcmp(command, "version")) {
368        /* fake key for version command handling */
369        key = "v";
370        key_len = 1;
371    }
372
373    conn *c = cproxy_find_downstream_conn(d, key, key_len, NULL);
374    if (c != NULL) {
375
376        if (cproxy_prep_conn_for_write(c)) {
377            cb_assert(c->state == conn_pause);
378
379            out_string(c, command);
380
381            if (settings.verbose > 1) {
382                moxi_log_write("forwarding to %d, noreply %d\n",
383                        c->sfd, uc->noreply);
384            }
385
386            if (update_event(c, EV_WRITE | EV_PERSIST)) {
387                d->downstream_used_start = 1;
388                d->downstream_used       = 1;
389
390                if (cproxy_dettach_if_noreply(d, uc) == false) {
391                    cproxy_start_downstream_timeout(d, c);
392                } else {
393                    c->write_and_go = conn_pause;
394
395                    /* Do mcache_delete() here only during a noreply, */
396                    /* otherwise for with-reply requests, we could */
397                    /* be in a race with other clients repopulating */
398                    /* the front_cache.  For with-reply requests, we */
399                    /* clear the front_cache when we get a success reply. */
400
401                    cproxy_front_cache_delete(d->ptd, key, key_len);
402                }
403
404                return true;
405            }
406
407            if (settings.verbose > 1) {
408                moxi_log_write("Couldn't update cproxy write event\n");
409            }
410
411            d->ptd->stats.stats.err_oom++;
412            cproxy_close_conn(c);
413        } else {
414            d->ptd->stats.stats.err_downstream_write_prep++;
415            cproxy_close_conn(c);
416        }
417    }
418
419    return false;
420}
421
422int a2a_multiget_start(conn *c, char *cmd, int cmd_len) {
423    return add_iov(c, cmd, cmd_len);
424}
425
426/* An skey is a space prefixed key string.
427 */
428int a2a_multiget_skey(conn *c, char *skey, int skey_length, int vbucket, int key_index) {
429    (void)vbucket;
430    (void)key_index;
431    return add_iov(c, skey, skey_length);
432}
433
434int a2a_multiget_end(conn *c) {
435    return add_iov(c, "\r\n", 2);
436}
437
438/* Used for broadcast commands, like flush_all or stats.
439 */
440bool cproxy_broadcast_a2a_downstream(downstream *d,
441                                     char *command,
442                                     conn *uc,
443                                     char *suffix) {
444    int nwrite = 0;
445    int nconns;
446    int i;
447
448    cb_assert(d != NULL);
449    cb_assert(d->ptd != NULL);
450    cb_assert(d->ptd->proxy != NULL);
451    cb_assert(d->downstream_conns != NULL);
452    cb_assert(d->downstream_used_start == 0);
453    cb_assert(d->downstream_used == 0);
454    cb_assert(command != NULL);
455    cb_assert(uc != NULL);
456    cb_assert(uc->next == NULL);
457    cb_assert(uc->item == NULL);
458
459    nconns = mcs_server_count(&d->mst);
460    for (i = 0; i < nconns; i++) {
461        conn *c = d->downstream_conns[i];
462        if (c != NULL &&
463            c != NULL_CONN) {
464            if (cproxy_prep_conn_for_write(c)) {
465                cb_assert(c->state == conn_pause);
466
467                out_string(c, command);
468
469                if (update_event(c, EV_WRITE | EV_PERSIST)) {
470                    nwrite++;
471
472                    if (uc->noreply) {
473                        c->write_and_go = conn_pause;
474                    }
475                } else {
476                    if (settings.verbose > 1) {
477                        moxi_log_write("Update cproxy write event failed\n");
478                    }
479
480                    d->ptd->stats.stats.err_oom++;
481                    cproxy_close_conn(c);
482                }
483            } else {
484                d->ptd->stats.stats.err_downstream_write_prep++;
485                cproxy_close_conn(c);
486            }
487        }
488    }
489
490    if (settings.verbose > 1) {
491        moxi_log_write("%d: a2a broadcast nwrite %d out of %d\n",
492                uc->sfd, nwrite, nconns);
493    }
494
495    if (nwrite > 0) {
496        d->downstream_used_start = nwrite;
497        d->downstream_used       = nwrite;
498
499        if (cproxy_dettach_if_noreply(d, uc) == false) {
500            d->upstream_suffix = suffix;
501            d->upstream_suffix_len = 0;
502            d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
503            d->upstream_retry = 0;
504            d->target_host_ident = NULL;
505
506            cproxy_start_downstream_timeout(d, NULL);
507        } else {
508            /* TODO: Handle flush_all's expiration parameter against */
509            /* the front_cache. */
510
511            if (strncmp(command, "flush_all", 9) == 0) {
512                mcache_flush_all(&d->ptd->proxy->front_cache, 0);
513            }
514        }
515
516        return true;
517    }
518
519    return false;
520}
521
522/* Forward an upstream command that came with item data,
523 * like set/add/replace/etc.
524 */
525bool cproxy_forward_a2a_item_downstream(downstream *d, short cmd,
526                                        item *it, conn *uc) {
527    conn *c;
528    cb_assert(d != NULL);
529    cb_assert(d->ptd != NULL);
530    cb_assert(d->ptd->proxy != NULL);
531    cb_assert(d->downstream_conns != NULL);
532    cb_assert(it != NULL);
533    cb_assert(uc != NULL);
534    cb_assert(uc->next == NULL);
535
536    /* Assuming we're already connected to downstream. */
537    c = cproxy_find_downstream_conn(d, ITEM_key(it), it->nkey, NULL);
538    if (c != NULL) {
539
540        if (cproxy_prep_conn_for_write(c)) {
541            char *verb;
542            char *str_flags;
543            char *str_length;
544            int len_flags;
545            int len_length;
546            char *str_exptime;
547            char *str_cas;
548
549            cb_assert(c->state == conn_pause);
550
551            verb = nread_text(cmd);
552            cb_assert(verb != NULL);
553
554            str_flags = ITEM_suffix(it);
555            str_length = strchr(str_flags + 1, ' ');
556            len_flags = str_length - str_flags;
557            len_length = it->nsuffix - len_flags - 2;
558            str_exptime = add_conn_suffix(c);
559            str_cas = (cmd == NREAD_CAS ? add_conn_suffix(c) : NULL);
560
561            if (str_flags != NULL &&
562                str_length != NULL &&
563                len_flags > 1 &&
564                len_length > 1 &&
565                str_exptime != NULL &&
566                (cmd != NREAD_CAS ||
567                 str_cas != NULL)) {
568                sprintf(str_exptime, " %u", it->exptime);
569
570                if (str_cas != NULL) {
571                    sprintf(str_cas, " %llu",
572                            (unsigned long long) ITEM_get_cas(it));
573                }
574
575                if (add_iov(c, verb, strlen(verb)) == 0 &&
576                    add_iov(c, ITEM_key(it), it->nkey) == 0 &&
577                    add_iov(c, str_flags, len_flags) == 0 &&
578                    add_iov(c, str_exptime, strlen(str_exptime)) == 0 &&
579                    add_iov(c, str_length, len_length) == 0 &&
580                    (str_cas == NULL ||
581                     add_iov(c, str_cas, strlen(str_cas)) == 0) &&
582                    (uc->noreply == false ||
583                     add_iov(c, " noreply", 8) == 0) &&
584                    add_iov(c, ITEM_data(it) - 2, it->nbytes + 2) == 0) {
585                    conn_set_state(c, conn_mwrite);
586                    c->write_and_go = conn_new_cmd;
587
588                    if (update_event(c, EV_WRITE | EV_PERSIST)) {
589                        d->downstream_used_start = 1;
590                        d->downstream_used       = 1;
591
592                        if (cproxy_dettach_if_noreply(d, uc) == false) {
593                            cproxy_start_downstream_timeout(d, c);
594
595                            /* During a synchronous (with-reply) SET, */
596                            /* handle fire-&-forget SET optimization. */
597
598                            if (cmd == NREAD_SET &&
599                                cproxy_optimize_set_ascii(d, uc,
600                                                          ITEM_key(it),
601                                                          it->nkey)) {
602                                d->ptd->stats.stats.tot_optimize_sets++;
603                            }
604                        } else {
605                            c->write_and_go = conn_pause;
606
607                            cproxy_front_cache_delete(d->ptd,
608                                                      ITEM_key(it), it->nkey);
609                        }
610
611                        return true;
612                    }
613                }
614
615                d->ptd->stats.stats.err_oom++;
616                cproxy_close_conn(c);
617            } else {
618                /* TODO: Handle this weird error case. */
619            }
620        } else {
621            d->ptd->stats.stats.err_downstream_write_prep++;
622            cproxy_close_conn(c);
623        }
624
625        if (settings.verbose > 1) {
626            moxi_log_write("Proxy item write out of memory");
627        }
628    }
629
630    return false;
631}
632
633