xref: /5.5.2/moxi/src/cproxy_config.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <errno.h>
7#include <ctype.h>
8#include <unistd.h>
9#include <platform/cbassert.h>
10#include <math.h>
11#include "memcached.h"
12#include "cproxy.h"
13#include "work.h"
14#include "log.h"
15
16/* Local declarations. */
17
18static char *readfile(char *path);
19
20volatile uint64_t  msec_current_time = 0;
21int                msec_cycle = 0;
22struct event       msec_clockevent;
23struct event_base *msec_clockevent_base = NULL;
24
25char cproxy_hostname[300] = {0}; /* Immutable after init. */
26
27static void msec_clock_handler(evutil_socket_t fd, short which, void *arg);
28void msec_set_current_time(void);
29
30int cproxy_init_string(char *cfg_str,
31                       proxy_behavior behavior,
32                       int nthreads);
33
34int cproxy_init_agent(char *cfg_str,
35                      proxy_behavior behavior,
36                      int nthreads);
37
38int cproxy_init_mcmux_mode(int proxy_port,
39                           proxy_behavior behavior,
40                           int nthreads);
41
42proxy_behavior behavior_default_g = {
43    .cycle = 200, /* Clock cycle or quantum, in milliseconds. */
44    .downstream_max = 1024,
45    .downstream_conn_max = 4, /* Use 0 for unlimited. */
46    .downstream_weight = 0,
47    .downstream_retry = 1,
48    .downstream_protocol = proxy_downstream_ascii_prot,
49    .downstream_timeout = {
50        .tv_sec  = 5,
51        .tv_usec = 0
52    },
53    .downstream_conn_queue_timeout = {
54        .tv_sec  = 0,
55        .tv_usec = 200000
56    },
57    .wait_queue_timeout = {
58        .tv_sec  = 0,
59        .tv_usec = 200000
60    },
61    .connect_timeout = {
62        .tv_sec  = 0,
63        .tv_usec = 400000
64    },
65    .auth_timeout = {
66        .tv_sec  = 0,
67        .tv_usec = 100000
68    },
69    .time_stats = false,
70    .mcs_opts = {0},
71    .connect_max_errors = 5,         /* In zstored, 10. */
72    .connect_retry_interval = 30000, /* In zstored, 30000. */
73    .front_cache_max = 200,
74    .front_cache_lifespan = 0,
75    .front_cache_spec = {0},
76    .front_cache_unspec = {0},
77    .key_stats_max = 4000,
78    .key_stats_lifespan = 0,
79    .key_stats_spec = {0},
80    .key_stats_unspec = {0},
81    .optimize_set = {0},
82    .host = {0},
83    .port = 0,
84    .bucket = {0},
85    .nodeLocator = {0},
86    .usr = {0},
87    .pwd = {0},
88    .port_listen = MOXI_DEFAULT_LISTEN_PORT,
89    .default_bucket_name = FIRST_BUCKET
90};
91
92/** Length of key that may be zero or space terminated.
93 */
94size_t skey_len(const char *key) {
95    char *x;
96    cb_assert(key);
97
98    x = (char *) key;
99    while (*x != ' ' && *x != '\0')
100        x++;
101
102    return x - key;
103}
104
105/** Hash of key that may be zero or space terminated.
106 */
107int skey_hash(const void *v) {
108    const char *key;
109    size_t len;
110
111    cb_assert(v);
112
113    key = v;
114    len = skey_len(key);
115
116    return murmur_hash(key, len);
117}
118
119/** Returns true if two keys are equal, where the
120 *  keys may be zero or space terminated.
121 */
122int skey_equal(const void *v1, const void *v2) {
123    const char *k1;
124    const char *k2;
125    size_t n1;
126    size_t n2;
127
128    cb_assert(v1);
129    cb_assert(v2);
130
131    k1 = v1;
132    k2 = v2;
133    n1 = skey_len(k1);
134    n2 = skey_len(k2);
135
136    return (n1 == n2 && strncmp(k1, k2, n1) == 0);
137}
138
139static void *noop_dup(const void *v)
140{
141    return (void*)v;
142}
143
144void noop_free(void *v) {
145    (void)v;
146    /* Nothing */
147}
148
149static int
150str_eq(const void* p1, const void*p2)
151{
152    char *str1=(char *)p1;
153    char *str2=(char *)p2;
154    cb_assert(str1 != NULL);
155    cb_assert(str2 != NULL);
156    return strcmp(str1, str2) == 0;
157}
158
159struct hash_ops strhash_ops = {
160   .hashfunc = genhash_string_hash,
161   .hasheq = str_eq,
162   .dupKey = noop_dup,
163   .dupValue = noop_dup,
164   .freeKey = noop_free,
165   .freeValue = noop_free
166};
167
168struct hash_ops skeyhash_ops = {
169   .hashfunc = skey_hash,
170   .hasheq = skey_equal,
171   .dupKey = noop_dup,
172   .dupValue = noop_dup,
173   .freeKey = noop_free,
174   .freeValue = noop_free
175};
176
177/** Returns pointer to first non-space char in string.
178 */
179char *skipspace(char *s) {
180    if (s == NULL) {
181        return NULL;
182    }
183
184    while (isspace(*s) && *s != '\0') {
185        s++;
186    }
187
188    return s;
189}
190
191/** Modifies string by zero'ing out any trailing spaces.
192 */
193char *trailspace(char *s) {
194    char *e;
195
196    if (s == NULL) {
197        return NULL;
198    }
199
200    for (e = s + strlen(s) - 1; e >= s && isspace(*e); e--) {
201        *e = '\0';
202    }
203
204    return s;
205}
206
207/** Modifies string by removing prefix and suffix whitespace chars.
208 *  Returns the pointer into the string that you should use.
209 */
210char *trimstr(char *s) {
211    return trailspace(skipspace(s));
212}
213
214/** Like strdup(), but trims spaces from start and end.
215 *  Unlike with trimstr(strdup(s)), you can call free(trimstrdup(s)),
216 *  and the correct memory is free()'ed.
217 */
218char *trimstrdup(char *s) {
219    return trailspace(strdup(skipspace(s)));
220}
221
222/** Returns true if first word in a string equals a given word.
223 */
224bool wordeq(char *s, char *word) {
225    char *end = s;
226
227    cb_assert(s);
228    cb_assert(word);
229
230    while (!isspace(*end) && *end != '\0')
231        end++;
232
233    return strncmp(s, word, end - s) == 0;
234}
235
236/* --------------------------------------- */
237
238static bool cproxy_core_initted = false;
239
240/** The cfg_str may be a path to a file like...
241 *
242 *    /full/path
243 *      or...
244 *    ./relative/path
245 *      or...
246 *    ../another/relative/path
247 *
248 *  But not...
249 *
250 *    incorrect/relative/path
251 *
252 *  Paths are detected by the first character of  '/' or '.'.
253 *
254 *  The contents of the file should be in cfg_str format.
255 */
256int cproxy_init(char *cfg_str,
257                char *behavior_str,
258                int nthreads,
259                struct event_base *main_base) {
260    char *env_usr;
261    char *env_pwd;
262    proxy_behavior behavior;
263
264    cb_assert(nthreads > 1); /* Main + at least one worker. */
265    cb_assert(nthreads == settings.num_threads);
266
267    if (cproxy_core_initted == false) {
268        cproxy_core_initted = true;
269
270        gethostname(cproxy_hostname, sizeof(cproxy_hostname));
271
272        cproxy_init_a2a();
273        cproxy_init_a2b();
274        cproxy_init_b2b();
275    }
276
277    if ((cfg_str == NULL || strlen(cfg_str) <= 0) &&
278        (false == settings.enable_mcmux_mode)) {
279        return 0;
280    }
281
282    if (behavior_str != NULL && (behavior_str[0] == '.' || behavior_str[0] == '/')) {
283        char *buf = readfile(behavior_str);
284        if (buf != NULL) {
285            int rv = cproxy_init(cfg_str, buf, nthreads, main_base);
286            free(buf);
287            return rv;
288        } else {
289            moxi_log_write("ERROR: could not read behavior file: %s\n", behavior_str);
290            if (ml->log_mode != ERRORLOG_STDERR) {
291                fprintf(stderr, "ERROR: could not read behavior file: %s\n", behavior_str);
292            }
293            exit(EXIT_FAILURE);
294        }
295    }
296
297    if ((false == settings.enable_mcmux_mode) &&
298        (cfg_str[0] == '.' || cfg_str[0] == '/')) {
299        char *buf = readfile(cfg_str);
300        if (buf != NULL) {
301            int rv = cproxy_init(buf, behavior_str, nthreads, main_base);
302            free(buf);
303            return rv;
304        } else {
305            moxi_log_write("ERROR: could not read cfg file: %s\n", cfg_str);
306            if (ml->log_mode != ERRORLOG_STDERR) {
307                fprintf(stderr, "ERROR: could not read cfg file: %s\n", cfg_str);
308            }
309            exit(EXIT_FAILURE);
310        }
311    }
312
313    if (behavior_str == NULL) {
314        behavior_str = "";
315    }
316
317    /* The vbucket feature only works in binary protocol. */
318
319    behavior_default_g.downstream_protocol = proxy_downstream_binary_prot;
320
321    env_usr = getenv("MOXI_SASL_PLAIN_USR");
322    if (env_usr != NULL) {
323        strncpy(behavior_default_g.usr, env_usr, sizeof(behavior_default_g.usr) - 1);
324        behavior_default_g.usr[sizeof(behavior_default_g.usr) - 1] = '\0';
325
326        moxi_log_write("env: MOXI_SASL_PLAIN_USR (%d)\n",
327                       strlen(behavior_default_g.usr));
328    }
329
330    env_pwd = getenv("MOXI_SASL_PLAIN_PWD");
331    if (env_pwd != NULL) {
332        strncpy(behavior_default_g.pwd, env_pwd, sizeof(behavior_default_g.pwd) - 1);
333        behavior_default_g.pwd[sizeof(behavior_default_g.pwd) - 1] = '\0';
334
335        moxi_log_write("env: MOXI_SASL_PLAIN_PWD (%d)\n",
336                       strlen(behavior_default_g.pwd));
337    }
338
339    behavior = cproxy_parse_behavior(behavior_str, behavior_default_g);
340    if (behavior.cycle > 0) {
341        msec_cycle = behavior.cycle;
342    }
343
344    msec_clockevent_base = main_base;
345    msec_clock_handler(0, 0, NULL);
346
347    if (settings.enable_mcmux_mode) {
348        return cproxy_init_mcmux_mode(settings.port,
349                                      behavior,
350                                      nthreads);
351    }
352
353    /* Not jid format and not a URL, so it must be a simple cmd-line */
354    /* or file-based config. */
355
356    if (strchr(cfg_str, '@') == NULL &&
357        strstr(cfg_str, "http://") == NULL) {
358        return cproxy_init_string(cfg_str,
359                                  behavior,
360                                  nthreads);
361    }
362
363    if (settings.verbose > 2) {
364        cproxy_dump_behavior(&behavior, "cproxy_init_agent", 2);
365    }
366
367    return cproxy_init_agent(cfg_str,
368                             behavior,
369                             nthreads);
370}
371
372int cproxy_init_mcmux_mode(int proxy_port,
373                           proxy_behavior behavior,
374                           int nthreads) {
375
376    int behaviors_num = 1; /* Number of servers. */
377    proxy_behavior_pool behavior_pool;
378    char *proxy_name = "default";
379
380    if (settings.verbose > 1) {
381        cproxy_dump_behavior(&behavior, "init_string", 2);
382    }
383
384    memset(&behavior_pool, 0, sizeof(proxy_behavior_pool));
385
386    behavior_pool.base = behavior;
387    behavior_pool.num  = behaviors_num;
388    behavior_pool.arr  = calloc(behaviors_num,
389            sizeof(proxy_behavior));
390
391    if (behavior_pool.arr != NULL) {
392        int i;
393        proxy_main *m;
394        proxy *p;
395
396        for (i = 0; i < behaviors_num; i++) {
397            behavior_pool.arr[i] = behavior;
398        }
399
400        m = cproxy_gen_proxy_main(behavior, nthreads,
401                PROXY_CONF_TYPE_STATIC);
402        if (m == NULL) {
403            moxi_log_write("could not alloc proxy_main\n");
404            exit(EXIT_FAILURE);
405        }
406
407        p = cproxy_create(m, proxy_name, proxy_port, "mcmux_config",
408                          0, /* config_ver. */
409                          &behavior_pool, nthreads);
410        if (p != NULL) {
411            int n;
412
413            cb_mutex_enter(&m->proxy_main_lock);
414            p->next = m->proxy_head;
415            m->proxy_head = p;
416            cb_mutex_exit(&m->proxy_main_lock);
417
418            n = cproxy_listen(p);
419            if (n > 0) {
420                if (settings.verbose > 1) {
421                    moxi_log_write("moxi listening on %d with %d conns\n",
422                            proxy_port, n);
423                }
424            } else {
425                moxi_log_write("moxi error -- port %d unavailable?\n",
426                               proxy_port);
427                exit(EXIT_FAILURE);
428            }
429        } else {
430            moxi_log_write("could not alloc proxy\n");
431            exit(EXIT_FAILURE);
432        }
433
434        free(behavior_pool.arr);
435    } else {
436        moxi_log_write("could not alloc behaviors\n");
437        exit(EXIT_FAILURE);
438    }
439
440    return 0;
441}
442
443int cproxy_init_string(char *cfg_str,
444                       proxy_behavior behavior,
445                       int nthreads) {
446    char *buff;
447    char *next;
448    char *proxy_name = "default";
449    char *proxy_sect;
450    char *proxy_port_str;
451    int   proxy_port;
452
453    /* cfg looks like "local_port=host:port,host:port;local_port=host:port"
454     * like "11222=memcached1.foo.net:11211"  This means local port 11222
455     * will be a proxy to downstream memcached server running at
456     * host memcached1.foo.net on port 11211.
457     */
458    if (cfg_str== NULL ||
459        strlen(cfg_str) <= 0) {
460        return 0;
461    }
462
463    if (settings.verbose > 1) {
464        cproxy_dump_behavior(&behavior, "init_string", 2);
465    }
466
467    buff = trimstrdup(cfg_str);
468    next = buff;
469    while (next != NULL) {
470        proxy_behavior_pool behavior_pool;
471        int behaviors_num = 1; /* Number of servers. */
472        char *x;
473
474        proxy_sect = strsep(&next, ";");
475
476        proxy_port_str = trimstr(strsep(&proxy_sect, "="));
477        if (proxy_sect == NULL) {
478            moxi_log_write("bad moxi config, missing =\n");
479            exit(EXIT_FAILURE);
480        }
481        proxy_port = atoi(proxy_port_str);
482        if (proxy_port <= 0) {
483            moxi_log_write("missing proxy port\n");
484            exit(EXIT_FAILURE);
485        }
486        proxy_sect = trimstr(proxy_sect);
487
488        for (x = proxy_sect; *x != '\0'; x++) {
489            if (*x == ',') {
490                behaviors_num++;
491            }
492        }
493
494        memset(&behavior_pool, 0, sizeof(proxy_behavior_pool));
495
496        behavior_pool.base = behavior;
497        behavior_pool.num = behaviors_num;
498        behavior_pool.arr = calloc(behaviors_num, sizeof(proxy_behavior));
499
500        if (behavior_pool.arr != NULL) {
501            proxy *p;
502            proxy_main *m;
503            int i;
504
505            for (i = 0; i < behaviors_num; i++) {
506                behavior_pool.arr[i] = behavior;
507            }
508
509            m = cproxy_gen_proxy_main(behavior, nthreads,
510                                      PROXY_CONF_TYPE_STATIC);
511            if (m == NULL) {
512                moxi_log_write("could not alloc proxy_main\n");
513                exit(EXIT_FAILURE);
514            }
515
516            p = cproxy_create(m,
517                              proxy_name,
518                              proxy_port,
519                              proxy_sect,
520                              0, /* config_ver. */
521                              &behavior_pool,
522                              nthreads);
523            if (p != NULL) {
524                int n;
525                cb_mutex_enter(&m->proxy_main_lock);
526                p->next = m->proxy_head;
527                m->proxy_head = p;
528                cb_mutex_exit(&m->proxy_main_lock);
529
530                n = cproxy_listen(p);
531                if (n > 0) {
532                    if (settings.verbose > 1) {
533                        moxi_log_write("moxi listening on %d with %d conns\n",
534                                proxy_port, n);
535                    }
536                } else {
537                    moxi_log_write("moxi error -- port %d unavailable?\n",
538                            proxy_port);
539                    exit(EXIT_FAILURE);
540                }
541            } else {
542                moxi_log_write("could not alloc proxy\n");
543                exit(EXIT_FAILURE);
544            }
545
546            free(behavior_pool.arr);
547        } else {
548            moxi_log_write("could not alloc behaviors\n");
549            exit(EXIT_FAILURE);
550        }
551    }
552
553    free(buff);
554
555    return 0;
556}
557
558proxy_main *diag_last_proxy_main;
559
560proxy_main *cproxy_gen_proxy_main(proxy_behavior behavior, int nthreads,
561                                  enum_proxy_conf_type conf_type) {
562    proxy_main *m = calloc(1, sizeof(proxy_main));
563    if (m != NULL) {
564        m->proxy_head = NULL;
565        m->behavior   = behavior;
566        m->nthreads   = nthreads;
567        m->conf_type  = conf_type;
568
569        cb_mutex_initialize(&m->proxy_main_lock);
570
571        m->stat_configs      = 0;
572        m->stat_config_fails = 0;
573        m->stat_proxy_starts      = 0;
574        m->stat_proxy_start_fails = 0;
575        m->stat_proxy_existings   = 0;
576        m->stat_proxy_shutdowns   = 0;
577
578        diag_last_proxy_main = m;
579    }
580
581    return m;
582}
583
584proxy_behavior cproxy_parse_behavior(char          *behavior_str,
585                                     proxy_behavior behavior_default) {
586    /* These are the default proxy behaviors. */
587    char *buff;
588    char *next;
589
590    struct proxy_behavior behavior = behavior_default;
591
592    if (behavior_str == NULL ||
593        strlen(behavior_str) <= 0) {
594        return behavior;
595    }
596
597    /* Parse the key-value behavior_str, to override the defaults. */
598
599    buff = trimstrdup(behavior_str);
600    next = buff;
601
602    while (next != NULL) {
603        char *key_val = trimstr(strsep(&next, ","));
604        if (key_val != NULL) {
605            cproxy_parse_behavior_key_val_str(key_val, &behavior);
606        }
607    }
608
609    free(buff);
610
611    return behavior;
612}
613
614/** Note: the key_val param buffer is modified.
615 */
616void cproxy_parse_behavior_key_val_str(char *key_val,
617                                       proxy_behavior *behavior) {
618    cb_assert(behavior != NULL);
619
620    if (key_val != NULL) {
621        char *key = strsep(&key_val, "=");
622        char *val = key_val;
623        cproxy_parse_behavior_key_val(key, val, behavior);
624    }
625}
626
627/** Note: the key and val param buffers are modified.
628 */
629void cproxy_parse_behavior_key_val(char *key,
630                                   char *val,
631                                   proxy_behavior *behavior) {
632    uint32_t ms = 0;
633    uint32_t x = 0;
634    bool ok = false;
635
636    cb_assert(behavior != NULL);
637
638    if (key != NULL &&
639        val != NULL) {
640        key = trimstr(key);
641        val = trimstr(val);
642
643        if (wordeq(key, "cycle")) {
644            ok = safe_strtoul(val, &behavior->cycle);
645        } else if (wordeq(key, "downstream_max") ||
646                   wordeq(key, "concurrency")) {
647            ok = safe_strtoul(val, &behavior->downstream_max);
648        } else if (wordeq(key, "downstream_conn_max")) {
649            ok = safe_strtoul(val, &behavior->downstream_conn_max);
650        } else if (wordeq(key, "weight") ||
651                   wordeq(key, "downstream_weight")) {
652            ok = safe_strtoul(val, &behavior->downstream_weight);
653        } else if (wordeq(key, "retry") ||
654                   wordeq(key, "downstream_retry")) {
655            ok = safe_strtoul(val, &behavior->downstream_retry);
656        } else if (wordeq(key, "protocol") ||
657                   wordeq(key, "downstream_protocol")) {
658            if (wordeq(val, "ascii") ||
659                wordeq(val, "memcached-ascii") ||
660                wordeq(val, "membase-ascii")) {
661                behavior->downstream_protocol =
662                    proxy_downstream_ascii_prot;
663                ok = true;
664            } else if (wordeq(val, "binary") ||
665                       wordeq(val, "memcached-binary") ||
666                       wordeq(val, "membase-binary")) {
667                behavior->downstream_protocol =
668                    proxy_downstream_binary_prot;
669                ok = true;
670            } else {
671                if (settings.verbose > 1) {
672                    moxi_log_write("unknown behavior prot: %s\n", val);
673                }
674            }
675        } else if (wordeq(key, "timeout") ||
676                   wordeq(key, "downstream_timeout") ||
677                   wordeq(key, "downstream_conn_timeout")) {
678            ok = safe_strtoul(val, &ms);
679            behavior->downstream_timeout.tv_sec  = floor(ms / 1000.0);
680            behavior->downstream_timeout.tv_usec = (ms % 1000) * 1000;
681        } else if (wordeq(key, "downstream_conn_queue_timeout")) {
682            ok = safe_strtoul(val, &ms);
683            behavior->downstream_conn_queue_timeout.tv_sec  = floor(ms / 1000.0);
684            behavior->downstream_conn_queue_timeout.tv_usec = (ms % 1000) * 1000;
685        } else if (wordeq(key, "wait_queue_timeout")) {
686            ok = safe_strtoul(val, &ms);
687            behavior->wait_queue_timeout.tv_sec  = floor(ms / 1000.0);
688            behavior->wait_queue_timeout.tv_usec = (ms % 1000) * 1000;
689        } else if (wordeq(key, "connect_timeout")) {
690            ok = safe_strtoul(val, &ms);
691            behavior->connect_timeout.tv_sec  = floor(ms / 1000.0);
692            behavior->connect_timeout.tv_usec = (ms % 1000) * 1000;
693        } else if (wordeq(key, "auth_timeout")) {
694            ok = safe_strtoul(val, &ms);
695            behavior->auth_timeout.tv_sec  = floor(ms / 1000.0);
696            behavior->auth_timeout.tv_usec = (ms % 1000) * 1000;
697        } else if (wordeq(key, "time_stats")) {
698            ok = safe_strtoul(val, &x);
699            behavior->time_stats = x;
700        } else if (wordeq(key, "mcs_opts")) {
701            if (strlen(val) < sizeof(behavior->mcs_opts)) {
702                strcpy(behavior->mcs_opts, val);
703                ok = true;
704            }
705        } else if (wordeq(key, "connect_max_errors")) {
706            ok = safe_strtoul(val, &behavior->connect_max_errors);
707        } else if (wordeq(key, "connect_retry_interval")) {
708            ok = safe_strtoul(val, &behavior->connect_retry_interval);
709        } else if (wordeq(key, "front_cache_max")) {
710            ok = safe_strtoul(val, &behavior->front_cache_max);
711        } else if (wordeq(key, "front_cache_lifespan")) {
712            ok = safe_strtoul(val, &behavior->front_cache_lifespan);
713        } else if (wordeq(key, "front_cache_spec")) {
714            if (strlen(val) < sizeof(behavior->front_cache_spec)) {
715                strcpy(behavior->front_cache_spec, val);
716                ok = true;
717            }
718        } else if (wordeq(key, "front_cache_unspec")) {
719            if (strlen(val) < sizeof(behavior->front_cache_unspec)) {
720                strcpy(behavior->front_cache_unspec, val);
721                ok = true;
722            }
723        } else if (wordeq(key, "key_stats_max")) {
724            ok = safe_strtoul(val, &behavior->key_stats_max);
725        } else if (wordeq(key, "key_stats_lifespan")) {
726            ok = safe_strtoul(val, &behavior->key_stats_lifespan);
727        } else if (wordeq(key, "key_stats_spec")) {
728            if (strlen(val) < sizeof(behavior->key_stats_spec)) {
729                strcpy(behavior->key_stats_spec, val);
730                ok = true;
731            }
732        } else if (wordeq(key, "key_stats_unspec")) {
733            if (strlen(val) < sizeof(behavior->key_stats_unspec)) {
734                strcpy(behavior->key_stats_unspec, val);
735                ok = true;
736            }
737        } else if (wordeq(key, "optimize_set")) {
738            if (strlen(val) < sizeof(behavior->optimize_set)) {
739                strcpy(behavior->optimize_set, val);
740                ok = true;
741            }
742        } else if (wordeq(key, "usr")) {
743            if (strlen(val) < sizeof(behavior->usr)) {
744                strcpy(behavior->usr, val);
745                ok = true;
746            }
747        } else if (wordeq(key, "pwd")) {
748            if (strlen(val) < sizeof(behavior->pwd)) {
749                strcpy(behavior->pwd, val);
750                ok = true;
751            }
752        } else if (wordeq(key, "host")) {
753            if (strlen(val) < sizeof(behavior->host)) {
754                strcpy(behavior->host, val);
755                ok = true;
756            }
757        } else if (wordeq(key, "port")) {
758            ok = safe_strtol(val, &behavior->port);
759        } else if (wordeq(key, "bucket")) {
760            if (strlen(val) < sizeof(behavior->bucket)) {
761                strcpy(behavior->bucket, val);
762                ok = true;
763            }
764        } else if (wordeq(key, "port_listen")) {
765            ok = safe_strtol(val, &behavior->port_listen);
766        } else if (wordeq(key, "default_bucket_name")) {
767            if (strlen(val) < sizeof(behavior->default_bucket_name)) {
768                strcpy(behavior->default_bucket_name, val);
769                ok = true;
770            }
771        } else if (key[0] == '#') { /* Comment. */
772            ok = true;
773        } else {
774            if (settings.verbose > 1) {
775                moxi_log_write("ERROR: unknown behavior key: %s\n", key);
776            }
777        }
778    }
779
780    if (ok == false) {
781        moxi_log_write("ERROR: config error in key: %s value: %s\n",
782                       key, val);
783    }
784}
785
786/**
787 * Size of array should be arr_size.
788 */
789proxy_behavior *cproxy_copy_behaviors(int arr_size, proxy_behavior *arr) {
790    int arr_size_alloc = arr_size > 0 ? arr_size : 1;
791    proxy_behavior *rv = calloc(arr_size_alloc, sizeof(proxy_behavior));
792    if (rv != NULL) {
793        memcpy(rv, arr, arr_size * sizeof(proxy_behavior));
794    }
795    return rv;
796}
797
798/**
799 * Size of x/y array should be x/y_size.
800 */
801bool cproxy_equal_behaviors(int x_size, proxy_behavior *x,
802                            int y_size, proxy_behavior *y) {
803    int i;
804    if (x_size != y_size) {
805        return false;
806    }
807
808    if (x == NULL && y == NULL) {
809        return true;
810    }
811
812    if (x == NULL || y == NULL) {
813        return false;
814    }
815
816    for (i = 0; i < x_size; i++) {
817        if (cproxy_equal_behavior(&x[i], &y[i]) == false) {
818            if (settings.verbose > 1) {
819                moxi_log_write("behaviors not equal (%d)\n", i);
820                cproxy_dump_behavior(&x[i], "x", 0);
821                cproxy_dump_behavior(&y[i], "y", 0);
822            }
823
824            return false;
825        }
826    }
827
828    return true;
829}
830
831bool cproxy_equal_behavior(proxy_behavior *x,
832                           proxy_behavior *y) {
833    if (x == NULL && y == NULL) {
834        return true;
835    }
836
837    if (x == NULL || y == NULL) {
838        return false;
839    }
840
841    return memcmp(x, y, sizeof(proxy_behavior)) == 0;
842}
843
844void cproxy_dump_behavior(proxy_behavior *b, char *prefix, int level) {
845    cproxy_dump_behavior_ex(b, prefix, level,
846                            cproxy_dump_behavior_stderr, NULL);
847}
848
849void cproxy_dump_behavior_ex(proxy_behavior *b, char *prefix, int level,
850                             void (*dump)(const void *dump_opaque,
851                                          const char *prefix,
852                                          const char *key,
853                                          const char *buf),
854                             const void *dump_opaque) {
855    char vbuf[8000];
856    cb_assert(b);
857    cb_assert(dump);
858
859
860#define vdump(key, vfmt, val) {              \
861    snprintf(vbuf, sizeof(vbuf), vfmt, val); \
862    dump(dump_opaque, prefix, key, vbuf);    \
863}
864
865    if (level >= 2) {
866        vdump("cycle", "%u", b->cycle);
867    }
868    if (level >= 1) {
869        vdump("downstream_max", "%u", b->downstream_max);
870        vdump("downstream_conn_max", "%u", b->downstream_conn_max);
871    }
872
873    vdump("downstream_weight",   "%u", b->downstream_weight);
874    vdump("downstream_retry",    "%u", b->downstream_retry);
875    vdump("downstream_protocol", "%d", b->downstream_protocol);
876    vdump("downstream_timeout", "%ld", /* In millisecs. */
877          (b->downstream_timeout.tv_sec * 1000 +
878           b->downstream_timeout.tv_usec / 1000));
879    vdump("downstream_conn_queue_timeout", "%ld", /* In millisecs. */
880          (b->downstream_conn_queue_timeout.tv_sec * 1000 +
881           b->downstream_conn_queue_timeout.tv_usec / 1000));
882
883    if (level >= 1) {
884        vdump("wait_queue_timeout", "%ld", /* In millisecs. */
885              (b->wait_queue_timeout.tv_sec * 1000 +
886               b->wait_queue_timeout.tv_usec / 1000));
887        vdump("connect_timeout", "%ld", /* In millisecs. */
888              (b->connect_timeout.tv_sec * 1000 +
889               b->connect_timeout.tv_usec / 1000));
890        vdump("auth_timeout", "%ld", /* In millisecs. */
891              (b->auth_timeout.tv_sec * 1000 +
892               b->auth_timeout.tv_usec / 1000));
893        vdump("time_stats", "%d", b->time_stats);
894        vdump("mcs_opts", "%s", b->mcs_opts);
895        vdump("connect_max_errors", "%u", b->connect_max_errors);
896        vdump("connect_retry_interval", "%u", b->connect_retry_interval);
897        vdump("front_cache_max", "%u", b->front_cache_max);
898        vdump("front_cache_lifespan", "%u", b->front_cache_lifespan);
899        vdump("front_cache_spec", "%s", b->front_cache_spec);
900        vdump("front_cache_unspec", "%s", b->front_cache_unspec);
901        vdump("key_stats_max", "%u", b->key_stats_max);
902        vdump("key_stats_lifespan", "%u", b->key_stats_lifespan);
903        vdump("key_stats_spec", "%s", b->key_stats_spec);
904        vdump("key_stats_unspec", "%s", b->key_stats_unspec);
905        vdump("optimize_set", "%s", b->optimize_set);
906    }
907
908    vdump("usr",    "%s", b->usr);
909    vdump("host",   "%s", b->host);
910    vdump("port",   "%d", b->port);
911    vdump("bucket", "%s", b->bucket);
912
913    if (level >= 1) {
914        vdump("port_listen", "%d", b->port_listen);
915        vdump("default_bucket_name", "%s", b->default_bucket_name);
916    }
917}
918
919void cproxy_dump_behavior_stderr(const void *dump_opaque,
920                                 const char *prefix,
921                                 const char *key,
922                                 const char *val) {
923    (void)dump_opaque;
924    cb_assert(key);
925    cb_assert(val);
926
927    if (prefix == NULL) {
928        prefix = "";
929    }
930
931    moxi_log_write("%s %s: %s\n",
932            prefix, key, val);
933}
934
935/* --------------------------------------- */
936
937uint64_t usec_now(void) {
938    struct timeval timer;
939    gettimeofday(&timer, NULL);
940    return ((timer.tv_sec - process_started) * 1000000) + timer.tv_usec;
941}
942
943/* Time-sensitive callers can call it by hand with this,
944 * outside the normal subsecond timer
945 */
946void msec_set_current_time(void) {
947    struct timeval timer;
948    gettimeofday(&timer, NULL);
949    msec_current_time =
950        (timer.tv_sec - process_started) * 1000 + (timer.tv_usec / 1000);
951}
952
953static void msec_clock_handler(evutil_socket_t fd, short which, void *arg) {
954    static bool initialized = false;
955    struct timeval t;
956
957    if (msec_cycle <= 0) {
958        return;
959    }
960
961    /* Subsecond resolution timer. */
962    t.tv_sec = 0;
963    t.tv_usec = msec_cycle * 1000;
964
965    if (initialized) {
966        /* only delete the event if it's actually there. */
967        evtimer_del(&msec_clockevent);
968    } else {
969        initialized = true;
970    }
971
972    evtimer_set(&msec_clockevent, msec_clock_handler, 0);
973    event_base_set(msec_clockevent_base, &msec_clockevent);
974    evtimer_add(&msec_clockevent, &t);
975
976    msec_set_current_time();
977
978    (void)fd;
979    (void)which;
980    (void)arg;
981}
982
983/* --------------------------------------- */
984
985static char *readfile(char *path) {
986    FILE *fp = fopen(path, "r");
987    if (fp != NULL) {
988        if (fseek(fp, 0, SEEK_END) == 0) {
989            long len = ftell(fp);
990            if (len > 0 &&
991                fseek(fp, 0, SEEK_SET) == 0) {
992                char *buf = (char *) malloc(len + 1);
993                if (buf != NULL) {
994                    if (fread(buf, len, 1, fp) == 1) {
995                        fclose(fp);
996                        buf[len] = '\0';
997                        return buf;
998                    }
999                    free(buf);
1000                }
1001            }
1002        }
1003        fclose(fp);
1004    }
1005    return NULL;
1006}
1007