1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 NorthScale, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include <assert.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <strings.h>
22 #include <errno.h>
23 
24 #include "cJSON.h"
25 #include "hash.h"
26 #include <libvbucket/vbucket.h>
27 
28 #define MAX_CONFIG_SIZE 100 * 1048576
29 #define MAX_VBUCKETS 65536
30 #define MAX_REPLICAS 4
31 #define MAX_AUTHORITY_SIZE 100
32 #define STRINGIFY_(X) #X
33 #define STRINGIFY(X) STRINGIFY_(X)
34 
35 struct server_st {
36     char *authority;        /* host:port */
37     char *rest_api_authority;
38     char *couchdb_api_base;
39     int config_node;        /* non-zero if server struct describes node,
40                                which is listening */
41 };
42 
43 struct vbucket_st {
44     int servers[MAX_REPLICAS + 1];
45 };
46 
47 struct continuum_item_st {
48     uint32_t index;     /* server index */
49     uint32_t point;     /* point on the ketama continuum */
50 };
51 
52 struct vbucket_config_st {
53     char *errmsg;
54     VBUCKET_DISTRIBUTION_TYPE distribution;
55     int num_vbuckets;
56     int mask;
57     int num_servers;
58     int num_replicas;
59     char *user;
60     char *password;
61     int num_continuum;                      /* count of continuum points */
62     struct continuum_item_st *continuum;    /* ketama continuum */
63     struct server_st *servers;
64     struct vbucket_st *fvbuckets;
65     struct vbucket_st *vbuckets;
66     const char *localhost;              /* replacement for $HOST placeholder */
67     size_t nlocalhost;
68 };
69 
70 static char *errstr = NULL;
71 
vbucket_get_errornull72 const char *vbucket_get_error() {
73     return errstr;
74 }
75 
continuum_item_cmp(const void *t1, const void *t2)76 static int continuum_item_cmp(const void *t1, const void *t2)
77 {
78     const struct continuum_item_st *ct1 = t1, *ct2 = t2;
79 
80     if (ct1->point == ct2->point) {
81         return 0;
82     } else if (ct1->point > ct2->point) {
83         return 1;
84     } else {
85         return -1;
86     }
87 }
88 
update_ketama_continuum(VBUCKET_CONFIG_HANDLE vb)89 static void update_ketama_continuum(VBUCKET_CONFIG_HANDLE vb)
90 {
91     char host[MAX_AUTHORITY_SIZE+10] = "";
92     int nhost;
93     int pp, hh, ss, nn;
94     unsigned char digest[16];
95     struct continuum_item_st *new_continuum, *old_continuum;
96 
97     new_continuum = calloc(160 * vb->num_servers,
98                            sizeof(struct continuum_item_st));
99 
100     /* 40 hashes, 4 numbers per hash = 160 points per server */
101     for (ss = 0, pp = 0; ss < vb->num_servers; ++ss) {
102         /* we can add more points to server which have more memory */
103         for (hh = 0; hh < 40; ++hh) {
104             nhost = snprintf(host, MAX_AUTHORITY_SIZE+10, "%s-%u",
105                              vb->servers[ss].authority, hh);
106             hash_md5(host, nhost, digest);
107             for (nn = 0; nn < 4; ++nn, ++pp) {
108                 new_continuum[pp].index = ss;
109                 new_continuum[pp].point = ((uint32_t) (digest[3 + nn * 4] & 0xFF) << 24)
110                                         | ((uint32_t) (digest[2 + nn * 4] & 0xFF) << 16)
111                                         | ((uint32_t) (digest[1 + nn * 4] & 0xFF) << 8)
112                                         | (digest[0 + nn * 4] & 0xFF);
113             }
114         }
115     }
116 
117     qsort(new_continuum, pp, sizeof(struct continuum_item_st), continuum_item_cmp);
118 
119     old_continuum = vb->continuum;
120     vb->continuum = new_continuum;
121     vb->num_continuum = pp;
122     if (old_continuum) {
123         free(old_continuum);
124     }
125 }
126 
vbucket_config_destroy(VBUCKET_CONFIG_HANDLE vb)127 void vbucket_config_destroy(VBUCKET_CONFIG_HANDLE vb) {
128     int i;
129     for (i = 0; i < vb->num_servers; ++i) {
130         free(vb->servers[i].authority);
131         free(vb->servers[i].rest_api_authority);
132         free(vb->servers[i].couchdb_api_base);
133     }
134     free(vb->servers);
135     free(vb->user);
136     free(vb->password);
137     free(vb->fvbuckets);
138     free(vb->vbuckets);
139     free(vb->continuum);
140     free(vb->errmsg);
141     memset(vb, 0xff, sizeof(struct vbucket_config_st));
142     free(vb);
143 }
144 
substitute_localhost_marker(struct vbucket_config_st *vb, char *input)145 static char *substitute_localhost_marker(struct vbucket_config_st *vb, char *input)
146 {
147     char *placeholder;
148     char *result = input;
149     size_t ninput = strlen(input);
150     if (vb->localhost && (placeholder = strstr(input, "$HOST"))) {
151         size_t nprefix = placeholder - input;
152         size_t off = 0;
153         result = calloc(ninput + vb->nlocalhost - 5, sizeof(char));
154         if (!result) {
155             return NULL;
156         }
157         memcpy(result, input, nprefix);
158         off += nprefix;
159         memcpy(result + off, vb->localhost, vb->nlocalhost);
160         off += vb->nlocalhost;
161         memcpy(result + off, input + nprefix + 5, ninput - (nprefix + 5));
162         free(input);
163     }
164     return result;
165 }
166 
populate_servers(struct vbucket_config_st *vb, cJSON *c)167 static int populate_servers(struct vbucket_config_st *vb, cJSON *c) {
168     int i;
169 
170     vb->servers = calloc(vb->num_servers, sizeof(struct server_st));
171     if (vb->servers == NULL) {
172         vbucket_config_destroy(vb);
173         vb->errmsg = strdup("Failed to allocate servers array");
174         return -1;
175     }
176     for (i = 0; i < vb->num_servers; ++i) {
177         char *server;
178         cJSON *jServer = cJSON_GetArrayItem(c, i);
179         if (jServer == NULL || jServer->type != cJSON_String) {
180             vb->errmsg = strdup("Expected array of strings for serverList");
181             return -1;
182         }
183         server = strdup(jServer->valuestring);
184         if (server == NULL) {
185             vb->errmsg = strdup("Failed to allocate storage for server string");
186             return -1;
187         }
188         server = substitute_localhost_marker(vb, server);
189         if (server == NULL) {
190             vb->errmsg = strdup("Failed to allocate storage for server string during $HOST substitution");
191             return -1;
192         }
193         vb->servers[i].authority = server;
194     }
195     return 0;
196 }
197 
get_node_authority(struct vbucket_config_st *vb, cJSON *node, char **out, size_t nbuf)198 static int get_node_authority(struct vbucket_config_st *vb, cJSON *node, char **out, size_t nbuf)
199 {
200     cJSON *json;
201     char *hostname = NULL, *colon = NULL;
202     int port = -1;
203     char *buf = *out;
204 
205     json = cJSON_GetObjectItem(node, "hostname");
206     if (json == NULL || json->type != cJSON_String) {
207         vb->errmsg = strdup("Expected string for node's hostname");
208         return -1;
209     }
210     hostname = json->valuestring;
211     json = cJSON_GetObjectItem(node, "ports");
212     if (json == NULL || json->type != cJSON_Object) {
213         vb->errmsg = strdup("Expected json object for node's ports");
214         return -1;
215     }
216     json = cJSON_GetObjectItem(json, "direct");
217     if (json == NULL || json->type != cJSON_Number) {
218         vb->errmsg = strdup("Expected number for node's direct port");
219         return -1;
220     }
221     port = json->valueint;
222 
223     snprintf(buf, nbuf - 7, "%s", hostname);
224     colon = strchr(buf, ':');
225     if (!colon) {
226         colon = buf + strlen(buf);
227     }
228     snprintf(colon, 7, ":%d", port);
229 
230     buf = substitute_localhost_marker(vb, buf);
231     if (buf == NULL) {
232         vb->errmsg = strdup("Failed to allocate storage for authority string during $HOST substitution");
233         return -1;
234     }
235     *out = buf;
236     return 0;
237 }
238 
lookup_server_struct(struct vbucket_config_st *vb, cJSON *c)239 static int lookup_server_struct(struct vbucket_config_st *vb, cJSON *c) {
240     char *authority = NULL;
241     int idx = -1, ii;
242 
243     authority = calloc(MAX_AUTHORITY_SIZE, sizeof(char));
244     if (authority == NULL) {
245         vb->errmsg = strdup("Failed to allocate storage for authority string");
246         return -1;
247     }
248     if (get_node_authority(vb, c, &authority, MAX_AUTHORITY_SIZE) < 0) {
249         free(authority);
250         return -1;
251     }
252 
253     for (ii = 0; ii < vb->num_servers; ++ii) {
254         if (strcmp(vb->servers[ii].authority, authority) == 0) {
255             idx = ii;
256             break;
257         }
258     }
259 
260     free(authority);
261     return idx;
262 }
263 
update_server_info(struct vbucket_config_st *vb, cJSON *config)264 static int update_server_info(struct vbucket_config_st *vb, cJSON *config) {
265     int idx, ii;
266     cJSON *node, *json;
267 
268     for (ii = 0; ii < cJSON_GetArraySize(config); ++ii) {
269         node = cJSON_GetArrayItem(config, ii);
270         if (node) {
271             if (node->type != cJSON_Object) {
272                 vb->errmsg = strdup("Expected json object for nodes array item");
273                 return -1;
274             }
275 
276             if ((idx = lookup_server_struct(vb, node)) >= 0) {
277                 json = cJSON_GetObjectItem(node, "couchApiBase");
278                 if (json != NULL) {
279                     char *value = strdup(json->valuestring);
280                     if (value == NULL) {
281                         vb->errmsg = strdup("Failed to allocate storage for couchApiBase string");
282                         return -1;
283                     }
284                     value = substitute_localhost_marker(vb, value);
285                     if (value == NULL) {
286                         vb->errmsg = strdup("Failed to allocate storage for hostname string during $HOST substitution");
287                         return -1;
288                     }
289                     vb->servers[idx].couchdb_api_base = value;
290                 }
291                 json = cJSON_GetObjectItem(node, "hostname");
292                 if (json != NULL) {
293                     char *value = strdup(json->valuestring);
294                     if (value == NULL) {
295                         vb->errmsg = strdup("Failed to allocate storage for hostname string");
296                         return -1;
297                     }
298                     value = substitute_localhost_marker(vb, value);
299                     if (value == NULL) {
300                         vb->errmsg = strdup("Failed to allocate storage for hostname string during $HOST substitution");
301                         return -1;
302                     }
303                     vb->servers[idx].rest_api_authority = value;
304                 }
305                 json = cJSON_GetObjectItem(node, "thisNode");
306                 if (json != NULL && json->type == cJSON_True) {
307                     vb->servers[idx].config_node = 1;
308                 }
309             }
310         }
311     }
312     return 0;
313 }
314 
populate_buckets(struct vbucket_config_st *vb, cJSON *c, int is_forward)315 static int populate_buckets(struct vbucket_config_st *vb, cJSON *c, int is_forward)
316 {
317     int i, j;
318     struct vbucket_st *vb_map = NULL;
319 
320     if (is_forward) {
321         if (!(vb->fvbuckets = vb_map = calloc(vb->num_vbuckets, sizeof(struct vbucket_st)))) {
322             vb->errmsg = strdup("Failed to allocate storage for forward vbucket map");
323             return -1;
324         }
325     } else {
326         if (!(vb->vbuckets = vb_map = calloc(vb->num_vbuckets, sizeof(struct vbucket_st)))) {
327             vb->errmsg = strdup("Failed to allocate storage for vbucket map");
328             return -1;
329         }
330     }
331 
332     for (i = 0; i < vb->num_vbuckets; ++i) {
333         cJSON *jBucket = cJSON_GetArrayItem(c, i);
334         if (jBucket == NULL || jBucket->type != cJSON_Array ||
335             cJSON_GetArraySize(jBucket) != vb->num_replicas + 1) {
336             vb->errmsg = strdup("Expected array of arrays each with numReplicas + 1 ints for vBucketMap");
337             return -1;
338         }
339         for (j = 0; j < vb->num_replicas + 1; ++j) {
340             cJSON *jServerId = cJSON_GetArrayItem(jBucket, j);
341             if (jServerId == NULL || jServerId->type != cJSON_Number ||
342                 jServerId->valueint < -1 || jServerId->valueint >= vb->num_servers) {
343                 vb->errmsg = strdup("Server ID must be >= -1 and < num_servers");
344                 return -1;
345             }
346             vb_map[i].servers[j] = jServerId->valueint;
347         }
348     }
349     return 0;
350 }
351 
parse_vbucket_config(VBUCKET_CONFIG_HANDLE vb, cJSON *c)352 static int parse_vbucket_config(VBUCKET_CONFIG_HANDLE vb, cJSON *c)
353 {
354     cJSON *json, *config;
355 
356     config = cJSON_GetObjectItem(c, "vBucketServerMap");
357     if (config == NULL || config->type != cJSON_Object) {
358         /* seems like config without envelop, try to parse it */
359         config = c;
360     }
361 
362     json = cJSON_GetObjectItem(config, "numReplicas");
363     if (json == NULL || json->type != cJSON_Number ||
364         json->valueint > MAX_REPLICAS) {
365         vb->errmsg = strdup("Expected number <= " STRINGIFY(MAX_REPLICAS) " for numReplicas");
366         return -1;
367     }
368     vb->num_replicas = json->valueint;
369 
370     json = cJSON_GetObjectItem(config, "serverList");
371     if (json == NULL || json->type != cJSON_Array) {
372         vb->errmsg = strdup("Expected array for serverList");
373         return -1;
374     }
375     vb->num_servers = cJSON_GetArraySize(json);
376     if (vb->num_servers == 0) {
377         vb->errmsg = strdup("Empty serverList");
378         return -1;
379     }
380     if (populate_servers(vb, json) != 0) {
381         return -1;
382     }
383     /* optionally update server info using envelop (couchdb_api_base etc.) */
384     json = cJSON_GetObjectItem(c, "nodes");
385     if (json) {
386         if (json->type != cJSON_Array) {
387             vb->errmsg = strdup("Expected array for nodes");
388             return -1;
389         }
390         if (update_server_info(vb, json) != 0) {
391             return -1;
392         }
393     }
394 
395     json = cJSON_GetObjectItem(config, "vBucketMap");
396     if (json == NULL || json->type != cJSON_Array) {
397         vb->errmsg = strdup("Expected array for vBucketMap");
398         return -1;
399     }
400     vb->num_vbuckets = cJSON_GetArraySize(json);
401     if (vb->num_vbuckets == 0) {
402         vb->errmsg = strdup("No vBuckets available; service maybe still initializing");
403         return -1;
404     }
405     if ((vb->num_vbuckets & (vb->num_vbuckets - 1)) != 0) {
406         vb->errmsg = strdup("Number of vBuckets must be a power of two > 0 and <= " STRINGIFY(MAX_VBUCKETS));
407         return -1;
408     }
409     vb->mask = vb->num_vbuckets - 1;
410     if (populate_buckets(vb, json, 0) != 0) {
411         return -1;
412     }
413 
414     /* vbucket forward map could possibly be null */
415     json = cJSON_GetObjectItem(config, "vBucketMapForward");
416     if (json) {
417         if (json->type != cJSON_Array) {
418             vb->errmsg = strdup("Expected array for vBucketMapForward");
419             return -1;
420         }
421         if (populate_buckets(vb, json, 1) !=0) {
422             return -1;
423         }
424     }
425 
426     return 0;
427 }
428 
server_cmp(const void *s1, const void *s2)429 static int server_cmp(const void *s1, const void *s2)
430 {
431     return strcmp(((const struct server_st *)s1)->authority,
432                   ((const struct server_st *)s2)->authority);
433 }
434 
parse_ketama_config(VBUCKET_CONFIG_HANDLE vb, cJSON *config)435 static int parse_ketama_config(VBUCKET_CONFIG_HANDLE vb, cJSON *config)
436 {
437     cJSON *json, *node, *hostname;
438     char *buf;
439     int ii;
440 
441     json = cJSON_GetObjectItem(config, "nodes");
442     if (json == NULL || json->type != cJSON_Array) {
443         vb->errmsg = strdup("Expected array for nodes");
444         return -1;
445     }
446 
447     vb->num_servers = cJSON_GetArraySize(json);
448     if (vb->num_servers == 0) {
449         vb->errmsg = strdup("Empty serverList");
450         return -1;
451     }
452     vb->servers = calloc(vb->num_servers, sizeof(struct server_st));
453     for (ii = 0; ii < vb->num_servers; ++ii) {
454         node = cJSON_GetArrayItem(json, ii);
455         if (node == NULL || node->type != cJSON_Object) {
456             vb->errmsg = strdup("Expected object for nodes array item");
457             return -1;
458         }
459         buf = calloc(MAX_AUTHORITY_SIZE, sizeof(char));
460         if (buf == NULL) {
461             vb->errmsg = strdup("Failed to allocate storage for node authority");
462             return -1;
463         }
464         if (get_node_authority(vb, node, &buf, MAX_AUTHORITY_SIZE) < 0) {
465             return -1;
466         }
467         vb->servers[ii].authority = buf;
468         hostname = cJSON_GetObjectItem(node, "hostname");
469         if (hostname == NULL || hostname->type != cJSON_String) {
470             vb->errmsg = strdup("Expected string for node's hostname");
471             return -1;
472         }
473         buf = strdup(hostname->valuestring);
474         if (buf == NULL) {
475             vb->errmsg = strdup("Failed to allocate storage for hostname string");
476             return -1;
477         }
478         buf = substitute_localhost_marker(vb, buf);
479         if (buf == NULL) {
480             vb->errmsg = strdup("Failed to allocate storage for hostname string during $HOST substitution");
481             return -1;
482         }
483         vb->servers[ii].rest_api_authority = buf;
484     }
485     qsort(vb->servers, vb->num_servers, sizeof(struct server_st), server_cmp);
486 
487     update_ketama_continuum(vb);
488     return 0;
489 }
490 
parse_cjson(VBUCKET_CONFIG_HANDLE handle, cJSON *config)491 static int parse_cjson(VBUCKET_CONFIG_HANDLE handle, cJSON *config)
492 {
493     cJSON *json;
494 
495     /* set optional credentials */
496     json = cJSON_GetObjectItem(config, "name");
497     if (json != NULL && json->type == cJSON_String && strcmp(json->valuestring, "default") != 0) {
498         handle->user = strdup(json->valuestring);
499     }
500     json = cJSON_GetObjectItem(config, "saslPassword");
501     if (json != NULL && json->type == cJSON_String) {
502         handle->password = strdup(json->valuestring);
503     }
504 
505     /* by default it uses vbucket distribution to map keys to servers */
506     handle->distribution = VBUCKET_DISTRIBUTION_VBUCKET;
507 
508     json = cJSON_GetObjectItem(config, "nodeLocator");
509     if (json == NULL) {
510         /* special case: it migth be config without envelope */
511         if (parse_vbucket_config(handle, config) == -1) {
512             return -1;
513         }
514     } else if (json->type == cJSON_String) {
515         if (strcmp(json->valuestring, "vbucket") == 0) {
516             handle->distribution = VBUCKET_DISTRIBUTION_VBUCKET;
517             if (parse_vbucket_config(handle, config) == -1) {
518                 return -1;
519             }
520         } else if (strcmp(json->valuestring, "ketama") == 0) {
521             handle->distribution = VBUCKET_DISTRIBUTION_KETAMA;
522             if (parse_ketama_config(handle, config) == -1) {
523                 return -1;
524             }
525         }
526     } else {
527         handle->errmsg = strdup("Expected string for nodeLocator");
528         return -1;
529     }
530 
531     return 0;
532 }
533 
parse_from_memory(VBUCKET_CONFIG_HANDLE handle, const char *data)534 static int parse_from_memory(VBUCKET_CONFIG_HANDLE handle, const char *data)
535 {
536     int ret;
537     cJSON *c = cJSON_Parse(data);
538     if (c == NULL) {
539         handle->errmsg = strdup("Failed to parse data. Invalid JSON?");
540         return -1;
541     }
542 
543     ret = parse_cjson(handle, c);
544 
545     cJSON_Delete(c);
546     return ret;
547 }
548 
do_read_file(FILE *fp, char *data, size_t size)549 static int do_read_file(FILE *fp, char *data, size_t size)
550 {
551     size_t offset = 0;
552     size_t nread;
553 
554     do {
555         nread = fread(data + offset, 1, size, fp);
556         if (nread != (size_t)-1 && nread != 0) {
557             offset += nread;
558             size -= nread;
559         } else {
560             return -1;
561         }
562     } while (size > 0);
563 
564     return 0;
565 }
566 
parse_from_file(VBUCKET_CONFIG_HANDLE handle, const char *filename)567 static int parse_from_file(VBUCKET_CONFIG_HANDLE handle, const char *filename)
568 {
569     long size;
570     char *data;
571     int ret;
572     FILE *f = fopen(filename, "rb");
573     if (f == NULL) {
574         char msg[1024];
575         snprintf(msg, sizeof(msg), "Unable to open file \"%s\": %s", filename,
576                  strerror(errno));
577         handle->errmsg = strdup(msg);
578         return -1;
579     }
580     fseek(f, 0, SEEK_END);
581     size = ftell(f);
582     fseek(f, 0, SEEK_SET);
583     if (size > MAX_CONFIG_SIZE) {
584         char msg[1024];
585         snprintf(msg, sizeof(msg), "File too large: \"%s\"", filename);
586         handle->errmsg = strdup(msg);
587         fclose(f);
588         return -1;
589     }
590     data = calloc(size+1, sizeof(char));
591     if (data == NULL) {
592         char msg[1024];
593         snprintf(msg, sizeof(msg), "Failed to allocate buffer to read: \"%s\"", filename);
594         handle->errmsg = strdup(msg);
595         fclose(f);
596         return -1;
597     }
598     if (do_read_file(f, data, size) == -1) {
599         char msg[1024];
600         snprintf(msg, sizeof(msg), "Failed to read entire file: \"%s\": %s",
601                  filename, strerror(errno));
602         handle->errmsg = strdup(msg);
603         fclose(f);
604         free(data);
605         return -1;
606     }
607 
608     fclose(f);
609     ret = parse_from_memory(handle, data);
610     free(data);
611     return ret;
612 }
613 
vbucket_config_create(void)614 VBUCKET_CONFIG_HANDLE vbucket_config_create(void)
615 {
616     return calloc(1, sizeof(struct vbucket_config_st));
617 }
618 
vbucket_config_parse2(VBUCKET_CONFIG_HANDLE handle, vbucket_source_t data_source, const char *data, const char *peername)619 int vbucket_config_parse2(VBUCKET_CONFIG_HANDLE handle,
620                           vbucket_source_t data_source,
621                           const char *data,
622                           const char *peername)
623 {
624     handle->localhost = peername;
625     handle->nlocalhost = peername ? strlen(peername) : 0;
626     if (data_source == LIBVBUCKET_SOURCE_FILE) {
627         return parse_from_file(handle, data);
628     } else {
629         return parse_from_memory(handle, data);
630     }
631 }
632 
vbucket_config_parse(VBUCKET_CONFIG_HANDLE handle, vbucket_source_t data_source, const char *data)633 int vbucket_config_parse(VBUCKET_CONFIG_HANDLE handle,
634                          vbucket_source_t data_source,
635                          const char *data)
636 {
637     return vbucket_config_parse2(handle, data_source, data, "localhost");
638 }
639 
vbucket_get_error_message(VBUCKET_CONFIG_HANDLE handle)640 const char *vbucket_get_error_message(VBUCKET_CONFIG_HANDLE handle)
641 {
642     return handle->errmsg;
643 }
644 
backwards_compat(vbucket_source_t source, const char *data)645 static VBUCKET_CONFIG_HANDLE backwards_compat(vbucket_source_t source, const char *data)
646 {
647     VBUCKET_CONFIG_HANDLE ret = vbucket_config_create();
648     if (ret == NULL) {
649         return NULL;
650     }
651 
652     if (vbucket_config_parse(ret, source, data) != 0) {
653         errstr = strdup(ret->errmsg);
654         vbucket_config_destroy(ret);
655         ret = NULL;
656     }
657 
658     return ret;
659 }
660 
vbucket_config_parse_file(const char *filename)661 VBUCKET_CONFIG_HANDLE vbucket_config_parse_file(const char *filename)
662 {
663     return backwards_compat(LIBVBUCKET_SOURCE_FILE, filename);
664 }
665 
vbucket_config_parse_string(const char *data)666 VBUCKET_CONFIG_HANDLE vbucket_config_parse_string(const char *data)
667 {
668     return backwards_compat(LIBVBUCKET_SOURCE_MEMORY, data);
669 }
670 
vbucket_map(VBUCKET_CONFIG_HANDLE vb, const void *key, size_t nkey, int *vbucket_id, int *server_idx)671 int vbucket_map(VBUCKET_CONFIG_HANDLE vb, const void *key, size_t nkey,
672                 int *vbucket_id, int *server_idx)
673 {
674     uint32_t digest, mid, prev;
675     struct continuum_item_st *beginp, *endp, *midp, *highp, *lowp;
676 
677     if (vb->distribution == VBUCKET_DISTRIBUTION_KETAMA) {
678         assert(vb->continuum);
679         if (vbucket_id) {
680             *vbucket_id = 0;
681         }
682         digest = hash_ketama(key, nkey);
683         beginp = lowp = vb->continuum;
684         endp = highp = vb->continuum + vb->num_continuum;
685 
686         /* divide and conquer array search to find server with next biggest
687          * point after what this key hashes to */
688         while (1)
689         {
690             /* pick the middle point */
691             midp = lowp + (highp - lowp) / 2;
692 
693             if (midp == endp) {
694                 /* if at the end, roll back to zeroth */
695                 *server_idx = beginp->index;
696                 break;
697             }
698 
699             mid = midp->point;
700             prev = (midp == beginp) ? 0 : (midp-1)->point;
701 
702             if (digest <= mid && digest > prev) {
703                 /* we found nearest server */
704                 *server_idx = midp->index;
705                 break;
706             }
707 
708             /* adjust the limits */
709             if (mid < digest) {
710                 lowp = midp + 1;
711             } else {
712                 highp = midp - 1;
713             }
714 
715             if (lowp > highp) {
716                 *server_idx = beginp->index;
717                 break;
718             }
719         }
720     } else {
721         *vbucket_id = vbucket_get_vbucket_by_key(vb, key, nkey);
722         *server_idx = vbucket_get_master(vb, *vbucket_id);
723     }
724     return 0;
725 }
726 
727 
vbucket_config_get_num_replicas(VBUCKET_CONFIG_HANDLE vb)728 int vbucket_config_get_num_replicas(VBUCKET_CONFIG_HANDLE vb) {
729     return vb->num_replicas;
730 }
731 
vbucket_config_get_num_vbuckets(VBUCKET_CONFIG_HANDLE vb)732 int vbucket_config_get_num_vbuckets(VBUCKET_CONFIG_HANDLE vb) {
733     return vb->num_vbuckets;
734 }
735 
vbucket_config_get_num_servers(VBUCKET_CONFIG_HANDLE vb)736 int vbucket_config_get_num_servers(VBUCKET_CONFIG_HANDLE vb) {
737     return vb->num_servers;
738 }
739 
vbucket_config_get_couch_api_base(VBUCKET_CONFIG_HANDLE vb, int i)740 const char *vbucket_config_get_couch_api_base(VBUCKET_CONFIG_HANDLE vb, int i) {
741     return vb->servers[i].couchdb_api_base;
742 }
743 
vbucket_config_get_rest_api_server(VBUCKET_CONFIG_HANDLE vb, int i)744 const char *vbucket_config_get_rest_api_server(VBUCKET_CONFIG_HANDLE vb, int i) {
745     return vb->servers[i].rest_api_authority;
746 }
747 
vbucket_config_is_config_node(VBUCKET_CONFIG_HANDLE vb, int i)748 int vbucket_config_is_config_node(VBUCKET_CONFIG_HANDLE vb, int i) {
749     return vb->servers[i].config_node;
750 }
751 
vbucket_config_get_distribution_type(VBUCKET_CONFIG_HANDLE vb)752 VBUCKET_DISTRIBUTION_TYPE vbucket_config_get_distribution_type(VBUCKET_CONFIG_HANDLE vb) {
753     return vb->distribution;
754 }
755 
vbucket_config_get_server(VBUCKET_CONFIG_HANDLE vb, int i)756 const char *vbucket_config_get_server(VBUCKET_CONFIG_HANDLE vb, int i) {
757     return vb->servers[i].authority;
758 }
759 
vbucket_config_get_user(VBUCKET_CONFIG_HANDLE vb)760 const char *vbucket_config_get_user(VBUCKET_CONFIG_HANDLE vb) {
761     return vb->user;
762 }
763 
vbucket_config_get_password(VBUCKET_CONFIG_HANDLE vb)764 const char *vbucket_config_get_password(VBUCKET_CONFIG_HANDLE vb) {
765     return vb->password;
766 }
767 
vbucket_get_vbucket_by_key(VBUCKET_CONFIG_HANDLE vb, const void *key, size_t nkey)768 int vbucket_get_vbucket_by_key(VBUCKET_CONFIG_HANDLE vb, const void *key, size_t nkey) {
769     /* call crc32 directly here it could be changed to some more general
770      * function when vbucket distribution will support multiple hashing
771      * algorithms */
772     uint32_t digest = hash_crc32(key, nkey);
773     return digest & vb->mask;
774 }
775 
vbucket_get_master(VBUCKET_CONFIG_HANDLE vb, int vbucket)776 int vbucket_get_master(VBUCKET_CONFIG_HANDLE vb, int vbucket) {
777     return vb->vbuckets[vbucket].servers[0];
778 }
779 
vbucket_get_replica(VBUCKET_CONFIG_HANDLE vb, int vbucket, int i)780 int vbucket_get_replica(VBUCKET_CONFIG_HANDLE vb, int vbucket, int i) {
781     int idx = i + 1;
782     if (idx < vb->num_servers) {
783         return vb->vbuckets[vbucket].servers[idx];
784     } else {
785         return -1;
786     }
787 }
788 
vbucket_found_incorrect_master(VBUCKET_CONFIG_HANDLE vb, int vbucket, int wrongserver)789 int vbucket_found_incorrect_master(VBUCKET_CONFIG_HANDLE vb, int vbucket,
790                                    int wrongserver) {
791     int mappedServer = vb->vbuckets[vbucket].servers[0];
792     int rv = mappedServer;
793     /*
794      * if a forward table exists, then return the vbucket id from the forward table
795      * and update that information in the current table. We also need to Update the
796      * replica information for that vbucket
797      */
798     if (vb->fvbuckets) {
799         int i = 0;
800         rv = vb->vbuckets[vbucket].servers[0] = vb->fvbuckets[vbucket].servers[0];
801         for (i = 0; i < vb->num_replicas; i++) {
802             vb->vbuckets[vbucket].servers[i+1] = vb->fvbuckets[vbucket].servers[i+1];
803         }
804     } else if (mappedServer == wrongserver) {
805         rv = (rv + 1) % vb->num_servers;
806         vb->vbuckets[vbucket].servers[0] = rv;
807     }
808 
809     return rv;
810 }
811 
compute_vb_list_diff(VBUCKET_CONFIG_HANDLE from, VBUCKET_CONFIG_HANDLE to, char **out)812 static void compute_vb_list_diff(VBUCKET_CONFIG_HANDLE from,
813                                  VBUCKET_CONFIG_HANDLE to,
814                                  char **out) {
815     int offset = 0;
816     int i, j;
817     for (i = 0; i < to->num_servers; i++) {
818         int found = 0;
819         const char *sn = vbucket_config_get_server(to, i);
820         for (j = 0; !found && j < from->num_servers; j++) {
821             const char *sn2 = vbucket_config_get_server(from, j);
822             found |= (strcmp(sn2, sn) == 0);
823         }
824         if (!found) {
825             out[offset] = strdup(sn);
826             assert(out[offset]);
827             ++offset;
828         }
829     }
830 }
831 
vbucket_compare(VBUCKET_CONFIG_HANDLE from, VBUCKET_CONFIG_HANDLE to)832 VBUCKET_CONFIG_DIFF* vbucket_compare(VBUCKET_CONFIG_HANDLE from,
833                                      VBUCKET_CONFIG_HANDLE to) {
834     VBUCKET_CONFIG_DIFF *rv = calloc(1, sizeof(VBUCKET_CONFIG_DIFF));
835     int num_servers = (from->num_servers > to->num_servers
836                        ? from->num_servers : to->num_servers) + 1;
837     assert(rv);
838     rv->servers_added = calloc(num_servers, sizeof(char*));
839     rv->servers_removed = calloc(num_servers, sizeof(char*));
840 
841     /* Compute the added and removed servers */
842     compute_vb_list_diff(from, to, rv->servers_added);
843     compute_vb_list_diff(to, from, rv->servers_removed);
844 
845     /* Verify the servers are equal in their positions */
846     if (to->num_servers == from->num_servers) {
847         int i;
848         for (i = 0; i < from->num_servers; i++) {
849             rv->sequence_changed |= (0 != strcmp(vbucket_config_get_server(from, i),
850                                                  vbucket_config_get_server(to, i)));
851 
852         }
853     } else {
854         /* Just say yes */
855         rv->sequence_changed = 1;
856     }
857 
858     /* Consider the sequence changed if the auth credentials changed */
859     if (from->user != NULL && to->user != NULL) {
860         rv->sequence_changed |= (strcmp(from->user, to->user) != 0);
861     } else {
862         rv->sequence_changed |= ((from->user != NULL) ^ (to->user != NULL));
863     }
864 
865     if (from->password != NULL && to->password != NULL) {
866         rv->sequence_changed |= (strcmp(from->password, to->password) != 0);
867     } else {
868         rv->sequence_changed |= ((from->password != NULL) ^ (to->password != NULL));
869     }
870 
871     /* Count the number of vbucket differences */
872     if (to->num_vbuckets == from->num_vbuckets) {
873         int i;
874         for (i = 0; i < to->num_vbuckets; i++) {
875             rv->n_vb_changes += (vbucket_get_master(from, i)
876                                  == vbucket_get_master(to, i)) ? 0 : 1;
877         }
878     } else {
879         rv->n_vb_changes = -1;
880     }
881 
882     return rv;
883 }
884 
free_array_helper(char **l)885 static void free_array_helper(char **l) {
886     int i;
887     for (i = 0; l[i]; i++) {
888         free(l[i]);
889     }
890     free(l);
891 }
892 
vbucket_free_diff(VBUCKET_CONFIG_DIFF *diff)893 void vbucket_free_diff(VBUCKET_CONFIG_DIFF *diff) {
894     assert(diff);
895     free_array_helper(diff->servers_added);
896     free_array_helper(diff->servers_removed);
897     free(diff);
898 }
899