1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "src/config.h"
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <errno.h>
7 #include <assert.h>
8 #include "memcached.h"
9 #include "cproxy.h"
10 #include "work.h"
11 #include "agent.h"
12 #include "log.h"
13 #include "cJSON.h"
14 
15 /* Integration with libconflate. */
16 
17 static void update_ptd_config(void *data0, void *data1);
18 static bool update_str_config(char **curr, char *next, char *descrip);
19 
20 static bool update_behaviors_config(proxy_behavior **curr,
21                                     int  *curr_num,
22                                     proxy_behavior  *next,
23                                     int   next_num,
24                                     char *descrip);
25 
26 void close_outdated_proxies(proxy_main *m, uint32_t new_config_ver);
27 
28 char *parse_kvs_servers(char *prefix,
29                         char *pool_name,
30                         kvpair_t *kvs,
31                         char **servers,
32                         proxy_behavior_pool *behavior_pool);
33 
34 char **parse_kvs_behavior(kvpair_t *kvs,
35                           char *prefix,
36                           char *name,
37                           proxy_behavior *behavior);
38 
39 static void cproxy_parse_json_auth(char *config,
40                                    char *name,
41                                    proxy_behavior_pool *bp);
42 
43 static void cproxy_init_null_bucket(proxy_main *m);
44 
45 static void cproxy_on_config(void *data0, void *data1);
46 
agent_logger(void *userdata, enum conflate_log_level lvl, const char *msg, ...)47 static void agent_logger(void *userdata,
48                          enum conflate_log_level lvl,
49                          const char *msg, ...)
50 {
51     (void)userdata;
52     (void)lvl;
53     (void)msg;
54 /* Issues compiling vfprintf(), so turn off this unused code path for now. */
55 #undef AGENT_LOGGER
56 #ifdef AGENT_LOGGER
57     char *n = NULL;
58     bool v = false;
59 
60     switch(lvl) {
61     case FATAL: n = "FATAL"; v = settings.verbose > 0; break;
62     case ERROR: n = "ERROR"; v = settings.verbose > 0; break;
63     case WARN:  n = "WARN";  v = settings.verbose > 1; break;
64     case INFO:  n = "INFO";  v = settings.verbose > 1; break;
65     case DEBUG: n = "DEBUG"; v = settings.verbose > 2; break;
66     }
67     if (!v) {
68         return;
69     }
70 
71     char fmt[strlen(msg) + 16];
72     snprintf(fmt, sizeof(fmt), "%s: %s\n", n, msg);
73 
74     va_list ap;
75     va_start(ap, msg);
76     vfprintf(fmt, ap);
77     va_end(ap);
78 #endif
79 }
80 
init_extensions(void)81 static void init_extensions(void)
82 {
83     conflate_register_mgmt_cb("client_stats", "Retrieve stats from moxi",
84                               on_conflate_get_stats);
85     conflate_register_mgmt_cb("reset_stats", "Reset moxi stats",
86                               on_conflate_reset_stats);
87     conflate_register_mgmt_cb("ping_test", "Perform a ping test",
88                               on_conflate_ping_test);
89 }
90 
91 /** The cfg_str looks like...
92  *
93  *    apikey=jidname@jhostname%jpassword,config=config,host=host
94  *      or...
95  *    jidname@jhostname%jpassword,config=config,host=host
96  *
97  *  Only the apikey is needed, so it can also look like...
98  *
99  *    jidname@jhostname%jpassword
100  *
101  *  Or...
102  *
103  *    http://host:port/default/pools/bucketsStreamingConfig/default
104  *    url=http://host:port/default/pools/bucketsStreamingConfig/default
105  *    auth=,url=http://host:port/default/pools/bucketsStreamingConfig/default
106  *    auth=USER%PSWD,url=http://host:port/default/pools/bucketsStreamingConfig/default
107  *    auth=Administrator%password,url=http://host:port/default/pools/bucketsStreamingConfig/default
108  */
cproxy_init_agent(char *cfg_str, proxy_behavior behavior, int nthreads)109 int cproxy_init_agent(char *cfg_str,
110                       proxy_behavior behavior,
111                       int nthreads) {
112     int cfg_len;
113     char *buff;
114     char *next;
115     int rv = 0;
116 
117     init_extensions();
118 
119     if (cfg_str == NULL) {
120         moxi_log_write("ERROR: missing cfg\n");
121         if (ml->log_mode != ERRORLOG_STDERR) {
122             fprintf(stderr, "ERROR: missing cfg\n");
123         }
124         exit(EXIT_FAILURE);
125     }
126 
127     cfg_len = (int)strlen(cfg_str);
128     if (cfg_len <= 0) {
129         moxi_log_write("ERROR: empty cfg\n");
130         if (ml->log_mode != ERRORLOG_STDERR) {
131             fprintf(stderr, "ERROR: empty cfg\n");
132         }
133         exit(EXIT_FAILURE);
134     }
135 
136     if (strncmp(cfg_str, "apikey=", 7) == 0 ||
137         strncmp(cfg_str, "auth=", 5) == 0 ||
138         strncmp(cfg_str, "url=", 4) == 0) {
139         buff = trimstrdup(cfg_str);
140     } else {
141         buff = calloc(cfg_len + 50, sizeof(char));
142         if (buff != NULL) {
143             if (strncmp(cfg_str, "http://", 7) == 0) {
144                 char *x;
145                 snprintf(buff, cfg_len + 50, "url=%s", cfg_str);
146 
147                 /* Allow the user to specify multiple comma-separated URL's, */
148                 /* which we auto-translate right now to the '|' separators */
149                 /* that the rest of the code expects. */
150 
151                 for (x = buff; *x; x++) {
152                     if (*x == ',') {
153                         *x = '|';
154                     }
155                 }
156             } else {
157                 strcpy(buff, cfg_str);
158             }
159         }
160         buff = trimstr(buff);
161     }
162 
163     next = buff;
164 
165     while (next != NULL) {
166         char *jid    = behavior.usr;
167         char *jpw    = behavior.pwd;
168         char *jpwmem = NULL;
169         char *dbpath = NULL;
170         char *host   = NULL;
171         int dbpath_alloc = 0;
172 
173         char *cur = trimstr(strsep(&next, ";"));
174         while (cur != NULL) {
175             char *key_val = trimstr(strsep(&cur, ",\r\n"));
176             if (key_val != NULL) {
177                 char *key = trimstr(strsep(&key_val, "="));
178                 char *val = trimstr(key_val);
179 
180                 bool handled = true;
181 
182                 if (key != NULL &&
183                     val != NULL) {
184                     if (wordeq(key, "apikey") ||
185                         wordeq(key, "auth")) {
186                         jid = strsep(&val, "%");
187                         jpw = val;
188                     } else if (wordeq(key, "config") ||
189                                wordeq(key, "dbpath")) {
190                         dbpath = val;
191                     } else if (wordeq(key, "host") ||
192                                wordeq(key, "url")) {
193                         host = val;
194                     } else {
195                         handled = false;
196                     }
197                 } else {
198                     handled = false;
199                 }
200 
201                 if (handled == false &&
202                     key != NULL &&
203                     key[0] != '#' &&
204                     key[0] != '\0') {
205                     if (settings.verbose > 0) {
206                         moxi_log_write("unknown configuration key: %s\n", key);
207                     }
208                 }
209             }
210         }
211 
212         if (jid == NULL) {
213             jid = "";
214         }
215 
216         if (jpw == NULL) {
217             /* Handle if jid/jpw is in user:password@fqdn format */
218             /* instead of user@fqdn%password format. */
219 
220             char *colon = strchr(jid, ':');
221             char *asign = strchr(jid, '@');
222             if (colon != NULL &&
223                 asign != NULL &&
224                 asign > colon) {
225                 *asign = '\0';
226                 jpw = jpwmem = strdup(colon + 1);
227                 *asign = '@';
228                 do {
229                     *colon = *asign;
230                     colon++;
231                     asign++;
232                 } while (*asign != '\0');
233                 *colon = '\0';
234             }
235         }
236 
237         if (jpw == NULL) {
238             jpw = "";
239         }
240 
241         dbpath_alloc = 0;
242         if (dbpath == NULL) {
243             dbpath_alloc = (int)(strlen(jid) + strlen(CONFLATE_DB_PATH) + 100);
244             dbpath = calloc(dbpath_alloc, 1);
245             if (dbpath != NULL) {
246                 snprintf(dbpath, dbpath_alloc,
247                          CONFLATE_DB_PATH "/conflate-%s.cfg",
248                          (jid != NULL && strlen(jid) > 0 ? jid : "default"));
249 
250             } else {
251                 moxi_log_write("ERROR: conflate dbpath buf alloc\n");
252                 exit(EXIT_FAILURE);
253             }
254         }
255 
256         if (settings.verbose > 1) {
257             moxi_log_write("cproxy_init jid: %s host: %s\n", jid, host);
258         }
259 
260         if (cproxy_init_agent_start(jid, jpw, dbpath, host,
261                                     behavior,
262                                     nthreads) != NULL) {
263             rv++;
264         }
265 
266         if (dbpath_alloc > 0 &&
267             dbpath != NULL) {
268             free(dbpath);
269         }
270 
271         if (jpwmem) {
272             free(jpwmem);
273         }
274     }
275 
276     free(buff);
277 
278     return rv;
279 }
280 
cproxy_init_agent_start(char *jid, char *jpw, char *dbpath, char *host, proxy_behavior behavior, int nthreads)281 proxy_main *cproxy_init_agent_start(char *jid,
282                                     char *jpw,
283                                     char *dbpath,
284                                     char *host,
285                                     proxy_behavior behavior,
286                                     int nthreads) {
287     proxy_main *m;
288     assert(dbpath);
289 
290     if (settings.verbose > 2) {
291         moxi_log_write("cproxy_init_agent_start\n");;
292     }
293 
294     m = cproxy_gen_proxy_main(behavior, nthreads,
295                               PROXY_CONF_TYPE_DYNAMIC);
296     if (m != NULL) {
297         conflate_config_t config;
298         /* Create a NULL_BUCKET when we're not in "FIRST_BUCKET" mode. */
299 
300         /* FIRST_BUCKET mode means clients start off in the first */
301         /* configured bucket (and this is usually the case for */
302         /* standalone moxi). */
303 
304         /* Otherwise (when not in FIRST_BUCKET mode)... */
305         /* -- new clients start off in a configured, named default */
306         /* bucket (whose name is usually configured to be "default"), */
307         /* if it exists. */
308         /* -- if the named default bucket doesn't exist, new */
309         /* clients then start off in the NULL_BUCKET. */
310 
311         if (strcmp(behavior.default_bucket_name, FIRST_BUCKET) != 0) {
312             if (settings.verbose > 2) {
313                 moxi_log_write("initializing null bucket, default is: %s\n",
314                                behavior.default_bucket_name);
315             }
316 
317             cproxy_init_null_bucket(m);
318         } else {
319             if (settings.verbose > 2) {
320                 moxi_log_write("using first bucket\n");
321             }
322         }
323 
324         memset(&config, 0, sizeof(config));
325 
326         init_conflate(&config);
327 
328         /* Different jid's possible for production, staging, etc. */
329         config.jid  = jid;  /* "customer@stevenmb.local" or */
330                             /* "Administrator" */
331         config.pass = jpw;  /* "password" */
332         config.host = host; /* "localhost" or */
333                             /* "http://x.com:8091" */
334                             /* "http://x.com:8091/pools/default/buckets/default" */
335         config.software   = PACKAGE;
336         config.version    = VERSION;
337         config.save_path  = dbpath;
338         config.userdata   = m;
339         config.new_config = on_conflate_new_config;
340         config.log        = agent_logger;
341 
342         if (!config.host || config.host[0] == '\0') {
343             moxi_log_write("ERROR: missing -z configuration for url/host\n");
344             if (ml->log_mode != ERRORLOG_STDERR) {
345                 fprintf(stderr, "ERROR: missing -z configuration for url/host\n");
346             }
347             exit(EXIT_FAILURE);
348         }
349 
350         if (start_conflate(config)) {
351             if (settings.verbose > 2) {
352                 moxi_log_write("cproxy_init_agent_start done\n");
353             }
354 
355             return m;
356         }
357 
358         free(m);
359     }
360 
361     if (settings.verbose > 1) {
362         moxi_log_write("cproxy could not start conflate\n");
363     }
364 
365     return NULL;
366 }
367 
cproxy_init_null_bucket(proxy_main *m)368 static void cproxy_init_null_bucket(proxy_main *m) {
369     proxy_behavior proxyb = m->behavior;
370 
371     int pool_port = proxyb.port_listen;
372     int nodes_num = 0;
373 
374     if (pool_port > 0) {
375         proxy_behavior_pool behavior_pool;
376         memset(&behavior_pool, 0, sizeof(behavior_pool));
377         behavior_pool.base = proxyb;
378         behavior_pool.num  = nodes_num;
379         behavior_pool.arr  = calloc(nodes_num + 1, sizeof(proxy_behavior));
380 
381         if (behavior_pool.arr != NULL) {
382             cproxy_on_config_pool(m, NULL_BUCKET, pool_port,
383                                   "", 0, &behavior_pool);
384             free(behavior_pool.arr);
385         }
386     }
387 }
388 
on_conflate_new_config(void *userdata, kvpair_t *config)389 conflate_result on_conflate_new_config(void *userdata, kvpair_t *config) {
390     char **urlv;
391     char  *url;
392     char **contentsv;
393     char  *contents;
394     kvpair_t *copy;
395     LIBEVENT_THREAD *mthread;
396     proxy_main *m = userdata;
397     assert(m != NULL);
398     assert(config != NULL);
399 
400     mthread = thread_by_index(0);
401     assert(mthread != NULL);
402 
403     if (settings.verbose > 0) {
404         moxi_log_write("configuration received\n");
405     }
406 
407     urlv = get_key_values(config, "url"); /* NULL delimited array of char *. */
408     url  = urlv != NULL ? urlv[0] : NULL;
409     contentsv = get_key_values(config, "contents");
410     contents  = contentsv != NULL ? contentsv[0] : NULL;
411 
412     if (url != NULL) {
413         char **http_code = get_key_values(config, "http_code");
414         if (http_code != NULL && atoi(http_code[0]) == 401) {
415             moxi_log_write("ERROR: Authentication failure\n");
416             exit(EXIT_FAILURE);
417         }
418         if (contents != NULL && strlen(contents) > 0) {
419             /* Must be a REST/JSON config.  Wastefully test parse it here, */
420             /* before we asynchronously invoke the real worker who can't */
421             /* respond nicely with an error code. */
422             bool ok = false;
423             cJSON *c = cJSON_Parse(contents);
424             if (c != NULL) {
425                 ok = true;
426                 cJSON_Delete(c);
427             }
428 
429             if (!ok) {
430                 moxi_log_write("ERROR: parse JSON failed, from REST server: %s, %s\n",
431                                url, contents);
432 
433                 return CONFLATE_ERROR_BAD_SOURCE;
434             }
435         }
436     }
437 
438     copy = dup_kvpair(config);
439     if (copy != NULL) {
440         if (work_send(mthread->work_queue, cproxy_on_config, m, copy)) {
441             return CONFLATE_SUCCESS;
442         }
443 
444         if (settings.verbose > 1) {
445             moxi_log_write("work_send failed\n");
446         }
447 
448         return CONFLATE_ERROR;
449     }
450 
451     if (settings.verbose > 1) {
452         moxi_log_write("agent_config ocnc failed dup_kvpair\n");
453     }
454 
455     return CONFLATE_ERROR;
456 }
457 
458 static bool cproxy_on_config_json_one(proxy_main *m, uint32_t new_config_ver,
459                                       char *config, char *name, char *src);
460 
461 static bool cproxy_on_config_json_one_vbucket(proxy_main *m,
462                                               uint32_t new_config_ver,
463                                               char *config,
464                                               char *name,
465                                               char *src);
466 
467 static bool cproxy_on_config_json_one_ketama(proxy_main *m,
468                                              uint32_t new_config_ver,
469                                              char *config,
470                                              char *name,
471                                              char *src);
472 
473 static bool cproxy_on_config_json_buckets(proxy_main *m, uint32_t new_config_ver,
474                                           cJSON *jBuckets, bool want_default,
475                                           char *src);
476 
477 static
cproxy_on_config_json(proxy_main *m, uint32_t new_config_ver, char *config, char *src)478 bool cproxy_on_config_json(proxy_main *m, uint32_t new_config_ver, char *config,
479                            char *src) {
480     bool rv = false;
481 
482     cJSON *c = cJSON_Parse(config);
483     if (c != NULL) {
484         cJSON *jBuckets = cJSON_GetObjectItem(c, "buckets");
485         if (jBuckets != NULL &&
486             jBuckets->type == cJSON_Array) {
487             /* Make two passes through jBuckets, favoring any "default" */
488             /* bucket on the 1st pass, so the default bucket gets */
489             /* created earlier. */
490 
491             bool rv1 = cproxy_on_config_json_buckets(m, new_config_ver, jBuckets,
492                                                      true, src);
493             bool rv2 = cproxy_on_config_json_buckets(m, new_config_ver, jBuckets,
494                                                      false, src);
495 
496             rv = rv1 || rv2;
497         } else {
498             /* Just a single config. */
499 
500             rv = cproxy_on_config_json_one(m, new_config_ver, config, "default", src);
501         }
502 
503         cJSON_Delete(c);
504     } else {
505         moxi_log_write("ERROR: could not parse JSON from REST server: %s, %s\n",
506                        src, config);
507     }
508 
509     return rv;
510 }
511 
512 static
cproxy_on_config_json_buckets(proxy_main *m, uint32_t new_config_ver, cJSON *jBuckets, bool want_default, char *src)513 bool cproxy_on_config_json_buckets(proxy_main *m, uint32_t new_config_ver,
514                                    cJSON *jBuckets, bool want_default,
515                                    char *src) {
516     bool rv = false;
517 
518     int numBuckets = cJSON_GetArraySize(jBuckets);
519     int i;
520 
521     for (i = 0; i < numBuckets; i++) {
522         cJSON *jBucket = cJSON_GetArrayItem(jBuckets, i);
523         if (jBucket != NULL &&
524             jBucket->type == cJSON_Object) {
525             char *name = "default";
526             bool is_default;
527 
528             cJSON *jName = cJSON_GetObjectItem(jBucket, "name");
529             if (jName != NULL &&
530                 jName->type == cJSON_String &&
531                 jName->valuestring != NULL) {
532                 name = jName->valuestring;
533             }
534 
535             is_default = (strcmp(name, "default") == 0);
536             if (!(is_default ^ want_default)) { /* XOR. */
537                 char *jBucketStr = cJSON_Print(jBucket);
538                 if (jBucketStr != NULL) {
539                     rv = cproxy_on_config_json_one(m, new_config_ver,
540                                                    jBucketStr, name, src) || rv;
541                     free(jBucketStr);
542                 }
543             }
544         }
545     }
546 
547     return rv;
548 }
549 
550 static
cproxy_on_config_json_one(proxy_main *m, uint32_t new_config_ver, char *config, char *name, char *src)551 bool cproxy_on_config_json_one(proxy_main *m, uint32_t new_config_ver,
552                                char *config, char *name,
553                                char *src) {
554     bool rv = false;
555     assert(m != NULL);
556     assert(config != NULL);
557     assert(name != NULL);
558 
559     /* Handle reconfiguration of a single proxy. */
560 
561     if (m != NULL && config != NULL && strlen(config) > 0) {
562         cJSON *jConfig;
563 
564         if (settings.verbose > 2) {
565             moxi_log_write("conjo contents config from %s: %s\n", src, config);
566         }
567 
568         /* The config should be JSON that should look like... */
569 
570         /* {"name":"default",                // The bucket name. */
571         /*  "nodeLocator":"ketama",          // Optional. */
572         /*  "saslPassword":"someSASLPwd", */
573         /*  "nodes":[{"hostname":"10.17.1.46","status":"healthy", */
574         /*            "version":"0.3.0_114_g31859fe","os":"i386-apple-darwin9.8.0", */
575         /*            "ports":{"proxy":11213,"direct":11212}}], */
576         /*  "buckets":{"uri":"/pools/default/buckets"}, */
577         /*  "controllers":{"ejectNode":{"uri":"/controller/ejectNode"}, */
578         /*  "testWorkload":{"uri":"/pools/default/controller/testWorkload"}}, */
579         /*  "stats":{"uri":"/pools/default/stats"}, */
580         /*  "vBucketServerMap":{ */
581         /*     "hashAlgorithm":"CRC", */
582         /*     "user":"optionalSASLUsr",     // Optional. */
583         /*     "password":"someSASLPwd",     // Optional. */
584         /*     "serverList":["10.17.1.46:11212"], */
585         /*     ...more json here...}} */
586 
587         jConfig = cJSON_Parse(config);
588         if (jConfig != NULL) {
589             cJSON *jNodeLocator;
590             cJSON *jName = cJSON_GetObjectItem(jConfig, "name");
591             if (jName != NULL &&
592                 jName->type == cJSON_String &&
593                 jName->valuestring != NULL) {
594                 name = jName->valuestring;
595             }
596 
597             jNodeLocator = cJSON_GetObjectItem(jConfig, "nodeLocator");
598             if (jNodeLocator != NULL &&
599                 jNodeLocator->type == cJSON_String &&
600                 jNodeLocator->valuestring != NULL) {
601                 if (strcmp(jNodeLocator->valuestring, "ketama") == 0) {
602                     rv = cproxy_on_config_json_one_ketama(m, new_config_ver,
603                                                           config, name, src);
604                     cJSON_Delete(jConfig);
605 
606                     return rv;
607                 }
608             }
609 
610             rv = cproxy_on_config_json_one_vbucket(m, new_config_ver,
611                                                    config, name, src);
612             cJSON_Delete(jConfig);
613         }
614     } else {
615         if (settings.verbose > 1) {
616             moxi_log_write("ERROR: skipping empty config from %s\n", src);
617         }
618     }
619 
620     return rv;
621 }
622 
623 static
cproxy_on_config_json_one_vbucket(proxy_main *m, uint32_t new_config_ver, char *config, char *name, char *src)624 bool cproxy_on_config_json_one_vbucket(proxy_main *m, uint32_t new_config_ver,
625                                        char *config, char *name,
626                                        char *src) {
627     bool rv = false;
628     VBUCKET_CONFIG_HANDLE vch;
629     assert(m != NULL);
630 
631     if (settings.verbose > 2) {
632         moxi_log_write("parsing config nodeLocator:vbucket\n");
633     }
634 
635     vch = vbucket_config_parse_string(config);
636     if (vch) {
637         proxy_behavior proxyb;
638         int pool_port;
639         int nodes_num;
640 
641         if (settings.verbose > 2) {
642             moxi_log_write("conc vbucket_config_parse_string: %d for %s\n",
643                            (vch != NULL), name);
644         }
645 
646         proxyb = m->behavior;
647         strcpy(proxyb.nodeLocator, "vbucket");
648 
649         pool_port = proxyb.port_listen;
650         nodes_num = vbucket_config_get_num_servers(vch);
651 
652         if (settings.verbose > 2) {
653             moxi_log_write("conc pool_port: %d nodes_num: %d\n",
654                            pool_port, nodes_num);
655         }
656 
657         if (pool_port > 0 && nodes_num > 0) {
658             proxy_behavior_pool behavior_pool;
659             memset(&behavior_pool, 0, sizeof(behavior_pool));
660 
661             behavior_pool.base = proxyb;
662             behavior_pool.num  = nodes_num;
663             behavior_pool.arr  = calloc(nodes_num, sizeof(proxy_behavior));
664 
665             if (behavior_pool.arr != NULL) {
666                 int j = 0;
667                 cproxy_parse_json_auth(config, name, &behavior_pool);
668 
669                 for (; j < nodes_num; j++) {
670                     /* Inherit default behavior. */
671                     const char *hostport;
672                     behavior_pool.arr[j] = behavior_pool.base;
673 
674                     hostport = vbucket_config_get_server(vch, j);
675                     if (hostport != NULL &&
676                         strlen(hostport) > 0 &&
677                         strlen(hostport) < sizeof(behavior_pool.arr[j].host) - 1) {
678                         char *colon;
679 
680                         strncpy(behavior_pool.arr[j].host,
681                                 hostport,
682                                 sizeof(behavior_pool.arr[j].host) - 1);
683                         behavior_pool.arr[j].host[sizeof(behavior_pool.arr[j].host) - 1] = '\0';
684 
685                         colon = strchr(behavior_pool.arr[j].host, ':');
686                         if (colon != NULL) {
687                             *colon = '\0';
688                             behavior_pool.arr[j].port = atoi(colon + 1);
689                             if (behavior_pool.arr[j].port <= 0) {
690                                 break;
691                             }
692                         } else {
693                             break;
694                         }
695                     } else {
696                         break;
697                     }
698                 }
699 
700                 if (j >= nodes_num) {
701                     cproxy_on_config_pool(m, name, pool_port,
702                                           config, new_config_ver,
703                                           &behavior_pool);
704                     rv = true;
705                 } else {
706                     if (settings.verbose > 1) {
707                         moxi_log_write("ERROR: error receiving host:port"
708                                        " from %s"
709                                        " for server config %d in %s\n",
710                                        src, j, config);
711                     }
712                 }
713 
714                 free(behavior_pool.arr);
715             }
716         }
717 
718         vbucket_config_destroy(vch);
719     } else {
720         moxi_log_write("ERROR: bad JSON configuration from %s: %s\n",
721                        src, vbucket_get_error());
722         if (ml->log_mode != ERRORLOG_STDERR) {
723             fprintf(stderr, "ERROR: bad JSON configuration from %s: %s\n",
724                     src, vbucket_get_error());
725         }
726 
727         /* Bug 1961 - don't exit() as we might be in a multitenant use case. */
728 
729         /* exit(EXIT_FAILURE); */
730     }
731 
732     return rv;
733 }
734 
735 static
cproxy_on_config_json_one_ketama(proxy_main *m, uint32_t new_config_ver, char *config, char *name, char *src)736 bool cproxy_on_config_json_one_ketama(proxy_main *m, uint32_t new_config_ver,
737                                       char *config, char *name,
738                                       char *src) {
739     bool rv = false;
740     cJSON *jConfig;
741     cJSON *jArr;
742     cJSON *jVBSM;
743 
744     assert(m != NULL);
745 
746     if (settings.verbose > 2) {
747         moxi_log_write("parsing config nodeLocator:ketama\n");
748     }
749 
750     /* First, try to iterate through jConfig.vBucketServerMap.serverList */
751     /* if it exists, otherwise iterate through jConfig.nodes. */
752 
753     jConfig = cJSON_Parse(config);
754     if (jConfig == NULL) {
755         return false;
756     }
757 
758     jArr = NULL;
759 
760     jVBSM = cJSON_GetObjectItem(jConfig, "vBucketServerMap");
761     if (jVBSM != NULL) {
762         jArr = cJSON_GetObjectItem(jVBSM, "serverList");
763     }
764 
765     if (jArr == NULL ||
766         jArr->type != cJSON_Array) {
767         jArr = cJSON_GetObjectItem(jConfig, "nodes");
768     }
769 
770     if (jArr != NULL && jArr->type == cJSON_Array) {
771         int nodes_num = cJSON_GetArraySize(jArr);
772         if (nodes_num > 0) {
773             proxy_behavior proxyb = m->behavior;
774             proxy_behavior_pool behavior_pool;
775 
776             strcpy(proxyb.nodeLocator, "ketama");
777 
778             if (settings.verbose > 2) {
779                 moxi_log_write("conjk nodes_num: %d\n", nodes_num);
780             }
781             memset(&behavior_pool, 0, sizeof(behavior_pool));
782             behavior_pool.base = proxyb;
783             behavior_pool.num  = nodes_num;
784             behavior_pool.arr  = calloc(nodes_num + 1, sizeof(proxy_behavior));
785 
786             if (behavior_pool.arr != NULL) {
787                 int curr = 0; /* Moves slower than j so we can skip unhealthy nodes. */
788 
789                 int j = 0;
790 
791                 cproxy_parse_json_auth(config, name, &behavior_pool);
792                 for (; j < nodes_num; j++) {
793                     /* Inherit default behavior. */
794                     cJSON *jNode;
795 
796                     behavior_pool.arr[curr] = behavior_pool.base;
797 
798                     jNode = cJSON_GetArrayItem(jArr, j);
799                     if (jNode != NULL) {
800                         if (jNode->type == cJSON_String &&
801                             jNode->valuestring != NULL) {
802                             /* Should look like "host:port". */
803                             char *hostport = jNode->valuestring;
804 
805                             if (strlen(hostport) > 0 &&
806                                 strlen(hostport) < sizeof(behavior_pool.arr[curr].host) - 1) {
807                                 char *colon;
808                                 strncpy(behavior_pool.arr[curr].host,
809                                         hostport,
810                                         sizeof(behavior_pool.arr[curr].host) - 1);
811                                 behavior_pool.arr[curr].host[sizeof(behavior_pool.arr[curr].host) - 1] = '\0';
812 
813                                 colon = strchr(behavior_pool.arr[curr].host, ':');
814                                 if (colon != NULL) {
815                                     *colon = '\0';
816                                     behavior_pool.arr[curr].port = atoi(colon + 1);
817                                     if (behavior_pool.arr[curr].port > 0) {
818                                         curr++;
819                                     } else {
820                                         break;
821                                     }
822                                 } else {
823                                     break;
824                                 }
825                             } else {
826                                 break;
827                             }
828                         } else if (jNode->type == cJSON_Object) {
829                             /* Should look like... { */
830                             /*   status: "healthy", */
831                             /*   hostname: "host", */
832                             /*   ports: { direct: port } */
833                             /* } */
834 
835                             cJSON *jHostname;
836                             cJSON *jStatus = cJSON_GetObjectItem(jNode, "status");
837                             if (jStatus != NULL &&
838                                 jStatus->type == cJSON_String &&
839                                 jStatus->valuestring != NULL &&
840                                 strcmp(jStatus->valuestring, "healthy") != 0) {
841                                 /* Skip non-healthy node. */
842 
843                                 continue;
844                             }
845 
846                             jHostname = cJSON_GetObjectItem(jNode, "hostname");
847                             if (jHostname != NULL &&
848                                 jHostname->type == cJSON_String &&
849                                 jHostname->valuestring != NULL &&
850                                 strlen(jHostname->valuestring) < sizeof(behavior_pool.arr[curr].host) - 1) {
851                                 cJSON *jPorts = cJSON_GetObjectItem(jNode, "ports");
852                                 if (jPorts != NULL &&
853                                     jPorts->type == cJSON_Object) {
854                                     cJSON *jDirect = cJSON_GetObjectItem(jPorts, "direct");
855                                     if (jDirect != NULL &&
856                                         jDirect->type == cJSON_Number &&
857                                         jDirect->valueint > 0) {
858                                         char *colon;
859                                         strncpy(behavior_pool.arr[curr].host,
860                                                 jHostname->valuestring,
861                                                 sizeof(behavior_pool.arr[curr].host) - 1);
862                                         behavior_pool.arr[curr].host[sizeof(behavior_pool.arr[curr].host) - 1] = '\0';
863 
864                                         /* The JSON might return a hostname that looks */
865                                         /* like "HOST:REST_PORT", so strip off the ":REST_PORT". */
866 
867                                         colon = strchr(behavior_pool.arr[curr].host, ':');
868                                         if (colon != NULL) {
869                                             *colon = '\0';
870                                         }
871 
872                                         behavior_pool.arr[curr].port = jDirect->valueint;
873 
874                                         curr++;
875                                     } else {
876                                         break;
877                                     }
878                                 } else {
879                                     break;
880                                 }
881                             } else {
882                                 break;
883                             }
884                         } else {
885                             break;
886                         }
887                     } else {
888                         break;
889                     }
890                 }
891 
892                 if (j >= nodes_num && curr > 0) {
893                     int   config_len = 200;
894                     char *config_str;
895                     /* Some unhealthy nodes might have been skipped, */
896                     /* so curr might be <= behavior_pool.num. */
897 
898                     behavior_pool.num = curr;
899 
900                     /* Create a config string that libmemcached likes, */
901                     /* such as "HOST:PORT,HOST:PORT,HOST:PORT". */
902 
903                     config_str = calloc(config_len, 1);
904 
905                     if (config_str != NULL) {
906                         for (j = 0; j < behavior_pool.num; j++) {
907                             /* Grow config string for libmemcached. */
908 
909                             char *config_end;
910                             int x = (int)(40 + /* For port and weight. */
911                                 strlen(config_str) +
912                                 strlen(behavior_pool.arr[j].host));
913                             if (config_len < x) {
914                                 config_len = 2 * (config_len + x);
915                                 config_str = realloc(config_str, config_len);
916                                 if (config_str == NULL) {
917                                     break;
918                                 }
919                             }
920 
921                             config_end = config_str + strlen(config_str);
922                             if (config_end != config_str) {
923                                 *config_end++ = ',';
924                             }
925 
926                             if (strlen(behavior_pool.arr[j].host) > 0 &&
927                                 behavior_pool.arr[j].port > 0) {
928                                 snprintf(config_end,
929                                          config_len - (config_end - config_str),
930                                          "%s:%u",
931                                          behavior_pool.arr[j].host,
932                                          behavior_pool.arr[j].port);
933                             } else {
934                                 if (settings.verbose > 1) {
935                                     moxi_log_write("ERROR: conjk missing host/port %d in %s from %s\n",
936                                                    j, name, src);
937                                 }
938                             }
939 
940                             if (behavior_pool.arr[j].downstream_weight > 0) {
941                                 config_end = config_str + strlen(config_str);
942                                 snprintf(config_end,
943                                          config_len - (config_end - config_str),
944                                          ":%u",
945                                          behavior_pool.arr[j].downstream_weight);
946                             }
947                         }
948 
949                         if (config_str != NULL) {
950                             if (j >= behavior_pool.num) {
951                                 cproxy_on_config_pool(m, name, proxyb.port_listen,
952                                                       config_str, new_config_ver,
953                                                       &behavior_pool);
954                                 rv = true;
955                             }
956 
957                             free(config_str);
958                         }
959                     } else {
960                         if (settings.verbose > 1) {
961                             moxi_log_write("ERROR: oom on jk re-config str\n");;
962                         }
963                     }
964                 } else {
965                     if (settings.verbose > 1) {
966                         moxi_log_write("ERROR: conjk parse error for config %d from %s in %s\n",
967                                        j, src, config);
968                     }
969                 }
970 
971                 free(behavior_pool.arr);
972             } else {
973                 if (settings.verbose > 1) {
974                     moxi_log_write("ERROR: oom on jk re-config\n");;
975                 }
976             }
977         } else {
978             if (settings.verbose > 1) {
979                 moxi_log_write("ERROR: conjk empty serverList/nodes in re-config\n");;
980             }
981         }
982     } else {
983         if (settings.verbose > 1) {
984             moxi_log_write("ERROR: conjk no serverList/nodes in re-config\n");;
985         }
986     }
987 
988     cJSON_Delete(jConfig);
989 
990     return rv;
991 }
992 
cproxy_parse_json_auth(char *config, char *name, proxy_behavior_pool *bp)993 static void cproxy_parse_json_auth(char *config,
994                                    char *name,
995                                    proxy_behavior_pool *bp) {
996     cJSON *jConfig;
997     strncpy(bp->base.usr, name, sizeof(bp->base.usr) - 1);
998     bp->base.usr[sizeof(bp->base.usr) - 1] = '\0';
999 
1000     jConfig = cJSON_Parse(config);
1001     if (jConfig != NULL) {
1002         cJSON *jPassword = cJSON_GetObjectItem(jConfig, "saslPassword");
1003         if (jPassword != NULL &&
1004             jPassword->type == cJSON_String &&
1005             jPassword->valuestring != NULL) {
1006             strncpy(bp->base.pwd,
1007                     jPassword->valuestring,
1008                     sizeof(bp->base.pwd) - 1);
1009             bp->base.pwd[sizeof(bp->base.pwd) - 1] = '\0';
1010         }
1011 
1012         cJSON_Delete(jConfig);
1013     }
1014 }
1015 
1016 static
cproxy_on_config(void *data0, void *data1)1017 void cproxy_on_config(void *data0, void *data1) {
1018     proxy_main *m = data0;
1019     kvpair_t *kvs = data1;
1020     uint32_t max_config_ver = 0;
1021     proxy *p;
1022     uint32_t new_config_ver;
1023     char **urlv;
1024     char  *url;
1025     char **contents;
1026 
1027 
1028     assert(m);
1029     assert(kvs);
1030     assert(is_listen_thread());
1031 
1032     m->stat_configs++;
1033 
1034     cb_mutex_enter(&m->proxy_main_lock);
1035 
1036     for (p = m->proxy_head; p != NULL; p = p->next) {
1037         cb_mutex_enter(&p->proxy_lock);
1038         if (max_config_ver < p->config_ver) {
1039             max_config_ver = p->config_ver;
1040         }
1041         cb_mutex_exit(&p->proxy_lock);
1042     }
1043 
1044     cb_mutex_exit(&m->proxy_main_lock);
1045 
1046     new_config_ver = max_config_ver + 1;
1047 
1048     if (settings.verbose > 2) {
1049         moxi_log_write("conc new_config_ver %u\n", new_config_ver);
1050     }
1051 
1052     urlv = get_key_values(kvs, "url"); /* NULL delimited array of char *. */
1053     url  = urlv != NULL ? (urlv[0] != NULL ? urlv[0] : "") : "";
1054 
1055     contents = get_key_values(kvs, "contents");
1056     if (contents != NULL && contents[0] != NULL) {
1057         char *config = trimstrdup(contents[0]);
1058         if (config != NULL &&
1059             strlen(config) > 0) {
1060             cproxy_on_config_json(m, new_config_ver, config, url);
1061 
1062             free(config);
1063         } else {
1064             moxi_log_write("ERROR: invalid, empty config from REST server %s\n",
1065                            url);
1066             goto fail;
1067         }
1068     } else {
1069         moxi_log_write("ERROR: invalid response from REST server %s\n",
1070                        url);
1071     }
1072 
1073     /* If there were any proxies that weren't updated in the */
1074     /* previous loop, we need to shut them down.  We mark the */
1075     /* proxy->config as NULL, and cproxy_check_downstream_config() */
1076     /* will catch it. */
1077 
1078     close_outdated_proxies(m, new_config_ver);
1079 
1080     free_kvpair(kvs);
1081 
1082     return;
1083 
1084  fail:
1085     m->stat_config_fails++;
1086     free_kvpair(kvs);
1087 
1088     if (settings.verbose > 1) {
1089         moxi_log_write("ERROR: conc failed config %"PRIu64"\n",
1090                        (uint64_t) m->stat_config_fails);
1091     }
1092 }
1093 
close_outdated_proxies(proxy_main *m, uint32_t new_config_ver)1094 void close_outdated_proxies(proxy_main *m, uint32_t new_config_ver) {
1095     /* TODO: Close any listening conns for the proxy? */
1096     /* TODO: Close any upstream conns for the proxy? */
1097     /* TODO: We still need to free proxy memory, after all its */
1098     /*       proxy_td's and downstreams are closed, and no more */
1099     /*       upstreams are pointed at the proxy. */
1100 
1101     proxy_behavior_pool empty_pool;
1102     proxy *p;
1103     memset(&empty_pool, 0, sizeof(proxy_behavior_pool));
1104 
1105     empty_pool.base = m->behavior;
1106     empty_pool.num  = 0;
1107     empty_pool.arr  = NULL;
1108 
1109     cb_mutex_enter(&m->proxy_main_lock);
1110 
1111     for (p = m->proxy_head; p != NULL; p = p->next) {
1112         bool  down = false;
1113         int   port = 0;
1114         char *name = NULL;
1115 
1116         cb_mutex_enter(&p->proxy_lock);
1117 
1118         if (p->config_ver != new_config_ver) {
1119             down = true;
1120 
1121             assert(p->port > 0);
1122             assert(p->name != NULL);
1123 
1124             port = p->port;
1125             name = strdup(p->name);
1126         }
1127 
1128         cb_mutex_exit(&p->proxy_lock);
1129 
1130         cb_mutex_exit(&m->proxy_main_lock);
1131 
1132         /* Note, we don't want to own the proxy_main_lock here */
1133         /* because cproxy_on_config_pool() may scatter/gather */
1134         /* calls against the worker threads, and the worked threads */
1135         /* should not deadlock if they need the proxy_main_lock. */
1136 
1137         /* Also, check that we're not shutting down the NULL_BUCKET. */
1138 
1139         /* Otherwise, passing in a NULL config string signals that */
1140         /* a bucket's proxy struct should be shut down. */
1141 
1142         if (name != NULL) {
1143             if (down && (strcmp(NULL_BUCKET, name) != 0)) {
1144                 cproxy_on_config_pool(m, name, port, NULL, new_config_ver,
1145                                       &empty_pool);
1146             }
1147 
1148             free(name);
1149         }
1150 
1151         cb_mutex_enter(&m->proxy_main_lock);
1152     }
1153 
1154     cb_mutex_exit(&m->proxy_main_lock);
1155 }
1156 
1157 /**
1158  * A name and port uniquely identify a proxy.
1159  */
cproxy_on_config_pool(proxy_main *m, char *name, int port, char *config, uint32_t config_ver, proxy_behavior_pool *behavior_pool)1160 void cproxy_on_config_pool(proxy_main *m,
1161                            char *name, int port,
1162                            char *config,
1163                            uint32_t config_ver,
1164                            proxy_behavior_pool *behavior_pool) {
1165     bool found = false;
1166     proxy *p;
1167 
1168     assert(m);
1169     assert(name != NULL);
1170     assert(port >= 0);
1171     assert(is_listen_thread());
1172 
1173     /* See if we've already got a proxy running with that name and port, */
1174     /* and create one if needed. */
1175 
1176 
1177     cb_mutex_enter(&m->proxy_main_lock);
1178 
1179     p = m->proxy_head;
1180     while (p != NULL && !found) {
1181         cb_mutex_enter(&p->proxy_lock);
1182 
1183         assert(p->port > 0);
1184         assert(p->name != NULL);
1185 
1186         found = ((p->port == port) &&
1187                  (strcmp(p->name, name) == 0));
1188 
1189         cb_mutex_exit(&p->proxy_lock);
1190 
1191         if (found) {
1192             break;
1193         }
1194 
1195         p = p->next;
1196     }
1197 
1198     cb_mutex_exit(&m->proxy_main_lock);
1199 
1200     if (p == NULL) {
1201         p = cproxy_create(m, name, port,
1202                           config,
1203                           config_ver,
1204                           behavior_pool,
1205                           m->nthreads);
1206         if (p != NULL) {
1207             int n;
1208 
1209             cb_mutex_enter(&m->proxy_main_lock);
1210 
1211             p->next = m->proxy_head;
1212             m->proxy_head = p;
1213 
1214             cb_mutex_exit(&m->proxy_main_lock);
1215 
1216             n = cproxy_listen(p);
1217             if (n > 0) {
1218                 if (settings.verbose > 2) {
1219                     moxi_log_write(
1220                             "cproxy_listen success %u for %s to %s with %d conns\n",
1221                             p->port, p->name, p->config, n);
1222                 }
1223                 m->stat_proxy_starts++;
1224             } else {
1225                 if (settings.verbose > 1) {
1226                     moxi_log_write("ERROR: cproxy_listen failed on %u to %s\n",
1227                             p->port, p->config);
1228                 }
1229                 m->stat_proxy_start_fails++;
1230             }
1231         } else {
1232             if (settings.verbose > 2) {
1233                 moxi_log_write("ERROR: cproxy_create failed on %s, %d, %s\n",
1234                                name, port, config);
1235             }
1236         }
1237     } else {
1238         bool changed  = false;
1239         bool shutdown_flag = false;
1240         int i;
1241 
1242         if (settings.verbose > 2) {
1243             moxi_log_write("conp existing config change %u\n",
1244                     p->port);
1245         }
1246 
1247         cb_mutex_enter(&m->proxy_main_lock);
1248 
1249         /* Turn off the front_cache while we're reconfiguring. */
1250 
1251         mcache_stop(&p->front_cache);
1252         matcher_stop(&p->front_cache_matcher);
1253         matcher_stop(&p->front_cache_unmatcher);
1254 
1255         matcher_stop(&p->optimize_set_matcher);
1256 
1257         cb_mutex_enter(&p->proxy_lock);
1258 
1259         if (settings.verbose > 2) {
1260             if (p->config && config &&
1261                 strcmp(p->config, config) != 0) {
1262                 moxi_log_write("conp config changed from %s to %s\n",
1263                         p->config, config);
1264             }
1265         }
1266 
1267         changed =
1268             update_str_config(&p->config, config,
1269                               "conp config changed") ||
1270             changed;
1271 
1272         changed =
1273             (cproxy_equal_behavior(&p->behavior_pool.base,
1274                                    &behavior_pool->base) == false) ||
1275             changed;
1276 
1277         p->behavior_pool.base = behavior_pool->base;
1278 
1279         changed =
1280             update_behaviors_config(&p->behavior_pool.arr,
1281                                     &p->behavior_pool.num,
1282                                     behavior_pool->arr,
1283                                     behavior_pool->num,
1284                                     "conp behaviors changed") ||
1285             changed;
1286 
1287         if (p->config != NULL &&
1288             p->behavior_pool.arr != NULL) {
1289             m->stat_proxy_existings++;
1290         } else {
1291             m->stat_proxy_shutdowns++;
1292             shutdown_flag = true;
1293         }
1294 
1295         assert(config_ver != p->config_ver);
1296 
1297         p->config_ver = config_ver;
1298 
1299         cb_mutex_exit(&p->proxy_lock);
1300 
1301         if (settings.verbose > 2) {
1302             moxi_log_write("conp changed %s, shutdown %s\n",
1303                     changed ? "true" : "false",
1304                     shutdown_flag ? "true" : "false");
1305         }
1306 
1307         /* Restart the front_cache, if necessary. */
1308 
1309         if (shutdown_flag == false) {
1310             if (behavior_pool->base.front_cache_max > 0 &&
1311                 behavior_pool->base.front_cache_lifespan > 0) {
1312                 mcache_start(&p->front_cache,
1313                              behavior_pool->base.front_cache_max);
1314 
1315                 if (strlen(behavior_pool->base.front_cache_spec) > 0) {
1316                     matcher_start(&p->front_cache_matcher,
1317                                   behavior_pool->base.front_cache_spec);
1318                 }
1319 
1320                 if (strlen(behavior_pool->base.front_cache_unspec) > 0) {
1321                     matcher_start(&p->front_cache_unmatcher,
1322                                   behavior_pool->base.front_cache_unspec);
1323                 }
1324             }
1325 
1326             if (strlen(behavior_pool->base.optimize_set) > 0) {
1327                 matcher_start(&p->optimize_set_matcher,
1328                               behavior_pool->base.optimize_set);
1329             }
1330         }
1331 
1332         /* Send update across worker threads, avoiding locks. */
1333 
1334         for (i = 1; i < m->nthreads; i++) {
1335             LIBEVENT_THREAD *t = thread_by_index(i);
1336             proxy_td *ptd;
1337 
1338             assert(t);
1339             assert(t->work_queue);
1340 
1341             ptd = &p->thread_data[i];
1342             if (t &&
1343                 t->work_queue) {
1344                 work_send(t->work_queue, update_ptd_config, ptd, NULL);
1345             }
1346         }
1347 
1348         cb_mutex_exit(&m->proxy_main_lock);
1349 
1350         if (settings.verbose > 2) {
1351             moxi_log_write("conp changed %s, %d\n",
1352                            changed ? "true" : "false", config_ver);
1353         }
1354     }
1355 }
1356 
1357 /* ---------------------------------------------------------- */
1358 
update_ptd_config(void *data0, void *data1)1359 static void update_ptd_config(void *data0, void *data1) {
1360     proxy_td *ptd;
1361     proxy *p;
1362     bool changed = false;
1363     int  port;
1364     int  prev;
1365 
1366     (void)data1;
1367 
1368     ptd = data0;
1369     assert(ptd);
1370 
1371     p = ptd->proxy;
1372     assert(p);
1373 
1374     assert(is_listen_thread() == false); /* Expecting a worker thread. */
1375 
1376     cb_mutex_enter(&p->proxy_lock);
1377 
1378     port = p->port;
1379     prev = ptd->config_ver;
1380 
1381     if (ptd->config_ver != p->config_ver) {
1382         ptd->config_ver = p->config_ver;
1383 
1384         changed =
1385             update_str_config(&ptd->config, p->config, NULL) ||
1386             changed;
1387 
1388         ptd->behavior_pool.base = p->behavior_pool.base;
1389 
1390         changed =
1391             update_behaviors_config(&ptd->behavior_pool.arr,
1392                                     &ptd->behavior_pool.num,
1393                                     p->behavior_pool.arr,
1394                                     p->behavior_pool.num,
1395                                     NULL) ||
1396             changed;
1397     }
1398 
1399     cb_mutex_exit(&p->proxy_lock);
1400 
1401     /* Restart the key_stats, if necessary. */
1402 
1403     if (changed) {
1404         mcache_stop(&ptd->key_stats);
1405         matcher_stop(&ptd->key_stats_matcher);
1406         matcher_stop(&ptd->key_stats_unmatcher);
1407 
1408         if (ptd->config != NULL) {
1409             if (ptd->behavior_pool.base.key_stats_max > 0 &&
1410                 ptd->behavior_pool.base.key_stats_lifespan > 0) {
1411                 mcache_start(&ptd->key_stats,
1412                              ptd->behavior_pool.base.key_stats_max);
1413 
1414                 if (strlen(ptd->behavior_pool.base.key_stats_spec) > 0) {
1415                     matcher_start(&ptd->key_stats_matcher,
1416                                   ptd->behavior_pool.base.key_stats_spec);
1417                 }
1418 
1419                 if (strlen(ptd->behavior_pool.base.key_stats_unspec) > 0) {
1420                     matcher_start(&ptd->key_stats_unmatcher,
1421                                   ptd->behavior_pool.base.key_stats_unspec);
1422                 }
1423             }
1424         }
1425 
1426         if (settings.verbose > 2) {
1427             moxi_log_write("update_ptd_config %u, %u to %u\n",
1428                     port, prev, ptd->config_ver);
1429         }
1430     } else {
1431         if (settings.verbose > 2) {
1432             moxi_log_write("update_ptd_config %u, %u = %u no change\n",
1433                     port, prev, ptd->config_ver);
1434         }
1435     }
1436 }
1437 
1438 /* ---------------------------------------------------------- */
1439 
update_str_config(char **curr, char *next, char *descrip)1440 static bool update_str_config(char **curr, char *next, char *descrip) {
1441     bool rv = false;
1442 
1443     if ((*curr != NULL) &&
1444         (next == NULL ||
1445          strcmp(*curr, next) != 0)) {
1446         free(*curr);
1447         *curr = NULL;
1448 
1449         rv = true;
1450 
1451         if (descrip != NULL &&
1452             settings.verbose > 2) {
1453             moxi_log_write("%s\n", descrip);
1454         }
1455     }
1456     if (*curr == NULL && next != NULL) {
1457         *curr = trimstrdup(next);
1458     }
1459 
1460     return rv;
1461 }
1462 
update_behaviors_config(proxy_behavior **curr, int *curr_num, proxy_behavior *next, int next_num, char *descrip)1463 static bool update_behaviors_config(proxy_behavior **curr,
1464                                     int  *curr_num,
1465                                     proxy_behavior  *next,
1466                                     int   next_num,
1467                                     char *descrip) {
1468     bool rv = false;
1469 
1470     if ((*curr != NULL) &&
1471         (next == NULL ||
1472          cproxy_equal_behaviors(*curr_num,
1473                                 *curr,
1474                                 next_num,
1475                                 next) == false)) {
1476         free(*curr);
1477         *curr = NULL;
1478         *curr_num = 0;
1479 
1480         rv = true;
1481 
1482         if (descrip != NULL &&
1483             settings.verbose > 2) {
1484             moxi_log_write("%s\n", descrip);
1485         }
1486     }
1487     if (*curr == NULL && next != NULL) {
1488         *curr = cproxy_copy_behaviors(next_num,
1489                                       next);
1490         *curr_num = next_num;
1491     }
1492 
1493     return rv;
1494 }
1495 
1496 /* ---------------------------------------------------------- */
1497 
1498 /**
1499  * Parse server-level behaviors from a pool into a given
1500  * array of behaviors, one entry for each server.
1501  *
1502  * An example prefix is "svr".
1503  */
parse_kvs_servers(char *prefix, char *pool_name, kvpair_t *kvs, char **servers, proxy_behavior_pool *behavior_pool)1504 char *parse_kvs_servers(char *prefix,
1505                         char *pool_name,
1506                         kvpair_t *kvs,
1507                         char **servers,
1508                         proxy_behavior_pool *behavior_pool) {
1509 
1510     int   config_len = 200;
1511     char *config_str;
1512     int j;
1513 
1514     assert(prefix);
1515     assert(pool_name);
1516     assert(kvs);
1517     assert(servers);
1518     assert(behavior_pool);
1519     assert(behavior_pool->arr);
1520 
1521     if (behavior_pool->num <= 0) {
1522         return NULL;
1523     }
1524 
1525     /* Create a config string that libmemcached likes. */
1526     /* See memcached_servers_parse(). */
1527 
1528     config_str = calloc(config_len, 1);
1529 
1530     for (j = 0; servers[j]; j++) {
1531         int x;
1532         char *config_end;
1533 
1534         assert(j < behavior_pool->num);
1535 
1536         /* Inherit default behavior. */
1537 
1538         behavior_pool->arr[j] = behavior_pool->base;
1539 
1540         parse_kvs_behavior(kvs, prefix, servers[j],
1541                            &behavior_pool->arr[j]);
1542 
1543         /* Grow config string for libmemcached. */
1544 
1545         x = (int)(40 + /* For port and weight. */
1546             strlen(config_str) +
1547             strlen(behavior_pool->arr[j].host));
1548         if (config_len < x) {
1549             config_len = 2 * (config_len + x);
1550             config_str = realloc(config_str, config_len);
1551         }
1552 
1553         config_end = config_str + strlen(config_str);
1554         if (config_end != config_str) {
1555             *config_end++ = ',';
1556         }
1557 
1558         if (strlen(behavior_pool->arr[j].host) > 0 &&
1559             behavior_pool->arr[j].port > 0) {
1560             snprintf(config_end,
1561                      config_len - (config_end - config_str),
1562                      "%s:%u",
1563                      behavior_pool->arr[j].host,
1564                      behavior_pool->arr[j].port);
1565         } else {
1566             if (settings.verbose > 1) {
1567                 moxi_log_write("ERROR: missing host:port for svr-%s in %s\n",
1568                         servers[j], pool_name);
1569             }
1570         }
1571 
1572         if (behavior_pool->arr[j].downstream_weight > 0) {
1573             config_end = config_str + strlen(config_str);
1574             snprintf(config_end,
1575                      config_len - (config_end - config_str),
1576                      ":%u",
1577                      behavior_pool->arr[j].downstream_weight);
1578         }
1579 
1580         if (settings.verbose > 2) {
1581             cproxy_dump_behavior(&behavior_pool->arr[j],
1582                                  "pks", 0);
1583         }
1584     }
1585 
1586     return config_str;
1587 }
1588 
1589 /* ---------------------------------------------------------- */
1590 
1591 /**
1592  * Parse a "[prefix]-[name]" configuration section into a behavior.
1593  */
parse_kvs_behavior(kvpair_t *kvs, char *prefix, char *name, proxy_behavior *behavior)1594 char **parse_kvs_behavior(kvpair_t *kvs,
1595                           char *prefix,
1596                           char *name,
1597                           proxy_behavior *behavior) {
1598     char key[800];
1599     char **props;
1600     int k;
1601 
1602     assert(kvs);
1603     assert(prefix);
1604     assert(name);
1605     assert(behavior);
1606 
1607 
1608     snprintf(key, sizeof(key), "%s-%s", prefix, name);
1609 
1610     props = get_key_values(kvs, key);
1611     for (k = 0; props && props[k]; k++) {
1612         char *key_val = trimstrdup(props[k]);
1613         if (key_val != NULL) {
1614             cproxy_parse_behavior_key_val_str(key_val, behavior);
1615             free(key_val);
1616         }
1617     }
1618 
1619     return props;
1620 }
1621 
1622 /* ---------------------------------------------------------- */
1623 
get_key_values(kvpair_t *kvs, char *key)1624 char **get_key_values(kvpair_t *kvs, char *key) {
1625     kvpair_t *x = find_kvpair(kvs, key);
1626     if (x != NULL) {
1627         return x->values;
1628     }
1629     return NULL;
1630 }
1631