1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "src/config.h"
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <errno.h>
7 #include <ctype.h>
8 #include <unistd.h>
9 #include <assert.h>
10 #include <math.h>
11 #include "memcached.h"
12 #include "cproxy.h"
13 #include "work.h"
14 #include "log.h"
15 
16 /* Local declarations. */
17 
18 static char *readfile(char *path);
19 
20 volatile uint64_t  msec_current_time = 0;
21 int                msec_cycle = 0;
22 struct event       msec_clockevent;
23 struct event_base *msec_clockevent_base = NULL;
24 
25 char cproxy_hostname[300] = {0}; /* Immutable after init. */
26 
27 static void msec_clock_handler(evutil_socket_t fd, short which, void *arg);
28 void msec_set_current_time(void);
29 
30 int cproxy_init_string(char *cfg_str,
31                        proxy_behavior behavior,
32                        int nthreads);
33 
34 int cproxy_init_agent(char *cfg_str,
35                       proxy_behavior behavior,
36                       int nthreads);
37 
38 int cproxy_init_mcmux_mode(int proxy_port,
39                            proxy_behavior behavior,
40                            int nthreads);
41 
42 proxy_behavior behavior_default_g = {
43     .cycle = 200, /* Clock cycle or quantum, in milliseconds. */
44     .downstream_max = 1024,
45     .downstream_conn_max = 4, /* Use 0 for unlimited. */
46     .downstream_weight = 0,
47     .downstream_retry = 1,
48     .downstream_protocol = proxy_downstream_ascii_prot,
49     .downstream_timeout = {
50         .tv_sec  = 5,
51         .tv_usec = 0
52     },
53     .downstream_conn_queue_timeout = {
54         .tv_sec  = 0,
55         .tv_usec = 200000
56     },
57     .wait_queue_timeout = {
58         .tv_sec  = 0,
59         .tv_usec = 200000
60     },
61     .connect_timeout = {
62         .tv_sec  = 0,
63         .tv_usec = 400000
64     },
65     .auth_timeout = {
66         .tv_sec  = 0,
67         .tv_usec = 100000
68     },
69     .time_stats = false,
70     .mcs_opts = {0},
71     .connect_max_errors = 5,         /* In zstored, 10. */
72     .connect_retry_interval = 30000, /* In zstored, 30000. */
73     .front_cache_max = 200,
74     .front_cache_lifespan = 0,
75     .front_cache_spec = {0},
76     .front_cache_unspec = {0},
77     .key_stats_max = 4000,
78     .key_stats_lifespan = 0,
79     .key_stats_spec = {0},
80     .key_stats_unspec = {0},
81     .optimize_set = {0},
82     .host = {0},
83     .port = 0,
84     .bucket = {0},
85     .nodeLocator = {0},
86     .usr = {0},
87     .pwd = {0},
88     .port_listen = MOXI_DEFAULT_LISTEN_PORT,
89     .default_bucket_name = FIRST_BUCKET
90 };
91 
92 /** Length of key that may be zero or space terminated.
93  */
skey_len(const char *key)94 size_t skey_len(const char *key) {
95     char *x;
96     assert(key);
97 
98     x = (char *) key;
99     while (*x != ' ' && *x != '\0')
100         x++;
101 
102     return x - key;
103 }
104 
105 /** Hash of key that may be zero or space terminated.
106  */
skey_hash(const void *v)107 int skey_hash(const void *v) {
108     const char *key;
109     size_t len;
110 
111     assert(v);
112 
113     key = v;
114     len = skey_len(key);
115 
116     return murmur_hash(key, len);
117 }
118 
119 /** Returns true if two keys are equal, where the
120  *  keys may be zero or space terminated.
121  */
skey_equal(const void *v1, const void *v2)122 int skey_equal(const void *v1, const void *v2) {
123     const char *k1;
124     const char *k2;
125     size_t n1;
126     size_t n2;
127 
128     assert(v1);
129     assert(v2);
130 
131     k1 = v1;
132     k2 = v2;
133     n1 = skey_len(k1);
134     n2 = skey_len(k2);
135 
136     return (n1 == n2 && strncmp(k1, k2, n1) == 0);
137 }
138 
noop_dup(const void *v)139 static void *noop_dup(const void *v)
140 {
141     return (void*)v;
142 }
143 
noop_free(void *v)144 void noop_free(void *v) {
145     (void)v;
146     /* Nothing */
147 }
148 
149 static int
str_eq(const void* p1, const void*p2)150 str_eq(const void* p1, const void*p2)
151 {
152     char *str1=(char *)p1;
153     char *str2=(char *)p2;
154     assert(str1 != NULL);
155     assert(str2 != NULL);
156     return strcmp(str1, str2) == 0;
157 }
158 
159 struct hash_ops strhash_ops = {
160    .hashfunc = genhash_string_hash,
161    .hasheq = str_eq,
162    .dupKey = noop_dup,
163    .dupValue = noop_dup,
164    .freeKey = noop_free,
165    .freeValue = noop_free
166 };
167 
168 struct hash_ops skeyhash_ops = {
169    .hashfunc = skey_hash,
170    .hasheq = skey_equal,
171    .dupKey = noop_dup,
172    .dupValue = noop_dup,
173    .freeKey = noop_free,
174    .freeValue = noop_free
175 };
176 
177 /** Returns pointer to first non-space char in string.
178  */
skipspace(char *s)179 char *skipspace(char *s) {
180     if (s == NULL) {
181         return NULL;
182     }
183 
184     while (isspace(*s) && *s != '\0') {
185         s++;
186     }
187 
188     return s;
189 }
190 
191 /** Modifies string by zero'ing out any trailing spaces.
192  */
trailspace(char *s)193 char *trailspace(char *s) {
194     char *e;
195 
196     if (s == NULL) {
197         return NULL;
198     }
199 
200     for (e = s + strlen(s) - 1; e >= s && isspace(*e); e--) {
201         *e = '\0';
202     }
203 
204     return s;
205 }
206 
207 /** Modifies string by removing prefix and suffix whitespace chars.
208  *  Returns the pointer into the string that you should use.
209  */
trimstr(char *s)210 char *trimstr(char *s) {
211     return trailspace(skipspace(s));
212 }
213 
214 /** Like strdup(), but trims spaces from start and end.
215  *  Unlike with trimstr(strdup(s)), you can call free(trimstrdup(s)),
216  *  and the correct memory is free()'ed.
217  */
trimstrdup(char *s)218 char *trimstrdup(char *s) {
219     return trailspace(strdup(skipspace(s)));
220 }
221 
222 /** Returns true if first word in a string equals a given word.
223  */
wordeq(char *s, char *word)224 bool wordeq(char *s, char *word) {
225     char *end = s;
226 
227     assert(s);
228     assert(word);
229 
230     while (!isspace(*end) && *end != '\0')
231         end++;
232 
233     return strncmp(s, word, end - s) == 0;
234 }
235 
236 /* --------------------------------------- */
237 
238 static bool cproxy_core_initted = false;
239 
240 /** The cfg_str may be a path to a file like...
241  *
242  *    /full/path
243  *      or...
244  *    ./relative/path
245  *      or...
246  *    ../another/relative/path
247  *
248  *  But not...
249  *
250  *    incorrect/relative/path
251  *
252  *  Paths are detected by the first character of  '/' or '.'.
253  *
254  *  The contents of the file should be in cfg_str format.
255  */
cproxy_init(char *cfg_str, char *behavior_str, int nthreads, struct event_base *main_base)256 int cproxy_init(char *cfg_str,
257                 char *behavior_str,
258                 int nthreads,
259                 struct event_base *main_base) {
260     char *env_usr;
261     char *env_pwd;
262     proxy_behavior behavior;
263 
264     assert(nthreads > 1); /* Main + at least one worker. */
265     assert(nthreads == settings.num_threads);
266 
267     if (cproxy_core_initted == false) {
268         cproxy_core_initted = true;
269 
270         gethostname(cproxy_hostname, sizeof(cproxy_hostname));
271 
272         cproxy_init_a2a();
273         cproxy_init_a2b();
274         cproxy_init_b2b();
275     }
276 
277     if ((cfg_str == NULL || strlen(cfg_str) <= 0) &&
278         (false == settings.enable_mcmux_mode)) {
279         return 0;
280     }
281 
282     if (behavior_str != NULL && (behavior_str[0] == '.' || behavior_str[0] == '/')) {
283         char *buf = readfile(behavior_str);
284         if (buf != NULL) {
285             int rv = cproxy_init(cfg_str, buf, nthreads, main_base);
286             free(buf);
287             return rv;
288         } else {
289             moxi_log_write("ERROR: could not read behavior file: %s\n", behavior_str);
290             if (ml->log_mode != ERRORLOG_STDERR) {
291                 fprintf(stderr, "ERROR: could not read behavior file: %s\n", behavior_str);
292             }
293             exit(EXIT_FAILURE);
294         }
295     }
296 
297     if ((false == settings.enable_mcmux_mode) &&
298         (cfg_str[0] == '.' || cfg_str[0] == '/')) {
299         char *buf = readfile(cfg_str);
300         if (buf != NULL) {
301             int rv = cproxy_init(buf, behavior_str, nthreads, main_base);
302             free(buf);
303             return rv;
304         } else {
305             moxi_log_write("ERROR: could not read cfg file: %s\n", cfg_str);
306             if (ml->log_mode != ERRORLOG_STDERR) {
307                 fprintf(stderr, "ERROR: could not read cfg file: %s\n", cfg_str);
308             }
309             exit(EXIT_FAILURE);
310         }
311     }
312 
313     if (behavior_str == NULL) {
314         behavior_str = "";
315     }
316 
317     /* The vbucket feature only works in binary protocol. */
318 
319     behavior_default_g.downstream_protocol = proxy_downstream_binary_prot;
320 
321     env_usr = getenv("MOXI_SASL_PLAIN_USR");
322     if (env_usr != NULL) {
323         strncpy(behavior_default_g.usr, env_usr, sizeof(behavior_default_g.usr) - 1);
324         behavior_default_g.usr[sizeof(behavior_default_g.usr) - 1] = '\0';
325 
326         moxi_log_write("env: MOXI_SASL_PLAIN_USR (%d)\n",
327                        strlen(behavior_default_g.usr));
328     }
329 
330     env_pwd = getenv("MOXI_SASL_PLAIN_PWD");
331     if (env_pwd != NULL) {
332         strncpy(behavior_default_g.pwd, env_pwd, sizeof(behavior_default_g.pwd) - 1);
333         behavior_default_g.pwd[sizeof(behavior_default_g.pwd) - 1] = '\0';
334 
335         moxi_log_write("env: MOXI_SASL_PLAIN_PWD (%d)\n",
336                        strlen(behavior_default_g.pwd));
337     }
338 
339     behavior = cproxy_parse_behavior(behavior_str, behavior_default_g);
340     if (behavior.cycle > 0) {
341         msec_cycle = behavior.cycle;
342     }
343 
344     msec_clockevent_base = main_base;
345     msec_clock_handler(0, 0, NULL);
346 
347     if (settings.enable_mcmux_mode) {
348         return cproxy_init_mcmux_mode(settings.port,
349                                       behavior,
350                                       nthreads);
351     }
352 
353     /* Not jid format and not a URL, so it must be a simple cmd-line */
354     /* or file-based config. */
355 
356     if (strchr(cfg_str, '@') == NULL &&
357         strstr(cfg_str, "http://") == NULL) {
358         return cproxy_init_string(cfg_str,
359                                   behavior,
360                                   nthreads);
361     }
362 
363     if (settings.verbose > 2) {
364         cproxy_dump_behavior(&behavior, "cproxy_init_agent", 2);
365     }
366 
367     return cproxy_init_agent(cfg_str,
368                              behavior,
369                              nthreads);
370 }
371 
cproxy_init_mcmux_mode(int proxy_port, proxy_behavior behavior, int nthreads)372 int cproxy_init_mcmux_mode(int proxy_port,
373                            proxy_behavior behavior,
374                            int nthreads) {
375 
376     int behaviors_num = 1; /* Number of servers. */
377     proxy_behavior_pool behavior_pool;
378     char *proxy_name = "default";
379 
380     if (settings.verbose > 1) {
381         cproxy_dump_behavior(&behavior, "init_string", 2);
382     }
383 
384     memset(&behavior_pool, 0, sizeof(proxy_behavior_pool));
385 
386     behavior_pool.base = behavior;
387     behavior_pool.num  = behaviors_num;
388     behavior_pool.arr  = calloc(behaviors_num,
389             sizeof(proxy_behavior));
390 
391     if (behavior_pool.arr != NULL) {
392         int i;
393         proxy_main *m;
394         proxy *p;
395 
396         for (i = 0; i < behaviors_num; i++) {
397             behavior_pool.arr[i] = behavior;
398         }
399 
400         m = cproxy_gen_proxy_main(behavior, nthreads,
401                 PROXY_CONF_TYPE_STATIC);
402         if (m == NULL) {
403             moxi_log_write("could not alloc proxy_main\n");
404             exit(EXIT_FAILURE);
405         }
406 
407         p = cproxy_create(m, proxy_name, proxy_port, "mcmux_config",
408                           0, /* config_ver. */
409                           &behavior_pool, nthreads);
410         if (p != NULL) {
411             int n;
412 
413             cb_mutex_enter(&m->proxy_main_lock);
414             p->next = m->proxy_head;
415             m->proxy_head = p;
416             cb_mutex_exit(&m->proxy_main_lock);
417 
418             n = cproxy_listen(p);
419             if (n > 0) {
420                 if (settings.verbose > 1) {
421                     moxi_log_write("moxi listening on %d with %d conns\n",
422                             proxy_port, n);
423                 }
424             } else {
425                 moxi_log_write("moxi error -- port %d unavailable?\n",
426                                proxy_port);
427                 exit(EXIT_FAILURE);
428             }
429         } else {
430             moxi_log_write("could not alloc proxy\n");
431             exit(EXIT_FAILURE);
432         }
433 
434         free(behavior_pool.arr);
435     } else {
436         moxi_log_write("could not alloc behaviors\n");
437         exit(EXIT_FAILURE);
438     }
439 
440     return 0;
441 }
442 
cproxy_init_string(char *cfg_str, proxy_behavior behavior, int nthreads)443 int cproxy_init_string(char *cfg_str,
444                        proxy_behavior behavior,
445                        int nthreads) {
446     char *buff;
447     char *next;
448     char *proxy_name = "default";
449     char *proxy_sect;
450     char *proxy_port_str;
451     int   proxy_port;
452 
453     /* cfg looks like "local_port=host:port,host:port;local_port=host:port"
454      * like "11222=memcached1.foo.net:11211"  This means local port 11222
455      * will be a proxy to downstream memcached server running at
456      * host memcached1.foo.net on port 11211.
457      */
458     if (cfg_str== NULL ||
459         strlen(cfg_str) <= 0) {
460         return 0;
461     }
462 
463     if (settings.verbose > 1) {
464         cproxy_dump_behavior(&behavior, "init_string", 2);
465     }
466 
467     buff = trimstrdup(cfg_str);
468     next = buff;
469     while (next != NULL) {
470         proxy_behavior_pool behavior_pool;
471         int behaviors_num = 1; /* Number of servers. */
472         char *x;
473 
474         proxy_sect = strsep(&next, ";");
475 
476         proxy_port_str = trimstr(strsep(&proxy_sect, "="));
477         if (proxy_sect == NULL) {
478             moxi_log_write("bad moxi config, missing =\n");
479             exit(EXIT_FAILURE);
480         }
481         proxy_port = atoi(proxy_port_str);
482         if (proxy_port <= 0) {
483             moxi_log_write("missing proxy port\n");
484             exit(EXIT_FAILURE);
485         }
486         proxy_sect = trimstr(proxy_sect);
487 
488         for (x = proxy_sect; *x != '\0'; x++) {
489             if (*x == ',') {
490                 behaviors_num++;
491             }
492         }
493 
494         memset(&behavior_pool, 0, sizeof(proxy_behavior_pool));
495 
496         behavior_pool.base = behavior;
497         behavior_pool.num = behaviors_num;
498         behavior_pool.arr = calloc(behaviors_num, sizeof(proxy_behavior));
499 
500         if (behavior_pool.arr != NULL) {
501             proxy *p;
502             proxy_main *m;
503             int i;
504 
505             for (i = 0; i < behaviors_num; i++) {
506                 behavior_pool.arr[i] = behavior;
507             }
508 
509             m = cproxy_gen_proxy_main(behavior, nthreads,
510                                       PROXY_CONF_TYPE_STATIC);
511             if (m == NULL) {
512                 moxi_log_write("could not alloc proxy_main\n");
513                 exit(EXIT_FAILURE);
514             }
515 
516             p = cproxy_create(m,
517                               proxy_name,
518                               proxy_port,
519                               proxy_sect,
520                               0, /* config_ver. */
521                               &behavior_pool,
522                               nthreads);
523             if (p != NULL) {
524                 int n;
525                 cb_mutex_enter(&m->proxy_main_lock);
526                 p->next = m->proxy_head;
527                 m->proxy_head = p;
528                 cb_mutex_exit(&m->proxy_main_lock);
529 
530                 n = cproxy_listen(p);
531                 if (n > 0) {
532                     if (settings.verbose > 1) {
533                         moxi_log_write("moxi listening on %d with %d conns\n",
534                                 proxy_port, n);
535                     }
536                 } else {
537                     moxi_log_write("moxi error -- port %d unavailable?\n",
538                             proxy_port);
539                     exit(EXIT_FAILURE);
540                 }
541             } else {
542                 moxi_log_write("could not alloc proxy\n");
543                 exit(EXIT_FAILURE);
544             }
545 
546             free(behavior_pool.arr);
547         } else {
548             moxi_log_write("could not alloc behaviors\n");
549             exit(EXIT_FAILURE);
550         }
551     }
552 
553     free(buff);
554 
555     return 0;
556 }
557 
558 proxy_main *diag_last_proxy_main;
559 
cproxy_gen_proxy_main(proxy_behavior behavior, int nthreads, enum_proxy_conf_type conf_type)560 proxy_main *cproxy_gen_proxy_main(proxy_behavior behavior, int nthreads,
561                                   enum_proxy_conf_type conf_type) {
562     proxy_main *m = calloc(1, sizeof(proxy_main));
563     if (m != NULL) {
564         m->proxy_head = NULL;
565         m->behavior   = behavior;
566         m->nthreads   = nthreads;
567         m->conf_type  = conf_type;
568 
569         cb_mutex_initialize(&m->proxy_main_lock);
570 
571         m->stat_configs      = 0;
572         m->stat_config_fails = 0;
573         m->stat_proxy_starts      = 0;
574         m->stat_proxy_start_fails = 0;
575         m->stat_proxy_existings   = 0;
576         m->stat_proxy_shutdowns   = 0;
577 
578         diag_last_proxy_main = m;
579     }
580 
581     return m;
582 }
583 
cproxy_parse_behavior(char *behavior_str, proxy_behavior behavior_default)584 proxy_behavior cproxy_parse_behavior(char          *behavior_str,
585                                      proxy_behavior behavior_default) {
586     /* These are the default proxy behaviors. */
587     char *buff;
588     char *next;
589 
590     struct proxy_behavior behavior = behavior_default;
591 
592     if (behavior_str == NULL ||
593         strlen(behavior_str) <= 0) {
594         return behavior;
595     }
596 
597     /* Parse the key-value behavior_str, to override the defaults. */
598 
599     buff = trimstrdup(behavior_str);
600     next = buff;
601 
602     while (next != NULL) {
603         char *key_val = trimstr(strsep(&next, ","));
604         if (key_val != NULL) {
605             cproxy_parse_behavior_key_val_str(key_val, &behavior);
606         }
607     }
608 
609     free(buff);
610 
611     return behavior;
612 }
613 
614 /** Note: the key_val param buffer is modified.
615  */
cproxy_parse_behavior_key_val_str(char *key_val, proxy_behavior *behavior)616 void cproxy_parse_behavior_key_val_str(char *key_val,
617                                        proxy_behavior *behavior) {
618     assert(behavior != NULL);
619 
620     if (key_val != NULL) {
621         char *key = strsep(&key_val, "=");
622         char *val = key_val;
623         cproxy_parse_behavior_key_val(key, val, behavior);
624     }
625 }
626 
627 /** Note: the key and val param buffers are modified.
628  */
cproxy_parse_behavior_key_val(char *key, char *val, proxy_behavior *behavior)629 void cproxy_parse_behavior_key_val(char *key,
630                                    char *val,
631                                    proxy_behavior *behavior) {
632     uint32_t ms = 0;
633     uint32_t x = 0;
634     bool ok = false;
635 
636     assert(behavior != NULL);
637 
638     if (key != NULL &&
639         val != NULL) {
640         key = trimstr(key);
641         val = trimstr(val);
642 
643         if (wordeq(key, "cycle")) {
644             ok = safe_strtoul(val, &behavior->cycle);
645         } else if (wordeq(key, "downstream_max") ||
646                    wordeq(key, "concurrency")) {
647             ok = safe_strtoul(val, &behavior->downstream_max);
648         } else if (wordeq(key, "downstream_conn_max")) {
649             ok = safe_strtoul(val, &behavior->downstream_conn_max);
650         } else if (wordeq(key, "weight") ||
651                    wordeq(key, "downstream_weight")) {
652             ok = safe_strtoul(val, &behavior->downstream_weight);
653         } else if (wordeq(key, "retry") ||
654                    wordeq(key, "downstream_retry")) {
655             ok = safe_strtoul(val, &behavior->downstream_retry);
656         } else if (wordeq(key, "protocol") ||
657                    wordeq(key, "downstream_protocol")) {
658             if (wordeq(val, "ascii") ||
659                 wordeq(val, "memcached-ascii") ||
660                 wordeq(val, "membase-ascii")) {
661                 behavior->downstream_protocol =
662                     proxy_downstream_ascii_prot;
663                 ok = true;
664             } else if (wordeq(val, "binary") ||
665                        wordeq(val, "memcached-binary") ||
666                        wordeq(val, "membase-binary")) {
667                 behavior->downstream_protocol =
668                     proxy_downstream_binary_prot;
669                 ok = true;
670             } else {
671                 if (settings.verbose > 1) {
672                     moxi_log_write("unknown behavior prot: %s\n", val);
673                 }
674             }
675         } else if (wordeq(key, "timeout") ||
676                    wordeq(key, "downstream_timeout") ||
677                    wordeq(key, "downstream_conn_timeout")) {
678             ok = safe_strtoul(val, &ms);
679             behavior->downstream_timeout.tv_sec  = floor(ms / 1000.0);
680             behavior->downstream_timeout.tv_usec = (ms % 1000) * 1000;
681         } else if (wordeq(key, "downstream_conn_queue_timeout")) {
682             ok = safe_strtoul(val, &ms);
683             behavior->downstream_conn_queue_timeout.tv_sec  = floor(ms / 1000.0);
684             behavior->downstream_conn_queue_timeout.tv_usec = (ms % 1000) * 1000;
685         } else if (wordeq(key, "wait_queue_timeout")) {
686             ok = safe_strtoul(val, &ms);
687             behavior->wait_queue_timeout.tv_sec  = floor(ms / 1000.0);
688             behavior->wait_queue_timeout.tv_usec = (ms % 1000) * 1000;
689         } else if (wordeq(key, "connect_timeout")) {
690             ok = safe_strtoul(val, &ms);
691             behavior->connect_timeout.tv_sec  = floor(ms / 1000.0);
692             behavior->connect_timeout.tv_usec = (ms % 1000) * 1000;
693         } else if (wordeq(key, "auth_timeout")) {
694             ok = safe_strtoul(val, &ms);
695             behavior->auth_timeout.tv_sec  = floor(ms / 1000.0);
696             behavior->auth_timeout.tv_usec = (ms % 1000) * 1000;
697         } else if (wordeq(key, "time_stats")) {
698             ok = safe_strtoul(val, &x);
699             behavior->time_stats = x;
700         } else if (wordeq(key, "mcs_opts")) {
701             if (strlen(val) < sizeof(behavior->mcs_opts)) {
702                 strcpy(behavior->mcs_opts, val);
703                 ok = true;
704             }
705         } else if (wordeq(key, "connect_max_errors")) {
706             ok = safe_strtoul(val, &behavior->connect_max_errors);
707         } else if (wordeq(key, "connect_retry_interval")) {
708             ok = safe_strtoul(val, &behavior->connect_retry_interval);
709         } else if (wordeq(key, "front_cache_max")) {
710             ok = safe_strtoul(val, &behavior->front_cache_max);
711         } else if (wordeq(key, "front_cache_lifespan")) {
712             ok = safe_strtoul(val, &behavior->front_cache_lifespan);
713         } else if (wordeq(key, "front_cache_spec")) {
714             if (strlen(val) < sizeof(behavior->front_cache_spec)) {
715                 strcpy(behavior->front_cache_spec, val);
716                 ok = true;
717             }
718         } else if (wordeq(key, "front_cache_unspec")) {
719             if (strlen(val) < sizeof(behavior->front_cache_unspec)) {
720                 strcpy(behavior->front_cache_unspec, val);
721                 ok = true;
722             }
723         } else if (wordeq(key, "key_stats_max")) {
724             ok = safe_strtoul(val, &behavior->key_stats_max);
725         } else if (wordeq(key, "key_stats_lifespan")) {
726             ok = safe_strtoul(val, &behavior->key_stats_lifespan);
727         } else if (wordeq(key, "key_stats_spec")) {
728             if (strlen(val) < sizeof(behavior->key_stats_spec)) {
729                 strcpy(behavior->key_stats_spec, val);
730                 ok = true;
731             }
732         } else if (wordeq(key, "key_stats_unspec")) {
733             if (strlen(val) < sizeof(behavior->key_stats_unspec)) {
734                 strcpy(behavior->key_stats_unspec, val);
735                 ok = true;
736             }
737         } else if (wordeq(key, "optimize_set")) {
738             if (strlen(val) < sizeof(behavior->optimize_set)) {
739                 strcpy(behavior->optimize_set, val);
740                 ok = true;
741             }
742         } else if (wordeq(key, "usr")) {
743             if (strlen(val) < sizeof(behavior->usr)) {
744                 strcpy(behavior->usr, val);
745                 ok = true;
746             }
747         } else if (wordeq(key, "pwd")) {
748             if (strlen(val) < sizeof(behavior->pwd)) {
749                 strcpy(behavior->pwd, val);
750                 ok = true;
751             }
752         } else if (wordeq(key, "host")) {
753             if (strlen(val) < sizeof(behavior->host)) {
754                 strcpy(behavior->host, val);
755                 ok = true;
756             }
757         } else if (wordeq(key, "port")) {
758             ok = safe_strtol(val, &behavior->port);
759         } else if (wordeq(key, "bucket")) {
760             if (strlen(val) < sizeof(behavior->bucket)) {
761                 strcpy(behavior->bucket, val);
762                 ok = true;
763             }
764         } else if (wordeq(key, "port_listen")) {
765             ok = safe_strtol(val, &behavior->port_listen);
766         } else if (wordeq(key, "default_bucket_name")) {
767             if (strlen(val) < sizeof(behavior->default_bucket_name)) {
768                 strcpy(behavior->default_bucket_name, val);
769                 ok = true;
770             }
771         } else if (key[0] == '#') { /* Comment. */
772             ok = true;
773         } else {
774             if (settings.verbose > 1) {
775                 moxi_log_write("ERROR: unknown behavior key: %s\n", key);
776             }
777         }
778     }
779 
780     if (ok == false) {
781         moxi_log_write("ERROR: config error in key: %s value: %s\n",
782                        key, val);
783     }
784 }
785 
786 /**
787  * Size of array should be arr_size.
788  */
cproxy_copy_behaviors(int arr_size, proxy_behavior *arr)789 proxy_behavior *cproxy_copy_behaviors(int arr_size, proxy_behavior *arr) {
790     int arr_size_alloc = arr_size > 0 ? arr_size : 1;
791     proxy_behavior *rv = calloc(arr_size_alloc, sizeof(proxy_behavior));
792     if (rv != NULL) {
793         memcpy(rv, arr, arr_size * sizeof(proxy_behavior));
794     }
795     return rv;
796 }
797 
798 /**
799  * Size of x/y array should be x/y_size.
800  */
cproxy_equal_behaviors(int x_size, proxy_behavior *x, int y_size, proxy_behavior *y)801 bool cproxy_equal_behaviors(int x_size, proxy_behavior *x,
802                             int y_size, proxy_behavior *y) {
803     int i;
804     if (x_size != y_size) {
805         return false;
806     }
807 
808     if (x == NULL && y == NULL) {
809         return true;
810     }
811 
812     if (x == NULL || y == NULL) {
813         return false;
814     }
815 
816     for (i = 0; i < x_size; i++) {
817         if (cproxy_equal_behavior(&x[i], &y[i]) == false) {
818             if (settings.verbose > 1) {
819                 moxi_log_write("behaviors not equal (%d)\n", i);
820                 cproxy_dump_behavior(&x[i], "x", 0);
821                 cproxy_dump_behavior(&y[i], "y", 0);
822             }
823 
824             return false;
825         }
826     }
827 
828     return true;
829 }
830 
cproxy_equal_behavior(proxy_behavior *x, proxy_behavior *y)831 bool cproxy_equal_behavior(proxy_behavior *x,
832                            proxy_behavior *y) {
833     if (x == NULL && y == NULL) {
834         return true;
835     }
836 
837     if (x == NULL || y == NULL) {
838         return false;
839     }
840 
841     return memcmp(x, y, sizeof(proxy_behavior)) == 0;
842 }
843 
cproxy_dump_behavior(proxy_behavior *b, char *prefix, int level)844 void cproxy_dump_behavior(proxy_behavior *b, char *prefix, int level) {
845     cproxy_dump_behavior_ex(b, prefix, level,
846                             cproxy_dump_behavior_stderr, NULL);
847 }
848 
cproxy_dump_behavior_ex(proxy_behavior *b, char *prefix, int level, void (*dump)(const void *dump_opaque, const char *prefix, const char *key, const char *buf), const void *dump_opaque)849 void cproxy_dump_behavior_ex(proxy_behavior *b, char *prefix, int level,
850                              void (*dump)(const void *dump_opaque,
851                                           const char *prefix,
852                                           const char *key,
853                                           const char *buf),
854                              const void *dump_opaque) {
855     char vbuf[8000];
856     assert(b);
857     assert(dump);
858 
859 
860 #define vdump(key, vfmt, val) {              \
861     snprintf(vbuf, sizeof(vbuf), vfmt, val); \
862     dump(dump_opaque, prefix, key, vbuf);    \
863 }
864 
865     if (level >= 2) {
866         vdump("cycle", "%u", b->cycle);
867     }
868     if (level >= 1) {
869         vdump("downstream_max", "%u", b->downstream_max);
870         vdump("downstream_conn_max", "%u", b->downstream_conn_max);
871     }
872 
873     vdump("downstream_weight",   "%u", b->downstream_weight);
874     vdump("downstream_retry",    "%u", b->downstream_retry);
875     vdump("downstream_protocol", "%d", b->downstream_protocol);
876     vdump("downstream_timeout", "%ld", /* In millisecs. */
877           (b->downstream_timeout.tv_sec * 1000 +
878            b->downstream_timeout.tv_usec / 1000));
879     vdump("downstream_conn_queue_timeout", "%ld", /* In millisecs. */
880           (b->downstream_conn_queue_timeout.tv_sec * 1000 +
881            b->downstream_conn_queue_timeout.tv_usec / 1000));
882 
883     if (level >= 1) {
884         vdump("wait_queue_timeout", "%ld", /* In millisecs. */
885               (b->wait_queue_timeout.tv_sec * 1000 +
886                b->wait_queue_timeout.tv_usec / 1000));
887         vdump("connect_timeout", "%ld", /* In millisecs. */
888               (b->connect_timeout.tv_sec * 1000 +
889                b->connect_timeout.tv_usec / 1000));
890         vdump("auth_timeout", "%ld", /* In millisecs. */
891               (b->auth_timeout.tv_sec * 1000 +
892                b->auth_timeout.tv_usec / 1000));
893         vdump("time_stats", "%d", b->time_stats);
894         vdump("mcs_opts", "%s", b->mcs_opts);
895         vdump("connect_max_errors", "%u", b->connect_max_errors);
896         vdump("connect_retry_interval", "%u", b->connect_retry_interval);
897         vdump("front_cache_max", "%u", b->front_cache_max);
898         vdump("front_cache_lifespan", "%u", b->front_cache_lifespan);
899         vdump("front_cache_spec", "%s", b->front_cache_spec);
900         vdump("front_cache_unspec", "%s", b->front_cache_unspec);
901         vdump("key_stats_max", "%u", b->key_stats_max);
902         vdump("key_stats_lifespan", "%u", b->key_stats_lifespan);
903         vdump("key_stats_spec", "%s", b->key_stats_spec);
904         vdump("key_stats_unspec", "%s", b->key_stats_unspec);
905         vdump("optimize_set", "%s", b->optimize_set);
906     }
907 
908     vdump("usr",    "%s", b->usr);
909     vdump("host",   "%s", b->host);
910     vdump("port",   "%d", b->port);
911     vdump("bucket", "%s", b->bucket);
912 
913     if (level >= 1) {
914         vdump("port_listen", "%d", b->port_listen);
915         vdump("default_bucket_name", "%s", b->default_bucket_name);
916     }
917 }
918 
cproxy_dump_behavior_stderr(const void *dump_opaque, const char *prefix, const char *key, const char *val)919 void cproxy_dump_behavior_stderr(const void *dump_opaque,
920                                  const char *prefix,
921                                  const char *key,
922                                  const char *val) {
923     (void)dump_opaque;
924     assert(key);
925     assert(val);
926 
927     if (prefix == NULL) {
928         prefix = "";
929     }
930 
931     moxi_log_write("%s %s: %s\n",
932             prefix, key, val);
933 }
934 
935 /* --------------------------------------- */
936 
usec_now(void)937 uint64_t usec_now(void) {
938     struct timeval timer;
939     gettimeofday(&timer, NULL);
940     return ((timer.tv_sec - process_started) * 1000000) + timer.tv_usec;
941 }
942 
943 /* Time-sensitive callers can call it by hand with this,
944  * outside the normal subsecond timer
945  */
msec_set_current_time(void)946 void msec_set_current_time(void) {
947     struct timeval timer;
948     gettimeofday(&timer, NULL);
949     msec_current_time =
950         (timer.tv_sec - process_started) * 1000 + (timer.tv_usec / 1000);
951 }
952 
msec_clock_handler(evutil_socket_t fd, short which, void *arg)953 static void msec_clock_handler(evutil_socket_t fd, short which, void *arg) {
954     static bool initialized = false;
955     struct timeval t;
956 
957     if (msec_cycle <= 0) {
958         return;
959     }
960 
961     /* Subsecond resolution timer. */
962     t.tv_sec = 0;
963     t.tv_usec = msec_cycle * 1000;
964 
965     if (initialized) {
966         /* only delete the event if it's actually there. */
967         evtimer_del(&msec_clockevent);
968     } else {
969         initialized = true;
970     }
971 
972     evtimer_set(&msec_clockevent, msec_clock_handler, 0);
973     event_base_set(msec_clockevent_base, &msec_clockevent);
974     evtimer_add(&msec_clockevent, &t);
975 
976     msec_set_current_time();
977 
978     (void)fd;
979     (void)which;
980     (void)arg;
981 }
982 
983 /* --------------------------------------- */
984 
readfile(char *path)985 static char *readfile(char *path) {
986     FILE *fp = fopen(path, "r");
987     if (fp != NULL) {
988         if (fseek(fp, 0, SEEK_END) == 0) {
989             long len = ftell(fp);
990             if (len > 0 &&
991                 fseek(fp, 0, SEEK_SET) == 0) {
992                 char *buf = (char *) malloc(len + 1);
993                 if (buf != NULL) {
994                     if (fread(buf, len, 1, fp) == 1) {
995                         fclose(fp);
996                         buf[len] = '\0';
997                         return buf;
998                     }
999                     free(buf);
1000                 }
1001             }
1002         }
1003         fclose(fp);
1004     }
1005     return NULL;
1006 }
1007