1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "src/config.h"
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <errno.h>
7 #include <assert.h>
8 #include "memcached.h"
9 #include "cproxy.h"
10 #include "work.h"
11 #include "log.h"
12 
13 /* Protocol STATS command handling. */
14 
15 /* Special STATS value merging rules, instead of the */
16 /* default to just sum the values.  Note the trailing space. */
17 
18 char *protocol_stats_keys_first =
19     "pid version libevent "
20     "ep_version ep_dbname ep_storage_type ep_flusher_state ep_warmup_thread ";
21 
22 char *protocol_stats_keys_smallest =
23     "uptime "
24     "time "
25     "pointer_size "
26     "limit_maxbytes "
27     "accepting_conns "
28     ":chunk_size "
29     ":chunk_per_page "
30     ":age "; /* TODO: Should age merge be largest, not smallest? */
31 
32 bool protocol_stats_merge_sum(char *v1, int v1len,
33                               char *v2, int v2len,
34                               char *out, int outlen);
35 bool protocol_stats_merge_smallest(char *v1, int v1len,
36                                    char *v2, int v2len,
37                                    char *out, int outlen);
38 
39 int count_dot_pair(char *x, int xlen, char *y, int ylen);
40 int count_dot(char *x, int len);
41 
42 /* Per-key stats. */
43 
44 static char *key_stats_key(void *it);
45 static int key_stats_key_len(void *it);
46 static int key_stats_len(void *it);
47 static void *key_stats_get_next(void *it);
48 static void key_stats_set_next(void *it, void *next);
49 static void *key_stats_get_prev(void *it);
50 static void key_stats_set_prev(void *it, void *prev);
51 static uint64_t key_stats_get_exptime(void *it);
52 static void key_stats_set_exptime(void *it, uint64_t exptime);
53 
54 mcache_funcs mcache_key_stats_funcs = {
55     .item_key         = key_stats_key,
56     .item_key_len     = key_stats_key_len,
57     .item_len         = key_stats_len,
58     .item_add_ref     = key_stats_add_ref,
59     .item_dec_ref     = key_stats_dec_ref,
60     .item_get_next    = key_stats_get_next,
61     .item_set_next    = key_stats_set_next,
62     .item_get_prev    = key_stats_get_prev,
63     .item_set_prev    = key_stats_set_prev,
64     .item_get_exptime = key_stats_get_exptime,
65     .item_set_exptime = key_stats_set_exptime
66 };
67 
68 #define MAX_TOKENS     5
69 #define PREFIX_TOKEN   0
70 #define NAME_TOKEN     1
71 #define VALUE_TOKEN    2
72 #define MERGE_BUF_SIZE 300
73 
protocol_stats_merge_line(genhash_t *merger, char *line)74 bool protocol_stats_merge_line(genhash_t *merger, char *line) {
75     int nline;
76     token_t tokens[MAX_TOKENS];
77     int ntokens;
78     char *name;
79     int name_len;
80 
81 
82     assert(merger != NULL);
83     assert(line != NULL);
84 
85     nline = strlen(line); /* Ex: "STATS uptime 123455" */
86     if (nline <= 0 ||
87         nline >= MERGE_BUF_SIZE) {
88         return false;
89     }
90 
91     ntokens = scan_tokens(line, tokens, MAX_TOKENS, NULL);
92 
93     if (ntokens != 4) { /* 3 + 1 for the terminal token. */
94         return false;
95     }
96 
97     name = tokens[NAME_TOKEN].value;
98     name_len = tokens[NAME_TOKEN].length;
99     if (name == NULL ||
100         name_len <= 0 ||
101         tokens[VALUE_TOKEN].value == NULL ||
102         tokens[VALUE_TOKEN].length <= 0 ||
103         tokens[VALUE_TOKEN].length >= MERGE_BUF_SIZE) {
104         return false;
105     }
106 
107     return protocol_stats_merge_name_val(merger,
108                                          tokens[PREFIX_TOKEN].value,
109                                          tokens[PREFIX_TOKEN].length,
110                                          name, name_len,
111                                          tokens[VALUE_TOKEN].value,
112                                          tokens[VALUE_TOKEN].length);
113 }
114 
115 /* TODO: The stats merge assumes an ascii upstream. */
116 
protocol_stats_merge_name_val(genhash_t *merger, char *prefix, int prefix_len, char *name, int name_len, char *val, int val_len)117 bool protocol_stats_merge_name_val(genhash_t *merger,
118                                    char *prefix,
119                                    int   prefix_len,
120                                    char *name,
121                                    int   name_len,
122                                    char *val,
123                                    int   val_len) {
124 
125     char *key;
126     int key_len;
127 
128     assert(merger);
129     assert(name);
130     assert(val);
131 
132     key = name + name_len - 1;     /* Key part for merge rule lookup. */
133     while (key >= name && *key != ':') { /* Scan for last colon. */
134         key--;
135     }
136     if (key < name) {
137         key = name;
138     }
139     key_len = name_len - (key - name);
140     if (key_len > 0 && key_len < MERGE_BUF_SIZE) {
141         char buf_name[MERGE_BUF_SIZE];
142         char buf_key[MERGE_BUF_SIZE];
143         char buf_val[MERGE_BUF_SIZE];
144         char *prev;
145         token_t prev_tokens[MAX_TOKENS];
146         size_t prev_ntokens;
147         bool ok;
148 
149         strncpy(buf_name, name, name_len);
150         buf_name[name_len] = '\0';
151 
152         prev = (char *) genhash_find(merger, buf_name);
153         if (prev == NULL) {
154             char *hval = malloc(prefix_len + 1 +
155                                 name_len + 1 +
156                                 val_len + 1);
157             if (hval != NULL) {
158                 memcpy(hval, prefix, prefix_len);
159                 hval[prefix_len] = ' ';
160 
161                 memcpy(hval + prefix_len + 1, name, name_len);
162                 hval[prefix_len + 1 + name_len] = ' ';
163 
164                 memcpy(hval + prefix_len + 1 + name_len + 1, val, val_len);
165                 hval[prefix_len + 1 + name_len + 1 + val_len] = '\0';
166 
167                 genhash_update(merger, hval + prefix_len + 1, hval);
168             }
169 
170             return true;
171         }
172 
173         strncpy(buf_key, key, key_len);
174         buf_key[key_len] = '\0';
175 
176         if (strstr(protocol_stats_keys_first, buf_key) != NULL) {
177             return true;
178         }
179 
180         prev_ntokens = scan_tokens(prev, prev_tokens, MAX_TOKENS, NULL);
181         if (prev_ntokens != 4) {
182             return true;
183         }
184 
185         strncpy(buf_val, val, val_len);
186         buf_val[val_len] = '\0';
187 
188         if (strstr(protocol_stats_keys_smallest, buf_key) != NULL) {
189             ok = protocol_stats_merge_smallest(prev_tokens[VALUE_TOKEN].value,
190                                                prev_tokens[VALUE_TOKEN].length,
191                                                buf_val, val_len,
192                                                buf_val, MERGE_BUF_SIZE);
193         } else {
194             ok = protocol_stats_merge_sum(prev_tokens[VALUE_TOKEN].value,
195                                           prev_tokens[VALUE_TOKEN].length,
196                                           buf_val, val_len,
197                                           buf_val, MERGE_BUF_SIZE);
198         }
199 
200         if (ok) {
201             int   vlen = strlen(buf_val);
202             char *hval = malloc(prefix_len + 1 +
203                                 name_len + 1 +
204                                 vlen + 1);
205             if (hval != NULL) {
206                 memcpy(hval, prefix, prefix_len);
207                 hval[prefix_len] = ' ';
208 
209                 memcpy(hval + prefix_len + 1, name, name_len);
210                 hval[prefix_len + 1 + name_len] = ' ';
211 
212                 strcpy(hval + prefix_len + 1 + name_len + 1, buf_val);
213                 hval[prefix_len + 1 + name_len + 1 + vlen] = '\0';
214 
215                 genhash_update(merger, hval + prefix_len + 1, hval);
216 
217                 free(prev);
218             }
219         }
220 
221         /* Note, if we couldn't merge, then just keep */
222         /* the previous value. */
223 
224         return true;
225     }
226 
227     return false;
228 }
229 
protocol_stats_merge_sum(char *v1, int v1len, char *v2, int v2len, char *out, int outlen)230 bool protocol_stats_merge_sum(char *v1, int v1len,
231                               char *v2, int v2len,
232                               char *out, int outlen) {
233     int dot = count_dot_pair(v1, v1len, v2, v2len);
234     if (dot > 0) {
235         float v1f = strtof(v1, NULL);
236         float v2f = strtof(v2, NULL);
237         sprintf(out, "%f", v1f + v2f);
238         return true;
239     } else {
240         uint64_t v1i = 0;
241         uint64_t v2i = 0;
242 
243         if (safe_strtoull(v1, &v1i) &&
244             safe_strtoull(v2, &v2i)) {
245             sprintf(out, "%"PRIu64"", (v1i + v2i));
246             return true;
247         }
248     }
249 
250     (void)outlen;
251     return false;
252 }
253 
protocol_stats_merge_smallest(char *v1, int v1len, char *v2, int v2len, char *out, int outlen)254 bool protocol_stats_merge_smallest(char *v1, int v1len,
255                                    char *v2, int v2len,
256                                    char *out, int outlen) {
257     int dot = count_dot_pair(v1, v1len, v2, v2len);
258     if (dot > 0) {
259         float v1f = strtof(v1, NULL);
260         float v2f = strtof(v2, NULL);
261         sprintf(out, "%f", (v1f > v2f ? v1f : v2f));
262         return true;
263     } else {
264         uint64_t v1i = 0;
265         uint64_t v2i = 0;
266 
267         if (safe_strtoull(v1, &v1i) &&
268             safe_strtoull(v2, &v2i)) {
269             sprintf(out, "%"PRIu64"", (v1i > v2i ? v1i : v2i));
270             return true;
271         }
272     }
273 
274     (void)outlen;
275     return false;
276 }
277 
278 /* Callback to hash table iteration that frees the multiget_entry list.
279  */
protocol_stats_foreach_free(const void *key, const void *value, void *user_data)280 void protocol_stats_foreach_free(const void *key,
281                                  const void *value,
282                                  void *user_data) {
283     (void)key;
284     (void)user_data;
285     assert(value != NULL);
286     free((void*)value);
287 }
288 
protocol_stats_foreach_write(const void *key, const void *value, void *user_data)289 void protocol_stats_foreach_write(const void *key,
290                                   const void *value,
291                                   void *user_data) {
292 
293     char *line = (char *) value;
294     conn *uc = (conn *) user_data;
295     int nline;
296     assert(line != NULL);
297     assert(uc != NULL);
298     (void)key;
299 
300     nline = strlen(line);
301     if (nline > 0) {
302         item *it;
303         if (settings.verbose > 2) {
304             moxi_log_write("%d: cproxy_stats writing: %s\n", uc->sfd, line);
305         }
306 
307         if (IS_BINARY(uc->protocol)) {
308             token_t line_tokens[MAX_TOKENS];
309             size_t  line_ntokens = scan_tokens(line, line_tokens, MAX_TOKENS, NULL);
310 
311             if (line_ntokens == 4) {
312                 uint16_t key_len  = line_tokens[NAME_TOKEN].length;
313                 uint32_t data_len = line_tokens[VALUE_TOKEN].length;
314 
315                 it = item_alloc("s", 1, 0, 0,
316                                 sizeof(protocol_binary_response_stats) + key_len + data_len);
317                 if (it != NULL) {
318                     protocol_binary_response_stats *header =
319                         (protocol_binary_response_stats *) ITEM_data(it);
320 
321                     memset(ITEM_data(it), 0, it->nbytes);
322 
323                     header->message.header.response.magic = (uint8_t) PROTOCOL_BINARY_RES;
324                     header->message.header.response.opcode = uc->binary_header.request.opcode;
325                     header->message.header.response.keylen  = (uint16_t) htons(key_len);
326                     header->message.header.response.bodylen = htonl(key_len + data_len);
327                     header->message.header.response.opaque  = uc->opaque;
328 
329                     memcpy((ITEM_data(it)) + sizeof(protocol_binary_response_stats),
330                            line_tokens[NAME_TOKEN].value, key_len);
331                     memcpy((ITEM_data(it)) + sizeof(protocol_binary_response_stats) + key_len,
332                            line_tokens[VALUE_TOKEN].value, data_len);
333 
334                     if (add_conn_item(uc, it)) {
335                         add_iov(uc, ITEM_data(it), it->nbytes);
336 
337                         if (settings.verbose > 2) {
338                             moxi_log_write("%d: cproxy_stats writing binary", uc->sfd);
339                             cproxy_dump_header(uc->sfd, ITEM_data(it));
340                         }
341 
342                         return;
343                     }
344 
345                     item_remove(it);
346                 }
347             }
348 
349             return;
350         }
351 
352         it = item_alloc("s", 1, 0, 0, nline + 2);
353         if (it != NULL) {
354             strncpy(ITEM_data(it), line, nline);
355             strncpy(ITEM_data(it) + nline, "\r\n", 2);
356 
357             if (add_conn_item(uc, it)) {
358                 add_iov(uc, ITEM_data(it), nline + 2);
359                 return;
360             }
361 
362             item_remove(it);
363         }
364     }
365 }
366 
count_dot_pair(char *x, int xlen, char *y, int ylen)367 int count_dot_pair(char *x, int xlen, char *y, int ylen) {
368     int xdot = count_dot(x, xlen);
369     int ydot = count_dot(y, ylen);
370 
371     return (xdot > ydot ? xdot : ydot);
372 }
373 
count_dot(char *x, int len)374 int count_dot(char *x, int len) { /* Number of '.' chars in a string. */
375     int dot = 0;
376     char *end;
377     for (end = x + len; x < end; x++) {
378         if (*x == '.')
379             dot++;
380     }
381 
382     return dot;
383 }
384 
385 /* ---------------------------------------- */
386 
cproxy_reset_stats_td(proxy_stats_td *pstd)387 void cproxy_reset_stats_td(proxy_stats_td *pstd) {
388     int j;
389 
390     assert(pstd);
391 
392     cproxy_reset_stats(&pstd->stats);
393 
394     for (j = 0; j < STATS_CMD_TYPE_last; j++) {
395         int k;
396         for (k = 0; k < STATS_CMD_last; k++) {
397             cproxy_reset_stats_cmd(&pstd->stats_cmd[j][k]);
398         }
399     }
400 }
401 
cproxy_reset_stats(proxy_stats *ps)402 void cproxy_reset_stats(proxy_stats *ps) {
403     assert(ps);
404 
405     /* Only clear the tot_xxx stats, not the num_xxx ones. */
406 
407     ps->tot_upstream = 0;
408     ps->tot_downstream_conn = 0;
409     ps->tot_downstream_conn_acquired = 0;
410     ps->tot_downstream_conn_released = 0;
411     ps->tot_downstream_released = 0;
412     ps->tot_downstream_reserved = 0;
413     ps->tot_downstream_reserved_time = 0;
414     ps->max_downstream_reserved_time = 0;
415     ps->tot_downstream_freed = 0;
416     ps->tot_downstream_quit_server = 0;
417     ps->tot_downstream_max_reached = 0;
418     ps->tot_downstream_create_failed = 0;
419     ps->tot_downstream_connect_started = 0;
420     ps->tot_downstream_connect_wait = 0;
421     ps->tot_downstream_connect = 0;
422     ps->tot_downstream_connect_failed = 0;
423     ps->tot_downstream_connect_timeout = 0;
424     ps->tot_downstream_connect_interval = 0;
425     ps->tot_downstream_connect_max_reached = 0;
426     ps->tot_downstream_waiting_errors = 0;
427     ps->tot_downstream_auth = 0;
428     ps->tot_downstream_auth_failed = 0;
429     ps->tot_downstream_bucket = 0;
430     ps->tot_downstream_bucket_failed = 0;
431     ps->tot_downstream_propagate_failed = 0;
432     ps->tot_downstream_close_on_upstream_close = 0;
433     ps->tot_downstream_conn_queue_timeout = 0;
434     ps->tot_downstream_conn_queue_add = 0;
435     ps->tot_downstream_conn_queue_remove = 0;
436     ps->tot_downstream_timeout = 0;
437     ps->tot_wait_queue_timeout = 0;
438     ps->tot_assign_downstream = 0;
439     ps->tot_assign_upstream = 0;
440     ps->tot_assign_recursion = 0;
441     ps->tot_reset_upstream_avail = 0;
442     ps->tot_retry = 0;
443     ps->tot_retry_time = 0;
444     ps->max_retry_time = 0;
445     ps->tot_retry_vbucket = 0;
446     ps->tot_upstream_paused = 0;
447     ps->tot_upstream_unpaused = 0;
448     ps->tot_multiget_keys = 0;
449     ps->tot_multiget_keys_dedupe = 0;
450     ps->tot_multiget_bytes_dedupe = 0;
451     ps->tot_optimize_sets = 0;
452     ps->err_oom = 0;
453     ps->err_upstream_write_prep = 0;
454     ps->err_downstream_write_prep = 0;
455     ps->tot_cmd_time = 0;
456     ps->tot_cmd_count = 0;
457 }
458 
cproxy_reset_stats_cmd(proxy_stats_cmd *sc)459 void cproxy_reset_stats_cmd(proxy_stats_cmd *sc) {
460     assert(sc);
461     memset(sc, 0, sizeof(proxy_stats_cmd));
462 }
463 
464 /* ------------------------------------------------- */
465 
find_key_stats(proxy_td *ptd, char *key, int key_len, uint64_t msec_time)466 key_stats *find_key_stats(proxy_td *ptd, char *key, int key_len,
467                           uint64_t msec_time) {
468     key_stats *ks;
469     assert(ptd);
470     assert(key);
471     assert(key_len > 0);
472 
473     ks = mcache_get(&ptd->key_stats, key, key_len, msec_time);
474     if (ks == NULL) {
475         ks = calloc(1, sizeof(key_stats));
476         if (ks != NULL) {
477             memcpy(ks->key, key, key_len);
478             ks->key[key_len] = '\0';
479             ks->refcount = 1;
480             ks->added_at = msec_time;
481 
482             mcache_set(&ptd->key_stats, ks,
483                        msec_time +
484                        ptd->behavior_pool.base.key_stats_lifespan,
485                        true, false);
486         }
487     }
488 
489     return ks;
490 }
491 
touch_key_stats(proxy_td *ptd, char *key, int key_len, uint64_t msec_time, enum_stats_cmd_type cmd_type, enum_stats_cmd cmd, int delta_seen, int delta_hits, int delta_misses, int delta_read_bytes, int delta_write_bytes)492 void touch_key_stats(proxy_td *ptd, char *key, int key_len,
493                      uint64_t msec_time,
494                      enum_stats_cmd_type cmd_type,
495                      enum_stats_cmd cmd,
496                      int delta_seen,
497                      int delta_hits,
498                      int delta_misses,
499                      int delta_read_bytes,
500                      int delta_write_bytes) {
501     key_stats *ks = find_key_stats(ptd, key, key_len, msec_time);
502     if (ks != NULL) {
503         proxy_stats_cmd *psc = &ks->stats_cmd[cmd_type][cmd];
504         if (psc != NULL) {
505             psc->seen        += delta_seen;
506             psc->hits        += delta_hits;
507             psc->misses      += delta_misses;
508             psc->read_bytes  += delta_read_bytes;
509             psc->write_bytes += delta_write_bytes;
510         }
511 
512         key_stats_dec_ref(ks);
513     }
514 }
515 
516 /* ------------------------------------------------- */
517 
key_stats_key(void *it)518 static char *key_stats_key(void *it) {
519     key_stats *i = it;
520     assert(i);
521     return i->key;
522 }
523 
key_stats_key_len(void *it)524 static int key_stats_key_len(void *it) {
525     key_stats *i = it;
526     assert(i);
527     return strlen(i->key);
528 }
529 
key_stats_len(void *it)530 static int key_stats_len(void *it) {
531     (void)it;
532     return sizeof(key_stats);
533 }
534 
key_stats_add_ref(void *it)535 void key_stats_add_ref(void *it) {
536     key_stats *i = it;
537     if (i != NULL) {
538         i->refcount++;
539     }
540 }
541 
key_stats_dec_ref(void *it)542 void key_stats_dec_ref(void *it) {
543     key_stats *i = it;
544     if (i != NULL) {
545         i->refcount--;
546         if (i->refcount <= 0) {
547             free(it);
548         }
549     }
550 }
551 
key_stats_get_next(void *it)552 static void *key_stats_get_next(void *it) {
553     key_stats *i = it;
554     assert(i);
555     return i->next;
556 }
557 
key_stats_set_next(void *it, void *next)558 static void key_stats_set_next(void *it, void *next) {
559     key_stats *i = it;
560     assert(i);
561     i->next = (key_stats *) next;
562 }
563 
key_stats_get_prev(void *it)564 static void *key_stats_get_prev(void *it) {
565     key_stats *i = it;
566     assert(i);
567     return i->prev;
568 }
569 
key_stats_set_prev(void *it, void *prev)570 static void key_stats_set_prev(void *it, void *prev) {
571     key_stats *i = it;
572     assert(i);
573     i->prev = (key_stats *) prev;
574 }
575 
key_stats_get_exptime(void *it)576 static uint64_t key_stats_get_exptime(void *it) {
577     key_stats *i = it;
578     assert(i);
579     return i->exptime;
580 }
581 
key_stats_set_exptime(void *it, uint64_t exptime)582 static void key_stats_set_exptime(void *it, uint64_t exptime) {
583     key_stats *i = it;
584     assert(i);
585     i->exptime = exptime;
586 }
587 
588