xref: /5.5.2/moxi/src/cproxy_stats.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <errno.h>
7#include <platform/cbassert.h>
8#include "memcached.h"
9#include "cproxy.h"
10#include "work.h"
11#include "log.h"
12
13/* Protocol STATS command handling. */
14
15/* Special STATS value merging rules, instead of the */
16/* default to just sum the values.  Note the trailing space. */
17
18char *protocol_stats_keys_first =
19    "pid version libevent "
20    "ep_version ep_dbname ep_storage_type ep_flusher_state ep_warmup_thread ";
21
22char *protocol_stats_keys_smallest =
23    "uptime "
24    "time "
25    "pointer_size "
26    "limit_maxbytes "
27    "accepting_conns "
28    ":chunk_size "
29    ":chunk_per_page "
30    ":age "; /* TODO: Should age merge be largest, not smallest? */
31
32bool protocol_stats_merge_sum(char *v1, int v1len,
33                              char *v2, int v2len,
34                              char *out, int outlen);
35bool protocol_stats_merge_smallest(char *v1, int v1len,
36                                   char *v2, int v2len,
37                                   char *out, int outlen);
38
39int count_dot_pair(char *x, int xlen, char *y, int ylen);
40int count_dot(char *x, int len);
41
42/* Per-key stats. */
43
44static char *key_stats_key(void *it);
45static int key_stats_key_len(void *it);
46static int key_stats_len(void *it);
47static void *key_stats_get_next(void *it);
48static void key_stats_set_next(void *it, void *next);
49static void *key_stats_get_prev(void *it);
50static void key_stats_set_prev(void *it, void *prev);
51static uint64_t key_stats_get_exptime(void *it);
52static void key_stats_set_exptime(void *it, uint64_t exptime);
53
54mcache_funcs mcache_key_stats_funcs = {
55    .item_key         = key_stats_key,
56    .item_key_len     = key_stats_key_len,
57    .item_len         = key_stats_len,
58    .item_add_ref     = key_stats_add_ref,
59    .item_dec_ref     = key_stats_dec_ref,
60    .item_get_next    = key_stats_get_next,
61    .item_set_next    = key_stats_set_next,
62    .item_get_prev    = key_stats_get_prev,
63    .item_set_prev    = key_stats_set_prev,
64    .item_get_exptime = key_stats_get_exptime,
65    .item_set_exptime = key_stats_set_exptime
66};
67
68#define MAX_TOKENS     5
69#define PREFIX_TOKEN   0
70#define NAME_TOKEN     1
71#define VALUE_TOKEN    2
72#define MERGE_BUF_SIZE 300
73
74bool protocol_stats_merge_line(genhash_t *merger, char *line) {
75    int nline;
76    token_t tokens[MAX_TOKENS];
77    int ntokens;
78    char *name;
79    int name_len;
80
81
82    cb_assert(merger != NULL);
83    cb_assert(line != NULL);
84
85    nline = strlen(line); /* Ex: "STATS uptime 123455" */
86    if (nline <= 0 ||
87        nline >= MERGE_BUF_SIZE) {
88        return false;
89    }
90
91    ntokens = scan_tokens(line, tokens, MAX_TOKENS, NULL);
92
93    if (ntokens != 4) { /* 3 + 1 for the terminal token. */
94        return false;
95    }
96
97    name = tokens[NAME_TOKEN].value;
98    name_len = tokens[NAME_TOKEN].length;
99    if (name == NULL ||
100        name_len <= 0 ||
101        tokens[VALUE_TOKEN].value == NULL ||
102        tokens[VALUE_TOKEN].length <= 0 ||
103        tokens[VALUE_TOKEN].length >= MERGE_BUF_SIZE) {
104        return false;
105    }
106
107    return protocol_stats_merge_name_val(merger,
108                                         tokens[PREFIX_TOKEN].value,
109                                         tokens[PREFIX_TOKEN].length,
110                                         name, name_len,
111                                         tokens[VALUE_TOKEN].value,
112                                         tokens[VALUE_TOKEN].length);
113}
114
115/* TODO: The stats merge assumes an ascii upstream. */
116
117bool protocol_stats_merge_name_val(genhash_t *merger,
118                                   char *prefix,
119                                   int   prefix_len,
120                                   char *name,
121                                   int   name_len,
122                                   char *val,
123                                   int   val_len) {
124
125    char *key;
126    int key_len;
127
128    cb_assert(merger);
129    cb_assert(name);
130    cb_assert(val);
131
132    key = name + name_len - 1;     /* Key part for merge rule lookup. */
133    while (key >= name && *key != ':') { /* Scan for last colon. */
134        key--;
135    }
136    if (key < name) {
137        key = name;
138    }
139    key_len = name_len - (key - name);
140    if (key_len > 0 && key_len < MERGE_BUF_SIZE) {
141        char buf_name[MERGE_BUF_SIZE];
142        char buf_key[MERGE_BUF_SIZE];
143        char buf_val[MERGE_BUF_SIZE];
144        char *prev;
145        token_t prev_tokens[MAX_TOKENS];
146        size_t prev_ntokens;
147        bool ok;
148
149        strncpy(buf_name, name, name_len);
150        buf_name[name_len] = '\0';
151
152        prev = (char *) genhash_find(merger, buf_name);
153        if (prev == NULL) {
154            char *hval = malloc(prefix_len + 1 +
155                                name_len + 1 +
156                                val_len + 1);
157            if (hval != NULL) {
158                memcpy(hval, prefix, prefix_len);
159                hval[prefix_len] = ' ';
160
161                memcpy(hval + prefix_len + 1, name, name_len);
162                hval[prefix_len + 1 + name_len] = ' ';
163
164                memcpy(hval + prefix_len + 1 + name_len + 1, val, val_len);
165                hval[prefix_len + 1 + name_len + 1 + val_len] = '\0';
166
167                genhash_update(merger, hval + prefix_len + 1, hval);
168            }
169
170            return true;
171        }
172
173        strncpy(buf_key, key, key_len);
174        buf_key[key_len] = '\0';
175
176        if (strstr(protocol_stats_keys_first, buf_key) != NULL) {
177            return true;
178        }
179
180        prev_ntokens = scan_tokens(prev, prev_tokens, MAX_TOKENS, NULL);
181        if (prev_ntokens != 4) {
182            return true;
183        }
184
185        strncpy(buf_val, val, val_len);
186        buf_val[val_len] = '\0';
187
188        if (strstr(protocol_stats_keys_smallest, buf_key) != NULL) {
189            ok = protocol_stats_merge_smallest(prev_tokens[VALUE_TOKEN].value,
190                                               prev_tokens[VALUE_TOKEN].length,
191                                               buf_val, val_len,
192                                               buf_val, MERGE_BUF_SIZE);
193        } else {
194            ok = protocol_stats_merge_sum(prev_tokens[VALUE_TOKEN].value,
195                                          prev_tokens[VALUE_TOKEN].length,
196                                          buf_val, val_len,
197                                          buf_val, MERGE_BUF_SIZE);
198        }
199
200        if (ok) {
201            int   vlen = strlen(buf_val);
202            char *hval = malloc(prefix_len + 1 +
203                                name_len + 1 +
204                                vlen + 1);
205            if (hval != NULL) {
206                memcpy(hval, prefix, prefix_len);
207                hval[prefix_len] = ' ';
208
209                memcpy(hval + prefix_len + 1, name, name_len);
210                hval[prefix_len + 1 + name_len] = ' ';
211
212                strcpy(hval + prefix_len + 1 + name_len + 1, buf_val);
213                hval[prefix_len + 1 + name_len + 1 + vlen] = '\0';
214
215                genhash_update(merger, hval + prefix_len + 1, hval);
216
217                free(prev);
218            }
219        }
220
221        /* Note, if we couldn't merge, then just keep */
222        /* the previous value. */
223
224        return true;
225    }
226
227    return false;
228}
229
230bool protocol_stats_merge_sum(char *v1, int v1len,
231                              char *v2, int v2len,
232                              char *out, int outlen) {
233    int dot = count_dot_pair(v1, v1len, v2, v2len);
234    if (dot > 0) {
235        float v1f = strtof(v1, NULL);
236        float v2f = strtof(v2, NULL);
237        sprintf(out, "%f", v1f + v2f);
238        return true;
239    } else {
240        uint64_t v1i = 0;
241        uint64_t v2i = 0;
242
243        if (safe_strtoull(v1, &v1i) &&
244            safe_strtoull(v2, &v2i)) {
245            sprintf(out, "%"PRIu64"", (v1i + v2i));
246            return true;
247        }
248    }
249
250    (void)outlen;
251    return false;
252}
253
254bool protocol_stats_merge_smallest(char *v1, int v1len,
255                                   char *v2, int v2len,
256                                   char *out, int outlen) {
257    int dot = count_dot_pair(v1, v1len, v2, v2len);
258    if (dot > 0) {
259        float v1f = strtof(v1, NULL);
260        float v2f = strtof(v2, NULL);
261        sprintf(out, "%f", (v1f > v2f ? v1f : v2f));
262        return true;
263    } else {
264        uint64_t v1i = 0;
265        uint64_t v2i = 0;
266
267        if (safe_strtoull(v1, &v1i) &&
268            safe_strtoull(v2, &v2i)) {
269            sprintf(out, "%"PRIu64"", (v1i > v2i ? v1i : v2i));
270            return true;
271        }
272    }
273
274    (void)outlen;
275    return false;
276}
277
278/* Callback to hash table iteration that frees the multiget_entry list.
279 */
280void protocol_stats_foreach_free(const void *key,
281                                 const void *value,
282                                 void *user_data) {
283    (void)key;
284    (void)user_data;
285    cb_assert(value != NULL);
286    free((void*)value);
287}
288
289void protocol_stats_foreach_write(const void *key,
290                                  const void *value,
291                                  void *user_data) {
292
293    char *line = (char *) value;
294    conn *uc = (conn *) user_data;
295    int nline;
296    cb_assert(line != NULL);
297    cb_assert(uc != NULL);
298    (void)key;
299
300    nline = strlen(line);
301    if (nline > 0) {
302        item *it;
303        if (settings.verbose > 2) {
304            moxi_log_write("%d: cproxy_stats writing: %s\n", uc->sfd, line);
305        }
306
307        if (IS_BINARY(uc->protocol)) {
308            token_t line_tokens[MAX_TOKENS];
309            size_t  line_ntokens = scan_tokens(line, line_tokens, MAX_TOKENS, NULL);
310
311            if (line_ntokens == 4) {
312                uint16_t key_len  = line_tokens[NAME_TOKEN].length;
313                uint32_t data_len = line_tokens[VALUE_TOKEN].length;
314
315                it = item_alloc("s", 1, 0, 0,
316                                sizeof(protocol_binary_response_stats) + key_len + data_len);
317                if (it != NULL) {
318                    protocol_binary_response_stats *header =
319                        (protocol_binary_response_stats *) ITEM_data(it);
320
321                    memset(ITEM_data(it), 0, it->nbytes);
322
323                    header->message.header.response.magic = (uint8_t) PROTOCOL_BINARY_RES;
324                    header->message.header.response.opcode = uc->binary_header.request.opcode;
325                    header->message.header.response.keylen  = (uint16_t) htons(key_len);
326                    header->message.header.response.bodylen = htonl(key_len + data_len);
327                    header->message.header.response.opaque  = uc->opaque;
328
329                    memcpy((ITEM_data(it)) + sizeof(protocol_binary_response_stats),
330                           line_tokens[NAME_TOKEN].value, key_len);
331                    memcpy((ITEM_data(it)) + sizeof(protocol_binary_response_stats) + key_len,
332                           line_tokens[VALUE_TOKEN].value, data_len);
333
334                    if (add_conn_item(uc, it)) {
335                        add_iov(uc, ITEM_data(it), it->nbytes);
336
337                        if (settings.verbose > 2) {
338                            moxi_log_write("%d: cproxy_stats writing binary", uc->sfd);
339                            cproxy_dump_header(uc->sfd, ITEM_data(it));
340                        }
341
342                        return;
343                    }
344
345                    item_remove(it);
346                }
347            }
348
349            return;
350        }
351
352        it = item_alloc("s", 1, 0, 0, nline + 2);
353        if (it != NULL) {
354            strncpy(ITEM_data(it), line, nline);
355            strncpy(ITEM_data(it) + nline, "\r\n", 2);
356
357            if (add_conn_item(uc, it)) {
358                add_iov(uc, ITEM_data(it), nline + 2);
359                return;
360            }
361
362            item_remove(it);
363        }
364    }
365}
366
367int count_dot_pair(char *x, int xlen, char *y, int ylen) {
368    int xdot = count_dot(x, xlen);
369    int ydot = count_dot(y, ylen);
370
371    return (xdot > ydot ? xdot : ydot);
372}
373
374int count_dot(char *x, int len) { /* Number of '.' chars in a string. */
375    int dot = 0;
376    char *end;
377    for (end = x + len; x < end; x++) {
378        if (*x == '.')
379            dot++;
380    }
381
382    return dot;
383}
384
385/* ---------------------------------------- */
386
387void cproxy_reset_stats_td(proxy_stats_td *pstd) {
388    int j;
389
390    cb_assert(pstd);
391
392    cproxy_reset_stats(&pstd->stats);
393
394    for (j = 0; j < STATS_CMD_TYPE_last; j++) {
395        int k;
396        for (k = 0; k < STATS_CMD_last; k++) {
397            cproxy_reset_stats_cmd(&pstd->stats_cmd[j][k]);
398        }
399    }
400}
401
402void cproxy_reset_stats(proxy_stats *ps) {
403    cb_assert(ps);
404
405    /* Only clear the tot_xxx stats, not the num_xxx ones. */
406
407    ps->tot_upstream = 0;
408    ps->tot_downstream_conn = 0;
409    ps->tot_downstream_conn_acquired = 0;
410    ps->tot_downstream_conn_released = 0;
411    ps->tot_downstream_released = 0;
412    ps->tot_downstream_reserved = 0;
413    ps->tot_downstream_reserved_time = 0;
414    ps->max_downstream_reserved_time = 0;
415    ps->tot_downstream_freed = 0;
416    ps->tot_downstream_quit_server = 0;
417    ps->tot_downstream_max_reached = 0;
418    ps->tot_downstream_create_failed = 0;
419    ps->tot_downstream_connect_started = 0;
420    ps->tot_downstream_connect_wait = 0;
421    ps->tot_downstream_connect = 0;
422    ps->tot_downstream_connect_failed = 0;
423    ps->tot_downstream_connect_timeout = 0;
424    ps->tot_downstream_connect_interval = 0;
425    ps->tot_downstream_connect_max_reached = 0;
426    ps->tot_downstream_waiting_errors = 0;
427    ps->tot_downstream_auth = 0;
428    ps->tot_downstream_auth_failed = 0;
429    ps->tot_downstream_bucket = 0;
430    ps->tot_downstream_bucket_failed = 0;
431    ps->tot_downstream_propagate_failed = 0;
432    ps->tot_downstream_close_on_upstream_close = 0;
433    ps->tot_downstream_conn_queue_timeout = 0;
434    ps->tot_downstream_conn_queue_add = 0;
435    ps->tot_downstream_conn_queue_remove = 0;
436    ps->tot_downstream_timeout = 0;
437    ps->tot_wait_queue_timeout = 0;
438    ps->tot_assign_downstream = 0;
439    ps->tot_assign_upstream = 0;
440    ps->tot_assign_recursion = 0;
441    ps->tot_reset_upstream_avail = 0;
442    ps->tot_retry = 0;
443    ps->tot_retry_time = 0;
444    ps->max_retry_time = 0;
445    ps->tot_retry_vbucket = 0;
446    ps->tot_upstream_paused = 0;
447    ps->tot_upstream_unpaused = 0;
448    ps->tot_multiget_keys = 0;
449    ps->tot_multiget_keys_dedupe = 0;
450    ps->tot_multiget_bytes_dedupe = 0;
451    ps->tot_optimize_sets = 0;
452    ps->err_oom = 0;
453    ps->err_upstream_write_prep = 0;
454    ps->err_downstream_write_prep = 0;
455    ps->tot_cmd_time = 0;
456    ps->tot_cmd_count = 0;
457}
458
459void cproxy_reset_stats_cmd(proxy_stats_cmd *sc) {
460    cb_assert(sc);
461    memset(sc, 0, sizeof(proxy_stats_cmd));
462}
463
464/* ------------------------------------------------- */
465
466key_stats *find_key_stats(proxy_td *ptd, char *key, int key_len,
467                          uint64_t msec_time) {
468    key_stats *ks;
469    cb_assert(ptd);
470    cb_assert(key);
471    cb_assert(key_len > 0);
472
473    ks = mcache_get(&ptd->key_stats, key, key_len, msec_time);
474    if (ks == NULL) {
475        ks = calloc(1, sizeof(key_stats));
476        if (ks != NULL) {
477            memcpy(ks->key, key, key_len);
478            ks->key[key_len] = '\0';
479            ks->refcount = 1;
480            ks->added_at = msec_time;
481
482            mcache_set(&ptd->key_stats, ks,
483                       msec_time +
484                       ptd->behavior_pool.base.key_stats_lifespan,
485                       true, false);
486        }
487    }
488
489    return ks;
490}
491
492void touch_key_stats(proxy_td *ptd, char *key, int key_len,
493                     uint64_t msec_time,
494                     enum_stats_cmd_type cmd_type,
495                     enum_stats_cmd cmd,
496                     int delta_seen,
497                     int delta_hits,
498                     int delta_misses,
499                     int delta_read_bytes,
500                     int delta_write_bytes) {
501    key_stats *ks = find_key_stats(ptd, key, key_len, msec_time);
502    if (ks != NULL) {
503        proxy_stats_cmd *psc = &ks->stats_cmd[cmd_type][cmd];
504        if (psc != NULL) {
505            psc->seen        += delta_seen;
506            psc->hits        += delta_hits;
507            psc->misses      += delta_misses;
508            psc->read_bytes  += delta_read_bytes;
509            psc->write_bytes += delta_write_bytes;
510        }
511
512        key_stats_dec_ref(ks);
513    }
514}
515
516/* ------------------------------------------------- */
517
518static char *key_stats_key(void *it) {
519    key_stats *i = it;
520    cb_assert(i);
521    return i->key;
522}
523
524static int key_stats_key_len(void *it) {
525    key_stats *i = it;
526    cb_assert(i);
527    return strlen(i->key);
528}
529
530static int key_stats_len(void *it) {
531    (void)it;
532    return sizeof(key_stats);
533}
534
535void key_stats_add_ref(void *it) {
536    key_stats *i = it;
537    if (i != NULL) {
538        i->refcount++;
539    }
540}
541
542void key_stats_dec_ref(void *it) {
543    key_stats *i = it;
544    if (i != NULL) {
545        i->refcount--;
546        if (i->refcount <= 0) {
547            free(it);
548        }
549    }
550}
551
552static void *key_stats_get_next(void *it) {
553    key_stats *i = it;
554    cb_assert(i);
555    return i->next;
556}
557
558static void key_stats_set_next(void *it, void *next) {
559    key_stats *i = it;
560    cb_assert(i);
561    i->next = (key_stats *) next;
562}
563
564static void *key_stats_get_prev(void *it) {
565    key_stats *i = it;
566    cb_assert(i);
567    return i->prev;
568}
569
570static void key_stats_set_prev(void *it, void *prev) {
571    key_stats *i = it;
572    cb_assert(i);
573    i->prev = (key_stats *) prev;
574}
575
576static uint64_t key_stats_get_exptime(void *it) {
577    key_stats *i = it;
578    cb_assert(i);
579    return i->exptime;
580}
581
582static void key_stats_set_exptime(void *it, uint64_t exptime) {
583    key_stats *i = it;
584    cb_assert(i);
585    i->exptime = exptime;
586}
587
588