xref: /6.0.3/moxi/src/cproxy_protocol_b.c (revision 1298cfed)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <errno.h>
7#include <platform/cbassert.h>
8#include "memcached.h"
9#include "cproxy.h"
10#include "work.h"
11#include "log.h"
12
13static void cproxy_sasl_plain_auth(conn *c, char *req_bytes);
14
15void cproxy_process_upstream_binary(conn *c) {
16    cb_assert(c != NULL);
17    cb_assert(c->cmd >= 0);
18    cb_assert(c->next == NULL);
19    cb_assert(c->item == NULL);
20    cb_assert(IS_BINARY(c->protocol));
21    cb_assert(IS_PROXY(c->protocol));
22
23    proxy_td *ptd = c->extra;
24    cb_assert(ptd != NULL);
25
26    if (!cproxy_prep_conn_for_write(c)) {
27        ptd->stats.stats.err_upstream_write_prep++;
28        conn_set_state(c, conn_closing);
29        return;
30    }
31
32    c->cmd_curr       = -1;
33    c->cmd_start      = NULL;
34    c->cmd_start_time = msec_current_time;
35    c->cmd_retries    = 0;
36
37    int      extlen  = c->binary_header.request.extlen;
38    int      keylen  = c->binary_header.request.keylen;
39    uint32_t bodylen = c->binary_header.request.bodylen;
40
41    cb_assert(bodylen >= (uint32_t) keylen + extlen);
42
43    if (settings.verbose > 2) {
44        moxi_log_write("<%d cproxy_process_upstream_binary %x %d %d %u\n",
45                c->sfd, c->cmd, extlen, keylen, bodylen);
46    }
47
48    process_bin_noreply(c); /* Map quiet c->cmd values into non-quiet. */
49
50    if (c->cmd == PROTOCOL_BINARY_CMD_VERSION ||
51        c->cmd == PROTOCOL_BINARY_CMD_QUIT) {
52        dispatch_bin_command(c);
53        return;
54    }
55
56    /* Alloc an item and continue with an rest-of-body nread if */
57    /* necessary.  The item will hold the entire request message */
58    /* (the header + body). */
59
60    char *ikey    = "u";
61    int   ikeylen = 1;
62
63    c->item = item_alloc(ikey, ikeylen, 0, 0,
64                         sizeof(c->binary_header) + bodylen);
65    if (c->item != NULL) {
66        item *it = c->item;
67        void *rb = c->rcurr;
68
69        cb_assert(it->refcount == 1);
70
71        memcpy(ITEM_data(it), rb, sizeof(c->binary_header));
72
73        if (bodylen > 0) {
74            c->ritem = ITEM_data(it) + sizeof(c->binary_header);
75            c->rlbytes = bodylen;
76            c->substate = bin_read_set_value;
77
78            conn_set_state(c, conn_nread);
79        } else {
80            /* Since we have no body bytes, we can go immediately to */
81            /* the nread completed processing step. */
82
83            if (c->binary_header.request.opcode == PROTOCOL_BINARY_CMD_SASL_LIST_MECHS) {
84                /* TODO: One day handle more than just PLAIN sasl auth. */
85
86                write_bin_response(c, "PLAIN", 0, 0, strlen("PLAIN"));
87                return;
88            }
89
90            cproxy_pause_upstream_for_downstream(ptd, c);
91        }
92    } else {
93        if (settings.verbose > 2) {
94            moxi_log_write("<%d cproxy_process_upstream_binary OOM\n",
95                           c->sfd);
96        }
97        ptd->stats.stats.err_oom++;
98        cproxy_close_conn(c);
99    }
100}
101
102/* We get here after reading the header+body into an item.
103 */
104void cproxy_process_upstream_binary_nread(conn *c) {
105    cb_assert(c != NULL);
106    cb_assert(c->cmd >= 0);
107    cb_assert(c->next == NULL);
108    cb_assert(c->cmd_start == NULL);
109    cb_assert(IS_BINARY(c->protocol));
110    cb_assert(IS_PROXY(c->protocol));
111
112    protocol_binary_request_header *header =
113        (protocol_binary_request_header *) &c->binary_header;
114
115    int      extlen  = header->request.extlen;
116    int      keylen  = header->request.keylen;
117    uint32_t bodylen = header->request.bodylen;
118
119    if (settings.verbose > 2) {
120        moxi_log_write("<%d cproxy_process_upstream_binary_nread %x %d %d %u\n",
121                c->sfd, c->cmd, extlen, keylen, bodylen);
122    }
123
124    /* pthread_mutex_lock(&c->thread->stats.mutex); */
125    /* c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; */
126    /* pthread_mutex_unlock(&c->thread->stats.mutex); */
127
128    proxy_td *ptd = c->extra;
129    cb_assert(ptd != NULL);
130
131    if (header->request.opcode == PROTOCOL_BINARY_CMD_SASL_AUTH) {
132        item *it = c->item;
133        cb_assert(it);
134
135        cproxy_sasl_plain_auth(c, (char *) ITEM_data(it));
136        return;
137    }
138
139    if (header->request.opcode == PROTOCOL_BINARY_CMD_SASL_STEP) {
140        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
141        return;
142    }
143
144    if (c->binary_header.request.opcode == PROTOCOL_BINARY_CMD_STAT) {
145        char *subcommand = binary_get_key(c);
146        size_t nkey = c->binary_header.request.keylen;
147        if (nkey == 13 && memcmp(subcommand, "proxy buckets", 13) == 0) {
148            process_bin_proxy_stats(c);
149            return;
150        }
151    }
152
153    if (c->noreply) {
154        if (settings.verbose > 2) {
155            moxi_log_write("<%d cproxy_process_upstream_binary_nread "
156                    "corking quiet command %x %d\n",
157                    c->sfd, c->cmd, (c->corked != NULL));
158        }
159
160        /* TODO: We currently don't support binary FLUSHQ. */
161
162        /* Rather than having the downstream connections get */
163        /* into a wonky state, prevent it. */
164
165        if (header->request.opcode == PROTOCOL_BINARY_CMD_FLUSHQ) {
166            /* Note: don't use cproxy_close_conn(c), as it goes */
167            /* through the drive_machine() loop again. */
168
169            /* cproxy_close_conn(c); */
170
171            conn_set_state(c, conn_closing);
172
173            return;
174        }
175
176        /* Hold onto or 'cork' all the binary quiet commands */
177        /* until there's a later non-quiet command. */
178
179        if (cproxy_binary_cork_cmd(c)) {
180            conn_set_state(c, conn_new_cmd);
181        } else {
182            ptd->stats.stats.err_oom++;
183            cproxy_close_conn(c);
184        }
185
186        return;
187    }
188
189    cb_assert(c->item == NULL || ((item *) c->item)->refcount == 1);
190
191    cproxy_pause_upstream_for_downstream(ptd, c);
192}
193
194static int bin_cmd_append(bin_cmd **head, bin_cmd *bc) {
195    cb_assert(head != NULL);
196    cb_assert(bc != NULL);
197
198    bin_cmd *tail = *head;
199
200    int n = 1;
201
202    while (tail != NULL) {
203        n++;
204        if (tail->next == NULL) {
205            tail->next = bc;
206            return n;
207        }
208        tail = tail->next;
209    }
210
211    *head = bc;
212
213    return n; /* Returns number of items in list. */
214}
215
216bool cproxy_binary_cork_cmd(conn *c) {
217    /* Save the quiet binary command for later uncorking. */
218
219    cb_assert(c != NULL);
220    cb_assert(c->item != NULL);
221
222    bin_cmd *bc = calloc(1, sizeof(bin_cmd));
223    if (bc != NULL) {
224        /* Transferred the item refcount from c->item to the bin_cmd. */
225
226        bc->request_item = c->item;
227        c->item = NULL;
228
229        int ncorked = bin_cmd_append(&c->corked, bc);
230
231        if (settings.verbose > 2) {
232            moxi_log_write("%d: cproxy_binary_cork_cmd, ncorked %d %d\n",
233                    c->sfd, ncorked, (c->corked != NULL));
234        }
235
236        return true;
237    }
238
239    if (settings.verbose > 2) {
240        moxi_log_write("%d: cproxy_binary_cork_cmd failed\n",
241                c->sfd);
242    }
243
244    return false;
245}
246
247void cproxy_binary_uncork_cmds(downstream *d, conn *uc) {
248    cb_assert(d != NULL);
249    cb_assert(uc != NULL);
250
251    if (settings.verbose > 2) {
252        moxi_log_write("%d: cproxy_binary_uncork_cmds\n",
253                uc->sfd);
254    }
255
256    int n = 0;
257
258    while (uc->corked != NULL) {
259        bin_cmd *next = uc->corked->next;
260
261        item *it = uc->corked->request_item;
262        if (it != NULL) {
263            b2b_forward_item(uc, d, it);
264            n++;
265        }
266
267        if (uc->corked->request_item != NULL) {
268            item_remove(uc->corked->request_item);
269        }
270
271        if (uc->corked->response_item != NULL) {
272            item_remove(uc->corked->response_item);
273        }
274
275        free(uc->corked);
276        uc->corked = next;
277    }
278
279    if (settings.verbose > 2) {
280        moxi_log_write("%d: cproxy_binary_uncork_cmds, uncorked %d\n",
281                uc->sfd, n);
282    }
283}
284
285void cproxy_process_downstream_binary(conn *c) {
286    downstream *d = c->extra;
287    cb_assert(d != NULL);
288    cb_assert(d->upstream_conn != NULL);
289
290    if (IS_ASCII(d->upstream_conn->protocol)) {
291        cproxy_process_a2b_downstream(c);
292    } else {
293        cproxy_process_b2b_downstream(c);
294    }
295}
296
297void cproxy_process_downstream_binary_nread(conn *c) {
298    downstream *d = c->extra;
299    cb_assert(d != NULL);
300    cb_assert(d->upstream_conn != NULL);
301
302    if (IS_ASCII(d->upstream_conn->protocol)) {
303        cproxy_process_a2b_downstream_nread(c);
304    } else {
305        cproxy_process_b2b_downstream_nread(c);
306    }
307}
308
309void cproxy_dump_header(SOCKET prefix, char *bb) {
310    if (settings.verbose > 2) {
311        char buf[200];
312
313        int prefix_len = snprintf(buf, sizeof(buf), "%d   ", (int)prefix);
314        int start = prefix_len;
315        size_t ii;
316
317        for (ii = 0; ii < sizeof(protocol_binary_request_header); ++ii) {
318            if (ii > 0 && ii % 4 == 0) {
319                buf[start] = '\n';
320                buf[start + 1] = '\0';
321                moxi_log_write(buf);
322
323                start = prefix_len;
324            }
325
326            start += snprintf(buf + start, sizeof(buf) - start,
327                              " 0x%02x", (unsigned char) bb[ii]);
328        }
329
330        buf[start] = '\n';
331        buf[start + 1] = '\0';
332
333        moxi_log_write(buf);
334    }
335}
336
337bool cproxy_binary_ignore_reply(conn *c, protocol_binary_response_header *header, item *it) {
338    if (c->noreply &&
339        OPAQUE_IGNORE_REPLY == ntohl(header->response.opaque)) {
340        /* Handle when the client sent an ascii noreply command, */
341        /* and we now need to eat the binary error responses. */
342        /* So, drop the current response (should be an error response) */
343        /* and go to read the next response message. */
344
345        if (settings.verbose > 2) {
346            moxi_log_write("<%d cproxy_process_a2b_downstream_response OPAQUE_IGNORE_REPLY, "
347                    "cmd: %x, status: %x, ignoring reply\n",
348                    c->sfd, header->response.opcode, header->response.status);
349        }
350
351        conn_set_state(c, conn_new_cmd);
352
353        if (it != NULL) {
354            item_remove(it);
355        }
356
357        return true;
358    }
359
360    return false;
361}
362
363static void cproxy_sasl_plain_auth(conn *c, char *req_bytes) {
364    protocol_binary_request_header *req;
365    char *key;
366    int keylen;
367    int bodylen;
368    char *clientin;
369    unsigned int clientinlen;
370
371    proxy_td *ptd = c->extra;
372    cb_assert(ptd != NULL);
373    cb_assert(ptd->proxy != NULL);
374    cb_assert(ptd->proxy->main != NULL);
375
376    /* Authenticate an upstream connection. */
377
378    req = (protocol_binary_request_header *) req_bytes;
379
380    key = ((char *) req) + sizeof(*req) + req->request.extlen;
381    keylen = ntohs(req->request.keylen);
382    bodylen = ntohl(req->request.bodylen);
383
384    /* The key is the sasl mech. */
385
386    if (keylen != 5 ||
387        memcmp(key, "PLAIN", 5) != 0) { /* 5 == strlen("PLAIN"). */
388        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
389        return;
390    }
391
392    clientin = key + keylen;
393    clientinlen = bodylen - keylen - req->request.extlen;
394
395    /* The clientin string looks like "[authzid]\0username\0password". */
396
397    while (clientinlen > 0 && clientin[0] != '\0') {
398        /* Skip authzid. */
399
400        clientin++;
401        clientinlen--;
402    }
403
404    if (clientinlen > 2 && clientinlen < 128 && clientin[0] == '\0') {
405        const char *username = clientin + 1;
406        char        password[256];
407
408        int uslen = strlen(username);
409        int pwlen = clientinlen - 2 - uslen;
410
411        if (pwlen < (int) sizeof(password)) {
412            proxy *p;
413            memcpy(password, clientin + 2 + uslen, pwlen);
414            password[pwlen] = '\0';
415
416            p = cproxy_find_proxy_by_auth(ptd->proxy->main,
417                                                 username, password);
418            if (p != NULL) {
419                proxy_td *ptd_target = cproxy_find_thread_data(p, cb_thread_self());
420                if (ptd_target != NULL) {
421                    c->extra = ptd_target;
422
423                    write_bin_response(c, "Authenticated", 0, 0,
424                                       strlen("Authenticated"));
425
426                    if (settings.verbose > 2) {
427                        moxi_log_write("<%d sasl authenticated for %s\n",
428                                       c->sfd, username);
429                    }
430
431                    return;
432                } else {
433                    if (settings.verbose > 2) {
434                        moxi_log_write("<%d sasl auth failed on ptd for %s\n",
435                                       c->sfd, username);
436                    }
437                }
438            } else {
439                if (settings.verbose > 2) {
440                    moxi_log_write("<%d sasl auth failed for %s (%d)\n",
441                                   c->sfd, username, pwlen);
442                }
443            }
444        } else {
445            if (settings.verbose > 2) {
446                moxi_log_write("<%d sasl auth failed for %s with empty password\n",
447                               c->sfd, username);
448            }
449        }
450    } else {
451        if (settings.verbose > 2) {
452            moxi_log_write("<%d sasl auth failed with malformed PLAIN data\n",
453                           c->sfd);
454        }
455    }
456
457    /* TODO: If authentication failed, we should consider */
458    /* reassigning the connection to the NULL_BUCKET. */
459
460    write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
461}
462
463void *cproxy_make_bin_header(conn *c, uint8_t magic) {
464    protocol_binary_response_header *rh =
465        (protocol_binary_response_header *) add_conn_suffix(c);
466    if (rh != NULL) {
467        memset(rh, 0, sizeof(protocol_binary_response_header));
468        rh->response.magic = magic;
469        rh->response.datatype = (uint8_t) PROTOCOL_BINARY_RAW_BYTES;
470        return rh;
471    }
472    return NULL;
473}
474
475protocol_binary_response_header *
476cproxy_make_bin_error(conn *c, uint16_t status) {
477    protocol_binary_response_header *rh =
478        cproxy_make_bin_header(c, PROTOCOL_BINARY_RES);
479    if (rh != NULL) {
480        rh->response.opcode = c->binary_header.request.opcode;
481        rh->response.status = (uint16_t) htons(status);
482        rh->response.opaque = c->opaque;
483        return rh;
484    }
485    return NULL;
486}
487