1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "src/config.h"
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <errno.h>
7 #include <assert.h>
8 #include "memcached.h"
9 #include "cproxy.h"
10 #include "work.h"
11 #include "log.h"
12
13 /* Protocol STATS command handling. */
14
15 /* Special STATS value merging rules, instead of the */
16 /* default to just sum the values. Note the trailing space. */
17
18 char *protocol_stats_keys_first =
19 "pid version libevent "
20 "ep_version ep_dbname ep_storage_type ep_flusher_state ep_warmup_thread ";
21
22 char *protocol_stats_keys_smallest =
23 "uptime "
24 "time "
25 "pointer_size "
26 "limit_maxbytes "
27 "accepting_conns "
28 ":chunk_size "
29 ":chunk_per_page "
30 ":age "; /* TODO: Should age merge be largest, not smallest? */
31
32 bool protocol_stats_merge_sum(char *v1, int v1len,
33 char *v2, int v2len,
34 char *out, int outlen);
35 bool protocol_stats_merge_smallest(char *v1, int v1len,
36 char *v2, int v2len,
37 char *out, int outlen);
38
39 int count_dot_pair(char *x, int xlen, char *y, int ylen);
40 int count_dot(char *x, int len);
41
42 /* Per-key stats. */
43
44 static char *key_stats_key(void *it);
45 static int key_stats_key_len(void *it);
46 static int key_stats_len(void *it);
47 static void *key_stats_get_next(void *it);
48 static void key_stats_set_next(void *it, void *next);
49 static void *key_stats_get_prev(void *it);
50 static void key_stats_set_prev(void *it, void *prev);
51 static uint64_t key_stats_get_exptime(void *it);
52 static void key_stats_set_exptime(void *it, uint64_t exptime);
53
54 mcache_funcs mcache_key_stats_funcs = {
55 .item_key = key_stats_key,
56 .item_key_len = key_stats_key_len,
57 .item_len = key_stats_len,
58 .item_add_ref = key_stats_add_ref,
59 .item_dec_ref = key_stats_dec_ref,
60 .item_get_next = key_stats_get_next,
61 .item_set_next = key_stats_set_next,
62 .item_get_prev = key_stats_get_prev,
63 .item_set_prev = key_stats_set_prev,
64 .item_get_exptime = key_stats_get_exptime,
65 .item_set_exptime = key_stats_set_exptime
66 };
67
68 #define MAX_TOKENS 5
69 #define PREFIX_TOKEN 0
70 #define NAME_TOKEN 1
71 #define VALUE_TOKEN 2
72 #define MERGE_BUF_SIZE 300
73
protocol_stats_merge_line(genhash_t *merger, char *line)74 bool protocol_stats_merge_line(genhash_t *merger, char *line) {
75 int nline;
76 token_t tokens[MAX_TOKENS];
77 int ntokens;
78 char *name;
79 int name_len;
80
81
82 assert(merger != NULL);
83 assert(line != NULL);
84
85 nline = strlen(line); /* Ex: "STATS uptime 123455" */
86 if (nline <= 0 ||
87 nline >= MERGE_BUF_SIZE) {
88 return false;
89 }
90
91 ntokens = scan_tokens(line, tokens, MAX_TOKENS, NULL);
92
93 if (ntokens != 4) { /* 3 + 1 for the terminal token. */
94 return false;
95 }
96
97 name = tokens[NAME_TOKEN].value;
98 name_len = tokens[NAME_TOKEN].length;
99 if (name == NULL ||
100 name_len <= 0 ||
101 tokens[VALUE_TOKEN].value == NULL ||
102 tokens[VALUE_TOKEN].length <= 0 ||
103 tokens[VALUE_TOKEN].length >= MERGE_BUF_SIZE) {
104 return false;
105 }
106
107 return protocol_stats_merge_name_val(merger,
108 tokens[PREFIX_TOKEN].value,
109 tokens[PREFIX_TOKEN].length,
110 name, name_len,
111 tokens[VALUE_TOKEN].value,
112 tokens[VALUE_TOKEN].length);
113 }
114
115 /* TODO: The stats merge assumes an ascii upstream. */
116
protocol_stats_merge_name_val(genhash_t *merger, char *prefix, int prefix_len, char *name, int name_len, char *val, int val_len)117 bool protocol_stats_merge_name_val(genhash_t *merger,
118 char *prefix,
119 int prefix_len,
120 char *name,
121 int name_len,
122 char *val,
123 int val_len) {
124
125 char *key;
126 int key_len;
127
128 assert(merger);
129 assert(name);
130 assert(val);
131
132 key = name + name_len - 1; /* Key part for merge rule lookup. */
133 while (key >= name && *key != ':') { /* Scan for last colon. */
134 key--;
135 }
136 if (key < name) {
137 key = name;
138 }
139 key_len = name_len - (key - name);
140 if (key_len > 0 && key_len < MERGE_BUF_SIZE) {
141 char buf_name[MERGE_BUF_SIZE];
142 char buf_key[MERGE_BUF_SIZE];
143 char buf_val[MERGE_BUF_SIZE];
144 char *prev;
145 token_t prev_tokens[MAX_TOKENS];
146 size_t prev_ntokens;
147 bool ok;
148
149 strncpy(buf_name, name, name_len);
150 buf_name[name_len] = '\0';
151
152 prev = (char *) genhash_find(merger, buf_name);
153 if (prev == NULL) {
154 char *hval = malloc(prefix_len + 1 +
155 name_len + 1 +
156 val_len + 1);
157 if (hval != NULL) {
158 memcpy(hval, prefix, prefix_len);
159 hval[prefix_len] = ' ';
160
161 memcpy(hval + prefix_len + 1, name, name_len);
162 hval[prefix_len + 1 + name_len] = ' ';
163
164 memcpy(hval + prefix_len + 1 + name_len + 1, val, val_len);
165 hval[prefix_len + 1 + name_len + 1 + val_len] = '\0';
166
167 genhash_update(merger, hval + prefix_len + 1, hval);
168 }
169
170 return true;
171 }
172
173 strncpy(buf_key, key, key_len);
174 buf_key[key_len] = '\0';
175
176 if (strstr(protocol_stats_keys_first, buf_key) != NULL) {
177 return true;
178 }
179
180 prev_ntokens = scan_tokens(prev, prev_tokens, MAX_TOKENS, NULL);
181 if (prev_ntokens != 4) {
182 return true;
183 }
184
185 strncpy(buf_val, val, val_len);
186 buf_val[val_len] = '\0';
187
188 if (strstr(protocol_stats_keys_smallest, buf_key) != NULL) {
189 ok = protocol_stats_merge_smallest(prev_tokens[VALUE_TOKEN].value,
190 prev_tokens[VALUE_TOKEN].length,
191 buf_val, val_len,
192 buf_val, MERGE_BUF_SIZE);
193 } else {
194 ok = protocol_stats_merge_sum(prev_tokens[VALUE_TOKEN].value,
195 prev_tokens[VALUE_TOKEN].length,
196 buf_val, val_len,
197 buf_val, MERGE_BUF_SIZE);
198 }
199
200 if (ok) {
201 int vlen = strlen(buf_val);
202 char *hval = malloc(prefix_len + 1 +
203 name_len + 1 +
204 vlen + 1);
205 if (hval != NULL) {
206 memcpy(hval, prefix, prefix_len);
207 hval[prefix_len] = ' ';
208
209 memcpy(hval + prefix_len + 1, name, name_len);
210 hval[prefix_len + 1 + name_len] = ' ';
211
212 strcpy(hval + prefix_len + 1 + name_len + 1, buf_val);
213 hval[prefix_len + 1 + name_len + 1 + vlen] = '\0';
214
215 genhash_update(merger, hval + prefix_len + 1, hval);
216
217 free(prev);
218 }
219 }
220
221 /* Note, if we couldn't merge, then just keep */
222 /* the previous value. */
223
224 return true;
225 }
226
227 return false;
228 }
229
protocol_stats_merge_sum(char *v1, int v1len, char *v2, int v2len, char *out, int outlen)230 bool protocol_stats_merge_sum(char *v1, int v1len,
231 char *v2, int v2len,
232 char *out, int outlen) {
233 int dot = count_dot_pair(v1, v1len, v2, v2len);
234 if (dot > 0) {
235 float v1f = strtof(v1, NULL);
236 float v2f = strtof(v2, NULL);
237 sprintf(out, "%f", v1f + v2f);
238 return true;
239 } else {
240 uint64_t v1i = 0;
241 uint64_t v2i = 0;
242
243 if (safe_strtoull(v1, &v1i) &&
244 safe_strtoull(v2, &v2i)) {
245 sprintf(out, "%"PRIu64"", (v1i + v2i));
246 return true;
247 }
248 }
249
250 (void)outlen;
251 return false;
252 }
253
protocol_stats_merge_smallest(char *v1, int v1len, char *v2, int v2len, char *out, int outlen)254 bool protocol_stats_merge_smallest(char *v1, int v1len,
255 char *v2, int v2len,
256 char *out, int outlen) {
257 int dot = count_dot_pair(v1, v1len, v2, v2len);
258 if (dot > 0) {
259 float v1f = strtof(v1, NULL);
260 float v2f = strtof(v2, NULL);
261 sprintf(out, "%f", (v1f > v2f ? v1f : v2f));
262 return true;
263 } else {
264 uint64_t v1i = 0;
265 uint64_t v2i = 0;
266
267 if (safe_strtoull(v1, &v1i) &&
268 safe_strtoull(v2, &v2i)) {
269 sprintf(out, "%"PRIu64"", (v1i > v2i ? v1i : v2i));
270 return true;
271 }
272 }
273
274 (void)outlen;
275 return false;
276 }
277
278 /* Callback to hash table iteration that frees the multiget_entry list.
279 */
protocol_stats_foreach_free(const void *key, const void *value, void *user_data)280 void protocol_stats_foreach_free(const void *key,
281 const void *value,
282 void *user_data) {
283 (void)key;
284 (void)user_data;
285 assert(value != NULL);
286 free((void*)value);
287 }
288
protocol_stats_foreach_write(const void *key, const void *value, void *user_data)289 void protocol_stats_foreach_write(const void *key,
290 const void *value,
291 void *user_data) {
292
293 char *line = (char *) value;
294 conn *uc = (conn *) user_data;
295 int nline;
296 assert(line != NULL);
297 assert(uc != NULL);
298 (void)key;
299
300 nline = strlen(line);
301 if (nline > 0) {
302 item *it;
303 if (settings.verbose > 2) {
304 moxi_log_write("%d: cproxy_stats writing: %s\n", uc->sfd, line);
305 }
306
307 if (IS_BINARY(uc->protocol)) {
308 token_t line_tokens[MAX_TOKENS];
309 size_t line_ntokens = scan_tokens(line, line_tokens, MAX_TOKENS, NULL);
310
311 if (line_ntokens == 4) {
312 uint16_t key_len = line_tokens[NAME_TOKEN].length;
313 uint32_t data_len = line_tokens[VALUE_TOKEN].length;
314
315 it = item_alloc("s", 1, 0, 0,
316 sizeof(protocol_binary_response_stats) + key_len + data_len);
317 if (it != NULL) {
318 protocol_binary_response_stats *header =
319 (protocol_binary_response_stats *) ITEM_data(it);
320
321 memset(ITEM_data(it), 0, it->nbytes);
322
323 header->message.header.response.magic = (uint8_t) PROTOCOL_BINARY_RES;
324 header->message.header.response.opcode = uc->binary_header.request.opcode;
325 header->message.header.response.keylen = (uint16_t) htons(key_len);
326 header->message.header.response.bodylen = htonl(key_len + data_len);
327 header->message.header.response.opaque = uc->opaque;
328
329 memcpy((ITEM_data(it)) + sizeof(protocol_binary_response_stats),
330 line_tokens[NAME_TOKEN].value, key_len);
331 memcpy((ITEM_data(it)) + sizeof(protocol_binary_response_stats) + key_len,
332 line_tokens[VALUE_TOKEN].value, data_len);
333
334 if (add_conn_item(uc, it)) {
335 add_iov(uc, ITEM_data(it), it->nbytes);
336
337 if (settings.verbose > 2) {
338 moxi_log_write("%d: cproxy_stats writing binary", uc->sfd);
339 cproxy_dump_header(uc->sfd, ITEM_data(it));
340 }
341
342 return;
343 }
344
345 item_remove(it);
346 }
347 }
348
349 return;
350 }
351
352 it = item_alloc("s", 1, 0, 0, nline + 2);
353 if (it != NULL) {
354 strncpy(ITEM_data(it), line, nline);
355 strncpy(ITEM_data(it) + nline, "\r\n", 2);
356
357 if (add_conn_item(uc, it)) {
358 add_iov(uc, ITEM_data(it), nline + 2);
359 return;
360 }
361
362 item_remove(it);
363 }
364 }
365 }
366
count_dot_pair(char *x, int xlen, char *y, int ylen)367 int count_dot_pair(char *x, int xlen, char *y, int ylen) {
368 int xdot = count_dot(x, xlen);
369 int ydot = count_dot(y, ylen);
370
371 return (xdot > ydot ? xdot : ydot);
372 }
373
count_dot(char *x, int len)374 int count_dot(char *x, int len) { /* Number of '.' chars in a string. */
375 int dot = 0;
376 char *end;
377 for (end = x + len; x < end; x++) {
378 if (*x == '.')
379 dot++;
380 }
381
382 return dot;
383 }
384
385 /* ---------------------------------------- */
386
cproxy_reset_stats_td(proxy_stats_td *pstd)387 void cproxy_reset_stats_td(proxy_stats_td *pstd) {
388 int j;
389
390 assert(pstd);
391
392 cproxy_reset_stats(&pstd->stats);
393
394 for (j = 0; j < STATS_CMD_TYPE_last; j++) {
395 int k;
396 for (k = 0; k < STATS_CMD_last; k++) {
397 cproxy_reset_stats_cmd(&pstd->stats_cmd[j][k]);
398 }
399 }
400 }
401
cproxy_reset_stats(proxy_stats *ps)402 void cproxy_reset_stats(proxy_stats *ps) {
403 assert(ps);
404
405 /* Only clear the tot_xxx stats, not the num_xxx ones. */
406
407 ps->tot_upstream = 0;
408 ps->tot_downstream_conn = 0;
409 ps->tot_downstream_conn_acquired = 0;
410 ps->tot_downstream_conn_released = 0;
411 ps->tot_downstream_released = 0;
412 ps->tot_downstream_reserved = 0;
413 ps->tot_downstream_reserved_time = 0;
414 ps->max_downstream_reserved_time = 0;
415 ps->tot_downstream_freed = 0;
416 ps->tot_downstream_quit_server = 0;
417 ps->tot_downstream_max_reached = 0;
418 ps->tot_downstream_create_failed = 0;
419 ps->tot_downstream_connect_started = 0;
420 ps->tot_downstream_connect_wait = 0;
421 ps->tot_downstream_connect = 0;
422 ps->tot_downstream_connect_failed = 0;
423 ps->tot_downstream_connect_timeout = 0;
424 ps->tot_downstream_connect_interval = 0;
425 ps->tot_downstream_connect_max_reached = 0;
426 ps->tot_downstream_waiting_errors = 0;
427 ps->tot_downstream_auth = 0;
428 ps->tot_downstream_auth_failed = 0;
429 ps->tot_downstream_bucket = 0;
430 ps->tot_downstream_bucket_failed = 0;
431 ps->tot_downstream_propagate_failed = 0;
432 ps->tot_downstream_close_on_upstream_close = 0;
433 ps->tot_downstream_conn_queue_timeout = 0;
434 ps->tot_downstream_conn_queue_add = 0;
435 ps->tot_downstream_conn_queue_remove = 0;
436 ps->tot_downstream_timeout = 0;
437 ps->tot_wait_queue_timeout = 0;
438 ps->tot_assign_downstream = 0;
439 ps->tot_assign_upstream = 0;
440 ps->tot_assign_recursion = 0;
441 ps->tot_reset_upstream_avail = 0;
442 ps->tot_retry = 0;
443 ps->tot_retry_time = 0;
444 ps->max_retry_time = 0;
445 ps->tot_retry_vbucket = 0;
446 ps->tot_upstream_paused = 0;
447 ps->tot_upstream_unpaused = 0;
448 ps->tot_multiget_keys = 0;
449 ps->tot_multiget_keys_dedupe = 0;
450 ps->tot_multiget_bytes_dedupe = 0;
451 ps->tot_optimize_sets = 0;
452 ps->err_oom = 0;
453 ps->err_upstream_write_prep = 0;
454 ps->err_downstream_write_prep = 0;
455 ps->tot_cmd_time = 0;
456 ps->tot_cmd_count = 0;
457 }
458
cproxy_reset_stats_cmd(proxy_stats_cmd *sc)459 void cproxy_reset_stats_cmd(proxy_stats_cmd *sc) {
460 assert(sc);
461 memset(sc, 0, sizeof(proxy_stats_cmd));
462 }
463
464 /* ------------------------------------------------- */
465
find_key_stats(proxy_td *ptd, char *key, int key_len, uint64_t msec_time)466 key_stats *find_key_stats(proxy_td *ptd, char *key, int key_len,
467 uint64_t msec_time) {
468 key_stats *ks;
469 assert(ptd);
470 assert(key);
471 assert(key_len > 0);
472
473 ks = mcache_get(&ptd->key_stats, key, key_len, msec_time);
474 if (ks == NULL) {
475 ks = calloc(1, sizeof(key_stats));
476 if (ks != NULL) {
477 memcpy(ks->key, key, key_len);
478 ks->key[key_len] = '\0';
479 ks->refcount = 1;
480 ks->added_at = msec_time;
481
482 mcache_set(&ptd->key_stats, ks,
483 msec_time +
484 ptd->behavior_pool.base.key_stats_lifespan,
485 true, false);
486 }
487 }
488
489 return ks;
490 }
491
touch_key_stats(proxy_td *ptd, char *key, int key_len, uint64_t msec_time, enum_stats_cmd_type cmd_type, enum_stats_cmd cmd, int delta_seen, int delta_hits, int delta_misses, int delta_read_bytes, int delta_write_bytes)492 void touch_key_stats(proxy_td *ptd, char *key, int key_len,
493 uint64_t msec_time,
494 enum_stats_cmd_type cmd_type,
495 enum_stats_cmd cmd,
496 int delta_seen,
497 int delta_hits,
498 int delta_misses,
499 int delta_read_bytes,
500 int delta_write_bytes) {
501 key_stats *ks = find_key_stats(ptd, key, key_len, msec_time);
502 if (ks != NULL) {
503 proxy_stats_cmd *psc = &ks->stats_cmd[cmd_type][cmd];
504 if (psc != NULL) {
505 psc->seen += delta_seen;
506 psc->hits += delta_hits;
507 psc->misses += delta_misses;
508 psc->read_bytes += delta_read_bytes;
509 psc->write_bytes += delta_write_bytes;
510 }
511
512 key_stats_dec_ref(ks);
513 }
514 }
515
516 /* ------------------------------------------------- */
517
key_stats_key(void *it)518 static char *key_stats_key(void *it) {
519 key_stats *i = it;
520 assert(i);
521 return i->key;
522 }
523
key_stats_key_len(void *it)524 static int key_stats_key_len(void *it) {
525 key_stats *i = it;
526 assert(i);
527 return strlen(i->key);
528 }
529
key_stats_len(void *it)530 static int key_stats_len(void *it) {
531 (void)it;
532 return sizeof(key_stats);
533 }
534
key_stats_add_ref(void *it)535 void key_stats_add_ref(void *it) {
536 key_stats *i = it;
537 if (i != NULL) {
538 i->refcount++;
539 }
540 }
541
key_stats_dec_ref(void *it)542 void key_stats_dec_ref(void *it) {
543 key_stats *i = it;
544 if (i != NULL) {
545 i->refcount--;
546 if (i->refcount <= 0) {
547 free(it);
548 }
549 }
550 }
551
key_stats_get_next(void *it)552 static void *key_stats_get_next(void *it) {
553 key_stats *i = it;
554 assert(i);
555 return i->next;
556 }
557
key_stats_set_next(void *it, void *next)558 static void key_stats_set_next(void *it, void *next) {
559 key_stats *i = it;
560 assert(i);
561 i->next = (key_stats *) next;
562 }
563
key_stats_get_prev(void *it)564 static void *key_stats_get_prev(void *it) {
565 key_stats *i = it;
566 assert(i);
567 return i->prev;
568 }
569
key_stats_set_prev(void *it, void *prev)570 static void key_stats_set_prev(void *it, void *prev) {
571 key_stats *i = it;
572 assert(i);
573 i->prev = (key_stats *) prev;
574 }
575
key_stats_get_exptime(void *it)576 static uint64_t key_stats_get_exptime(void *it) {
577 key_stats *i = it;
578 assert(i);
579 return i->exptime;
580 }
581
key_stats_set_exptime(void *it, uint64_t exptime)582 static void key_stats_set_exptime(void *it, uint64_t exptime) {
583 key_stats *i = it;
584 assert(i);
585 i->exptime = exptime;
586 }
587
588