xref: /5.5.2/moxi/src/agent_config.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "src/config.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <errno.h>
7#include <platform/cbassert.h>
8#include "memcached.h"
9#include "cproxy.h"
10#include "work.h"
11#include "agent.h"
12#include "log.h"
13#include "cJSON.h"
14
15/* Integration with libconflate. */
16
17static void update_ptd_config(void *data0, void *data1);
18static bool update_str_config(char **curr, char *next, char *descrip);
19
20static bool update_behaviors_config(proxy_behavior **curr,
21                                    int  *curr_num,
22                                    proxy_behavior  *next,
23                                    int   next_num,
24                                    char *descrip);
25
26void close_outdated_proxies(proxy_main *m, uint32_t new_config_ver);
27
28char *parse_kvs_servers(char *prefix,
29                        char *pool_name,
30                        kvpair_t *kvs,
31                        char **servers,
32                        proxy_behavior_pool *behavior_pool);
33
34char **parse_kvs_behavior(kvpair_t *kvs,
35                          char *prefix,
36                          char *name,
37                          proxy_behavior *behavior);
38
39static void cproxy_parse_json_auth(char *config,
40                                   char *name,
41                                   proxy_behavior_pool *bp);
42
43static void cproxy_init_null_bucket(proxy_main *m);
44
45static void cproxy_on_config(void *data0, void *data1);
46
47static void agent_logger(void *userdata,
48                         enum conflate_log_level lvl,
49                         const char *msg, ...)
50{
51    (void)userdata;
52    (void)lvl;
53    (void)msg;
54/* Issues compiling vfprintf(), so turn off this unused code path for now. */
55#undef AGENT_LOGGER
56#ifdef AGENT_LOGGER
57    char *n = NULL;
58    bool v = false;
59
60    switch(lvl) {
61    case FATAL: n = "FATAL"; v = settings.verbose > 0; break;
62    case ERROR: n = "ERROR"; v = settings.verbose > 0; break;
63    case WARN:  n = "WARN";  v = settings.verbose > 1; break;
64    case INFO:  n = "INFO";  v = settings.verbose > 1; break;
65    case DEBUG: n = "DEBUG"; v = settings.verbose > 2; break;
66    }
67    if (!v) {
68        return;
69    }
70
71    char fmt[strlen(msg) + 16];
72    snprintf(fmt, sizeof(fmt), "%s: %s\n", n, msg);
73
74    va_list ap;
75    va_start(ap, msg);
76    vfprintf(fmt, ap);
77    va_end(ap);
78#endif
79}
80
81static void init_extensions(void)
82{
83    conflate_register_mgmt_cb("client_stats", "Retrieve stats from moxi",
84                              on_conflate_get_stats);
85    conflate_register_mgmt_cb("reset_stats", "Reset moxi stats",
86                              on_conflate_reset_stats);
87    conflate_register_mgmt_cb("ping_test", "Perform a ping test",
88                              on_conflate_ping_test);
89}
90
91/** The cfg_str looks like...
92 *
93 *    apikey=jidname@jhostname%jpassword,config=config,host=host
94 *      or...
95 *    jidname@jhostname%jpassword,config=config,host=host
96 *
97 *  Only the apikey is needed, so it can also look like...
98 *
99 *    jidname@jhostname%jpassword
100 *
101 *  Or...
102 *
103 *    http://host:port/default/pools/bucketsStreamingConfig/default
104 *    url=http://host:port/default/pools/bucketsStreamingConfig/default
105 *    auth=,url=http://host:port/default/pools/bucketsStreamingConfig/default
106 *    auth=USER%PSWD,url=http://host:port/default/pools/bucketsStreamingConfig/default
107 *    auth=Administrator%password,url=http://host:port/default/pools/bucketsStreamingConfig/default
108 */
109int cproxy_init_agent(char *cfg_str,
110                      proxy_behavior behavior,
111                      int nthreads) {
112    int cfg_len;
113    char *buff;
114    char *next;
115    int rv = 0;
116
117    init_extensions();
118
119    if (cfg_str == NULL) {
120        moxi_log_write("ERROR: missing cfg\n");
121        if (ml->log_mode != ERRORLOG_STDERR) {
122            fprintf(stderr, "ERROR: missing cfg\n");
123        }
124        exit(EXIT_FAILURE);
125    }
126
127    cfg_len = (int)strlen(cfg_str);
128    if (cfg_len <= 0) {
129        moxi_log_write("ERROR: empty cfg\n");
130        if (ml->log_mode != ERRORLOG_STDERR) {
131            fprintf(stderr, "ERROR: empty cfg\n");
132        }
133        exit(EXIT_FAILURE);
134    }
135
136    if (strncmp(cfg_str, "apikey=", 7) == 0 ||
137        strncmp(cfg_str, "auth=", 5) == 0 ||
138        strncmp(cfg_str, "url=", 4) == 0) {
139        buff = trimstrdup(cfg_str);
140    } else {
141        buff = calloc(cfg_len + 50, sizeof(char));
142        if (buff != NULL) {
143            if (strncmp(cfg_str, "http://", 7) == 0) {
144                char *x;
145                snprintf(buff, cfg_len + 50, "url=%s", cfg_str);
146
147                /* Allow the user to specify multiple comma-separated URL's, */
148                /* which we auto-translate right now to the '|' separators */
149                /* that the rest of the code expects. */
150
151                for (x = buff; *x; x++) {
152                    if (*x == ',') {
153                        *x = '|';
154                    }
155                }
156            } else {
157                strcpy(buff, cfg_str);
158            }
159        }
160        buff = trimstr(buff);
161    }
162
163    next = buff;
164
165    while (next != NULL) {
166        char *jid    = behavior.usr;
167        char *jpw    = behavior.pwd;
168        char *jpwmem = NULL;
169        char *dbpath = NULL;
170        char *host   = NULL;
171        int dbpath_alloc = 0;
172
173        char *cur = trimstr(strsep(&next, ";"));
174        while (cur != NULL) {
175            char *key_val = trimstr(strsep(&cur, ",\r\n"));
176            if (key_val != NULL) {
177                char *key = trimstr(strsep(&key_val, "="));
178                char *val = trimstr(key_val);
179
180                bool handled = true;
181
182                if (key != NULL &&
183                    val != NULL) {
184                    if (wordeq(key, "apikey") ||
185                        wordeq(key, "auth")) {
186                        jid = strsep(&val, "%");
187                        jpw = val;
188                    } else if (wordeq(key, "config") ||
189                               wordeq(key, "dbpath")) {
190                        dbpath = val;
191                    } else if (wordeq(key, "host") ||
192                               wordeq(key, "url")) {
193                        host = val;
194                    } else {
195                        handled = false;
196                    }
197                } else {
198                    handled = false;
199                }
200
201                if (handled == false &&
202                    key != NULL &&
203                    key[0] != '#' &&
204                    key[0] != '\0') {
205                    if (settings.verbose > 0) {
206                        moxi_log_write("unknown configuration key: %s\n", key);
207                    }
208                }
209            }
210        }
211
212        if (jid == NULL) {
213            jid = "";
214        }
215
216        if (jpw == NULL) {
217            /* Handle if jid/jpw is in user:password@fqdn format */
218            /* instead of user@fqdn%password format. */
219
220            char *colon = strchr(jid, ':');
221            char *asign = strchr(jid, '@');
222            if (colon != NULL &&
223                asign != NULL &&
224                asign > colon) {
225                *asign = '\0';
226                jpw = jpwmem = strdup(colon + 1);
227                *asign = '@';
228                do {
229                    *colon = *asign;
230                    colon++;
231                    asign++;
232                } while (*asign != '\0');
233                *colon = '\0';
234            }
235        }
236
237        if (jpw == NULL) {
238            jpw = "";
239        }
240
241        dbpath_alloc = 0;
242        if (dbpath == NULL) {
243            dbpath_alloc = (int)(strlen(jid) + strlen(CONFLATE_DB_PATH) + 100);
244            dbpath = calloc(dbpath_alloc, 1);
245            if (dbpath != NULL) {
246                snprintf(dbpath, dbpath_alloc,
247                         CONFLATE_DB_PATH "/conflate-%s.cfg",
248                         (jid != NULL && strlen(jid) > 0 ? jid : "default"));
249
250            } else {
251                moxi_log_write("ERROR: conflate dbpath buf alloc\n");
252                exit(EXIT_FAILURE);
253            }
254        }
255
256        if (settings.verbose > 1) {
257            moxi_log_write("cproxy_init jid: %s host: %s\n", jid, host);
258        }
259
260        if (cproxy_init_agent_start(jid, jpw, dbpath, host,
261                                    behavior,
262                                    nthreads) != NULL) {
263            rv++;
264        }
265
266        if (dbpath_alloc > 0 &&
267            dbpath != NULL) {
268            free(dbpath);
269        }
270
271        if (jpwmem) {
272            free(jpwmem);
273        }
274    }
275
276    free(buff);
277
278    return rv;
279}
280
281proxy_main *cproxy_init_agent_start(char *jid,
282                                    char *jpw,
283                                    char *dbpath,
284                                    char *host,
285                                    proxy_behavior behavior,
286                                    int nthreads) {
287    proxy_main *m;
288    cb_assert(dbpath);
289
290    if (settings.verbose > 2) {
291        moxi_log_write("cproxy_init_agent_start\n");;
292    }
293
294    m = cproxy_gen_proxy_main(behavior, nthreads,
295                              PROXY_CONF_TYPE_DYNAMIC);
296    if (m != NULL) {
297        conflate_config_t config;
298        /* Create a NULL_BUCKET when we're not in "FIRST_BUCKET" mode. */
299
300        /* FIRST_BUCKET mode means clients start off in the first */
301        /* configured bucket (and this is usually the case for */
302        /* standalone moxi). */
303
304        /* Otherwise (when not in FIRST_BUCKET mode)... */
305        /* -- new clients start off in a configured, named default */
306        /* bucket (whose name is usually configured to be "default"), */
307        /* if it exists. */
308        /* -- if the named default bucket doesn't exist, new */
309        /* clients then start off in the NULL_BUCKET. */
310
311        if (strcmp(behavior.default_bucket_name, FIRST_BUCKET) != 0) {
312            if (settings.verbose > 2) {
313                moxi_log_write("initializing null bucket, default is: %s\n",
314                               behavior.default_bucket_name);
315            }
316
317            cproxy_init_null_bucket(m);
318        } else {
319            if (settings.verbose > 2) {
320                moxi_log_write("using first bucket\n");
321            }
322        }
323
324        memset(&config, 0, sizeof(config));
325
326        init_conflate(&config);
327
328        /* Different jid's possible for production, staging, etc. */
329        config.jid  = jid;  /* "customer@stevenmb.local" or */
330                            /* "Administrator" */
331        config.pass = jpw;  /* "password" */
332        config.host = host; /* "localhost" or */
333                            /* "http://x.com:8091" */
334                            /* "http://x.com:8091/pools/default/buckets/default" */
335        config.software   = PACKAGE;
336        config.version    = VERSION;
337        config.save_path  = dbpath;
338        config.userdata   = m;
339        config.new_config = on_conflate_new_config;
340        config.log        = agent_logger;
341
342        if (!config.host || config.host[0] == '\0') {
343            moxi_log_write("ERROR: missing -z configuration for url/host\n");
344            if (ml->log_mode != ERRORLOG_STDERR) {
345                fprintf(stderr, "ERROR: missing -z configuration for url/host\n");
346            }
347            exit(EXIT_FAILURE);
348        }
349
350        if (start_conflate(config)) {
351            if (settings.verbose > 2) {
352                moxi_log_write("cproxy_init_agent_start done\n");
353            }
354
355            return m;
356        }
357
358        free(m);
359    }
360
361    if (settings.verbose > 1) {
362        moxi_log_write("cproxy could not start conflate\n");
363    }
364
365    return NULL;
366}
367
368static void cproxy_init_null_bucket(proxy_main *m) {
369    proxy_behavior proxyb = m->behavior;
370
371    int pool_port = proxyb.port_listen;
372    int nodes_num = 0;
373
374    if (pool_port > 0) {
375        proxy_behavior_pool behavior_pool;
376        memset(&behavior_pool, 0, sizeof(behavior_pool));
377        behavior_pool.base = proxyb;
378        behavior_pool.num  = nodes_num;
379        behavior_pool.arr  = calloc(nodes_num + 1, sizeof(proxy_behavior));
380
381        if (behavior_pool.arr != NULL) {
382            cproxy_on_config_pool(m, NULL_BUCKET, pool_port,
383                                  "", 0, &behavior_pool);
384            free(behavior_pool.arr);
385        }
386    }
387}
388
389conflate_result on_conflate_new_config(void *userdata, kvpair_t *config) {
390    char **urlv;
391    char  *url;
392    char **contentsv;
393    char  *contents;
394    kvpair_t *copy;
395    LIBEVENT_THREAD *mthread;
396    proxy_main *m = userdata;
397    cb_assert(m != NULL);
398    cb_assert(config != NULL);
399
400    mthread = thread_by_index(0);
401    cb_assert(mthread != NULL);
402
403    if (settings.verbose > 0) {
404        moxi_log_write("configuration received\n");
405    }
406
407    urlv = get_key_values(config, "url"); /* NULL delimited array of char *. */
408    url  = urlv != NULL ? urlv[0] : NULL;
409    contentsv = get_key_values(config, "contents");
410    contents  = contentsv != NULL ? contentsv[0] : NULL;
411
412    if (url != NULL) {
413        char **http_code = get_key_values(config, "http_code");
414        if (http_code != NULL && atoi(http_code[0]) == 401) {
415            moxi_log_write("ERROR: Authentication failure\n");
416            exit(EXIT_FAILURE);
417        }
418        if (contents != NULL && strlen(contents) > 0) {
419            /* Must be a REST/JSON config.  Wastefully test parse it here, */
420            /* before we asynchronously invoke the real worker who can't */
421            /* respond nicely with an error code. */
422            bool ok = false;
423            cJSON *c = cJSON_Parse(contents);
424            if (c != NULL) {
425                ok = true;
426                cJSON_Delete(c);
427            }
428
429            if (!ok) {
430                moxi_log_write("ERROR: parse JSON failed, from REST server: %s, %s\n",
431                               url, contents);
432
433                return CONFLATE_ERROR_BAD_SOURCE;
434            }
435        }
436    }
437
438    copy = dup_kvpair(config);
439    if (copy != NULL) {
440        if (work_send(mthread->work_queue, cproxy_on_config, m, copy)) {
441            return CONFLATE_SUCCESS;
442        }
443
444        if (settings.verbose > 1) {
445            moxi_log_write("work_send failed\n");
446        }
447
448        return CONFLATE_ERROR;
449    }
450
451    if (settings.verbose > 1) {
452        moxi_log_write("agent_config ocnc failed dup_kvpair\n");
453    }
454
455    return CONFLATE_ERROR;
456}
457
458static bool cproxy_on_config_json_one(proxy_main *m, uint32_t new_config_ver,
459                                      char *config, char *name, char *src);
460
461static bool cproxy_on_config_json_one_vbucket(proxy_main *m,
462                                              uint32_t new_config_ver,
463                                              char *config,
464                                              char *name,
465                                              char *src);
466
467static bool cproxy_on_config_json_one_ketama(proxy_main *m,
468                                             uint32_t new_config_ver,
469                                             char *config,
470                                             char *name,
471                                             char *src);
472
473static bool cproxy_on_config_json_buckets(proxy_main *m, uint32_t new_config_ver,
474                                          cJSON *jBuckets, bool want_default,
475                                          char *src);
476
477static
478bool cproxy_on_config_json(proxy_main *m, uint32_t new_config_ver, char *config,
479                           char *src) {
480    bool rv = false;
481
482    cJSON *c = cJSON_Parse(config);
483    if (c != NULL) {
484        cJSON *jBuckets = cJSON_GetObjectItem(c, "buckets");
485        if (jBuckets != NULL &&
486            jBuckets->type == cJSON_Array) {
487            /* Make two passes through jBuckets, favoring any "default" */
488            /* bucket on the 1st pass, so the default bucket gets */
489            /* created earlier. */
490
491            bool rv1 = cproxy_on_config_json_buckets(m, new_config_ver, jBuckets,
492                                                     true, src);
493            bool rv2 = cproxy_on_config_json_buckets(m, new_config_ver, jBuckets,
494                                                     false, src);
495
496            rv = rv1 || rv2;
497        } else {
498            /* Just a single config. */
499
500            rv = cproxy_on_config_json_one(m, new_config_ver, config, "default", src);
501        }
502
503        cJSON_Delete(c);
504    } else {
505        moxi_log_write("ERROR: could not parse JSON from REST server: %s, %s\n",
506                       src, config);
507    }
508
509    return rv;
510}
511
512static
513bool cproxy_on_config_json_buckets(proxy_main *m, uint32_t new_config_ver,
514                                   cJSON *jBuckets, bool want_default,
515                                   char *src) {
516    bool rv = false;
517
518    int numBuckets = cJSON_GetArraySize(jBuckets);
519    int i;
520
521    for (i = 0; i < numBuckets; i++) {
522        cJSON *jBucket = cJSON_GetArrayItem(jBuckets, i);
523        if (jBucket != NULL &&
524            jBucket->type == cJSON_Object) {
525            char *name = "default";
526            bool is_default;
527
528            cJSON *jName = cJSON_GetObjectItem(jBucket, "name");
529            if (jName != NULL &&
530                jName->type == cJSON_String &&
531                jName->valuestring != NULL) {
532                name = jName->valuestring;
533            }
534
535            is_default = (strcmp(name, "default") == 0);
536            if (!(is_default ^ want_default)) { /* XOR. */
537                char *jBucketStr = cJSON_Print(jBucket);
538                if (jBucketStr != NULL) {
539                    rv = cproxy_on_config_json_one(m, new_config_ver,
540                                                   jBucketStr, name, src) || rv;
541                    free(jBucketStr);
542                }
543            }
544        }
545    }
546
547    return rv;
548}
549
550static
551bool cproxy_on_config_json_one(proxy_main *m, uint32_t new_config_ver,
552                               char *config, char *name,
553                               char *src) {
554    bool rv = false;
555    cb_assert(m != NULL);
556    cb_assert(config != NULL);
557    cb_assert(name != NULL);
558
559    /* Handle reconfiguration of a single proxy. */
560
561    if (m != NULL && config != NULL && strlen(config) > 0) {
562        cJSON *jConfig;
563
564        if (settings.verbose > 2) {
565            moxi_log_write("conjo contents config from %s: %s\n", src, config);
566        }
567
568        /* The config should be JSON that should look like... */
569
570        /* {"name":"default",                // The bucket name. */
571        /*  "nodeLocator":"ketama",          // Optional. */
572        /*  "saslPassword":"someSASLPwd", */
573        /*  "nodes":[{"hostname":"10.17.1.46","status":"healthy", */
574        /*            "version":"0.3.0_114_g31859fe","os":"i386-apple-darwin9.8.0", */
575        /*            "ports":{"proxy":11213,"direct":11212}}], */
576        /*  "buckets":{"uri":"/pools/default/buckets"}, */
577        /*  "controllers":{"ejectNode":{"uri":"/controller/ejectNode"}, */
578        /*  "testWorkload":{"uri":"/pools/default/controller/testWorkload"}}, */
579        /*  "stats":{"uri":"/pools/default/stats"}, */
580        /*  "vBucketServerMap":{ */
581        /*     "hashAlgorithm":"CRC", */
582        /*     "user":"optionalSASLUsr",     // Optional. */
583        /*     "password":"someSASLPwd",     // Optional. */
584        /*     "serverList":["10.17.1.46:11212"], */
585        /*     ...more json here...}} */
586
587        jConfig = cJSON_Parse(config);
588        if (jConfig != NULL) {
589            cJSON *jNodeLocator;
590            cJSON *jName = cJSON_GetObjectItem(jConfig, "name");
591            if (jName != NULL &&
592                jName->type == cJSON_String &&
593                jName->valuestring != NULL) {
594                name = jName->valuestring;
595            }
596
597            jNodeLocator = cJSON_GetObjectItem(jConfig, "nodeLocator");
598            if (jNodeLocator != NULL &&
599                jNodeLocator->type == cJSON_String &&
600                jNodeLocator->valuestring != NULL) {
601                if (strcmp(jNodeLocator->valuestring, "ketama") == 0) {
602                    rv = cproxy_on_config_json_one_ketama(m, new_config_ver,
603                                                          config, name, src);
604                    cJSON_Delete(jConfig);
605
606                    return rv;
607                }
608            }
609
610            rv = cproxy_on_config_json_one_vbucket(m, new_config_ver,
611                                                   config, name, src);
612            cJSON_Delete(jConfig);
613        }
614    } else {
615        if (settings.verbose > 1) {
616            moxi_log_write("ERROR: skipping empty config from %s\n", src);
617        }
618    }
619
620    return rv;
621}
622
623static
624bool cproxy_on_config_json_one_vbucket(proxy_main *m, uint32_t new_config_ver,
625                                       char *config, char *name,
626                                       char *src) {
627    bool rv = false;
628    VBUCKET_CONFIG_HANDLE vch;
629    cb_assert(m != NULL);
630
631    if (settings.verbose > 2) {
632        moxi_log_write("parsing config nodeLocator:vbucket\n");
633    }
634
635    vch = vbucket_config_parse_string(config);
636    if (vch) {
637        proxy_behavior proxyb;
638        int pool_port;
639        int nodes_num;
640
641        if (settings.verbose > 2) {
642            moxi_log_write("conc vbucket_config_parse_string: %d for %s\n",
643                           (vch != NULL), name);
644        }
645
646        proxyb = m->behavior;
647        strcpy(proxyb.nodeLocator, "vbucket");
648
649        pool_port = proxyb.port_listen;
650        nodes_num = vbucket_config_get_num_servers(vch);
651
652        if (settings.verbose > 2) {
653            moxi_log_write("conc pool_port: %d nodes_num: %d\n",
654                           pool_port, nodes_num);
655        }
656
657        if (pool_port > 0 && nodes_num > 0) {
658            proxy_behavior_pool behavior_pool;
659            memset(&behavior_pool, 0, sizeof(behavior_pool));
660
661            behavior_pool.base = proxyb;
662            behavior_pool.num  = nodes_num;
663            behavior_pool.arr  = calloc(nodes_num, sizeof(proxy_behavior));
664
665            if (behavior_pool.arr != NULL) {
666                int j = 0;
667                cproxy_parse_json_auth(config, name, &behavior_pool);
668
669                for (; j < nodes_num; j++) {
670                    /* Inherit default behavior. */
671                    const char *hostport;
672                    behavior_pool.arr[j] = behavior_pool.base;
673
674                    hostport = vbucket_config_get_server(vch, j);
675                    if (hostport != NULL &&
676                        strlen(hostport) > 0 &&
677                        strlen(hostport) < sizeof(behavior_pool.arr[j].host) - 1) {
678                        char *colon;
679
680                        strncpy(behavior_pool.arr[j].host,
681                                hostport,
682                                sizeof(behavior_pool.arr[j].host) - 1);
683                        behavior_pool.arr[j].host[sizeof(behavior_pool.arr[j].host) - 1] = '\0';
684
685                        colon = strchr(behavior_pool.arr[j].host, ':');
686                        if (colon != NULL) {
687                            *colon = '\0';
688                            behavior_pool.arr[j].port = atoi(colon + 1);
689                            if (behavior_pool.arr[j].port <= 0) {
690                                break;
691                            }
692                        } else {
693                            break;
694                        }
695                    } else {
696                        break;
697                    }
698                }
699
700                if (j >= nodes_num) {
701                    cproxy_on_config_pool(m, name, pool_port,
702                                          config, new_config_ver,
703                                          &behavior_pool);
704                    rv = true;
705                } else {
706                    if (settings.verbose > 1) {
707                        moxi_log_write("ERROR: error receiving host:port"
708                                       " from %s"
709                                       " for server config %d in %s\n",
710                                       src, j, config);
711                    }
712                }
713
714                free(behavior_pool.arr);
715            }
716        }
717
718        vbucket_config_destroy(vch);
719    } else {
720        moxi_log_write("ERROR: bad JSON configuration from %s: %s\n",
721                       src, vbucket_get_error());
722        if (ml->log_mode != ERRORLOG_STDERR) {
723            fprintf(stderr, "ERROR: bad JSON configuration from %s: %s\n",
724                    src, vbucket_get_error());
725        }
726
727        /* Bug 1961 - don't exit() as we might be in a multitenant use case. */
728
729        /* exit(EXIT_FAILURE); */
730    }
731
732    return rv;
733}
734
735static
736bool cproxy_on_config_json_one_ketama(proxy_main *m, uint32_t new_config_ver,
737                                      char *config, char *name,
738                                      char *src) {
739    bool rv = false;
740    cJSON *jConfig;
741    cJSON *jArr;
742    cJSON *jVBSM;
743
744    cb_assert(m != NULL);
745
746    if (settings.verbose > 2) {
747        moxi_log_write("parsing config nodeLocator:ketama\n");
748    }
749
750    /* First, try to iterate through jConfig.vBucketServerMap.serverList */
751    /* if it exists, otherwise iterate through jConfig.nodes. */
752
753    jConfig = cJSON_Parse(config);
754    if (jConfig == NULL) {
755        return false;
756    }
757
758    jArr = NULL;
759
760    jVBSM = cJSON_GetObjectItem(jConfig, "vBucketServerMap");
761    if (jVBSM != NULL) {
762        jArr = cJSON_GetObjectItem(jVBSM, "serverList");
763    }
764
765    if (jArr == NULL ||
766        jArr->type != cJSON_Array) {
767        jArr = cJSON_GetObjectItem(jConfig, "nodes");
768    }
769
770    if (jArr != NULL && jArr->type == cJSON_Array) {
771        int nodes_num = cJSON_GetArraySize(jArr);
772        if (nodes_num > 0) {
773            proxy_behavior proxyb = m->behavior;
774            proxy_behavior_pool behavior_pool;
775
776            strcpy(proxyb.nodeLocator, "ketama");
777
778            if (settings.verbose > 2) {
779                moxi_log_write("conjk nodes_num: %d\n", nodes_num);
780            }
781            memset(&behavior_pool, 0, sizeof(behavior_pool));
782            behavior_pool.base = proxyb;
783            behavior_pool.num  = nodes_num;
784            behavior_pool.arr  = calloc(nodes_num + 1, sizeof(proxy_behavior));
785
786            if (behavior_pool.arr != NULL) {
787                int curr = 0; /* Moves slower than j so we can skip unhealthy nodes. */
788
789                int j = 0;
790
791                cproxy_parse_json_auth(config, name, &behavior_pool);
792                for (; j < nodes_num; j++) {
793                    /* Inherit default behavior. */
794                    cJSON *jNode;
795
796                    behavior_pool.arr[curr] = behavior_pool.base;
797
798                    jNode = cJSON_GetArrayItem(jArr, j);
799                    if (jNode != NULL) {
800                        if (jNode->type == cJSON_String &&
801                            jNode->valuestring != NULL) {
802                            /* Should look like "host:port". */
803                            char *hostport = jNode->valuestring;
804
805                            if (strlen(hostport) > 0 &&
806                                strlen(hostport) < sizeof(behavior_pool.arr[curr].host) - 1) {
807                                char *colon;
808                                strncpy(behavior_pool.arr[curr].host,
809                                        hostport,
810                                        sizeof(behavior_pool.arr[curr].host) - 1);
811                                behavior_pool.arr[curr].host[sizeof(behavior_pool.arr[curr].host) - 1] = '\0';
812
813                                colon = strchr(behavior_pool.arr[curr].host, ':');
814                                if (colon != NULL) {
815                                    *colon = '\0';
816                                    behavior_pool.arr[curr].port = atoi(colon + 1);
817                                    if (behavior_pool.arr[curr].port > 0) {
818                                        curr++;
819                                    } else {
820                                        break;
821                                    }
822                                } else {
823                                    break;
824                                }
825                            } else {
826                                break;
827                            }
828                        } else if (jNode->type == cJSON_Object) {
829                            /* Should look like... { */
830                            /*   status: "healthy", */
831                            /*   hostname: "host", */
832                            /*   ports: { direct: port } */
833                            /* } */
834
835                            cJSON *jHostname;
836                            cJSON *jStatus = cJSON_GetObjectItem(jNode, "status");
837                            if (jStatus != NULL &&
838                                jStatus->type == cJSON_String &&
839                                jStatus->valuestring != NULL &&
840                                strcmp(jStatus->valuestring, "healthy") != 0) {
841                                /* Skip non-healthy node. */
842
843                                continue;
844                            }
845
846                            jHostname = cJSON_GetObjectItem(jNode, "hostname");
847                            if (jHostname != NULL &&
848                                jHostname->type == cJSON_String &&
849                                jHostname->valuestring != NULL &&
850                                strlen(jHostname->valuestring) < sizeof(behavior_pool.arr[curr].host) - 1) {
851                                cJSON *jPorts = cJSON_GetObjectItem(jNode, "ports");
852                                if (jPorts != NULL &&
853                                    jPorts->type == cJSON_Object) {
854                                    cJSON *jDirect = cJSON_GetObjectItem(jPorts, "direct");
855                                    if (jDirect != NULL &&
856                                        jDirect->type == cJSON_Number &&
857                                        jDirect->valueint > 0) {
858                                        char *colon;
859                                        strncpy(behavior_pool.arr[curr].host,
860                                                jHostname->valuestring,
861                                                sizeof(behavior_pool.arr[curr].host) - 1);
862                                        behavior_pool.arr[curr].host[sizeof(behavior_pool.arr[curr].host) - 1] = '\0';
863
864                                        /* The JSON might return a hostname that looks */
865                                        /* like "HOST:REST_PORT", so strip off the ":REST_PORT". */
866
867                                        colon = strchr(behavior_pool.arr[curr].host, ':');
868                                        if (colon != NULL) {
869                                            *colon = '\0';
870                                        }
871
872                                        behavior_pool.arr[curr].port = jDirect->valueint;
873
874                                        curr++;
875                                    } else {
876                                        break;
877                                    }
878                                } else {
879                                    break;
880                                }
881                            } else {
882                                break;
883                            }
884                        } else {
885                            break;
886                        }
887                    } else {
888                        break;
889                    }
890                }
891
892                if (j >= nodes_num && curr > 0) {
893                    int   config_len = 200;
894                    char *config_str;
895                    /* Some unhealthy nodes might have been skipped, */
896                    /* so curr might be <= behavior_pool.num. */
897
898                    behavior_pool.num = curr;
899
900                    /* Create a config string that libmemcached likes, */
901                    /* such as "HOST:PORT,HOST:PORT,HOST:PORT". */
902
903                    config_str = calloc(config_len, 1);
904
905                    if (config_str != NULL) {
906                        for (j = 0; j < behavior_pool.num; j++) {
907                            /* Grow config string for libmemcached. */
908
909                            char *config_end;
910                            int x = (int)(40 + /* For port and weight. */
911                                strlen(config_str) +
912                                strlen(behavior_pool.arr[j].host));
913                            if (config_len < x) {
914                                config_len = 2 * (config_len + x);
915                                config_str = realloc(config_str, config_len);
916                                if (config_str == NULL) {
917                                    break;
918                                }
919                            }
920
921                            config_end = config_str + strlen(config_str);
922                            if (config_end != config_str) {
923                                *config_end++ = ',';
924                            }
925
926                            if (strlen(behavior_pool.arr[j].host) > 0 &&
927                                behavior_pool.arr[j].port > 0) {
928                                snprintf(config_end,
929                                         config_len - (config_end - config_str),
930                                         "%s:%u",
931                                         behavior_pool.arr[j].host,
932                                         behavior_pool.arr[j].port);
933                            } else {
934                                if (settings.verbose > 1) {
935                                    moxi_log_write("ERROR: conjk missing host/port %d in %s from %s\n",
936                                                   j, name, src);
937                                }
938                            }
939
940                            if (behavior_pool.arr[j].downstream_weight > 0) {
941                                config_end = config_str + strlen(config_str);
942                                snprintf(config_end,
943                                         config_len - (config_end - config_str),
944                                         ":%u",
945                                         behavior_pool.arr[j].downstream_weight);
946                            }
947                        }
948
949                        if (config_str != NULL) {
950                            if (j >= behavior_pool.num) {
951                                cproxy_on_config_pool(m, name, proxyb.port_listen,
952                                                      config_str, new_config_ver,
953                                                      &behavior_pool);
954                                rv = true;
955                            }
956
957                            free(config_str);
958                        }
959                    } else {
960                        if (settings.verbose > 1) {
961                            moxi_log_write("ERROR: oom on jk re-config str\n");;
962                        }
963                    }
964                } else {
965                    if (settings.verbose > 1) {
966                        moxi_log_write("ERROR: conjk parse error for config %d from %s in %s\n",
967                                       j, src, config);
968                    }
969                }
970
971                free(behavior_pool.arr);
972            } else {
973                if (settings.verbose > 1) {
974                    moxi_log_write("ERROR: oom on jk re-config\n");;
975                }
976            }
977        } else {
978            if (settings.verbose > 1) {
979                moxi_log_write("ERROR: conjk empty serverList/nodes in re-config\n");;
980            }
981        }
982    } else {
983        if (settings.verbose > 1) {
984            moxi_log_write("ERROR: conjk no serverList/nodes in re-config\n");;
985        }
986    }
987
988    cJSON_Delete(jConfig);
989
990    return rv;
991}
992
993static void cproxy_parse_json_auth(char *config,
994                                   char *name,
995                                   proxy_behavior_pool *bp) {
996    cJSON *jConfig;
997    strncpy(bp->base.usr, name, sizeof(bp->base.usr) - 1);
998    bp->base.usr[sizeof(bp->base.usr) - 1] = '\0';
999
1000    jConfig = cJSON_Parse(config);
1001    if (jConfig != NULL) {
1002        cJSON *jPassword = cJSON_GetObjectItem(jConfig, "saslPassword");
1003        if (jPassword != NULL &&
1004            jPassword->type == cJSON_String &&
1005            jPassword->valuestring != NULL) {
1006            strncpy(bp->base.pwd,
1007                    jPassword->valuestring,
1008                    sizeof(bp->base.pwd) - 1);
1009            bp->base.pwd[sizeof(bp->base.pwd) - 1] = '\0';
1010        }
1011
1012        cJSON_Delete(jConfig);
1013    }
1014}
1015
1016static
1017void cproxy_on_config(void *data0, void *data1) {
1018    proxy_main *m = data0;
1019    kvpair_t *kvs = data1;
1020    uint32_t max_config_ver = 0;
1021    proxy *p;
1022    uint32_t new_config_ver;
1023    char **urlv;
1024    char  *url;
1025    char **contents;
1026
1027
1028    cb_assert(m);
1029    cb_assert(kvs);
1030    cb_assert(is_listen_thread());
1031
1032    m->stat_configs++;
1033
1034    cb_mutex_enter(&m->proxy_main_lock);
1035
1036    for (p = m->proxy_head; p != NULL; p = p->next) {
1037        cb_mutex_enter(&p->proxy_lock);
1038        if (max_config_ver < p->config_ver) {
1039            max_config_ver = p->config_ver;
1040        }
1041        cb_mutex_exit(&p->proxy_lock);
1042    }
1043
1044    cb_mutex_exit(&m->proxy_main_lock);
1045
1046    new_config_ver = max_config_ver + 1;
1047
1048    if (settings.verbose > 2) {
1049        moxi_log_write("conc new_config_ver %u\n", new_config_ver);
1050    }
1051
1052    urlv = get_key_values(kvs, "url"); /* NULL delimited array of char *. */
1053    url  = urlv != NULL ? (urlv[0] != NULL ? urlv[0] : "") : "";
1054
1055    contents = get_key_values(kvs, "contents");
1056    if (contents != NULL && contents[0] != NULL) {
1057        char *config = trimstrdup(contents[0]);
1058        if (config != NULL &&
1059            strlen(config) > 0) {
1060            cproxy_on_config_json(m, new_config_ver, config, url);
1061
1062            free(config);
1063        } else {
1064            moxi_log_write("ERROR: invalid, empty config from REST server %s\n",
1065                           url);
1066            goto fail;
1067        }
1068    } else {
1069        moxi_log_write("ERROR: invalid response from REST server %s\n",
1070                       url);
1071    }
1072
1073    /* If there were any proxies that weren't updated in the */
1074    /* previous loop, we need to shut them down.  We mark the */
1075    /* proxy->config as NULL, and cproxy_check_downstream_config() */
1076    /* will catch it. */
1077
1078    close_outdated_proxies(m, new_config_ver);
1079
1080    free_kvpair(kvs);
1081
1082    return;
1083
1084 fail:
1085    m->stat_config_fails++;
1086    free_kvpair(kvs);
1087
1088    if (settings.verbose > 1) {
1089        moxi_log_write("ERROR: conc failed config %"PRIu64"\n",
1090                       (uint64_t) m->stat_config_fails);
1091    }
1092}
1093
1094void close_outdated_proxies(proxy_main *m, uint32_t new_config_ver) {
1095    /* TODO: Close any listening conns for the proxy? */
1096    /* TODO: Close any upstream conns for the proxy? */
1097    /* TODO: We still need to free proxy memory, after all its */
1098    /*       proxy_td's and downstreams are closed, and no more */
1099    /*       upstreams are pointed at the proxy. */
1100
1101    proxy_behavior_pool empty_pool;
1102    proxy *p;
1103    memset(&empty_pool, 0, sizeof(proxy_behavior_pool));
1104
1105    empty_pool.base = m->behavior;
1106    empty_pool.num  = 0;
1107    empty_pool.arr  = NULL;
1108
1109    cb_mutex_enter(&m->proxy_main_lock);
1110
1111    for (p = m->proxy_head; p != NULL; p = p->next) {
1112        bool  down = false;
1113        int   port = 0;
1114        char *name = NULL;
1115
1116        cb_mutex_enter(&p->proxy_lock);
1117
1118        if (p->config_ver != new_config_ver) {
1119            down = true;
1120
1121            cb_assert(p->port > 0);
1122            cb_assert(p->name != NULL);
1123
1124            port = p->port;
1125            name = strdup(p->name);
1126        }
1127
1128        cb_mutex_exit(&p->proxy_lock);
1129
1130        cb_mutex_exit(&m->proxy_main_lock);
1131
1132        /* Note, we don't want to own the proxy_main_lock here */
1133        /* because cproxy_on_config_pool() may scatter/gather */
1134        /* calls against the worker threads, and the worked threads */
1135        /* should not deadlock if they need the proxy_main_lock. */
1136
1137        /* Also, check that we're not shutting down the NULL_BUCKET. */
1138
1139        /* Otherwise, passing in a NULL config string signals that */
1140        /* a bucket's proxy struct should be shut down. */
1141
1142        if (name != NULL) {
1143            if (down && (strcmp(NULL_BUCKET, name) != 0)) {
1144                cproxy_on_config_pool(m, name, port, NULL, new_config_ver,
1145                                      &empty_pool);
1146            }
1147
1148            free(name);
1149        }
1150
1151        cb_mutex_enter(&m->proxy_main_lock);
1152    }
1153
1154    cb_mutex_exit(&m->proxy_main_lock);
1155}
1156
1157/**
1158 * A name and port uniquely identify a proxy.
1159 */
1160void cproxy_on_config_pool(proxy_main *m,
1161                           char *name, int port,
1162                           char *config,
1163                           uint32_t config_ver,
1164                           proxy_behavior_pool *behavior_pool) {
1165    bool found = false;
1166    proxy *p;
1167
1168    cb_assert(m);
1169    cb_assert(name != NULL);
1170    cb_assert(port >= 0);
1171    cb_assert(is_listen_thread());
1172
1173    /* See if we've already got a proxy running with that name and port, */
1174    /* and create one if needed. */
1175
1176
1177    cb_mutex_enter(&m->proxy_main_lock);
1178
1179    p = m->proxy_head;
1180    while (p != NULL && !found) {
1181        cb_mutex_enter(&p->proxy_lock);
1182
1183        cb_assert(p->port > 0);
1184        cb_assert(p->name != NULL);
1185
1186        found = ((p->port == port) &&
1187                 (strcmp(p->name, name) == 0));
1188
1189        cb_mutex_exit(&p->proxy_lock);
1190
1191        if (found) {
1192            break;
1193        }
1194
1195        p = p->next;
1196    }
1197
1198    cb_mutex_exit(&m->proxy_main_lock);
1199
1200    if (p == NULL) {
1201        p = cproxy_create(m, name, port,
1202                          config,
1203                          config_ver,
1204                          behavior_pool,
1205                          m->nthreads);
1206        if (p != NULL) {
1207            int n;
1208
1209            cb_mutex_enter(&m->proxy_main_lock);
1210
1211            p->next = m->proxy_head;
1212            m->proxy_head = p;
1213
1214            cb_mutex_exit(&m->proxy_main_lock);
1215
1216            n = cproxy_listen(p);
1217            if (n > 0) {
1218                if (settings.verbose > 2) {
1219                    moxi_log_write(
1220                            "cproxy_listen success %u for %s to %s with %d conns\n",
1221                            p->port, p->name, p->config, n);
1222                }
1223                m->stat_proxy_starts++;
1224            } else {
1225                if (settings.verbose > 1) {
1226                    moxi_log_write("ERROR: cproxy_listen failed on %u to %s\n",
1227                            p->port, p->config);
1228                }
1229                m->stat_proxy_start_fails++;
1230            }
1231        } else {
1232            if (settings.verbose > 2) {
1233                moxi_log_write("ERROR: cproxy_create failed on %s, %d, %s\n",
1234                               name, port, config);
1235            }
1236        }
1237    } else {
1238        bool changed  = false;
1239        bool shutdown_flag = false;
1240        int i;
1241
1242        if (settings.verbose > 2) {
1243            moxi_log_write("conp existing config change %u\n",
1244                    p->port);
1245        }
1246
1247        cb_mutex_enter(&m->proxy_main_lock);
1248
1249        /* Turn off the front_cache while we're reconfiguring. */
1250
1251        mcache_stop(&p->front_cache);
1252        matcher_stop(&p->front_cache_matcher);
1253        matcher_stop(&p->front_cache_unmatcher);
1254
1255        matcher_stop(&p->optimize_set_matcher);
1256
1257        cb_mutex_enter(&p->proxy_lock);
1258
1259        if (settings.verbose > 2) {
1260            if (p->config && config &&
1261                strcmp(p->config, config) != 0) {
1262                moxi_log_write("conp config changed from %s to %s\n",
1263                        p->config, config);
1264            }
1265        }
1266
1267        changed =
1268            update_str_config(&p->config, config,
1269                              "conp config changed") ||
1270            changed;
1271
1272        changed =
1273            (cproxy_equal_behavior(&p->behavior_pool.base,
1274                                   &behavior_pool->base) == false) ||
1275            changed;
1276
1277        p->behavior_pool.base = behavior_pool->base;
1278
1279        changed =
1280            update_behaviors_config(&p->behavior_pool.arr,
1281                                    &p->behavior_pool.num,
1282                                    behavior_pool->arr,
1283                                    behavior_pool->num,
1284                                    "conp behaviors changed") ||
1285            changed;
1286
1287        if (p->config != NULL &&
1288            p->behavior_pool.arr != NULL) {
1289            m->stat_proxy_existings++;
1290        } else {
1291            m->stat_proxy_shutdowns++;
1292            shutdown_flag = true;
1293        }
1294
1295        cb_assert(config_ver != p->config_ver);
1296
1297        p->config_ver = config_ver;
1298
1299        cb_mutex_exit(&p->proxy_lock);
1300
1301        if (settings.verbose > 2) {
1302            moxi_log_write("conp changed %s, shutdown %s\n",
1303                    changed ? "true" : "false",
1304                    shutdown_flag ? "true" : "false");
1305        }
1306
1307        /* Restart the front_cache, if necessary. */
1308
1309        if (shutdown_flag == false) {
1310            if (behavior_pool->base.front_cache_max > 0 &&
1311                behavior_pool->base.front_cache_lifespan > 0) {
1312                mcache_start(&p->front_cache,
1313                             behavior_pool->base.front_cache_max);
1314
1315                if (strlen(behavior_pool->base.front_cache_spec) > 0) {
1316                    matcher_start(&p->front_cache_matcher,
1317                                  behavior_pool->base.front_cache_spec);
1318                }
1319
1320                if (strlen(behavior_pool->base.front_cache_unspec) > 0) {
1321                    matcher_start(&p->front_cache_unmatcher,
1322                                  behavior_pool->base.front_cache_unspec);
1323                }
1324            }
1325
1326            if (strlen(behavior_pool->base.optimize_set) > 0) {
1327                matcher_start(&p->optimize_set_matcher,
1328                              behavior_pool->base.optimize_set);
1329            }
1330        }
1331
1332        /* Send update across worker threads, avoiding locks. */
1333
1334        for (i = 1; i < m->nthreads; i++) {
1335            LIBEVENT_THREAD *t = thread_by_index(i);
1336            proxy_td *ptd;
1337
1338            cb_assert(t);
1339            cb_assert(t->work_queue);
1340
1341            ptd = &p->thread_data[i];
1342            if (t &&
1343                t->work_queue) {
1344                work_send(t->work_queue, update_ptd_config, ptd, NULL);
1345            }
1346        }
1347
1348        cb_mutex_exit(&m->proxy_main_lock);
1349
1350        if (settings.verbose > 2) {
1351            moxi_log_write("conp changed %s, %d\n",
1352                           changed ? "true" : "false", config_ver);
1353        }
1354    }
1355}
1356
1357/* ---------------------------------------------------------- */
1358
1359static void update_ptd_config(void *data0, void *data1) {
1360    proxy_td *ptd;
1361    proxy *p;
1362    bool changed = false;
1363    int  port;
1364    int  prev;
1365
1366    (void)data1;
1367
1368    ptd = data0;
1369    cb_assert(ptd);
1370
1371    p = ptd->proxy;
1372    cb_assert(p);
1373
1374    cb_assert(is_listen_thread() == false); /* Expecting a worker thread. */
1375
1376    cb_mutex_enter(&p->proxy_lock);
1377
1378    port = p->port;
1379    prev = ptd->config_ver;
1380
1381    if (ptd->config_ver != p->config_ver) {
1382        ptd->config_ver = p->config_ver;
1383
1384        changed =
1385            update_str_config(&ptd->config, p->config, NULL) ||
1386            changed;
1387
1388        ptd->behavior_pool.base = p->behavior_pool.base;
1389
1390        changed =
1391            update_behaviors_config(&ptd->behavior_pool.arr,
1392                                    &ptd->behavior_pool.num,
1393                                    p->behavior_pool.arr,
1394                                    p->behavior_pool.num,
1395                                    NULL) ||
1396            changed;
1397    }
1398
1399    cb_mutex_exit(&p->proxy_lock);
1400
1401    /* Restart the key_stats, if necessary. */
1402
1403    if (changed) {
1404        mcache_stop(&ptd->key_stats);
1405        matcher_stop(&ptd->key_stats_matcher);
1406        matcher_stop(&ptd->key_stats_unmatcher);
1407
1408        if (ptd->config != NULL) {
1409            if (ptd->behavior_pool.base.key_stats_max > 0 &&
1410                ptd->behavior_pool.base.key_stats_lifespan > 0) {
1411                mcache_start(&ptd->key_stats,
1412                             ptd->behavior_pool.base.key_stats_max);
1413
1414                if (strlen(ptd->behavior_pool.base.key_stats_spec) > 0) {
1415                    matcher_start(&ptd->key_stats_matcher,
1416                                  ptd->behavior_pool.base.key_stats_spec);
1417                }
1418
1419                if (strlen(ptd->behavior_pool.base.key_stats_unspec) > 0) {
1420                    matcher_start(&ptd->key_stats_unmatcher,
1421                                  ptd->behavior_pool.base.key_stats_unspec);
1422                }
1423            }
1424        }
1425
1426        if (settings.verbose > 2) {
1427            moxi_log_write("update_ptd_config %u, %u to %u\n",
1428                    port, prev, ptd->config_ver);
1429        }
1430    } else {
1431        if (settings.verbose > 2) {
1432            moxi_log_write("update_ptd_config %u, %u = %u no change\n",
1433                    port, prev, ptd->config_ver);
1434        }
1435    }
1436}
1437
1438/* ---------------------------------------------------------- */
1439
1440static bool update_str_config(char **curr, char *next, char *descrip) {
1441    bool rv = false;
1442
1443    if ((*curr != NULL) &&
1444        (next == NULL ||
1445         strcmp(*curr, next) != 0)) {
1446        free(*curr);
1447        *curr = NULL;
1448
1449        rv = true;
1450
1451        if (descrip != NULL &&
1452            settings.verbose > 2) {
1453            moxi_log_write("%s\n", descrip);
1454        }
1455    }
1456    if (*curr == NULL && next != NULL) {
1457        *curr = trimstrdup(next);
1458    }
1459
1460    return rv;
1461}
1462
1463static bool update_behaviors_config(proxy_behavior **curr,
1464                                    int  *curr_num,
1465                                    proxy_behavior  *next,
1466                                    int   next_num,
1467                                    char *descrip) {
1468    bool rv = false;
1469
1470    if ((*curr != NULL) &&
1471        (next == NULL ||
1472         cproxy_equal_behaviors(*curr_num,
1473                                *curr,
1474                                next_num,
1475                                next) == false)) {
1476        free(*curr);
1477        *curr = NULL;
1478        *curr_num = 0;
1479
1480        rv = true;
1481
1482        if (descrip != NULL &&
1483            settings.verbose > 2) {
1484            moxi_log_write("%s\n", descrip);
1485        }
1486    }
1487    if (*curr == NULL && next != NULL) {
1488        *curr = cproxy_copy_behaviors(next_num,
1489                                      next);
1490        *curr_num = next_num;
1491    }
1492
1493    return rv;
1494}
1495
1496/* ---------------------------------------------------------- */
1497
1498/**
1499 * Parse server-level behaviors from a pool into a given
1500 * array of behaviors, one entry for each server.
1501 *
1502 * An example prefix is "svr".
1503 */
1504char *parse_kvs_servers(char *prefix,
1505                        char *pool_name,
1506                        kvpair_t *kvs,
1507                        char **servers,
1508                        proxy_behavior_pool *behavior_pool) {
1509
1510    int   config_len = 200;
1511    char *config_str;
1512    int j;
1513
1514    cb_assert(prefix);
1515    cb_assert(pool_name);
1516    cb_assert(kvs);
1517    cb_assert(servers);
1518    cb_assert(behavior_pool);
1519    cb_assert(behavior_pool->arr);
1520
1521    if (behavior_pool->num <= 0) {
1522        return NULL;
1523    }
1524
1525    /* Create a config string that libmemcached likes. */
1526    /* See memcached_servers_parse(). */
1527
1528    config_str = calloc(config_len, 1);
1529
1530    for (j = 0; servers[j]; j++) {
1531        int x;
1532        char *config_end;
1533
1534        cb_assert(j < behavior_pool->num);
1535
1536        /* Inherit default behavior. */
1537
1538        behavior_pool->arr[j] = behavior_pool->base;
1539
1540        parse_kvs_behavior(kvs, prefix, servers[j],
1541                           &behavior_pool->arr[j]);
1542
1543        /* Grow config string for libmemcached. */
1544
1545        x = (int)(40 + /* For port and weight. */
1546            strlen(config_str) +
1547            strlen(behavior_pool->arr[j].host));
1548        if (config_len < x) {
1549            config_len = 2 * (config_len + x);
1550            config_str = realloc(config_str, config_len);
1551        }
1552
1553        config_end = config_str + strlen(config_str);
1554        if (config_end != config_str) {
1555            *config_end++ = ',';
1556        }
1557
1558        if (strlen(behavior_pool->arr[j].host) > 0 &&
1559            behavior_pool->arr[j].port > 0) {
1560            snprintf(config_end,
1561                     config_len - (config_end - config_str),
1562                     "%s:%u",
1563                     behavior_pool->arr[j].host,
1564                     behavior_pool->arr[j].port);
1565        } else {
1566            if (settings.verbose > 1) {
1567                moxi_log_write("ERROR: missing host:port for svr-%s in %s\n",
1568                        servers[j], pool_name);
1569            }
1570        }
1571
1572        if (behavior_pool->arr[j].downstream_weight > 0) {
1573            config_end = config_str + strlen(config_str);
1574            snprintf(config_end,
1575                     config_len - (config_end - config_str),
1576                     ":%u",
1577                     behavior_pool->arr[j].downstream_weight);
1578        }
1579
1580        if (settings.verbose > 2) {
1581            cproxy_dump_behavior(&behavior_pool->arr[j],
1582                                 "pks", 0);
1583        }
1584    }
1585
1586    return config_str;
1587}
1588
1589/* ---------------------------------------------------------- */
1590
1591/**
1592 * Parse a "[prefix]-[name]" configuration section into a behavior.
1593 */
1594char **parse_kvs_behavior(kvpair_t *kvs,
1595                          char *prefix,
1596                          char *name,
1597                          proxy_behavior *behavior) {
1598    char key[800];
1599    char **props;
1600    int k;
1601
1602    cb_assert(kvs);
1603    cb_assert(prefix);
1604    cb_assert(name);
1605    cb_assert(behavior);
1606
1607
1608    snprintf(key, sizeof(key), "%s-%s", prefix, name);
1609
1610    props = get_key_values(kvs, key);
1611    for (k = 0; props && props[k]; k++) {
1612        char *key_val = trimstrdup(props[k]);
1613        if (key_val != NULL) {
1614            cproxy_parse_behavior_key_val_str(key_val, behavior);
1615            free(key_val);
1616        }
1617    }
1618
1619    return props;
1620}
1621
1622/* ---------------------------------------------------------- */
1623
1624char **get_key_values(kvpair_t *kvs, char *key) {
1625    kvpair_t *x = find_kvpair(kvs, key);
1626    if (x != NULL) {
1627        return x->values;
1628    }
1629    return NULL;
1630}
1631