xref: /5.5.2/moxi/src/cproxy_multiget.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <errno.h>
7#include <platform/cbassert.h>
8#include "memcached.h"
9#include "cproxy.h"
10#include "log.h"
11
12/* Callback to g_hash_table_foreach that frees the multiget_entry list.
13 */
14void multiget_foreach_free(const void *key,
15                           const void *value,
16                           void *user_data) {
17    downstream *d;
18    proxy_td *ptd;
19    proxy_stats_cmd *psc_get_key;
20    int length = 0;
21    multiget_entry *entry;
22
23    (void)key;
24    d = user_data;
25    cb_assert(d);
26
27    ptd = d->ptd;
28    cb_assert(ptd);
29
30    psc_get_key = &ptd->stats.stats_cmd[STATS_CMD_TYPE_REGULAR][STATS_CMD_GET_KEY];
31    entry = (multiget_entry*)value;
32
33    while (entry != NULL) {
34        multiget_entry *curr = entry;
35        if (entry->hits == 0) {
36            psc_get_key->misses++;
37        }
38
39        /* TODO: Update key-level stats misses. */
40
41        entry = entry->next;
42        curr->upstream_conn = NULL;
43        curr->next          = NULL;
44        free(curr);
45
46        length++;
47    }
48
49    /* TODO: Track key-level multiget squashes (length > 1). */
50}
51
52/* Callback to g_hash_table_foreach that clears out multiget_entries
53 * which have the given upstream conn (passed as user_data).
54 */
55void multiget_remove_upstream(const void *key,
56                              const void *value,
57                              void *user_data) {
58    multiget_entry *entry;
59    conn *uc;
60
61    (void)key;
62    entry = (multiget_entry *) value;
63    cb_assert(entry != NULL);
64
65    uc = user_data;
66    cb_assert(uc != NULL);
67
68    while (entry != NULL) {
69        /* Just clear the slots, because glib hash table API */
70        /* doesn't allow for key/value modifications during iteration. */
71
72        if (entry->upstream_conn == uc) {
73            entry->upstream_conn = NULL;
74            entry->opaque = 0;
75        }
76
77        entry = entry ->next;
78    }
79}
80
81bool multiget_ascii_downstream(downstream *d, conn *uc,
82                               int (*emit_start)(conn *c, char *cmd, int cmd_len),
83                               int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket, int key_index),
84                               int (*emit_end)(conn *c),
85                               mcache *front_cache) {
86    int i;
87    proxy_td *ptd;
88    proxy_stats_cmd *psc_get;
89    proxy_stats_cmd *psc_get_key;
90    int nwrite = 0;
91    int nconns;
92    uint64_t msec_current_time_snapshot;
93    int   uc_num = 0;
94    conn *uc_cur;
95
96    cb_assert(d != NULL);
97    cb_assert(d->downstream_conns != NULL);
98    cb_assert(uc != NULL);
99    cb_assert(uc->noreply == false);
100
101    ptd = d->ptd;
102    cb_assert(ptd != NULL);
103
104    psc_get = &ptd->stats.stats_cmd[STATS_CMD_TYPE_REGULAR][STATS_CMD_GET];
105    psc_get_key = &ptd->stats.stats_cmd[STATS_CMD_TYPE_REGULAR][STATS_CMD_GET_KEY];
106    nconns = mcs_server_count(&d->mst);
107
108    for (i = 0; i < nconns; i++) {
109        if (d->downstream_conns[i] != NULL &&
110            d->downstream_conns[i] != NULL_CONN &&
111            cproxy_prep_conn_for_write(d->downstream_conns[i]) == false) {
112            d->ptd->stats.stats.err_downstream_write_prep++;
113            cproxy_close_conn(d->downstream_conns[i]);
114            return false;
115        }
116    }
117
118    /* Snapshot the volatile only once. */
119    msec_current_time_snapshot = msec_current_time;
120    uc_cur = uc;
121
122    while (uc_cur != NULL) {
123        char *command;
124        char *space;
125        int cmd_len;
126        int cas_emit;
127
128        cb_assert(uc_cur->cmd == -1);
129        cb_assert(uc_cur->item == NULL);
130        cb_assert(uc_cur->state == conn_pause);
131        cb_assert(IS_ASCII(uc_cur->protocol));
132        cb_assert(IS_PROXY(uc_cur->protocol));
133
134        command = uc_cur->cmd_start;
135        cb_assert(command != NULL);
136
137        while (*command != '\0' && *command == ' ') {
138            command++;
139        }
140
141        space = strchr(command, ' ');
142        cb_assert(space > command);
143
144        cmd_len = space - command;
145        cb_assert(cmd_len == 3 || cmd_len == 4); /* Either get or gets. */
146
147        cas_emit = (command[3] == 's');
148
149        if (settings.verbose > 1) {
150            moxi_log_write("%d: forward multiget %s (%d %d)\n",
151                    uc_cur->sfd, command, cmd_len, uc_num);
152        }
153
154        while (space != NULL) {
155            char *key = space + 1;
156            char *next_space = strchr(key, ' ');
157            int   key_len;
158            bool  key_last;
159
160            if (next_space != NULL) {
161                key_len = next_space - key;
162                key_last = false;
163            } else {
164                key_len = strlen(key);
165                key_last = true;
166
167                /* We've reached the last key. */
168
169                psc_get->read_bytes += (key - command + key_len);
170            }
171
172            /* This key_len check helps skip consecutive spaces. */
173
174            if (key_len > 0) {
175                int vbucket = -1;
176                conn *c;
177                bool do_key_stats;
178
179                ptd->stats.stats.tot_multiget_keys++;
180
181                psc_get_key->seen++;
182                psc_get_key->read_bytes += key_len;
183
184                /* Update key-based statistics. */
185
186                do_key_stats =
187                    matcher_check(&ptd->key_stats_matcher,
188                                  key, key_len, false) == true &&
189                    matcher_check(&ptd->key_stats_unmatcher,
190                                  key, key_len, false) == false;
191
192                if (do_key_stats) {
193                    touch_key_stats(ptd, key, key_len,
194                                    msec_current_time_snapshot,
195                                    STATS_CMD_TYPE_REGULAR,
196                                    STATS_CMD_GET_KEY,
197                                    1, 0, 0,
198                                    key_len, 0);
199                }
200
201                /* Handle a front cache hit by queuing response. */
202
203                /* Note, front cache stats are part of mcache. */
204
205                if (!cas_emit) {
206                    item *it = NULL;
207
208                    if (front_cache != NULL &&
209                        cproxy_front_cache_key(ptd, key, key_len) == true) {
210                        it = mcache_get(front_cache, key, key_len,
211                                        msec_current_time_snapshot);
212                    }
213
214                    if (it != NULL) {
215                        cb_assert(it->nkey == key_len);
216                        cb_assert(strncmp(ITEM_key(it), key, it->nkey) == 0);
217
218                        cproxy_upstream_ascii_item_response(it, uc_cur, 0);
219
220                        psc_get_key->hits++;
221                        psc_get_key->write_bytes += it->nbytes;
222
223                        if (do_key_stats) {
224                            touch_key_stats(ptd, key, key_len,
225                                            msec_current_time_snapshot,
226                                            STATS_CMD_TYPE_REGULAR,
227                                            STATS_CMD_GET_KEY,
228                                            0, 1, 0,
229                                            0, it->nbytes);
230                        }
231
232                        /* The refcount was inc'ed by mcache_get() for us. */
233
234                        item_remove(it);
235
236                        goto loop_next;
237                    }
238                }
239
240                c = cproxy_find_downstream_conn_ex(d, key, key_len,
241                                                   NULL, &vbucket);
242                if (c != NULL) {
243                    bool first_request = true;
244
245                    /* If there's more than one key, create a de-duplication map. */
246                    /* This is used to handle not-my-vbucket errors */
247                    /* where any later retry attempts should avoid */
248                    /* retrying already successfully attempted keys. */
249
250                    /* Previously, we used to only have a map when there was more than */
251                    /* one upstream conn. */
252
253                    if (key_last == false &&
254                        d->multiget == NULL) {
255                        d->multiget = genhash_init(128, skeyhash_ops);
256                        if (settings.verbose > 1) {
257                            moxi_log_write("%d: cproxy multiget hash table new\n", uc->sfd);
258                        }
259                    }
260
261                    /* See if we've already requested this key via */
262                    /* the multiget hash table, in order to */
263                    /* de-duplicate repeated keys. */
264
265                    if (d->multiget != NULL) {
266                        multiget_entry *entry;
267                        if (settings.verbose > 2) {
268                            char key_buf[KEY_MAX_LENGTH + 10];
269                            cb_assert(key_len <= KEY_MAX_LENGTH);
270                            memcpy(key_buf, key, key_len);
271                            key_buf[key_len] = '\0';
272
273                            moxi_log_write("<%d multiget_ascii_downstream '%s' %d %d %d\n",
274                                    c->sfd, key_buf, vbucket, (int) (key - command), key_len);
275                        }
276
277                        /* TODO: Use Trond's allocator here. */
278
279                        entry = calloc(1, sizeof(multiget_entry));
280                        if (entry != NULL) {
281                            entry->upstream_conn = uc_cur;
282                            entry->opaque = 0;
283                            entry->hits = 0;
284                            entry->next = genhash_find(d->multiget, key);
285
286                            genhash_update(d->multiget, key, entry);
287
288                            if (entry->next != NULL) {
289                                first_request = false;
290                            }
291                        } else {
292                            /* TODO: Handle out of multiget entry memory. */
293                        }
294                    }
295
296                    if (first_request) {
297                        cb_assert(c->item == NULL);
298                        cb_assert(c->state == conn_pause);
299                        cb_assert(IS_PROXY(c->protocol));
300                        cb_assert(c->ilist != NULL);
301                        cb_assert(c->isize > 0);
302
303                        if (c->msgused <= 1 &&
304                            c->msgbytes <= 0) {
305                            emit_start(c, command, cmd_len);
306                        }
307
308                        /* Provide the preceding space as optimization */
309                        /* for ascii-to-ascii configuration. */
310
311                        emit_skey(c, key - 1, key_len + 1, vbucket, key - command);
312                    } else {
313                        ptd->stats.stats.tot_multiget_keys_dedupe++;
314
315                        if (settings.verbose > 1) {
316                            char buf[KEY_MAX_LENGTH + 10];
317                            memcpy(buf, key, key_len);
318                            buf[key_len] = '\0';
319
320                            moxi_log_write("%d cproxy multiget dedpue: %s\n",
321                                    uc_cur->sfd, buf);
322                        }
323                    }
324                } else {
325                    /* TODO: Handle when downstream conn is down. */
326                }
327            }
328
329        loop_next:
330            space = next_space;
331        }
332
333        uc_num++;
334        uc_cur = uc_cur->next;
335    }
336
337    for (i = 0; i < nconns; i++) {
338        conn *c = d->downstream_conns[i];
339        if (c != NULL &&
340            c != NULL_CONN &&
341            (c->msgused > 1 ||
342             c->msgbytes > 0)) {
343            emit_end(c);
344
345            conn_set_state(c, conn_mwrite);
346            c->write_and_go = conn_new_cmd;
347
348            if (update_event(c, EV_WRITE | EV_PERSIST)) {
349                nwrite++;
350
351                if (uc->noreply) {
352                    c->write_and_go = conn_pause;
353                }
354            } else {
355                if (settings.verbose > 1) {
356                    moxi_log_write("Couldn't update cproxy write event\n");
357                }
358
359                d->ptd->stats.stats.err_oom++;
360                cproxy_close_conn(c);
361            }
362        }
363    }
364
365    if (settings.verbose > 1) {
366        moxi_log_write("forward multiget nwrite %d out of %d\n",
367                nwrite, nconns);
368    }
369
370    d->downstream_used_start = nwrite;
371    d->downstream_used       = nwrite;
372
373    if (cproxy_dettach_if_noreply(d, uc) == false) {
374        d->upstream_suffix = "END\r\n";
375        d->upstream_suffix_len = 0;
376        d->upstream_status = PROTOCOL_BINARY_RESPONSE_SUCCESS;
377        d->upstream_retry = 0;
378        d->target_host_ident = NULL;
379
380        cproxy_start_downstream_timeout(d, NULL);
381    }
382
383    return nwrite > 0;
384}
385
386void multiget_ascii_downstream_response(downstream *d, item *it) {
387    proxy_td *ptd;
388    proxy_stats_cmd *psc_get_key;
389    proxy *p;
390
391    cb_assert(d);
392    cb_assert(it);
393    cb_assert(it->nkey > 0);
394    cb_assert(ITEM_key(it) != NULL);
395
396    ptd = d->ptd;
397    cb_assert(ptd);
398
399    psc_get_key = &ptd->stats.stats_cmd[STATS_CMD_TYPE_REGULAR][STATS_CMD_GET_KEY];
400
401    p = ptd->proxy;
402    cb_assert(p);
403
404    if (cproxy_front_cache_key(ptd, ITEM_key(it), it->nkey) == true) {
405        uint32_t front_cache_lifespan =
406            ptd->behavior_pool.base.front_cache_lifespan;
407
408        mcache_set(&p->front_cache, it,
409                   front_cache_lifespan + msec_current_time,
410                   true, false);
411    }
412
413    if (d->multiget != NULL) {
414        /* The ITEM_key is not NULL or space terminated. */
415        multiget_entry *entry_first;
416        char key_buf[KEY_MAX_LENGTH + 10];
417
418        cb_assert(it->nkey <= KEY_MAX_LENGTH);
419        memcpy(key_buf, ITEM_key(it), it->nkey);
420        key_buf[it->nkey] = '\0';
421        entry_first = genhash_find(d->multiget, key_buf);
422
423        if (entry_first != NULL) {
424            multiget_entry *entry = entry_first;
425            entry_first->hits++;
426
427            while (entry != NULL) {
428                /* The upstream might have been closed mid-request. */
429
430                /* TODO: Revisit the -1 cas_emit parameter. */
431
432                conn *uc = entry->upstream_conn;
433                if (uc != NULL) {
434                    cproxy_upstream_ascii_item_response(it, uc, -1);
435
436                    psc_get_key->hits++;
437                    psc_get_key->write_bytes += it->nbytes;
438
439                    if (matcher_check(&ptd->key_stats_matcher,
440                                      ITEM_key(it), it->nkey, false) == true &&
441                        matcher_check(&ptd->key_stats_unmatcher,
442                                      ITEM_key(it), it->nkey, false) == false) {
443                        touch_key_stats(ptd, ITEM_key(it), it->nkey,
444                                        msec_current_time,
445                                        STATS_CMD_TYPE_REGULAR,
446                                        STATS_CMD_GET_KEY,
447                                        0, 1, 0,
448                                        0, it->nbytes);
449                    }
450
451                    if (entry != entry_first) {
452                        ptd->stats.stats.tot_multiget_bytes_dedupe += it->nbytes;
453                    }
454                }
455
456                entry = entry->next;
457            }
458        }
459    } else {
460        /* TODO: We're not tracking miss stats in the simple case. */
461        /* Do we always need to use a multiget hashtable? */
462        /* Or, perhaps misses equals number of requests - number of hits. */
463
464        conn *uc = d->upstream_conn;
465        while (uc != NULL) {
466            /* TODO: Revisit the -1 cas_emit parameter. */
467
468            cproxy_upstream_ascii_item_response(it, uc, -1);
469
470            psc_get_key->hits++;
471            psc_get_key->write_bytes += it->nbytes;
472
473            if (matcher_check(&ptd->key_stats_matcher,
474                              ITEM_key(it), it->nkey, false) == true &&
475                matcher_check(&ptd->key_stats_unmatcher,
476                              ITEM_key(it), it->nkey, false) == false) {
477                touch_key_stats(ptd, ITEM_key(it), it->nkey,
478                                msec_current_time,
479                                STATS_CMD_TYPE_REGULAR,
480                                STATS_CMD_GET_KEY,
481                                0, 1, 0,
482                                0, it->nbytes);
483            }
484
485            uc = uc->next;
486        }
487    }
488}
489