xref: /5.5.2/moxi/vbucket/vbucket.c (revision d0366df5)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 NorthScale, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#include <platform/cbassert.h>
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <strings.h>
22#include <errno.h>
23
24#include "cJSON.h"
25#include "hash.h"
26#include <libvbucket/vbucket.h>
27
28#define MAX_CONFIG_SIZE 100 * 1048576
29#define MAX_VBUCKETS 65536
30#define MAX_REPLICAS 4
31#define MAX_AUTHORITY_SIZE 100
32#define STRINGIFY_(X) #X
33#define STRINGIFY(X) STRINGIFY_(X)
34
35struct server_st {
36    char *authority;        /* host:port */
37    char *rest_api_authority;
38    char *couchdb_api_base;
39    int config_node;        /* non-zero if server struct describes node,
40                               which is listening */
41};
42
43struct vbucket_st {
44    int servers[MAX_REPLICAS + 1];
45};
46
47struct continuum_item_st {
48    uint32_t index;     /* server index */
49    uint32_t point;     /* point on the ketama continuum */
50};
51
52struct vbucket_config_st {
53    char *errmsg;
54    VBUCKET_DISTRIBUTION_TYPE distribution;
55    int num_vbuckets;
56    int mask;
57    int num_servers;
58    int num_replicas;
59    char *user;
60    char *password;
61    int num_continuum;                      /* count of continuum points */
62    struct continuum_item_st *continuum;    /* ketama continuum */
63    struct server_st *servers;
64    struct vbucket_st *fvbuckets;
65    struct vbucket_st *vbuckets;
66    const char *localhost;              /* replacement for $HOST placeholder */
67    size_t nlocalhost;
68};
69
70static char *errstr = NULL;
71
72const char *vbucket_get_error() {
73    return errstr;
74}
75
76static int continuum_item_cmp(const void *t1, const void *t2)
77{
78    const struct continuum_item_st *ct1 = t1, *ct2 = t2;
79
80    if (ct1->point == ct2->point) {
81        return 0;
82    } else if (ct1->point > ct2->point) {
83        return 1;
84    } else {
85        return -1;
86    }
87}
88
89static void update_ketama_continuum(VBUCKET_CONFIG_HANDLE vb)
90{
91    char host[MAX_AUTHORITY_SIZE+10] = "";
92    int nhost;
93    int pp, hh, ss, nn;
94    unsigned char digest[16];
95    struct continuum_item_st *new_continuum, *old_continuum;
96
97    new_continuum = calloc(160 * vb->num_servers,
98                           sizeof(struct continuum_item_st));
99
100    /* 40 hashes, 4 numbers per hash = 160 points per server */
101    for (ss = 0, pp = 0; ss < vb->num_servers; ++ss) {
102        /* we can add more points to server which have more memory */
103        for (hh = 0; hh < 40; ++hh) {
104            nhost = snprintf(host, MAX_AUTHORITY_SIZE+10, "%s-%u",
105                             vb->servers[ss].authority, hh);
106            hash_md5(host, nhost, digest);
107            for (nn = 0; nn < 4; ++nn, ++pp) {
108                new_continuum[pp].index = ss;
109                new_continuum[pp].point = ((uint32_t) (digest[3 + nn * 4] & 0xFF) << 24)
110                                        | ((uint32_t) (digest[2 + nn * 4] & 0xFF) << 16)
111                                        | ((uint32_t) (digest[1 + nn * 4] & 0xFF) << 8)
112                                        | (digest[0 + nn * 4] & 0xFF);
113            }
114        }
115    }
116
117    qsort(new_continuum, pp, sizeof(struct continuum_item_st), continuum_item_cmp);
118
119    old_continuum = vb->continuum;
120    vb->continuum = new_continuum;
121    vb->num_continuum = pp;
122    if (old_continuum) {
123        free(old_continuum);
124    }
125}
126
127void vbucket_config_destroy(VBUCKET_CONFIG_HANDLE vb) {
128    int i;
129    for (i = 0; i < vb->num_servers; ++i) {
130        free(vb->servers[i].authority);
131        free(vb->servers[i].rest_api_authority);
132        free(vb->servers[i].couchdb_api_base);
133    }
134    free(vb->servers);
135    free(vb->user);
136    free(vb->password);
137    free(vb->fvbuckets);
138    free(vb->vbuckets);
139    free(vb->continuum);
140    free(vb->errmsg);
141    memset(vb, 0xff, sizeof(struct vbucket_config_st));
142    free(vb);
143}
144
145static char *substitute_localhost_marker(struct vbucket_config_st *vb, char *input)
146{
147    char *placeholder;
148    char *result = input;
149    size_t ninput = strlen(input);
150    if (vb->localhost && (placeholder = strstr(input, "$HOST"))) {
151        size_t nprefix = placeholder - input;
152        size_t off = 0;
153        result = calloc(ninput + vb->nlocalhost - 5, sizeof(char));
154        if (!result) {
155            return NULL;
156        }
157        memcpy(result, input, nprefix);
158        off += nprefix;
159        memcpy(result + off, vb->localhost, vb->nlocalhost);
160        off += vb->nlocalhost;
161        memcpy(result + off, input + nprefix + 5, ninput - (nprefix + 5));
162        free(input);
163    }
164    return result;
165}
166
167static int populate_servers(struct vbucket_config_st *vb, cJSON *c) {
168    int i;
169
170    vb->servers = calloc(vb->num_servers, sizeof(struct server_st));
171    if (vb->servers == NULL) {
172        vbucket_config_destroy(vb);
173        vb->errmsg = strdup("Failed to allocate servers array");
174        return -1;
175    }
176    for (i = 0; i < vb->num_servers; ++i) {
177        char *server;
178        cJSON *jServer = cJSON_GetArrayItem(c, i);
179        if (jServer == NULL || jServer->type != cJSON_String) {
180            vb->errmsg = strdup("Expected array of strings for serverList");
181            return -1;
182        }
183        server = strdup(jServer->valuestring);
184        if (server == NULL) {
185            vb->errmsg = strdup("Failed to allocate storage for server string");
186            return -1;
187        }
188        server = substitute_localhost_marker(vb, server);
189        if (server == NULL) {
190            vb->errmsg = strdup("Failed to allocate storage for server string during $HOST substitution");
191            return -1;
192        }
193        vb->servers[i].authority = server;
194    }
195    return 0;
196}
197
198static int get_node_authority(struct vbucket_config_st *vb, cJSON *node, char **out, size_t nbuf)
199{
200    cJSON *json;
201    char *hostname = NULL, *colon = NULL;
202    int port = -1;
203    char *buf = *out;
204
205    json = cJSON_GetObjectItem(node, "hostname");
206    if (json == NULL || json->type != cJSON_String) {
207        vb->errmsg = strdup("Expected string for node's hostname");
208        return -1;
209    }
210    hostname = json->valuestring;
211    json = cJSON_GetObjectItem(node, "ports");
212    if (json == NULL || json->type != cJSON_Object) {
213        vb->errmsg = strdup("Expected json object for node's ports");
214        return -1;
215    }
216    json = cJSON_GetObjectItem(json, "direct");
217    if (json == NULL || json->type != cJSON_Number) {
218        vb->errmsg = strdup("Expected number for node's direct port");
219        return -1;
220    }
221    port = json->valueint;
222
223    snprintf(buf, nbuf - 7, "%s", hostname);
224    colon = strchr(buf, ':');
225    if (!colon) {
226        colon = buf + strlen(buf);
227    }
228    snprintf(colon, 7, ":%d", port);
229
230    buf = substitute_localhost_marker(vb, buf);
231    if (buf == NULL) {
232        vb->errmsg = strdup("Failed to allocate storage for authority string during $HOST substitution");
233        return -1;
234    }
235    *out = buf;
236    return 0;
237}
238
239static int lookup_server_struct(struct vbucket_config_st *vb, cJSON *c) {
240    char *authority = NULL;
241    int idx = -1, ii;
242
243    authority = calloc(MAX_AUTHORITY_SIZE, sizeof(char));
244    if (authority == NULL) {
245        vb->errmsg = strdup("Failed to allocate storage for authority string");
246        return -1;
247    }
248    if (get_node_authority(vb, c, &authority, MAX_AUTHORITY_SIZE) < 0) {
249        free(authority);
250        return -1;
251    }
252
253    for (ii = 0; ii < vb->num_servers; ++ii) {
254        if (strcmp(vb->servers[ii].authority, authority) == 0) {
255            idx = ii;
256            break;
257        }
258    }
259
260    free(authority);
261    return idx;
262}
263
264static int update_server_info(struct vbucket_config_st *vb, cJSON *config) {
265    int idx, ii;
266    cJSON *node, *json;
267
268    for (ii = 0; ii < cJSON_GetArraySize(config); ++ii) {
269        node = cJSON_GetArrayItem(config, ii);
270        if (node) {
271            if (node->type != cJSON_Object) {
272                vb->errmsg = strdup("Expected json object for nodes array item");
273                return -1;
274            }
275
276            if ((idx = lookup_server_struct(vb, node)) >= 0) {
277                json = cJSON_GetObjectItem(node, "couchApiBase");
278                if (json != NULL) {
279                    char *value = strdup(json->valuestring);
280                    if (value == NULL) {
281                        vb->errmsg = strdup("Failed to allocate storage for couchApiBase string");
282                        return -1;
283                    }
284                    value = substitute_localhost_marker(vb, value);
285                    if (value == NULL) {
286                        vb->errmsg = strdup("Failed to allocate storage for hostname string during $HOST substitution");
287                        return -1;
288                    }
289                    vb->servers[idx].couchdb_api_base = value;
290                }
291                json = cJSON_GetObjectItem(node, "hostname");
292                if (json != NULL) {
293                    char *value = strdup(json->valuestring);
294                    if (value == NULL) {
295                        vb->errmsg = strdup("Failed to allocate storage for hostname string");
296                        return -1;
297                    }
298                    value = substitute_localhost_marker(vb, value);
299                    if (value == NULL) {
300                        vb->errmsg = strdup("Failed to allocate storage for hostname string during $HOST substitution");
301                        return -1;
302                    }
303                    vb->servers[idx].rest_api_authority = value;
304                }
305                json = cJSON_GetObjectItem(node, "thisNode");
306                if (json != NULL && json->type == cJSON_True) {
307                    vb->servers[idx].config_node = 1;
308                }
309            }
310        }
311    }
312    return 0;
313}
314
315static int populate_buckets(struct vbucket_config_st *vb, cJSON *c, int is_forward)
316{
317    int i, j;
318    struct vbucket_st *vb_map = NULL;
319
320    if (is_forward) {
321        if (!(vb->fvbuckets = vb_map = calloc(vb->num_vbuckets, sizeof(struct vbucket_st)))) {
322            vb->errmsg = strdup("Failed to allocate storage for forward vbucket map");
323            return -1;
324        }
325    } else {
326        if (!(vb->vbuckets = vb_map = calloc(vb->num_vbuckets, sizeof(struct vbucket_st)))) {
327            vb->errmsg = strdup("Failed to allocate storage for vbucket map");
328            return -1;
329        }
330    }
331
332    for (i = 0; i < vb->num_vbuckets; ++i) {
333        cJSON *jBucket = cJSON_GetArrayItem(c, i);
334        if (jBucket == NULL || jBucket->type != cJSON_Array ||
335            cJSON_GetArraySize(jBucket) != vb->num_replicas + 1) {
336            vb->errmsg = strdup("Expected array of arrays each with numReplicas + 1 ints for vBucketMap");
337            return -1;
338        }
339        for (j = 0; j < vb->num_replicas + 1; ++j) {
340            cJSON *jServerId = cJSON_GetArrayItem(jBucket, j);
341            if (jServerId == NULL || jServerId->type != cJSON_Number ||
342                jServerId->valueint < -1 || jServerId->valueint >= vb->num_servers) {
343                vb->errmsg = strdup("Server ID must be >= -1 and < num_servers");
344                return -1;
345            }
346            vb_map[i].servers[j] = jServerId->valueint;
347        }
348    }
349    return 0;
350}
351
352static int parse_vbucket_config(VBUCKET_CONFIG_HANDLE vb, cJSON *c)
353{
354    cJSON *json, *config;
355
356    config = cJSON_GetObjectItem(c, "vBucketServerMap");
357    if (config == NULL || config->type != cJSON_Object) {
358        /* seems like config without envelop, try to parse it */
359        config = c;
360    }
361
362    json = cJSON_GetObjectItem(config, "numReplicas");
363    if (json == NULL || json->type != cJSON_Number ||
364        json->valueint > MAX_REPLICAS) {
365        vb->errmsg = strdup("Expected number <= " STRINGIFY(MAX_REPLICAS) " for numReplicas");
366        return -1;
367    }
368    vb->num_replicas = json->valueint;
369
370    json = cJSON_GetObjectItem(config, "serverList");
371    if (json == NULL || json->type != cJSON_Array) {
372        vb->errmsg = strdup("Expected array for serverList");
373        return -1;
374    }
375    vb->num_servers = cJSON_GetArraySize(json);
376    if (vb->num_servers == 0) {
377        vb->errmsg = strdup("Empty serverList");
378        return -1;
379    }
380    if (populate_servers(vb, json) != 0) {
381        return -1;
382    }
383    /* optionally update server info using envelop (couchdb_api_base etc.) */
384    json = cJSON_GetObjectItem(c, "nodes");
385    if (json) {
386        if (json->type != cJSON_Array) {
387            vb->errmsg = strdup("Expected array for nodes");
388            return -1;
389        }
390        if (update_server_info(vb, json) != 0) {
391            return -1;
392        }
393    }
394
395    json = cJSON_GetObjectItem(config, "vBucketMap");
396    if (json == NULL || json->type != cJSON_Array) {
397        vb->errmsg = strdup("Expected array for vBucketMap");
398        return -1;
399    }
400    vb->num_vbuckets = cJSON_GetArraySize(json);
401    if (vb->num_vbuckets == 0) {
402        vb->errmsg = strdup("No vBuckets available; service maybe still initializing");
403        return -1;
404    }
405    if ((vb->num_vbuckets & (vb->num_vbuckets - 1)) != 0) {
406        vb->errmsg = strdup("Number of vBuckets must be a power of two > 0 and <= " STRINGIFY(MAX_VBUCKETS));
407        return -1;
408    }
409    vb->mask = vb->num_vbuckets - 1;
410    if (populate_buckets(vb, json, 0) != 0) {
411        return -1;
412    }
413
414    /* vbucket forward map could possibly be null */
415    json = cJSON_GetObjectItem(config, "vBucketMapForward");
416    if (json) {
417        if (json->type != cJSON_Array) {
418            vb->errmsg = strdup("Expected array for vBucketMapForward");
419            return -1;
420        }
421        if (populate_buckets(vb, json, 1) !=0) {
422            return -1;
423        }
424    }
425
426    return 0;
427}
428
429static int server_cmp(const void *s1, const void *s2)
430{
431    return strcmp(((const struct server_st *)s1)->authority,
432                  ((const struct server_st *)s2)->authority);
433}
434
435static int parse_ketama_config(VBUCKET_CONFIG_HANDLE vb, cJSON *config)
436{
437    cJSON *json, *node, *hostname;
438    char *buf;
439    int ii;
440
441    json = cJSON_GetObjectItem(config, "nodes");
442    if (json == NULL || json->type != cJSON_Array) {
443        vb->errmsg = strdup("Expected array for nodes");
444        return -1;
445    }
446
447    vb->num_servers = cJSON_GetArraySize(json);
448    if (vb->num_servers == 0) {
449        vb->errmsg = strdup("Empty serverList");
450        return -1;
451    }
452    vb->servers = calloc(vb->num_servers, sizeof(struct server_st));
453    for (ii = 0; ii < vb->num_servers; ++ii) {
454        node = cJSON_GetArrayItem(json, ii);
455        if (node == NULL || node->type != cJSON_Object) {
456            vb->errmsg = strdup("Expected object for nodes array item");
457            return -1;
458        }
459        buf = calloc(MAX_AUTHORITY_SIZE, sizeof(char));
460        if (buf == NULL) {
461            vb->errmsg = strdup("Failed to allocate storage for node authority");
462            return -1;
463        }
464        if (get_node_authority(vb, node, &buf, MAX_AUTHORITY_SIZE) < 0) {
465            return -1;
466        }
467        vb->servers[ii].authority = buf;
468        hostname = cJSON_GetObjectItem(node, "hostname");
469        if (hostname == NULL || hostname->type != cJSON_String) {
470            vb->errmsg = strdup("Expected string for node's hostname");
471            return -1;
472        }
473        buf = strdup(hostname->valuestring);
474        if (buf == NULL) {
475            vb->errmsg = strdup("Failed to allocate storage for hostname string");
476            return -1;
477        }
478        buf = substitute_localhost_marker(vb, buf);
479        if (buf == NULL) {
480            vb->errmsg = strdup("Failed to allocate storage for hostname string during $HOST substitution");
481            return -1;
482        }
483        vb->servers[ii].rest_api_authority = buf;
484    }
485    qsort(vb->servers, vb->num_servers, sizeof(struct server_st), server_cmp);
486
487    update_ketama_continuum(vb);
488    return 0;
489}
490
491static int parse_cjson(VBUCKET_CONFIG_HANDLE handle, cJSON *config)
492{
493    cJSON *json;
494
495    /* set optional credentials */
496    json = cJSON_GetObjectItem(config, "name");
497    if (json != NULL && json->type == cJSON_String && strcmp(json->valuestring, "default") != 0) {
498        handle->user = strdup(json->valuestring);
499    }
500    json = cJSON_GetObjectItem(config, "saslPassword");
501    if (json != NULL && json->type == cJSON_String) {
502        handle->password = strdup(json->valuestring);
503    }
504
505    /* by default it uses vbucket distribution to map keys to servers */
506    handle->distribution = VBUCKET_DISTRIBUTION_VBUCKET;
507
508    json = cJSON_GetObjectItem(config, "nodeLocator");
509    if (json == NULL) {
510        /* special case: it migth be config without envelope */
511        if (parse_vbucket_config(handle, config) == -1) {
512            return -1;
513        }
514    } else if (json->type == cJSON_String) {
515        if (strcmp(json->valuestring, "vbucket") == 0) {
516            handle->distribution = VBUCKET_DISTRIBUTION_VBUCKET;
517            if (parse_vbucket_config(handle, config) == -1) {
518                return -1;
519            }
520        } else if (strcmp(json->valuestring, "ketama") == 0) {
521            handle->distribution = VBUCKET_DISTRIBUTION_KETAMA;
522            if (parse_ketama_config(handle, config) == -1) {
523                return -1;
524            }
525        }
526    } else {
527        handle->errmsg = strdup("Expected string for nodeLocator");
528        return -1;
529    }
530
531    return 0;
532}
533
534static int parse_from_memory(VBUCKET_CONFIG_HANDLE handle, const char *data)
535{
536    int ret;
537    cJSON *c = cJSON_Parse(data);
538    if (c == NULL) {
539        handle->errmsg = strdup("Failed to parse data. Invalid JSON?");
540        return -1;
541    }
542
543    ret = parse_cjson(handle, c);
544
545    cJSON_Delete(c);
546    return ret;
547}
548
549static int do_read_file(FILE *fp, char *data, size_t size)
550{
551    size_t offset = 0;
552    size_t nread;
553
554    do {
555        nread = fread(data + offset, 1, size, fp);
556        if (nread != (size_t)-1 && nread != 0) {
557            offset += nread;
558            size -= nread;
559        } else {
560            return -1;
561        }
562    } while (size > 0);
563
564    return 0;
565}
566
567static int parse_from_file(VBUCKET_CONFIG_HANDLE handle, const char *filename)
568{
569    long size;
570    char *data;
571    int ret;
572    FILE *f = fopen(filename, "rb");
573    if (f == NULL) {
574        char msg[1024];
575        snprintf(msg, sizeof(msg), "Unable to open file \"%s\": %s", filename,
576                 strerror(errno));
577        handle->errmsg = strdup(msg);
578        return -1;
579    }
580    fseek(f, 0, SEEK_END);
581    size = ftell(f);
582    fseek(f, 0, SEEK_SET);
583    if (size > MAX_CONFIG_SIZE) {
584        char msg[1024];
585        snprintf(msg, sizeof(msg), "File too large: \"%s\"", filename);
586        handle->errmsg = strdup(msg);
587        fclose(f);
588        return -1;
589    }
590    data = calloc(size+1, sizeof(char));
591    if (data == NULL) {
592        char msg[1024];
593        snprintf(msg, sizeof(msg), "Failed to allocate buffer to read: \"%s\"", filename);
594        handle->errmsg = strdup(msg);
595        fclose(f);
596        return -1;
597    }
598    if (do_read_file(f, data, size) == -1) {
599        char msg[1024];
600        snprintf(msg, sizeof(msg), "Failed to read entire file: \"%s\": %s",
601                 filename, strerror(errno));
602        handle->errmsg = strdup(msg);
603        fclose(f);
604        free(data);
605        return -1;
606    }
607
608    fclose(f);
609    ret = parse_from_memory(handle, data);
610    free(data);
611    return ret;
612}
613
614VBUCKET_CONFIG_HANDLE vbucket_config_create(void)
615{
616    return calloc(1, sizeof(struct vbucket_config_st));
617}
618
619int vbucket_config_parse2(VBUCKET_CONFIG_HANDLE handle,
620                          vbucket_source_t data_source,
621                          const char *data,
622                          const char *peername)
623{
624    handle->localhost = peername;
625    handle->nlocalhost = peername ? strlen(peername) : 0;
626    if (data_source == LIBVBUCKET_SOURCE_FILE) {
627        return parse_from_file(handle, data);
628    } else {
629        return parse_from_memory(handle, data);
630    }
631}
632
633int vbucket_config_parse(VBUCKET_CONFIG_HANDLE handle,
634                         vbucket_source_t data_source,
635                         const char *data)
636{
637    return vbucket_config_parse2(handle, data_source, data, "localhost");
638}
639
640const char *vbucket_get_error_message(VBUCKET_CONFIG_HANDLE handle)
641{
642    return handle->errmsg;
643}
644
645static VBUCKET_CONFIG_HANDLE backwards_compat(vbucket_source_t source, const char *data)
646{
647    VBUCKET_CONFIG_HANDLE ret = vbucket_config_create();
648    if (ret == NULL) {
649        return NULL;
650    }
651
652    if (vbucket_config_parse(ret, source, data) != 0) {
653        errstr = strdup(ret->errmsg);
654        vbucket_config_destroy(ret);
655        ret = NULL;
656    }
657
658    return ret;
659}
660
661VBUCKET_CONFIG_HANDLE vbucket_config_parse_file(const char *filename)
662{
663    return backwards_compat(LIBVBUCKET_SOURCE_FILE, filename);
664}
665
666VBUCKET_CONFIG_HANDLE vbucket_config_parse_string(const char *data)
667{
668    return backwards_compat(LIBVBUCKET_SOURCE_MEMORY, data);
669}
670
671int vbucket_map(VBUCKET_CONFIG_HANDLE vb, const void *key, size_t nkey,
672                int *vbucket_id, int *server_idx)
673{
674    uint32_t digest, mid, prev;
675    struct continuum_item_st *beginp, *endp, *midp, *highp, *lowp;
676
677    if (vb->distribution == VBUCKET_DISTRIBUTION_KETAMA) {
678        cb_assert(vb->continuum);
679        if (vbucket_id) {
680            *vbucket_id = 0;
681        }
682        digest = hash_ketama(key, nkey);
683        beginp = lowp = vb->continuum;
684        endp = highp = vb->continuum + vb->num_continuum;
685
686        /* divide and conquer array search to find server with next biggest
687         * point after what this key hashes to */
688        while (1)
689        {
690            /* pick the middle point */
691            midp = lowp + (highp - lowp) / 2;
692
693            if (midp == endp) {
694                /* if at the end, roll back to zeroth */
695                *server_idx = beginp->index;
696                break;
697            }
698
699            mid = midp->point;
700            prev = (midp == beginp) ? 0 : (midp-1)->point;
701
702            if (digest <= mid && digest > prev) {
703                /* we found nearest server */
704                *server_idx = midp->index;
705                break;
706            }
707
708            /* adjust the limits */
709            if (mid < digest) {
710                lowp = midp + 1;
711            } else {
712                highp = midp - 1;
713            }
714
715            if (lowp > highp) {
716                *server_idx = beginp->index;
717                break;
718            }
719        }
720    } else {
721        *vbucket_id = vbucket_get_vbucket_by_key(vb, key, nkey);
722        *server_idx = vbucket_get_master(vb, *vbucket_id);
723    }
724    return 0;
725}
726
727
728int vbucket_config_get_num_replicas(VBUCKET_CONFIG_HANDLE vb) {
729    return vb->num_replicas;
730}
731
732int vbucket_config_get_num_vbuckets(VBUCKET_CONFIG_HANDLE vb) {
733    return vb->num_vbuckets;
734}
735
736int vbucket_config_get_num_servers(VBUCKET_CONFIG_HANDLE vb) {
737    return vb->num_servers;
738}
739
740const char *vbucket_config_get_couch_api_base(VBUCKET_CONFIG_HANDLE vb, int i) {
741    return vb->servers[i].couchdb_api_base;
742}
743
744const char *vbucket_config_get_rest_api_server(VBUCKET_CONFIG_HANDLE vb, int i) {
745    return vb->servers[i].rest_api_authority;
746}
747
748int vbucket_config_is_config_node(VBUCKET_CONFIG_HANDLE vb, int i) {
749    return vb->servers[i].config_node;
750}
751
752VBUCKET_DISTRIBUTION_TYPE vbucket_config_get_distribution_type(VBUCKET_CONFIG_HANDLE vb) {
753    return vb->distribution;
754}
755
756const char *vbucket_config_get_server(VBUCKET_CONFIG_HANDLE vb, int i) {
757    return vb->servers[i].authority;
758}
759
760const char *vbucket_config_get_user(VBUCKET_CONFIG_HANDLE vb) {
761    return vb->user;
762}
763
764const char *vbucket_config_get_password(VBUCKET_CONFIG_HANDLE vb) {
765    return vb->password;
766}
767
768int vbucket_get_vbucket_by_key(VBUCKET_CONFIG_HANDLE vb, const void *key, size_t nkey) {
769    /* call crc32 directly here it could be changed to some more general
770     * function when vbucket distribution will support multiple hashing
771     * algorithms */
772    uint32_t digest = hash_crc32(key, nkey);
773    return digest & vb->mask;
774}
775
776int vbucket_get_master(VBUCKET_CONFIG_HANDLE vb, int vbucket) {
777    return vb->vbuckets[vbucket].servers[0];
778}
779
780int vbucket_get_replica(VBUCKET_CONFIG_HANDLE vb, int vbucket, int i) {
781    int idx = i + 1;
782    if (idx < vb->num_servers) {
783        return vb->vbuckets[vbucket].servers[idx];
784    } else {
785        return -1;
786    }
787}
788
789int vbucket_found_incorrect_master(VBUCKET_CONFIG_HANDLE vb, int vbucket,
790                                   int wrongserver) {
791    int mappedServer = vb->vbuckets[vbucket].servers[0];
792    int rv = mappedServer;
793    /*
794     * if a forward table exists, then return the vbucket id from the forward table
795     * and update that information in the current table. We also need to Update the
796     * replica information for that vbucket
797     */
798    if (vb->fvbuckets) {
799        int i = 0;
800        rv = vb->vbuckets[vbucket].servers[0] = vb->fvbuckets[vbucket].servers[0];
801        for (i = 0; i < vb->num_replicas; i++) {
802            vb->vbuckets[vbucket].servers[i+1] = vb->fvbuckets[vbucket].servers[i+1];
803        }
804    } else if (mappedServer == wrongserver) {
805        rv = (rv + 1) % vb->num_servers;
806        vb->vbuckets[vbucket].servers[0] = rv;
807    }
808
809    return rv;
810}
811
812static void compute_vb_list_diff(VBUCKET_CONFIG_HANDLE from,
813                                 VBUCKET_CONFIG_HANDLE to,
814                                 char **out) {
815    int offset = 0;
816    int i, j;
817    for (i = 0; i < to->num_servers; i++) {
818        int found = 0;
819        const char *sn = vbucket_config_get_server(to, i);
820        for (j = 0; !found && j < from->num_servers; j++) {
821            const char *sn2 = vbucket_config_get_server(from, j);
822            found |= (strcmp(sn2, sn) == 0);
823        }
824        if (!found) {
825            out[offset] = strdup(sn);
826            cb_assert(out[offset]);
827            ++offset;
828        }
829    }
830}
831
832VBUCKET_CONFIG_DIFF* vbucket_compare(VBUCKET_CONFIG_HANDLE from,
833                                     VBUCKET_CONFIG_HANDLE to) {
834    VBUCKET_CONFIG_DIFF *rv = calloc(1, sizeof(VBUCKET_CONFIG_DIFF));
835    int num_servers = (from->num_servers > to->num_servers
836                       ? from->num_servers : to->num_servers) + 1;
837    cb_assert(rv);
838    rv->servers_added = calloc(num_servers, sizeof(char*));
839    rv->servers_removed = calloc(num_servers, sizeof(char*));
840
841    /* Compute the added and removed servers */
842    compute_vb_list_diff(from, to, rv->servers_added);
843    compute_vb_list_diff(to, from, rv->servers_removed);
844
845    /* Verify the servers are equal in their positions */
846    if (to->num_servers == from->num_servers) {
847        int i;
848        for (i = 0; i < from->num_servers; i++) {
849            rv->sequence_changed |= (0 != strcmp(vbucket_config_get_server(from, i),
850                                                 vbucket_config_get_server(to, i)));
851
852        }
853    } else {
854        /* Just say yes */
855        rv->sequence_changed = 1;
856    }
857
858    /* Consider the sequence changed if the auth credentials changed */
859    if (from->user != NULL && to->user != NULL) {
860        rv->sequence_changed |= (strcmp(from->user, to->user) != 0);
861    } else {
862        rv->sequence_changed |= ((from->user != NULL) ^ (to->user != NULL));
863    }
864
865    if (from->password != NULL && to->password != NULL) {
866        rv->sequence_changed |= (strcmp(from->password, to->password) != 0);
867    } else {
868        rv->sequence_changed |= ((from->password != NULL) ^ (to->password != NULL));
869    }
870
871    /* Count the number of vbucket differences */
872    if (to->num_vbuckets == from->num_vbuckets) {
873        int i;
874        for (i = 0; i < to->num_vbuckets; i++) {
875            rv->n_vb_changes += (vbucket_get_master(from, i)
876                                 == vbucket_get_master(to, i)) ? 0 : 1;
877        }
878    } else {
879        rv->n_vb_changes = -1;
880    }
881
882    return rv;
883}
884
885static void free_array_helper(char **l) {
886    int i;
887    for (i = 0; l[i]; i++) {
888        free(l[i]);
889    }
890    free(l);
891}
892
893void vbucket_free_diff(VBUCKET_CONFIG_DIFF *diff) {
894    cb_assert(diff);
895    free_array_helper(diff->servers_added);
896    free_array_helper(diff->servers_removed);
897    free(diff);
898}
899