xref: /6.0.3/moxi/src/agent_stats.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <unistd.h>
7#include <errno.h>
8#include <platform/cbassert.h>
9#include <math.h>
10#include <limits.h>
11#include <libconflate/conflate.h>
12#include "memcached.h"
13#include "cproxy.h"
14#include "work.h"
15#include "agent.h"
16
17#ifdef REDIRECTS_FOR_MOCKS
18#include "redirects.h"
19#endif
20
21/* Local declarations. */
22
23void xpassword(char *p);
24
25static void add_stat_prefix(const void *dump_opaque,
26                            const char *prefix,
27                            const char *key,
28                            const char *val);
29
30static void add_stat_prefix_ase(const char *key, const uint16_t klen,
31                                const char *val, const uint32_t vlen,
32                                const void *cookie);
33
34static void main_stats_collect(void *data0, void *data1);
35static void work_stats_collect(void *data0, void *data1);
36
37static void main_stats_reset(void *data0, void *data1);
38static void work_stats_reset(void *data0, void *data1);
39
40static void add_proxy_stats_td(proxy_stats_td *agg,
41                               proxy_stats_td *x);
42static void add_raw_key_stats(genhash_t *key_stats_map,
43                              mcache *key_stats);
44static void add_processed_key_stats(genhash_t *dest_map,
45                                    genhash_t *src_map);
46static void add_proxy_stats(proxy_stats *agg,
47                            proxy_stats *x);
48static void add_stats_cmd(proxy_stats_cmd *agg,
49                          proxy_stats_cmd *x);
50static void add_stats_cmd_with_rescale(proxy_stats_cmd *agg,
51                                       const proxy_stats_cmd *x,
52                                       float rescale_agg,
53                                       float rescale_x);
54
55void genhash_free_entry(const void *key,
56                        const void *value,
57                        void *user_data);
58void map_pstd_foreach_emit(const void *key,
59                           const void *value,
60                           void *user_data);
61
62void map_pstd_foreach_merge(const void *key,
63                            const void *value,
64                            void *user_data);
65
66static void map_key_stats_foreach_free(const void *key,
67                                       const void *value,
68                                       void *user_data);
69static void map_key_stats_foreach_emit(const void *key,
70                                       const void *value,
71                                       void *user_data);
72static void map_key_stats_foreach_merge(const void *key,
73                                        const void *value,
74                                        void *user_data);
75
76static void proxy_stats_dump_behavior(ADD_STAT add_stats,
77                                      conn *c,
78                                      const char *prefix,
79                                      proxy_behavior *b,
80                                      int level);
81static void proxy_stats_dump_frontcache(ADD_STAT add_stats,
82                                        conn *c,
83                                        const char *prefix,
84                                        proxy *p);
85static void proxy_stats_dump_pstd_stats(ADD_STAT add_stats,
86                                        conn *c,
87                                        const char *prefix,
88                                        proxy_stats *stats);
89static void proxy_stats_dump_stats_cmd(ADD_STAT add_stats,
90                                       conn *c, bool do_zeros,
91                                       const char *prefix,
92                                       proxy_stats_cmd stats_cmd[][STATS_CMD_last]);
93static void map_key_stats_foreach_dump(const void *key,
94                                       const void *value,
95                                       void *user_data);
96
97struct main_stats_proxy_info {
98    char *name;
99    int port;
100};
101
102struct main_stats_collect_info {
103    proxy_main           *m;
104    conflate_form_result *result;
105    char                 *prefix;
106
107    char *type;
108    bool  do_settings;
109    bool  do_stats;
110    bool  do_zeros;
111
112    int nproxy;
113    struct main_stats_proxy_info *proxies;
114};
115
116static char *cmd_names[] = { /* Keep sync'ed with enum_stats_cmd. */
117    "get",
118    "get_key",
119    "set",
120    "add",
121    "replace",
122    "delete",
123    "append",
124    "prepend",
125    "incr",
126    "decr",
127    "flush_all",
128    "cas",
129    "stats",
130    "stats_reset",
131    "version",
132    "verbosity",
133    "quit",
134    "getl",
135    "unl",
136    "ERROR"
137};
138
139static char *cmd_type_names[] = { /* Keep sync'ed with enum_stats_cmd_type. */
140    "regular",
141    "quiet"
142};
143
144struct stats_gathering_pair {
145    genhash_t *map_pstd; /* maps "<proxy-name>:<port>" strings to (proxy_stats_td *) */
146    genhash_t *map_key_stats; /* maps "<proxy-name>:<port>" strings to (genhash that maps key names to (struct key_stats *)) */
147};
148
149#ifndef REDIRECTS_FOR_MOCKS
150static
151#else
152#undef collect_memcached_stats_for_proxy
153#endif
154void collect_memcached_stats_for_proxy(struct main_stats_collect_info *msci,
155                                       const char *proxy_name, int proxy_port) {
156    memcached_st mst;
157    memcached_return error;
158    memcached_stat_st *st;
159    char bufk[500];
160
161    memcached_create(&mst);
162    memcached_server_add(&mst, "127.0.0.1", proxy_port);
163    memcached_behavior_set(&mst, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
164
165    st = memcached_stat(&mst, NULL, &error);
166
167    if (st == NULL) {
168        goto out_free;
169    }
170
171#define emit_s(key, val)                                 \
172    snprintf(bufk, sizeof(bufk), "%u:%s:stats:%s",       \
173             proxy_port,                                 \
174             proxy_name != NULL ? proxy_name : "", key); \
175    conflate_add_field(msci->result, bufk, val);
176
177    {
178        char **keys = memcached_stat_get_keys(&mst, st, &error);
179        for (; *keys; keys++) {
180            char *key = *keys;
181            char *value = memcached_stat_get_value(&mst, st, key, &error);
182            if (value == NULL) {
183                continue;
184            }
185
186            emit_s(key, value);
187            free(value);
188        }
189    }
190#undef emit_s
191
192    free(st);
193
194out_free:
195    memcached_free(&mst);
196}
197
198#ifdef REDIRECTS_FOR_MOCKS
199#define collect_memcached_stats_for_proxy redirected_collect_memcached_stats_for_proxy
200#endif
201
202/* This callback is invoked by conflate on a conflate thread
203 * when it wants proxy stats.
204 *
205 * We use the work_queues to retrieve the info, so that normal
206 * runtime has fewer locks, at the cost of scatter/gather
207 * complexity to handle the proxy stats request.
208 */
209enum conflate_mgmt_cb_result on_conflate_get_stats(void *userdata,
210                                                   conflate_handle_t *handle,
211                                                   const char *cmd,
212                                                   bool direct,
213                                                   kvpair_t *form,
214                                                   conflate_form_result *r)
215{
216    proxy_main *m = userdata;
217    LIBEVENT_THREAD *mthread;
218    char *type;
219    bool do_all;
220    struct main_stats_collect_info msci;
221    char buf[800];
222    work_collect *ca;
223
224    (void)handle;
225    (void)cmd;
226    (void)direct;
227
228    cb_assert(STATS_CMD_last      == sizeof(cmd_names) / sizeof(char *));
229    cb_assert(STATS_CMD_TYPE_last == sizeof(cmd_type_names) / sizeof(char *));
230
231    cb_assert(m);
232    cb_assert(m->nthreads > 1);
233
234    mthread = thread_by_index(0);
235    cb_assert(mthread);
236    cb_assert(mthread->work_queue);
237
238    type = get_simple_kvpair_val(form, "-subtype-");
239    do_all = (type == NULL ||
240              strlen(type) <= 0 ||
241              strcmp(type, "all") == 0);
242
243    memset(&msci, 0, sizeof(msci));
244    msci.m        = m;
245    msci.result   = r;
246    msci.prefix   = "";
247    msci.type     = type;
248    msci.do_settings = (do_all || strcmp(type, "settings") == 0);
249    msci.do_stats    = (do_all || strcmp(type, "stats") == 0);
250    msci.do_zeros    = (type != NULL &&
251                        strcmp(type, "all") == 0); /* Only when explicit "all". */
252        /* .nproxy   = 0, */
253    msci.proxies = 0;
254
255#define more_stat(spec, key, val)              \
256    if (msci.do_zeros || val) {                \
257        snprintf(buf, sizeof(buf), spec, val); \
258        conflate_add_field(r, key, buf);       \
259    }
260
261    more_stat("%s", "main_version",
262              VERSION);
263    more_stat("%u", "main_nthreads",
264              m->nthreads);
265
266    if (msci.do_settings) {
267        conflate_add_field(r, "main_hostname", cproxy_hostname);
268
269        cproxy_dump_behavior_ex(&m->behavior, "main_behavior", 2,
270                                add_stat_prefix, &msci);
271    }
272
273    if (msci.do_stats) {
274        more_stat("%"PRIu64, "main_configs",
275                  (uint64_t) m->stat_configs);
276        more_stat("%"PRIu64, "main_config_fails",
277                  (uint64_t) m->stat_config_fails);
278        more_stat("%"PRIu64, "main_proxy_starts",
279                  (uint64_t) m->stat_proxy_starts);
280        more_stat("%"PRIu64, "main_proxy_start_fails",
281                  (uint64_t) m->stat_proxy_start_fails);
282        more_stat("%"PRIu64, "main_proxy_existings",
283                  (uint64_t) m->stat_proxy_existings);
284        more_stat("%"PRIu64, "main_proxy_shutdowns",
285                  (uint64_t) m->stat_proxy_shutdowns);
286    }
287
288#undef more_stat
289
290    if (msci.do_settings) {
291        struct main_stats_collect_info ase = msci;
292        ase.prefix = "memcached_settings";
293
294        process_stat_settings(add_stat_prefix_ase, &ase, NULL);
295    }
296
297    if (msci.do_stats) {
298        struct main_stats_collect_info ase = msci;
299        ase.prefix = "memcached_stats";
300
301        server_stats(add_stat_prefix_ase, &ase, NULL);
302    }
303
304    /* Alloc here so the main listener thread has less work. */
305
306    ca = calloc(m->nthreads, sizeof(work_collect));
307    if (ca != NULL) {
308        int i;
309
310        for (i = 1; i < m->nthreads; i++) {
311            struct stats_gathering_pair *pair =
312                calloc(1, sizeof(struct stats_gathering_pair));
313            if (!pair) {
314                break;
315            }
316
317            /* Each thread gets its own collection hashmap, which */
318            /* is keyed by each proxy's "binding:name", and whose */
319            /* values are proxy_stats_td. */
320
321            if (!(pair->map_pstd = genhash_init(128, strhash_ops))) {
322                break;
323            }
324
325            /* Key stats hashmap has same keys and */
326            /* genhash<string, struct key_stats *> as values. */
327
328            if (!(pair->map_key_stats = genhash_init(128, strhash_ops))) {
329                break;
330            }
331            work_collect_init(&ca[i], -1, pair);
332        }
333
334        /* Continue on the main listener thread. */
335
336        if (i >= m->nthreads &&
337            work_send(mthread->work_queue, main_stats_collect,
338                      &msci, ca)) {
339            /* Wait for all the stats collecting to finish. */
340
341            for (i = 1; i < m->nthreads; i++) {
342                work_collect_wait(&ca[i]);
343            }
344
345            cb_assert(m->nthreads > 0);
346
347            if (msci.do_stats) {
348                struct stats_gathering_pair *end_pair = ca[1].data;
349                genhash_t *end_pstd = end_pair->map_pstd;
350                genhash_t *end_map_key_stats = end_pair->map_key_stats;
351                if (end_pstd != NULL) {
352                    /* Skip the first worker thread (index 1)'s results, */
353                    /* because that's where we'll aggregate final results. */
354
355                    for (i = 2; i < m->nthreads; i++) {
356                        struct stats_gathering_pair *pair = ca[i].data;
357                        genhash_t *map_pstd = pair->map_pstd;
358                        genhash_t *map_key_stats;
359
360                        if (map_pstd != NULL) {
361                            genhash_iter(map_pstd, map_pstd_foreach_merge,
362                                         end_pstd);
363                        }
364
365                        map_key_stats = pair->map_key_stats;
366                        if (map_key_stats != NULL) {
367                            genhash_iter(map_key_stats,
368                                         map_key_stats_foreach_merge,
369                                         end_map_key_stats);
370                        }
371                    }
372
373                    genhash_iter(end_pstd, map_pstd_foreach_emit, &msci);
374                    genhash_iter(end_map_key_stats,
375                                 map_key_stats_foreach_emit, &msci);
376                }
377            }
378        }
379
380        for (i = 0; i < msci.nproxy; i++) {
381            collect_memcached_stats_for_proxy(&msci,
382                                              msci.proxies[i].name,
383                                              msci.proxies[i].port);
384            free(msci.proxies[i].name);
385        }
386        free(msci.proxies);
387
388        for (i = 1; i < m->nthreads; i++) {
389            genhash_t *map_key_stats;
390            genhash_t *map_pstd;
391            struct stats_gathering_pair *pair = ca[i].data;
392            if (!pair) {
393                continue;
394            }
395            map_pstd = pair->map_pstd;
396            if (map_pstd != NULL) {
397                genhash_iter(map_pstd, genhash_free_entry, NULL);
398                genhash_free(map_pstd);
399            }
400            map_key_stats = pair->map_key_stats;
401            if (map_key_stats != NULL) {
402                genhash_iter(map_key_stats, map_key_stats_foreach_free, NULL);
403                genhash_free(map_key_stats);
404            }
405            free(pair);
406        }
407
408        free(ca);
409    }
410
411    return RV_OK;
412}
413
414void map_pstd_foreach_merge(const void *key,
415                            const void *value,
416                            void *user_data) {
417    genhash_t *map_end_pstd = user_data;
418    if (key != NULL &&
419        map_end_pstd != NULL) {
420        proxy_stats_td *cur_pstd = (proxy_stats_td *) value;
421        proxy_stats_td *end_pstd = genhash_find(map_end_pstd, key);
422        if (cur_pstd != NULL &&
423            end_pstd != NULL) {
424            add_proxy_stats_td(end_pstd, cur_pstd);
425        }
426    }
427}
428
429void map_key_stats_foreach_free(const void *key,
430                                const void *value,
431                                void *user_data) {
432    genhash_t *map_key_stats;
433    (void)user_data;
434    cb_assert(key);
435    cb_assert(value);
436
437    free((void *)key);
438
439    map_key_stats = (genhash_t *)value;
440    genhash_iter(map_key_stats, genhash_free_entry, NULL);
441    genhash_free(map_key_stats);
442}
443
444void map_key_stats_foreach_merge(const void *key,
445                                 const void *value,
446                                 void *user_data) {
447    genhash_t *end_map_key_stats = user_data;
448    if (key != NULL) {
449        genhash_t *key_stats_hash = (genhash_t *) value;
450        genhash_t *end_key_stats = genhash_find(end_map_key_stats, key);
451        if (key_stats_hash != NULL && end_key_stats != NULL) {
452            add_processed_key_stats(key_stats_hash, end_key_stats);
453        }
454    }
455}
456
457static void proxy_stats_dump_behavior(ADD_STAT add_stats,
458                                      conn *c, const char *prefix,
459                                      proxy_behavior *b, int level) {
460    if (level >= 2) {
461        APPEND_PREFIX_STAT("cycle", "%u", b->cycle);
462    }
463
464    if (level >= 1) {
465        APPEND_PREFIX_STAT("downstream_max", "%u", b->downstream_max);
466        APPEND_PREFIX_STAT("downstream_conn_max", "%u", b->downstream_conn_max);
467    }
468
469    APPEND_PREFIX_STAT("downstream_weight",   "%u", b->downstream_weight);
470    APPEND_PREFIX_STAT("downstream_retry",    "%u", b->downstream_retry);
471    APPEND_PREFIX_STAT("downstream_protocol", "%d", b->downstream_protocol);
472    APPEND_PREFIX_STAT("downstream_timeout", "%ld", /* In millisecs. */
473              (b->downstream_timeout.tv_sec * 1000 +
474               b->downstream_timeout.tv_usec / 1000));
475    APPEND_PREFIX_STAT("downstream_conn_queue_timeout", "%ld", /* In millisecs. */
476              (b->downstream_conn_queue_timeout.tv_sec * 1000 +
477               b->downstream_conn_queue_timeout.tv_usec / 1000));
478
479    APPEND_PREFIX_STAT("connect_timeout", "%ld", /* In millisecs. */
480                       (b->connect_timeout.tv_sec * 1000 +
481                        b->connect_timeout.tv_usec / 1000));
482    APPEND_PREFIX_STAT("auth_timeout", "%ld", /* In millisecs. */
483                       (b->auth_timeout.tv_sec * 1000 +
484                        b->auth_timeout.tv_usec / 1000));
485
486    if (level >= 1) {
487        APPEND_PREFIX_STAT("wait_queue_timeout", "%ld", /* In millisecs. */
488              (b->wait_queue_timeout.tv_sec * 1000 +
489               b->wait_queue_timeout.tv_usec / 1000));
490        APPEND_PREFIX_STAT("time_stats", "%d", b->time_stats);
491        APPEND_PREFIX_STAT("connect_max_errors", "%d", b->connect_max_errors);
492        APPEND_PREFIX_STAT("connect_retry_interval", "%d", b->connect_retry_interval);
493        APPEND_PREFIX_STAT("front_cache_max", "%u", b->front_cache_max);
494        APPEND_PREFIX_STAT("front_cache_lifespan", "%u", b->front_cache_lifespan);
495        APPEND_PREFIX_STAT("front_cache_spec", "%s", b->front_cache_spec);
496        APPEND_PREFIX_STAT("front_cache_unspec", "%s", b->front_cache_unspec);
497        APPEND_PREFIX_STAT("key_stats_max", "%u", b->key_stats_max);
498        APPEND_PREFIX_STAT("key_stats_lifespan", "%u", b->key_stats_lifespan);
499        APPEND_PREFIX_STAT("key_stats_spec", "%s", b->key_stats_spec);
500        APPEND_PREFIX_STAT("key_stats_unspec", "%s", b->key_stats_unspec);
501        APPEND_PREFIX_STAT("optimize_set", "%s", b->optimize_set);
502    }
503
504    APPEND_PREFIX_STAT("usr",    "%s", b->usr);
505    APPEND_PREFIX_STAT("host",   "%s", b->host);
506    APPEND_PREFIX_STAT("port",   "%d", b->port);
507    APPEND_PREFIX_STAT("bucket", "%s", b->bucket);
508
509    if (level >= 1) {
510        APPEND_PREFIX_STAT("port_listen", "%d", b->port_listen);
511        APPEND_PREFIX_STAT("default_bucket_name", "%s", b->default_bucket_name);
512    }
513}
514
515static void proxy_stats_dump_frontcache(ADD_STAT add_stats, conn *c,
516                                        const char *prefix, proxy *p) {
517    cb_mutex_enter(p->front_cache.lock);
518
519    if (p->front_cache.map != NULL) {
520        APPEND_PREFIX_STAT("size", "%u", genhash_size(p->front_cache.map));
521    }
522
523    APPEND_PREFIX_STAT("max", "%u", p->front_cache.max);
524    APPEND_PREFIX_STAT("oldest_live", "%u", p->front_cache.oldest_live);
525    APPEND_PREFIX_STAT("tot_get_hits",
526           "%"PRIu64, (uint64_t) p->front_cache.tot_get_hits);
527    APPEND_PREFIX_STAT("tot_get_expires",
528           "%"PRIu64, (uint64_t) p->front_cache.tot_get_expires);
529    APPEND_PREFIX_STAT("tot_get_misses",
530           "%"PRIu64, (uint64_t) p->front_cache.tot_get_misses);
531    APPEND_PREFIX_STAT("tot_get_bytes",
532           "%"PRIu64, (uint64_t) p->front_cache.tot_get_bytes);
533    APPEND_PREFIX_STAT("tot_adds",
534           "%"PRIu64, (uint64_t) p->front_cache.tot_adds);
535    APPEND_PREFIX_STAT("tot_add_skips",
536           "%"PRIu64, (uint64_t) p->front_cache.tot_add_skips);
537    APPEND_PREFIX_STAT("tot_add_fails",
538           "%"PRIu64, (uint64_t) p->front_cache.tot_add_fails);
539    APPEND_PREFIX_STAT("tot_add_bytes",
540           "%"PRIu64, (uint64_t) p->front_cache.tot_add_bytes);
541    APPEND_PREFIX_STAT("tot_deletes",
542           "%"PRIu64, (uint64_t) p->front_cache.tot_deletes);
543    APPEND_PREFIX_STAT("tot_evictions",
544           "%"PRIu64, (uint64_t) p->front_cache.tot_evictions);
545
546    cb_mutex_exit(p->front_cache.lock);
547}
548
549static void proxy_stats_dump_pstd_stats(ADD_STAT add_stats,
550                                        conn *c, const char *prefix,
551                                        proxy_stats *pstats) {
552    cb_assert(pstats != NULL);
553
554    APPEND_PREFIX_STAT("num_upstream",
555              "%"PRIu64, (uint64_t) pstats->num_upstream);
556    APPEND_PREFIX_STAT("tot_upstream",
557              "%"PRIu64, (uint64_t) pstats->tot_upstream);
558    APPEND_PREFIX_STAT("num_downstream_conn",
559              "%"PRIu64, (uint64_t) pstats->num_downstream_conn);
560    APPEND_PREFIX_STAT("tot_downstream_conn",
561              "%"PRIu64, (uint64_t) pstats->tot_downstream_conn);
562    APPEND_PREFIX_STAT("tot_downstream_conn_acquired",
563              "%"PRIu64, (uint64_t) pstats->tot_downstream_conn_acquired);
564    APPEND_PREFIX_STAT("tot_downstream_conn_released",
565              "%"PRIu64, (uint64_t) pstats->tot_downstream_conn_released);
566    APPEND_PREFIX_STAT("tot_downstream_released",
567              "%"PRIu64, (uint64_t) pstats->tot_downstream_released);
568    APPEND_PREFIX_STAT("tot_downstream_reserved",
569              "%"PRIu64, (uint64_t) pstats->tot_downstream_reserved);
570    APPEND_PREFIX_STAT("tot_downstream_reserved_time",
571              "%"PRIu64, (uint64_t) pstats->tot_downstream_reserved_time);
572    APPEND_PREFIX_STAT("max_downstream_reserved_time",
573              "%"PRIu64, (uint64_t) pstats->max_downstream_reserved_time);
574    APPEND_PREFIX_STAT("tot_downstream_freed",
575              "%"PRIu64, (uint64_t) pstats->tot_downstream_freed);
576    APPEND_PREFIX_STAT("tot_downstream_quit_server",
577              "%"PRIu64, (uint64_t) pstats->tot_downstream_quit_server);
578    APPEND_PREFIX_STAT("tot_downstream_max_reached",
579              "%"PRIu64, (uint64_t) pstats->tot_downstream_max_reached);
580    APPEND_PREFIX_STAT("tot_downstream_create_failed",
581              "%"PRIu64, (uint64_t) pstats->tot_downstream_create_failed);
582    APPEND_PREFIX_STAT("tot_downstream_connect_started",
583              "%"PRIu64, (uint64_t) pstats->tot_downstream_connect_started);
584    APPEND_PREFIX_STAT("tot_downstream_connect_wait",
585              "%"PRIu64, (uint64_t) pstats->tot_downstream_connect_wait);
586    APPEND_PREFIX_STAT("tot_downstream_connect",
587              "%"PRIu64, (uint64_t) pstats->tot_downstream_connect);
588    APPEND_PREFIX_STAT("tot_downstream_connect_failed",
589              "%"PRIu64, (uint64_t) pstats->tot_downstream_connect_failed);
590    APPEND_PREFIX_STAT("tot_downstream_connect_timeout",
591              "%"PRIu64, (uint64_t) pstats->tot_downstream_connect_timeout);
592    APPEND_PREFIX_STAT("tot_downstream_connect_interval",
593              "%"PRIu64, (uint64_t) pstats->tot_downstream_connect_interval);
594    APPEND_PREFIX_STAT("tot_downstream_connect_max_reached",
595              "%"PRIu64, (uint64_t) pstats->tot_downstream_connect_max_reached);
596    APPEND_PREFIX_STAT("tot_downstream_waiting_errors",
597              "%"PRIu64, (uint64_t) pstats->tot_downstream_waiting_errors);
598    APPEND_PREFIX_STAT("tot_downstream_auth",
599              "%"PRIu64, (uint64_t) pstats->tot_downstream_auth);
600    APPEND_PREFIX_STAT("tot_downstream_auth_failed",
601              "%"PRIu64, (uint64_t) pstats->tot_downstream_auth_failed);
602    APPEND_PREFIX_STAT("tot_downstream_bucket",
603              "%"PRIu64, (uint64_t) pstats->tot_downstream_bucket);
604    APPEND_PREFIX_STAT("tot_downstream_bucket_failed",
605              "%"PRIu64, (uint64_t) pstats->tot_downstream_bucket_failed);
606    APPEND_PREFIX_STAT("tot_downstream_propagate_failed",
607              "%"PRIu64, (uint64_t) pstats->tot_downstream_propagate_failed);
608    APPEND_PREFIX_STAT("tot_downstream_close_on_upstream_close",
609              "%"PRIu64, (uint64_t) pstats->tot_downstream_close_on_upstream_close);
610    APPEND_PREFIX_STAT("tot_downstream_conn_queue_timeout",
611              "%"PRIu64, (uint64_t) pstats->tot_downstream_conn_queue_timeout);
612    APPEND_PREFIX_STAT("tot_downstream_conn_queue_add",
613              "%"PRIu64, (uint64_t) pstats->tot_downstream_conn_queue_add);
614    APPEND_PREFIX_STAT("tot_downstream_conn_queue_remove",
615              "%"PRIu64, (uint64_t) pstats->tot_downstream_conn_queue_remove);
616    APPEND_PREFIX_STAT("tot_downstream_timeout",
617              "%"PRIu64, (uint64_t) pstats->tot_downstream_timeout);
618    APPEND_PREFIX_STAT("tot_wait_queue_timeout",
619              "%"PRIu64, (uint64_t) pstats->tot_wait_queue_timeout);
620    APPEND_PREFIX_STAT("tot_auth_timeout",
621              "%"PRIu64, (uint64_t) pstats->tot_auth_timeout);
622    APPEND_PREFIX_STAT("tot_assign_downstream",
623              "%"PRIu64, (uint64_t) pstats->tot_assign_downstream);
624    APPEND_PREFIX_STAT("tot_assign_upstream",
625              "%"PRIu64, (uint64_t) pstats->tot_assign_upstream);
626    APPEND_PREFIX_STAT("tot_assign_recursion",
627              "%"PRIu64, (uint64_t) pstats->tot_assign_recursion);
628    APPEND_PREFIX_STAT("tot_reset_upstream_avail",
629              "%"PRIu64, (uint64_t) pstats->tot_reset_upstream_avail);
630    APPEND_PREFIX_STAT("tot_multiget_keys",
631              "%"PRIu64, (uint64_t) pstats->tot_multiget_keys);
632    APPEND_PREFIX_STAT("tot_multiget_keys_dedupe",
633              "%"PRIu64, (uint64_t) pstats->tot_multiget_keys_dedupe);
634    APPEND_PREFIX_STAT("tot_multiget_bytes_dedupe",
635              "%"PRIu64, (uint64_t) pstats->tot_multiget_bytes_dedupe);
636    APPEND_PREFIX_STAT("tot_optimize_sets",
637              "%"PRIu64, (uint64_t) pstats->tot_optimize_sets);
638    APPEND_PREFIX_STAT("tot_retry",
639              "%"PRIu64, (uint64_t) pstats->tot_retry);
640    APPEND_PREFIX_STAT("tot_retry_time",
641              "%"PRIu64, (uint64_t) pstats->tot_retry_time);
642    APPEND_PREFIX_STAT("max_retry_time",
643              "%"PRIu64, (uint64_t) pstats->max_retry_time);
644    APPEND_PREFIX_STAT("tot_retry_vbucket",
645              "%"PRIu64, (uint64_t) pstats->tot_retry_vbucket);
646    APPEND_PREFIX_STAT("tot_upstream_paused",
647              "%"PRIu64, (uint64_t) pstats->tot_upstream_paused);
648    APPEND_PREFIX_STAT("tot_upstream_unpaused",
649              "%"PRIu64, (uint64_t) pstats->tot_upstream_unpaused);
650    APPEND_PREFIX_STAT("err_oom",
651              "%"PRIu64, (uint64_t) pstats->err_oom);
652    APPEND_PREFIX_STAT("err_upstream_write_prep",
653              "%"PRIu64, (uint64_t) pstats->err_upstream_write_prep);
654    APPEND_PREFIX_STAT("err_downstream_write_prep",
655              "%"PRIu64, (uint64_t) pstats->err_downstream_write_prep);
656    APPEND_PREFIX_STAT("tot_cmd_time",
657              "%"PRIu64, (uint64_t) pstats->tot_cmd_time);
658    APPEND_PREFIX_STAT("tot_cmd_count",
659              "%"PRIu64, (uint64_t) pstats->tot_cmd_count);
660    APPEND_PREFIX_STAT("tot_local_cmd_time",
661              "%"PRIu64, (uint64_t) pstats->tot_local_cmd_time);
662    APPEND_PREFIX_STAT("tot_local_cmd_count",
663              "%"PRIu64, (uint64_t) pstats->tot_local_cmd_count);
664
665}
666
667static void proxy_stats_dump_stats_cmd(ADD_STAT add_stats, conn *c, bool do_zeros,
668                                       const char *prefix,
669                                       proxy_stats_cmd stats_cmd[][STATS_CMD_last]) {
670    char keybuf[128];
671    int j;
672    int k;
673
674    for (j = 0; j < STATS_CMD_TYPE_last; j++) {
675        for (k = 0; k < STATS_CMD_last; k++) {
676            if (do_zeros || stats_cmd[j][k].seen != 0) {
677                snprintf(keybuf, sizeof(keybuf), "%s_%s:%s",
678                         cmd_type_names[j], cmd_names[k], "seen");
679                APPEND_PREFIX_STAT(keybuf,
680                         "%"PRIu64, (uint64_t) stats_cmd[j][k].seen);
681            }
682            if (do_zeros || stats_cmd[j][k].hits != 0) {
683                snprintf(keybuf, sizeof(keybuf), "%s_%s:%s",
684                         cmd_type_names[j], cmd_names[k], "hits");
685                APPEND_PREFIX_STAT(keybuf,
686                         "%"PRIu64, (uint64_t) stats_cmd[j][k].hits);
687            }
688            if (do_zeros || stats_cmd[j][k].misses != 0) {
689                snprintf(keybuf, sizeof(keybuf), "%s_%s:%s",
690                         cmd_type_names[j], cmd_names[k], "misses");
691                APPEND_PREFIX_STAT(keybuf,
692                         "%"PRIu64, (uint64_t) stats_cmd[j][k].misses);
693            }
694            if (do_zeros || stats_cmd[j][k].read_bytes != 0) {
695                snprintf(keybuf, sizeof(keybuf), "%s_%s:%s",
696                         cmd_type_names[j], cmd_names[k], "read_bytes");
697                APPEND_PREFIX_STAT(keybuf,
698                         "%"PRIu64, (uint64_t) stats_cmd[j][k].read_bytes);
699            }
700            if (do_zeros || stats_cmd[j][k].write_bytes != 0) {
701                snprintf(keybuf, sizeof(keybuf), "%s_%s:%s",
702                         cmd_type_names[j], cmd_names[k], "write_bytes");
703                APPEND_PREFIX_STAT(keybuf,
704                         "%"PRIu64, (uint64_t) stats_cmd[j][k].write_bytes);
705            }
706            if (do_zeros || stats_cmd[j][k].cas != 0) {
707                snprintf(keybuf, sizeof(keybuf), "%s_%s:%s",
708                         cmd_type_names[j], cmd_names[k], "cas");
709                APPEND_PREFIX_STAT(keybuf,
710                         "%"PRIu64, (uint64_t) stats_cmd[j][k].cas);
711            }
712        }
713    }
714}
715
716struct key_stats_dump_state {
717    const char *prefix;
718    ADD_STAT add_stats;
719    conn *conn;
720    struct proxy_stats_cmd_info *pscip;
721};
722
723static void map_key_stats_foreach_dump(const void *key, const void *value,
724                                       void *user_data) {
725    const char *name = (const char *)key;
726    struct key_stats *kstats = (struct key_stats *)value;
727    struct key_stats_dump_state *state = user_data;
728    ADD_STAT add_stats;
729    conn *c;
730    char prefix[200+KEY_MAX_LENGTH];
731
732    cb_assert(name != NULL);
733    cb_assert(kstats != NULL);
734    cb_assert(state != NULL);
735
736    cb_assert(strcmp(name, kstats->key) == 0);
737
738    add_stats = state->add_stats;
739    c = state->conn;
740    snprintf(prefix, sizeof(prefix), "%s:%s", state->prefix, name);
741
742    proxy_stats_dump_stats_cmd(add_stats, c, false, prefix, kstats->stats_cmd);
743
744    APPEND_PREFIX_STAT("added_at_msec", "%u", kstats->added_at);
745}
746
747void proxy_stats_dump_basic(ADD_STAT add_stats, conn *c, const char *prefix) {
748    APPEND_PREFIX_STAT("version", "%s", VERSION);
749    APPEND_PREFIX_STAT("nthreads", "%d", settings.num_threads);
750    APPEND_PREFIX_STAT("hostname", "%s", cproxy_hostname);
751}
752
753void proxy_stats_dump_proxy_main(ADD_STAT add_stats, conn *c,
754                                 struct proxy_stats_cmd_info *pscip) {
755    proxy_td *ptd;
756    proxy_main *pm;
757
758    cb_assert(c != NULL);
759
760    ptd = c->extra;
761    if (ptd == NULL ||
762        ptd->proxy == NULL ||
763        ptd->proxy->main == NULL) {
764        return;
765    }
766
767    pm = ptd->proxy->main;
768
769    if (pscip->do_info) {
770        const char *prefix = "proxy_main:";
771        APPEND_PREFIX_STAT("conf_type", "%s",
772               (pm->conf_type==PROXY_CONF_TYPE_STATIC ? "static" : "dynamic"));
773    }
774
775    if (pscip->do_behaviors)  {
776        proxy_stats_dump_behavior(add_stats, c, "proxy_main:behavior:",
777                                  &pm->behavior, 2);
778    }
779
780    if (pscip->do_stats) {
781        const char *prefix = "proxy_main:stats:";
782        APPEND_PREFIX_STAT("stat_configs",
783                    "%"PRIu64, (uint64_t) pm->stat_configs);
784        APPEND_PREFIX_STAT("stat_config_fails",
785                    "%"PRIu64, (uint64_t) pm->stat_config_fails);
786        APPEND_PREFIX_STAT("stat_proxy_starts",
787                    "%"PRIu64, (uint64_t) pm->stat_proxy_starts);
788        APPEND_PREFIX_STAT("stat_proxy_start_fails",
789                    "%"PRIu64, (uint64_t) pm->stat_proxy_start_fails);
790        APPEND_PREFIX_STAT("stat_proxy_existings",
791                    "%"PRIu64, (uint64_t) pm->stat_proxy_existings);
792        APPEND_PREFIX_STAT("stat_proxy_shutdowns",
793                    "%"PRIu64, (uint64_t) pm->stat_proxy_shutdowns);
794    }
795}
796
797void xpassword(char *p) {
798    /* X out passwords in input string. */
799    /* Example: ..."nodeLocator": "vbucket", "saslPassword": "test", "nodes":... */
800    /* Becomes: ..."nodeLocator": "vbucket", "saslPassword": "XXXX", "nodes":... */
801    while (true) {
802        p = strstr(p, "assword\"");
803        if (p == NULL) {
804            return;
805        }
806
807        p = p + 8;
808        p = strchr(p, '"');
809        if (p != NULL) {
810            p = p + 1;
811            while (p != NULL && *p != '\0' && *p != '"') {
812                *p = 'X';
813                p = p + 1;
814            }
815        }
816    }
817}
818
819void proxy_stats_dump_proxies(ADD_STAT add_stats, conn *c,
820                              struct proxy_stats_cmd_info *pscip) {
821    proxy_td *ptd;
822    proxy_main *pm;
823    char prefix[200];
824    proxy *p;
825
826    cb_assert(c != NULL);
827
828    ptd = c->extra;
829    if (ptd == NULL ||
830        ptd->proxy == NULL ||
831        ptd->proxy->main == NULL) {
832        return;
833    }
834
835    pm = ptd->proxy->main;
836
837    if (cb_mutex_try_enter(&pm->proxy_main_lock) != 0) {
838        /* Do not dump proxy stats
839         * if dynamic reconfiguration is currently executing by other thread.
840         */
841        return;
842    }
843
844    for (p = pm->proxy_head; p != NULL; p = p->next) {
845        bool go = true;
846        cb_mutex_enter(&p->proxy_lock);
847
848        if (p->name != NULL &&
849            strcmp(p->name, NULL_BUCKET) != 0 &&
850            p->config != NULL) {
851            if (pscip->do_info) {
852                char *buf;
853
854                snprintf(prefix, sizeof(prefix), "%u:%s:info:", p->port, p->name);
855                APPEND_PREFIX_STAT("port",          "%u", p->port);
856                APPEND_PREFIX_STAT("name",          "%s", p->name);
857
858                buf = trimstrdup(p->config);
859                if (buf != NULL) {
860                    /* Remove embedded newlines, as config might be a JSON string. */
861                    char *slow = buf;
862                    char *fast;
863                    for (fast = buf; *fast != '\0'; fast++) {
864                        *slow = *fast;
865                        if (*slow != '\n' && *slow != '\r') {
866                            slow++;
867                        }
868                    }
869                    *slow = '\0';
870
871                    xpassword(buf);
872
873                    APPEND_PREFIX_STAT("config", "%s", buf);
874
875                    free(buf);
876                }
877
878                APPEND_PREFIX_STAT("config_ver",    "%u", p->config_ver);
879                APPEND_PREFIX_STAT("behaviors_num", "%u", p->behavior_pool.num);
880            }
881
882            if (pscip->do_behaviors) {
883                int i;
884
885                snprintf(prefix, sizeof(prefix), "%u:%s:behavior:",
886                         p->port, p->name);
887                proxy_stats_dump_behavior(add_stats, c, prefix,
888                                          &p->behavior_pool.base, 1);
889
890                for (i = 0; i < p->behavior_pool.num; i++) {
891                    snprintf(prefix, sizeof(prefix), "%u:%s:behavior-%u:",
892                             p->port, p->name, i);
893                    proxy_stats_dump_behavior(add_stats, c, prefix,
894                                              &p->behavior_pool.arr[i], 0);
895                }
896            }
897
898            if (pscip->do_stats) {
899                snprintf(prefix, sizeof(prefix), "%u:%s:stats:", p->port, p->name);
900                APPEND_PREFIX_STAT("listening", "%"PRIu64,
901                                   (uint64_t) p->listening);
902                APPEND_PREFIX_STAT("listening_failed", "%"PRIu64,
903                                   (uint64_t) p->listening_failed);
904            }
905        } else {
906            go = false;
907        }
908
909        cb_mutex_exit(&p->proxy_lock);
910
911        if (go == false) {
912            continue;
913        }
914
915        if (pscip->do_frontcache) {
916            snprintf(prefix, sizeof(prefix), "%u:%s:frontcache:",
917                     p->port, p->name);
918            proxy_stats_dump_frontcache(add_stats, c, prefix, p);
919        }
920
921        if (pscip->do_stats) {
922            proxy_stats_td *pstd = calloc(1, sizeof(proxy_stats_td));
923            if (pstd != NULL) {
924                int i;
925                cb_mutex_enter(&p->proxy_lock);
926                for (i = 1; i < pm->nthreads; i++) {
927                    proxy_td *thread_ptd = &p->thread_data[i];
928                    if (thread_ptd != NULL) {
929                        add_proxy_stats_td(pstd, &thread_ptd->stats);
930                    }
931                }
932                cb_mutex_exit(&p->proxy_lock);
933
934                snprintf(prefix, sizeof(prefix), "%u:%s:pstd_stats:",
935                         p->port, p->name);
936                proxy_stats_dump_pstd_stats(add_stats, c, prefix, &pstd->stats);
937                snprintf(prefix, sizeof(prefix), "%u:%s:pstd_stats_cmd:",
938                         p->port, p->name);
939                proxy_stats_dump_stats_cmd(add_stats, c, pscip->do_zeros, prefix,
940                                           pstd->stats_cmd);
941
942                free(pstd);
943            }
944        }
945
946        if (pscip->do_keystats) {
947            /* TODO: Test is needed */
948            genhash_t *key_stats_map = NULL;
949
950            if (key_stats_map != NULL) {
951                struct key_stats_dump_state state;
952                int i;
953
954                memset(&state, 0, sizeof(state));
955
956                cb_mutex_enter(&p->proxy_lock);
957                for (i = 1; i < pm->nthreads; i++) {
958                     proxy_td *thread_ptd = &p->thread_data[i];
959                     if (ptd != NULL) {
960                         add_raw_key_stats(key_stats_map, &thread_ptd->key_stats);
961                     }
962                }
963                cb_mutex_exit(&p->proxy_lock);
964
965                snprintf(prefix, sizeof(prefix), "%u:%s:key_stats:",
966                         p->port, p->name);
967                state.prefix = prefix;
968                state.add_stats = add_stats;
969                state.conn = c;
970                state.pscip = pscip;
971                genhash_iter(key_stats_map, map_key_stats_foreach_dump, &state);
972                genhash_free(key_stats_map);
973            }
974        }
975    }
976
977    cb_mutex_exit(&pm->proxy_main_lock);
978}
979
980/* Must be invoked on the main listener thread.
981 *
982 * Puts stats gathering work on every worker thread's work_queue.
983 */
984static void main_stats_collect(void *data0, void *data1) {
985    struct main_stats_collect_info *msci = data0;
986    proxy_main *m;
987    work_collect *ca;
988    struct main_stats_collect_info ase;
989    int sent = 0;
990    int nproxy = 0;
991    char bufk[200];
992    char bufv[4000];
993    int i;
994    proxy *p;
995
996    cb_assert(msci);
997    cb_assert(msci->result);
998
999    m = msci->m;
1000    cb_assert(m);
1001    cb_assert(m->nthreads > 1);
1002
1003    ca = data1;
1004    cb_assert(ca);
1005
1006    cb_assert(is_listen_thread());
1007
1008    ase = *msci;
1009    ase.prefix = "";
1010
1011    cb_mutex_enter(&m->proxy_main_lock);
1012
1013    for (p = m->proxy_head; p != NULL; p = p->next) {
1014        nproxy++;
1015
1016#define emit_s(key, val)                               \
1017        snprintf(bufk, sizeof(bufk), "%u:%s:%s",       \
1018                 p->port,                              \
1019                 p->name != NULL ? p->name : "", key); \
1020        conflate_add_field(msci->result, bufk, val);
1021
1022#define emit_f(key, fmtv, val)                   \
1023        snprintf(bufv, sizeof(bufv), fmtv, val); \
1024        emit_s(key, bufv);
1025
1026        cb_mutex_enter(&p->proxy_lock);
1027
1028        emit_f("port",          "%u", p->port);
1029        emit_s("name",                p->name);
1030        emit_s("config",              p->config);
1031        emit_f("config_ver",    "%u", p->config_ver);
1032        emit_f("behaviors_num", "%u", p->behavior_pool.num);
1033
1034        if (msci->do_settings) {
1035            snprintf(bufk, sizeof(bufk),
1036                     "%u:%s:behavior", p->port, p->name);
1037
1038            cproxy_dump_behavior_ex(&p->behavior_pool.base, bufk, 1,
1039                                    add_stat_prefix, &ase);
1040
1041            for (i = 0; i < p->behavior_pool.num; i++) {
1042                snprintf(bufk, sizeof(bufk),
1043                         "%u:%s:behavior-%u", p->port, p->name, i);
1044
1045                cproxy_dump_behavior_ex(&p->behavior_pool.arr[i], bufk, 0,
1046                                        add_stat_prefix, &ase);
1047            }
1048        }
1049
1050        if (msci->do_stats) {
1051            emit_f("listening",
1052                   "%"PRIu64, (uint64_t) p->listening);
1053            emit_f("listening_failed",
1054                   "%"PRIu64, (uint64_t) p->listening_failed);
1055        }
1056
1057        cb_mutex_exit(&p->proxy_lock);
1058
1059        /* Emit front_cache stats. */
1060
1061        if (msci->do_stats) {
1062            cb_mutex_enter(p->front_cache.lock);
1063
1064            if (p->front_cache.map != NULL) {
1065                emit_f("front_cache_size", "%u", genhash_size(p->front_cache.map));
1066            }
1067
1068            emit_f("front_cache_max",
1069                   "%u", p->front_cache.max);
1070            emit_f("front_cache_oldest_live",
1071                   "%u", p->front_cache.oldest_live);
1072
1073            emit_f("front_cache_tot_get_hits",
1074                   "%"PRIu64,
1075                   (uint64_t) p->front_cache.tot_get_hits);
1076            emit_f("front_cache_tot_get_expires",
1077                   "%"PRIu64,
1078                   (uint64_t) p->front_cache.tot_get_expires);
1079            emit_f("front_cache_tot_get_misses",
1080                   "%"PRIu64,
1081                   (uint64_t) p->front_cache.tot_get_misses);
1082            emit_f("front_cache_tot_get_bytes",
1083                   "%"PRIu64,
1084                   (uint64_t) p->front_cache.tot_get_bytes);
1085            emit_f("front_cache_tot_adds",
1086                   "%"PRIu64,
1087                   (uint64_t) p->front_cache.tot_adds);
1088            emit_f("front_cache_tot_add_skips",
1089                   "%"PRIu64,
1090                   (uint64_t) p->front_cache.tot_add_skips);
1091            emit_f("front_cache_tot_add_fails",
1092                   "%"PRIu64,
1093                   (uint64_t) p->front_cache.tot_add_fails);
1094            emit_f("front_cache_tot_add_bytes",
1095                   "%"PRIu64,
1096                   (uint64_t) p->front_cache.tot_add_bytes);
1097            emit_f("front_cache_tot_deletes",
1098                   "%"PRIu64,
1099                   (uint64_t) p->front_cache.tot_deletes);
1100            emit_f("front_cache_tot_evictions",
1101                   "%"PRIu64,
1102                   (uint64_t) p->front_cache.tot_evictions);
1103
1104            cb_mutex_exit(p->front_cache.lock);
1105        }
1106    }
1107
1108    cb_mutex_exit(&m->proxy_main_lock);
1109
1110    /* Starting at 1 because 0 is the main listen thread. */
1111
1112    for (i = 1; i < m->nthreads; i++) {
1113        work_collect *c = &ca[i];
1114
1115        work_collect_count(c, nproxy);
1116
1117        if (nproxy > 0) {
1118            LIBEVENT_THREAD *t = thread_by_index(i);
1119            cb_assert(t);
1120            cb_assert(t->work_queue);
1121
1122            cb_mutex_enter(&m->proxy_main_lock);
1123
1124            for (p = m->proxy_head; p != NULL; p = p->next) {
1125                proxy_td *ptd = &p->thread_data[i];
1126                if (ptd != NULL &&
1127                    work_send(t->work_queue, work_stats_collect, ptd, c)) {
1128                    sent++;
1129                }
1130            }
1131
1132            cb_mutex_exit(&m->proxy_main_lock);
1133        }
1134    }
1135
1136    {
1137        struct main_stats_proxy_info *infos =
1138            calloc(nproxy, sizeof(struct main_stats_proxy_info));
1139
1140        cb_mutex_enter(&m->proxy_main_lock);
1141
1142        p = m->proxy_head;
1143        for (i = 0; i < nproxy; i++, p = p->next) {
1144            if (p == NULL) {
1145                break;
1146            }
1147
1148            cb_mutex_enter(&p->proxy_lock);
1149            infos[i].name = p->name != NULL ? strdup(p->name) : NULL;
1150            infos[i].port = p->port;
1151            cb_mutex_exit(&p->proxy_lock);
1152        }
1153
1154        cb_mutex_exit(&m->proxy_main_lock);
1155
1156        msci->proxies = infos;
1157        msci->nproxy = nproxy;
1158    }
1159
1160    /* Normally, no need to wait for the worker threads to finish, */
1161    /* as the workers will signal using work_collect_one(). */
1162
1163    /* TODO: If sent is too small, then some proxies were disabled? */
1164    /*       Need to decrement count? */
1165
1166    /* TODO: Might want to block here until worker threads are done, */
1167    /*       so that concurrent reconfigs don't cause issues. */
1168
1169    /* In the case when config/config_ver changes might already */
1170    /* be inflight, as long as they're not removing proxies, */
1171    /* we're ok.  New proxies that happen afterwards are fine, too. */
1172}
1173
1174static void work_stats_collect(void *data0, void *data1) {
1175    proxy_td *ptd = data0;
1176    proxy *p;
1177    work_collect *c;
1178    struct stats_gathering_pair *pair;
1179    genhash_t *map_pstd;
1180    bool locked = true;
1181
1182    cb_assert(ptd);
1183
1184    p = ptd->proxy;
1185    cb_assert(p);
1186
1187    c = data1;
1188    cb_assert(c);
1189
1190    cb_assert(is_listen_thread() == false); /* Expecting a worker thread. */
1191
1192    pair = c->data;
1193    map_pstd = pair->map_pstd;
1194    cb_assert(map_pstd != NULL);
1195
1196    cb_mutex_enter(&p->proxy_lock);
1197
1198    if (p->name != NULL) {
1199        int   key_len = (int)strlen(p->name) + 50;
1200        char *key_buf = malloc(key_len);
1201        if (key_buf != NULL) {
1202            proxy_stats_td *pstd;
1203            genhash_t *key_stats_map;
1204
1205            snprintf(key_buf, key_len, "%d:%s", p->port, p->name);
1206
1207            cb_mutex_exit(&p->proxy_lock);
1208            locked = false;
1209
1210            pstd = genhash_find(map_pstd, key_buf);
1211            if (pstd == NULL) {
1212                pstd = calloc(1, sizeof(proxy_stats_td));
1213                if (pstd != NULL) {
1214                    char *key = strdup(key_buf);
1215                    if (key == NULL) {
1216                        free(pstd);
1217                        pstd = NULL;
1218                    } else {
1219                        genhash_update(map_pstd, key, pstd);
1220                    }
1221                }
1222            }
1223
1224            if (pstd != NULL) {
1225                add_proxy_stats_td(pstd, &ptd->stats);
1226            }
1227
1228            key_stats_map = genhash_find(pair->map_key_stats, key_buf);
1229            if (key_stats_map == NULL) {
1230                key_stats_map = genhash_init(16, strhash_ops);
1231                if (key_stats_map != NULL) {
1232                    char *key = strdup(key_buf);
1233                    if (key == NULL) {
1234                        genhash_free(key_stats_map);
1235                        key_stats_map = NULL;
1236                    } else {
1237                        genhash_update(pair->map_key_stats, key, key_stats_map);
1238                    }
1239                }
1240            }
1241
1242            if (key_stats_map != NULL) {
1243                add_raw_key_stats(key_stats_map, &ptd->key_stats);
1244            }
1245
1246            free(key_buf);
1247        }
1248    }
1249
1250    if (locked) {
1251        cb_mutex_exit(&p->proxy_lock);
1252    }
1253
1254    work_collect_one(c);
1255}
1256
1257static void add_proxy_stats_td(proxy_stats_td *agg, proxy_stats_td *x) {
1258    int j;
1259
1260    cb_assert(agg);
1261    cb_assert(x);
1262
1263    add_proxy_stats(&agg->stats, &x->stats);
1264
1265    for (j = 0; j < STATS_CMD_TYPE_last; j++) {
1266        int k;
1267        for (k = 0; k < STATS_CMD_last; k++) {
1268            add_stats_cmd(&agg->stats_cmd[j][k],
1269                          &x->stats_cmd[j][k]);
1270        }
1271    }
1272}
1273
1274static void add_proxy_stats(proxy_stats *agg, proxy_stats *x) {
1275    cb_assert(agg);
1276    cb_assert(x);
1277
1278    agg->num_upstream += x->num_upstream;
1279    agg->tot_upstream += x->tot_upstream;
1280
1281    agg->num_downstream_conn += x->num_downstream_conn;
1282    agg->tot_downstream_conn += x->tot_downstream_conn;
1283    agg->tot_downstream_conn_acquired += x->tot_downstream_conn_acquired;
1284    agg->tot_downstream_conn_released += x->tot_downstream_conn_released;
1285    agg->tot_downstream_released += x->tot_downstream_released;
1286    agg->tot_downstream_reserved += x->tot_downstream_reserved;
1287    agg->tot_downstream_reserved_time  += x->tot_downstream_reserved_time;
1288
1289    if (agg->max_downstream_reserved_time < x->max_downstream_reserved_time) {
1290        agg->max_downstream_reserved_time = x->max_downstream_reserved_time;
1291    }
1292
1293    agg->tot_downstream_freed          += x->tot_downstream_freed;
1294    agg->tot_downstream_quit_server    += x->tot_downstream_quit_server;
1295    agg->tot_downstream_max_reached    += x->tot_downstream_max_reached;
1296    agg->tot_downstream_create_failed  += x->tot_downstream_create_failed;
1297    agg->tot_downstream_connect_started += x->tot_downstream_connect_started;
1298    agg->tot_downstream_connect_wait   += x->tot_downstream_connect_wait;
1299    agg->tot_downstream_connect        += x->tot_downstream_connect;
1300    agg->tot_downstream_connect_failed += x->tot_downstream_connect_failed;
1301    agg->tot_downstream_connect_timeout += x->tot_downstream_connect_timeout;
1302    agg->tot_downstream_connect_interval += x->tot_downstream_connect_interval;
1303    agg->tot_downstream_connect_max_reached += x->tot_downstream_connect_max_reached;
1304    agg->tot_downstream_waiting_errors += x->tot_downstream_waiting_errors;
1305    agg->tot_downstream_auth           += x->tot_downstream_auth;
1306    agg->tot_downstream_auth_failed    += x->tot_downstream_auth_failed;
1307    agg->tot_downstream_bucket         += x->tot_downstream_bucket;
1308    agg->tot_downstream_bucket_failed  += x->tot_downstream_bucket_failed;
1309    agg->tot_downstream_propagate_failed +=
1310        x->tot_downstream_propagate_failed;
1311    agg->tot_downstream_close_on_upstream_close +=
1312        x->tot_downstream_close_on_upstream_close;
1313    agg->tot_downstream_conn_queue_timeout +=
1314        x->tot_downstream_conn_queue_timeout;
1315    agg->tot_downstream_conn_queue_add +=
1316        x->tot_downstream_conn_queue_add;
1317    agg->tot_downstream_conn_queue_remove +=
1318        x->tot_downstream_conn_queue_remove;
1319    agg->tot_downstream_timeout   += x->tot_downstream_timeout;
1320    agg->tot_wait_queue_timeout   += x->tot_wait_queue_timeout;
1321    agg->tot_auth_timeout         += x->tot_auth_timeout;
1322    agg->tot_assign_downstream    += x->tot_assign_downstream;
1323    agg->tot_assign_upstream      += x->tot_assign_upstream;
1324    agg->tot_assign_recursion     += x->tot_assign_recursion;
1325    agg->tot_reset_upstream_avail += x->tot_reset_upstream_avail;
1326    agg->tot_multiget_keys        += x->tot_multiget_keys;
1327    agg->tot_multiget_keys_dedupe += x->tot_multiget_keys_dedupe;
1328    agg->tot_multiget_bytes_dedupe += x->tot_multiget_bytes_dedupe;
1329    agg->tot_optimize_sets        += x->tot_optimize_sets;
1330    agg->tot_retry                += x->tot_retry;
1331    agg->tot_retry_time           += x->tot_retry_time;
1332
1333    if (agg->max_retry_time < x->max_retry_time) {
1334        agg->max_retry_time = x->max_retry_time;
1335    }
1336
1337    agg->tot_retry_vbucket        += x->tot_retry_vbucket;
1338    agg->tot_upstream_paused      += x->tot_upstream_paused;
1339    agg->tot_upstream_unpaused    += x->tot_upstream_unpaused;
1340    agg->err_oom                  += x->err_oom;
1341    agg->err_upstream_write_prep   += x->err_upstream_write_prep;
1342    agg->err_downstream_write_prep += x->err_downstream_write_prep;
1343
1344    agg->tot_cmd_time             += x->tot_cmd_time;
1345    agg->tot_cmd_count            += x->tot_cmd_count;
1346    agg->tot_local_cmd_time       += x->tot_local_cmd_time;
1347    agg->tot_local_cmd_count      += x->tot_local_cmd_count;
1348}
1349
1350static void add_stats_cmd(proxy_stats_cmd *agg,
1351                          proxy_stats_cmd *x) {
1352    cb_assert(agg);
1353    cb_assert(x);
1354
1355    agg->seen        += x->seen;
1356    agg->hits        += x->hits;
1357    agg->misses      += x->misses;
1358    agg->read_bytes  += x->read_bytes;
1359    agg->write_bytes += x->write_bytes;
1360    agg->cas         += x->cas;
1361}
1362
1363static void add_stats_cmd_with_rescale(proxy_stats_cmd *agg,
1364                                       const proxy_stats_cmd *x,
1365                                       float rescale_agg,
1366                                       float rescale_x) {
1367    cb_assert(agg);
1368    cb_assert(x);
1369
1370#define AGG(field) do {agg->field = llrintf(agg->field * rescale_agg + x->field * rescale_x);} while (0)
1371
1372    AGG(seen);
1373    AGG(hits);
1374    AGG(misses);
1375    AGG(read_bytes);
1376    AGG(write_bytes);
1377    AGG(cas);
1378
1379#undef AGG
1380}
1381
1382static void add_key_stats_inner(const void *data, void *userdata) {
1383    genhash_t *key_stats_map = userdata;
1384    const struct key_stats *kstats = data;
1385    struct key_stats *dest_stats = genhash_find(key_stats_map, kstats->key);
1386    int j;
1387    uint64_t current_time_msec;
1388    float rescale_factor_dest = 1.0, rescale_factor_src = 1.0;
1389
1390    if (dest_stats == NULL) {
1391        dest_stats = calloc(1, sizeof(struct key_stats));
1392        if (dest_stats != NULL) {
1393            *dest_stats = *kstats;
1394        }
1395        genhash_update(key_stats_map, strdup(kstats->key), dest_stats);
1396        return;
1397    }
1398
1399    current_time_msec = msec_current_time;
1400    if (dest_stats->added_at < kstats->added_at) {
1401        rescale_factor_src = (float)(current_time_msec - dest_stats->added_at)/(current_time_msec - kstats->added_at);
1402    } else {
1403        rescale_factor_dest = (float)(current_time_msec - kstats->added_at)/(current_time_msec - dest_stats->added_at);
1404        dest_stats->added_at = kstats->added_at;
1405    }
1406
1407    cb_assert(rescale_factor_dest >= 1.0);
1408    cb_assert(rescale_factor_src >= 1.0);
1409
1410    for (j = 0; j < STATS_CMD_TYPE_last; j++) {
1411        int k;
1412        for (k = 0; k < STATS_CMD_last; k++) {
1413            add_stats_cmd_with_rescale(&(dest_stats->stats_cmd[j][k]),
1414                                       &(kstats->stats_cmd[j][k]),
1415                                       rescale_factor_dest,
1416                                       rescale_factor_src);
1417        }
1418    }
1419}
1420
1421static void add_raw_key_stats(genhash_t *key_stats_map,
1422                              mcache *kstats) {
1423    cb_assert(key_stats_map);
1424    cb_assert(kstats);
1425
1426    mcache_foreach(kstats, add_key_stats_inner, key_stats_map);
1427}
1428
1429static void add_processed_key_stats_inner(const void *key, const void* val, void *arg) {
1430    (void)key;
1431    add_key_stats_inner(val, arg);
1432}
1433
1434static void add_processed_key_stats(genhash_t *dest_map,
1435                                    genhash_t *src_map) {
1436    cb_assert(dest_map);
1437    cb_assert(src_map);
1438
1439    genhash_iter(src_map, add_processed_key_stats_inner, dest_map);
1440}
1441
1442void genhash_free_entry(const void *key,
1443                        const void *value,
1444                        void *user_data) {
1445    (void)user_data;
1446    cb_assert(key != NULL);
1447    free((void*)key);
1448
1449    cb_assert(value != NULL);
1450    free((void*)value);
1451}
1452
1453static void emit_proxy_stats_cmd(conflate_form_result *result,
1454                                 const char *prefix,
1455                                 const char *format,
1456                                 proxy_stats_cmd stats_cmd[][STATS_CMD_last]) {
1457    size_t prefix_len = strlen(prefix);
1458    const size_t bufsize = 200;
1459    char *buf_key = malloc(bufsize + prefix_len);
1460    char buf_val[100];
1461    int j;
1462    char *buf = buf_key+prefix_len;
1463    cb_assert(buf_key != NULL);
1464    memcpy(buf_key, prefix, prefix_len);
1465
1466
1467#define more_cmd_stat(type, cmd, key, val)                       \
1468    if (val != 0) {                                              \
1469        snprintf(buf, bufsize,                                   \
1470                 format, type, cmd, key);                        \
1471        snprintf(buf_val, sizeof(buf_val),                       \
1472                 "%"PRIu64, (uint64_t) val);          \
1473        conflate_add_field(result, buf_key, buf_val);      \
1474    }
1475
1476    for (j = 0; j < STATS_CMD_TYPE_last; j++) {
1477        int k;
1478        for (k = 0; k < STATS_CMD_last; k++) {
1479            more_cmd_stat(cmd_type_names[j], cmd_names[k],
1480                          "seen",
1481                          stats_cmd[j][k].seen);
1482            more_cmd_stat(cmd_type_names[j], cmd_names[k],
1483                          "hits",
1484                          stats_cmd[j][k].hits);
1485            more_cmd_stat(cmd_type_names[j], cmd_names[k],
1486                          "misses",
1487                          stats_cmd[j][k].misses);
1488            more_cmd_stat(cmd_type_names[j], cmd_names[k],
1489                          "read_bytes",
1490                          stats_cmd[j][k].read_bytes);
1491            more_cmd_stat(cmd_type_names[j], cmd_names[k],
1492                          "write_bytes",
1493                          stats_cmd[j][k].write_bytes);
1494            more_cmd_stat(cmd_type_names[j], cmd_names[k],
1495                          "cas",
1496                          stats_cmd[j][k].cas);
1497        }
1498    }
1499
1500#undef more_cmd_stat
1501    free(buf_key);
1502}
1503
1504void map_pstd_foreach_emit(const void *k,
1505                           const void *value,
1506                           void *user_data) {
1507    char buf_key[200];
1508    char buf_val[100];
1509    const char *name = (const char*)k;
1510    proxy_stats_td *pstd = (proxy_stats_td *) value;
1511    const struct main_stats_collect_info *emit = user_data;
1512
1513    cb_assert(name != NULL);
1514    cb_assert(pstd != NULL);
1515    cb_assert(emit != NULL);
1516    cb_assert(emit->result);
1517
1518
1519#define more_stat(key, val)                             \
1520    if (emit->do_zeros || val) {                        \
1521        snprintf(buf_key, sizeof(buf_key),              \
1522                 "%s:stats_%s", name, key);             \
1523        snprintf(buf_val, sizeof(buf_val),              \
1524                 "%"PRIu64, (uint64_t) val); \
1525        conflate_add_field(emit->result, buf_key, buf_val); \
1526    }
1527
1528    more_stat("num_upstream",
1529              pstd->stats.num_upstream);
1530    more_stat("tot_upstream",
1531              pstd->stats.tot_upstream);
1532    more_stat("num_downstream_conn",
1533              pstd->stats.num_downstream_conn);
1534    more_stat("tot_downstream_conn",
1535              pstd->stats.tot_downstream_conn);
1536    more_stat("tot_downstream_conn_acquired",
1537              pstd->stats.tot_downstream_conn_acquired);
1538    more_stat("tot_downstream_conn_released",
1539              pstd->stats.tot_downstream_conn_released);
1540    more_stat("tot_downstream_released",
1541              pstd->stats.tot_downstream_released);
1542    more_stat("tot_downstream_reserved",
1543              pstd->stats.tot_downstream_reserved);
1544    more_stat("tot_downstream_reserved_time",
1545              pstd->stats.tot_downstream_reserved_time);
1546    more_stat("max_downstream_reserved_time",
1547              pstd->stats.max_downstream_reserved_time);
1548    more_stat("tot_downstream_freed",
1549              pstd->stats.tot_downstream_freed);
1550    more_stat("tot_downstream_quit_server",
1551              pstd->stats.tot_downstream_quit_server);
1552    more_stat("tot_downstream_max_reached",
1553              pstd->stats.tot_downstream_max_reached);
1554    more_stat("tot_downstream_create_failed",
1555              pstd->stats.tot_downstream_create_failed);
1556    more_stat("tot_downstream_connect_started",
1557              pstd->stats.tot_downstream_connect_started);
1558    more_stat("tot_downstream_connect_wait",
1559              pstd->stats.tot_downstream_connect_wait);
1560    more_stat("tot_downstream_connect",
1561              pstd->stats.tot_downstream_connect);
1562    more_stat("tot_downstream_connect_failed",
1563              pstd->stats.tot_downstream_connect_failed);
1564    more_stat("tot_downstream_connect_timeout",
1565              pstd->stats.tot_downstream_connect_timeout);
1566    more_stat("tot_downstream_connect_interval",
1567              pstd->stats.tot_downstream_connect_interval);
1568    more_stat("tot_downstream_connect_max_reached",
1569              pstd->stats.tot_downstream_connect_max_reached);
1570    more_stat("tot_downstream_waiting_errors",
1571              pstd->stats.tot_downstream_waiting_errors);
1572    more_stat("tot_downstream_auth",
1573              pstd->stats.tot_downstream_auth);
1574    more_stat("tot_downstream_auth_failed",
1575              pstd->stats.tot_downstream_auth_failed);
1576    more_stat("tot_downstream_bucket",
1577              pstd->stats.tot_downstream_bucket);
1578    more_stat("tot_downstream_bucket_failed",
1579              pstd->stats.tot_downstream_bucket_failed);
1580    more_stat("tot_downstream_propagate_failed",
1581              pstd->stats.tot_downstream_propagate_failed);
1582    more_stat("tot_downstream_close_on_upstream_close",
1583              pstd->stats.tot_downstream_close_on_upstream_close);
1584    more_stat("tot_downstream_conn_queue_timeout",
1585              pstd->stats.tot_downstream_conn_queue_timeout);
1586    more_stat("tot_downstream_conn_queue_add",
1587              pstd->stats.tot_downstream_conn_queue_add);
1588    more_stat("tot_downstream_conn_queue_remove",
1589              pstd->stats.tot_downstream_conn_queue_remove);
1590    more_stat("tot_downstream_timeout",
1591              pstd->stats.tot_downstream_timeout);
1592    more_stat("tot_wait_queue_timeout",
1593              pstd->stats.tot_wait_queue_timeout);
1594    more_stat("tot_auth_timeout",
1595              pstd->stats.tot_auth_timeout);
1596    more_stat("tot_assign_downstream",
1597              pstd->stats.tot_assign_downstream);
1598    more_stat("tot_assign_upstream",
1599              pstd->stats.tot_assign_upstream);
1600    more_stat("tot_assign_recursion",
1601              pstd->stats.tot_assign_recursion);
1602    more_stat("tot_reset_upstream_avail",
1603              pstd->stats.tot_reset_upstream_avail);
1604    more_stat("tot_multiget_keys",
1605              pstd->stats.tot_multiget_keys);
1606    more_stat("tot_multiget_keys_dedupe",
1607              pstd->stats.tot_multiget_keys_dedupe);
1608    more_stat("tot_multiget_bytes_dedupe",
1609              pstd->stats.tot_multiget_bytes_dedupe);
1610    more_stat("tot_optimize_sets",
1611              pstd->stats.tot_optimize_sets);
1612    more_stat("tot_retry",
1613              pstd->stats.tot_retry);
1614    more_stat("tot_retry_time",
1615              pstd->stats.tot_retry_time);
1616    more_stat("max_retry_time",
1617              pstd->stats.max_retry_time);
1618    more_stat("tot_retry_vbucket",
1619              pstd->stats.tot_retry_vbucket);
1620    more_stat("tot_upstream_paused",
1621              pstd->stats.tot_upstream_paused);
1622    more_stat("tot_upstream_unpaused",
1623              pstd->stats.tot_upstream_unpaused);
1624    more_stat("err_oom",
1625              pstd->stats.err_oom);
1626    more_stat("err_upstream_write_prep",
1627              pstd->stats.err_upstream_write_prep);
1628    more_stat("err_downstream_write_prep",
1629              pstd->stats.err_downstream_write_prep);
1630
1631    more_stat("tot_cmd_time",
1632              pstd->stats.tot_cmd_time);
1633    more_stat("tot_cmd_count",
1634              pstd->stats.tot_cmd_count);
1635
1636    more_stat("tot_local_cmd_time",
1637              pstd->stats.tot_local_cmd_time);
1638    more_stat("tot_local_cmd_count",
1639              pstd->stats.tot_local_cmd_count);
1640
1641    snprintf(buf_key, sizeof(buf_key), "%s:stats_cmd_", name);
1642    emit_proxy_stats_cmd(emit->result, buf_key, "%s_%s_%s", pstd->stats_cmd);
1643}
1644
1645struct key_stats_emit_state {
1646    const char *name;
1647    const struct main_stats_collect_info *emit;
1648};
1649
1650static void map_key_stats_foreach_emit_inner(const void *_key,
1651                                             const void *value,
1652                                             void *user_data) {
1653    struct key_stats_emit_state *state = user_data;
1654    const char *key = _key;
1655    struct key_stats *kstats = (struct key_stats *) value;
1656    char buf[200+KEY_MAX_LENGTH];
1657
1658    cb_assert(strcmp(key, kstats->key) == 0);
1659
1660    snprintf(buf, sizeof(buf), "%s:keys_stats:%s:", state->name, key);
1661    emit_proxy_stats_cmd(state->emit->result, buf, "%s_%s_%s",
1662                         kstats->stats_cmd);
1663
1664    {
1665        char buf_val[32];
1666        snprintf(buf, sizeof(buf), "%s:keys_stats:%s:added_at_msec",
1667                 state->name, key);
1668        snprintf(buf_val, sizeof(buf_val), "%"PRIu64,
1669                 (uint64_t) kstats->added_at);
1670        conflate_add_field(state->emit->result, buf, buf_val);
1671    }
1672}
1673
1674void map_key_stats_foreach_emit(const void *k,
1675                                const void *value,
1676                                void *user_data) {
1677
1678    struct key_stats_emit_state state;
1679    const char *name = (const char *) k;
1680    genhash_t *map_key_stats = (genhash_t *) value;
1681    const struct main_stats_collect_info *emit = user_data;
1682    cb_assert(name != NULL);
1683    cb_assert(map_key_stats != NULL);
1684    cb_assert(emit != NULL);
1685    cb_assert(emit->result);
1686
1687    state.name = name;
1688    state.emit = emit;
1689
1690    genhash_iter(map_key_stats, map_key_stats_foreach_emit_inner, &state);
1691}
1692
1693/* This callback is invoked by conflate on a conflate thread
1694 * when it wants proxy stats.
1695 *
1696 * We use the work_queues to scatter the request across our
1697 * threads, so that normal runtime has fewer locks at the
1698 * cost of infrequent reset complexity.
1699 */
1700enum conflate_mgmt_cb_result on_conflate_reset_stats(void *userdata,
1701                                                     conflate_handle_t *handle,
1702                                                     const char *cmd,
1703                                                     bool direct,
1704                                                     kvpair_t *form,
1705                                                     conflate_form_result *r) {
1706    proxy_main *m = userdata;
1707
1708    (void)handle;
1709    (void)cmd;
1710    (void)direct;
1711    (void)form;
1712    (void)r;
1713
1714    cb_assert(m);
1715    cb_assert(m->nthreads > 1);
1716
1717    proxy_stats_reset(m);
1718
1719    return RV_OK;
1720}
1721
1722void proxy_stats_reset(proxy_main *m) {
1723    LIBEVENT_THREAD *mthread = thread_by_index(0);
1724    cb_assert(mthread);
1725    cb_assert(mthread->work_queue);
1726
1727    work_send(mthread->work_queue, main_stats_reset, m, NULL);
1728}
1729
1730/* Must be invoked on the main listener thread.
1731 *
1732 * Puts stats reset work on every worker thread's work_queue.
1733 */
1734static void main_stats_reset(void *data0, void *data1) {
1735    proxy *p;
1736    proxy_main *m = data0;
1737    int sent   = 0;
1738    int nproxy = 0;
1739
1740    (void) data1;
1741    cb_assert(m);
1742    cb_assert(m->nthreads > 1);
1743
1744    cb_assert(is_listen_thread());
1745
1746    m->stat_configs = 0;
1747    m->stat_config_fails = 0;
1748    m->stat_proxy_starts = 0;
1749    m->stat_proxy_start_fails = 0;
1750    m->stat_proxy_existings = 0;
1751    m->stat_proxy_shutdowns = 0;
1752
1753    cb_mutex_enter(&m->proxy_main_lock);
1754
1755    for (p = m->proxy_head; p != NULL; p = p->next) {
1756        nproxy++;
1757
1758        /* We don't clear p->listening because it's meant to */
1759        /* increase and decrease. */
1760
1761        p->listening_failed = 0;
1762
1763        mcache_reset_stats(&p->front_cache);
1764    }
1765
1766    cb_mutex_exit(&m->proxy_main_lock);
1767
1768    if (nproxy > 0) {
1769        work_collect *ca = calloc(m->nthreads, sizeof(work_collect));
1770        if (ca != NULL) {
1771            /* Starting at 1 because 0 is the main listen thread. */
1772            int i;
1773
1774            for (i = 1; i < m->nthreads; i++) {
1775                work_collect *c = &ca[i];
1776                LIBEVENT_THREAD *t;
1777
1778                work_collect_init(c, nproxy, NULL);
1779
1780                t = thread_by_index(i);
1781                cb_assert(t);
1782                cb_assert(t->work_queue);
1783
1784                cb_mutex_enter(&m->proxy_main_lock);
1785
1786                for (p = m->proxy_head; p != NULL; p = p->next) {
1787                    proxy_td *ptd = &p->thread_data[i];
1788                    if (ptd != NULL &&
1789                        work_send(t->work_queue, work_stats_reset, ptd, c)) {
1790                        sent++;
1791                    }
1792                }
1793
1794                cb_mutex_exit(&m->proxy_main_lock);
1795            }
1796
1797            /* Wait for all resets to finish. */
1798
1799            for (i = 1; i < m->nthreads; i++) {
1800                work_collect_wait(&ca[i]);
1801            }
1802
1803            free(ca);
1804        }
1805    }
1806
1807    /* TODO: If sent is too small, then some proxies were disabled? */
1808    /*       Need to decrement count? */
1809
1810    /* TODO: Might want to block here until worker threads are done, */
1811    /*       so that concurrent reconfigs don't cause issues. */
1812
1813    /* In the case when config/config_ver changes might already */
1814    /* be inflight, as long as they're not removing proxies, */
1815    /* we're ok.  New proxies that happen afterwards are fine, too. */
1816}
1817
1818static void work_stats_reset(void *data0, void *data1) {
1819    proxy_td *ptd = data0;
1820    work_collect *c = data1;
1821    cb_assert(ptd);
1822    cb_assert(c);
1823
1824    cb_assert(is_listen_thread() == false); /* Expecting a worker thread. */
1825
1826    cproxy_reset_stats_td(&ptd->stats);
1827
1828    mcache_flush_all(&ptd->key_stats, 0);
1829
1830    if (ptd->stats.downstream_reserved_time_htgram != NULL) {
1831        htgram_reset(ptd->stats.downstream_reserved_time_htgram);
1832    }
1833
1834    if (ptd->stats.downstream_connect_time_htgram != NULL) {
1835        htgram_reset(ptd->stats.downstream_connect_time_htgram);
1836    }
1837
1838    work_collect_one(c);
1839}
1840
1841static void add_stat_prefix(const void *dump_opaque,
1842                            const char *prefix,
1843                            const char *key,
1844                            const char *val) {
1845    char buf[2000];
1846    const struct main_stats_collect_info *ase = dump_opaque;
1847    cb_assert(ase);
1848
1849    snprintf(buf, sizeof(buf), "%s_%s", prefix, key);
1850
1851    conflate_add_field(ase->result, buf, val);
1852}
1853
1854static void add_stat_prefix_ase(const char *key, const uint16_t klen,
1855                                const char *val, const uint32_t vlen,
1856                                const void *cookie) {
1857    const struct main_stats_collect_info *ase = cookie;
1858    cb_assert(ase);
1859
1860    add_stat_prefix(cookie, ase->prefix, key, val);
1861    (void)klen;
1862    (void)vlen;
1863}
1864
1865struct htgram_dump_callback_data {
1866    ADD_STAT add_stats;
1867    char *prefix;
1868    conn *conn;
1869};
1870
1871static void htgram_dump_callback(HTGRAM_HANDLE h, const char *dump_line, void *cbdata) {
1872    ADD_STAT add_stats = ((struct htgram_dump_callback_data *) cbdata)->add_stats;
1873    char *prefix       = ((struct htgram_dump_callback_data *) cbdata)->prefix;
1874    conn *c            = ((struct htgram_dump_callback_data *) cbdata)->conn;
1875
1876    APPEND_STAT(prefix, "%s", dump_line);
1877    (void) h;
1878}
1879
1880void proxy_stats_dump_timings(ADD_STAT add_stats, conn *c) {
1881    char prefix[200];
1882    proxy_td *ptd;
1883    proxy_main *pm;
1884    proxy *p;
1885
1886    cb_assert(c != NULL);
1887
1888    ptd = c->extra;
1889    if (ptd == NULL ||
1890        ptd->proxy == NULL ||
1891        ptd->proxy->main == NULL) {
1892        return;
1893    }
1894
1895    pm = ptd->proxy->main;
1896
1897    if (cb_mutex_try_enter(&pm->proxy_main_lock) != 0) {
1898        return;
1899    }
1900
1901    for (p = pm->proxy_head; p != NULL; p = p->next) {
1902        HTGRAM_HANDLE hreserved = cproxy_create_timing_histogram();
1903        HTGRAM_HANDLE hconnect = cproxy_create_timing_histogram();
1904        if (hreserved != NULL && hconnect != NULL) {
1905            struct htgram_dump_callback_data cbdata;
1906            int i;
1907
1908            cb_mutex_enter(&p->proxy_lock);
1909            for (i = 1; i < pm->nthreads; i++) {
1910                proxy_td *thread_ptd = &p->thread_data[i];
1911                if (thread_ptd != NULL &&
1912                    thread_ptd->stats.downstream_reserved_time_htgram != NULL) {
1913                    htgram_add(hreserved, thread_ptd->stats.downstream_reserved_time_htgram);
1914                    htgram_add(hconnect, thread_ptd->stats.downstream_connect_time_htgram);
1915                }
1916            }
1917            cb_mutex_exit(&p->proxy_lock);
1918
1919            cbdata.add_stats = add_stats;
1920            cbdata.prefix    = prefix;
1921            cbdata.conn      = c;
1922
1923            snprintf(prefix, sizeof(prefix), "%u:%s:connect", p->port, p->name);
1924            htgram_dump(hconnect, htgram_dump_callback, &cbdata);
1925
1926            snprintf(prefix, sizeof(prefix), "%u:%s:reserved", p->port, p->name);
1927            htgram_dump(hreserved, htgram_dump_callback, &cbdata);
1928        }
1929
1930        if (hreserved != NULL) {
1931            htgram_destroy(hreserved);
1932        }
1933
1934        if (hconnect != NULL) {
1935            htgram_destroy(hconnect);
1936        }
1937    }
1938
1939    cb_mutex_exit(&pm->proxy_main_lock);
1940}
1941
1942void proxy_stats_dump_config(ADD_STAT add_stats, conn *c) {
1943    char prefix[200];
1944    proxy_td *ptd;
1945    proxy_main *pm;
1946    proxy *p;
1947
1948    cb_assert(c != NULL);
1949
1950    ptd = c->extra;
1951    if (ptd == NULL ||
1952        ptd->proxy == NULL ||
1953        ptd->proxy->main == NULL) {
1954        return;
1955    }
1956
1957    pm = ptd->proxy->main;
1958
1959    if (cb_mutex_try_enter(&pm->proxy_main_lock) != 0) {
1960        return;
1961    }
1962
1963    for (p = pm->proxy_head; p != NULL; p = p->next) {
1964        cb_mutex_enter(&p->proxy_lock);
1965
1966        if (p->name != NULL &&
1967            p->config != NULL) {
1968            snprintf(prefix, sizeof(prefix), "%u:%s:config", p->port, p->name);
1969
1970            add_stats(prefix, (uint16_t)strlen(prefix), p->config, (uint32_t)strlen(p->config), c);
1971        }
1972
1973        cb_mutex_exit(&p->proxy_lock);
1974    }
1975
1976    cb_mutex_exit(&pm->proxy_main_lock);
1977}
1978