1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 /**
4  * @copyright 2013 Couchbase, Inc.
5  *
6  * @author Filipe Manana  <filipe@couchbase.com>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
9  * use this file except in compliance with the License. You may obtain a copy of
10  * the License at
11  *
12  *  http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17  * License for the specific language governing permissions and limitations under
18  * the License.
19  **/
20 
21 #include "index_header.h"
22 #include "../bitfield.h"
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <string.h>
26 #include <snappy-c.h>
27 
28 #define BITMASK_BYTE_SIZE      (1024 / CHAR_BIT)
29 
30 #define dec_uint16(b) (decode_raw16(*((raw_16 *) b)))
31 #define dec_uint48(b) (decode_raw48(*((raw_48 *) b)))
32 #define dec_uint64(b) (decode_raw64(*((raw_64 *) b)))
33 
34 static size_t size_of_partition_versions(part_version_t *part_versions);
35 static void free_part_versions(part_version_t *part_versions);
36 
37 static void enc_uint16(uint16_t u, char **buf);
38 static void enc_uint48(uint64_t u, char **buf);
39 
40 static void enc_seq_list(const void *list, char **buf);
41 static void enc_part_seq_list(const void *list, char **buf);
42 static void enc_part_versions_list(const void *list, char **buf);
43 static int part_seq_cmp(const void *a, const void *b);
44 static int part_id_cmp(const void *a, const void *b);
45 static int part_versions_cmp(const void *a, const void *b);
46 
47 
decode_index_header(const char *bytes, size_t len, index_header_t **header)48 couchstore_error_t decode_index_header(const char *bytes,
49                                        size_t len,
50                                        index_header_t **header)
51 {
52     index_header_t *h = NULL;
53     char *b = NULL, *uncomp = NULL;
54     uint16_t num_seqs, i, j, sz, num_part_versions;
55     size_t uncompLen;
56 
57     /* First 16 bytes are md5 checksum (group signature). */
58     if (len <= 16) {
59         return COUCHSTORE_ERROR_CORRUPT;
60     }
61 
62     if (snappy_uncompressed_length(bytes + 16, len, &uncompLen) != SNAPPY_OK) {
63         return COUCHSTORE_ERROR_CORRUPT;
64     }
65 
66     b = uncomp = (char *) malloc(uncompLen);
67     if (b == NULL) {
68         return COUCHSTORE_ERROR_ALLOC_FAIL;
69     }
70 
71     if (snappy_uncompress(bytes + 16, len - 16, b, &uncompLen) != SNAPPY_OK) {
72         goto alloc_error;
73     }
74 
75     h = (index_header_t *) malloc(sizeof(index_header_t));
76     if (h == NULL) {
77         goto alloc_error;
78     }
79     h->seqs = NULL;
80     h->id_btree_state = NULL;
81     h->view_states = NULL;
82     h->replicas_on_transfer = NULL;
83     h->pending_transition.active = NULL;
84     h->pending_transition.passive = NULL;
85     h->pending_transition.unindexable = NULL;
86     h->unindexable_seqs = NULL;
87     memcpy(h->signature, bytes, 16);
88 
89     h->version = (uint8_t) b[0];
90     b += 1;
91 
92     h->num_partitions = dec_uint16(b);
93     b += 2;
94 
95     memcpy(&h->active_bitmask, b, BITMASK_BYTE_SIZE);
96     b += BITMASK_BYTE_SIZE;
97     memcpy(&h->passive_bitmask, b, BITMASK_BYTE_SIZE);
98     b += BITMASK_BYTE_SIZE;
99     memcpy(&h->cleanup_bitmask, b, BITMASK_BYTE_SIZE);
100     b += BITMASK_BYTE_SIZE;
101 
102     num_seqs = dec_uint16(b);
103     b += 2;
104 
105     h->seqs = sorted_list_create(part_seq_cmp);
106     if (h->seqs == NULL) {
107         goto alloc_error;
108     }
109 
110     for (i = 0; i < num_seqs; ++i) {
111         part_seq_t pseq;
112 
113         pseq.part_id = dec_uint16(b);
114         b += 2;
115         pseq.seq = dec_uint48(b);
116         b += 6;
117 
118         if (sorted_list_add(h->seqs, &pseq, sizeof(pseq)) != 0) {
119             goto alloc_error;
120         }
121     }
122 
123     sz = dec_uint16(b);
124     b += 2;
125     h->id_btree_state = read_root((void *) b, (int) sz);
126     b += sz;
127 
128     h->num_views = (uint8_t) b[0];
129     b += 1;
130 
131     h->view_states = (node_pointer **) malloc(sizeof(node_pointer *) * h->num_views);
132     if (h->view_states == NULL) {
133         goto alloc_error;
134     }
135 
136     for (i = 0; i < (uint16_t) h->num_views; ++i) {
137         sz = dec_uint16(b);
138         b += 2;
139         h->view_states[i] = read_root((void *) b, (int) sz);
140         b += sz;
141     }
142 
143     h->has_replica = b[0] == 0 ? 0 : 1;
144     b += 1;
145 
146     sz = dec_uint16(b);
147     b += 2;
148     h->replicas_on_transfer = sorted_list_create(part_id_cmp);
149     if (h->replicas_on_transfer == NULL) {
150         goto alloc_error;
151     }
152 
153     for (i = 0; i < sz; ++i) {
154         uint16_t part_id = dec_uint16(b);
155         b += 2;
156 
157         if (sorted_list_add(h->replicas_on_transfer, &part_id, sizeof(part_id)) != 0) {
158             goto alloc_error;
159         }
160     }
161 
162     sz = dec_uint16(b);
163     b += 2;
164 
165     h->pending_transition.active = sorted_list_create(part_id_cmp);
166     if (h->pending_transition.active == NULL) {
167         goto alloc_error;
168     }
169 
170     for (i = 0; i < sz; ++i) {
171         uint16_t part_id = dec_uint16(b);
172         b += 2;
173 
174         if (sorted_list_add(h->pending_transition.active,
175                             &part_id, sizeof(part_id)) != 0) {
176             goto alloc_error;
177         }
178     }
179 
180     sz = dec_uint16(b);
181     b += 2;
182 
183     h->pending_transition.passive = sorted_list_create(part_id_cmp);
184     if (h->pending_transition.passive == NULL) {
185         goto alloc_error;
186     }
187 
188     for (i = 0; i < sz; ++i) {
189         uint16_t part_id = dec_uint16(b);
190         b += 2;
191 
192         if (sorted_list_add(h->pending_transition.passive,
193                             &part_id, sizeof(part_id)) != 0) {
194             goto alloc_error;
195         }
196     }
197 
198     sz = dec_uint16(b);
199     b += 2;
200 
201     h->pending_transition.unindexable = sorted_list_create(part_id_cmp);
202     if (h->pending_transition.unindexable == NULL) {
203         goto alloc_error;
204     }
205 
206     for (i = 0; i < sz; ++i) {
207         uint16_t part_id = dec_uint16(b);
208         b += 2;
209 
210         if (sorted_list_add(h->pending_transition.unindexable,
211                             &part_id, sizeof(part_id)) != 0) {
212             goto alloc_error;
213         }
214     }
215 
216     num_seqs = dec_uint16(b);
217     b += 2;
218 
219     h->unindexable_seqs = sorted_list_create(part_seq_cmp);
220     if (h->unindexable_seqs == NULL) {
221         goto alloc_error;
222     }
223 
224     for (i = 0; i < num_seqs; ++i) {
225         part_seq_t pseq;
226 
227         pseq.part_id = dec_uint16(b);
228         b += 2;
229         pseq.seq = dec_uint48(b);
230         b += 6;
231 
232         if (sorted_list_add(h->unindexable_seqs, &pseq, sizeof(pseq)) != 0) {
233             goto alloc_error;
234         }
235     }
236 
237     if (h->version >= 2) {
238         num_part_versions = dec_uint16(b);
239         b += 2;
240 
241         h->part_versions = sorted_list_create(part_versions_cmp);
242         if (h->part_versions == NULL) {
243             goto alloc_error;
244         }
245 
246         for (i = 0; i < num_part_versions; ++i) {
247             part_version_t pver;
248 
249             pver.part_id = dec_uint16(b);
250             b += 2;
251             pver.num_failover_log = dec_uint16(b);
252             b += 2;
253             pver.failover_log = (failover_log_t *) malloc(
254                 sizeof(failover_log_t) * pver.num_failover_log);
255 
256             if (pver.failover_log == NULL) {
257                 goto alloc_error;
258             }
259 
260             for (j = 0; j < pver.num_failover_log; ++j) {
261                 memcpy(&pver.failover_log[j].uuid, b, 8);
262                 b += 8;
263                 pver.failover_log[j].seq = dec_uint64(b);
264                 b += 8;
265             }
266             if (sorted_list_add(h->part_versions, &pver, sizeof(pver)) != 0) {
267                 free(pver.failover_log);
268                 goto alloc_error;
269             }
270         }
271     }
272 
273     free(uncomp);
274     *header = h;
275 
276     return COUCHSTORE_SUCCESS;
277 
278  alloc_error:
279     free_index_header(h);
280     free(uncomp);
281     return COUCHSTORE_ERROR_ALLOC_FAIL;
282 }
283 
284 
size_of_partition_versions(part_version_t *part_versions)285 static size_t size_of_partition_versions(part_version_t *part_versions) {
286     /* 2 is for the number of partition versions */
287     size_t sz = 2;
288     void *it = sorted_list_iterator(part_versions);
289     part_version_t *pver = NULL;
290     pver = sorted_list_next(it);
291     while (pver != NULL) {
292         /* partition ID + number of failover logs */
293         sz += 2 + 2;
294         sz += pver->num_failover_log * 16;
295         pver = sorted_list_next(it);
296     }
297     sorted_list_free_iterator(it);
298     return sz;
299 }
300 
encode_index_header(const index_header_t *header, char **buffer, size_t *buffer_size)301 couchstore_error_t encode_index_header(const index_header_t *header,
302                                        char **buffer,
303                                        size_t *buffer_size)
304 {
305     char *buf = NULL, *b = NULL;
306     size_t sz = 0;
307     uint16_t id_btree_state_size;
308     int i;
309     size_t comp_size;
310     char *comp;
311     snappy_status res;
312 
313     sz += 1;                     /* version */
314     sz += 2;                     /* number of partitions */
315     sz += 3 * BITMASK_BYTE_SIZE; /* active/passive/cleanup bitmasks */
316     /* seqs */
317     sz += 2;
318     sz += sorted_list_size(header->seqs) * (2 + 6);
319     /* id btree state */
320     sz += 2;
321     if (header->id_btree_state != NULL) {
322         sz += sizeof(raw_btree_root);
323         sz += header->id_btree_state->reduce_value.size;
324     }
325     /* view btree states */
326     sz += 1;
327     for (i = 0; i < header->num_views; ++i) {
328         sz += 2;
329         if (header->view_states[i] != NULL) {
330             sz += sizeof(raw_btree_root);
331             sz += header->view_states[i]->reduce_value.size;
332         }
333     }
334     /* has_replicas */
335     sz += 1;
336     /* replicas_on_transfer */
337     sz += 2;
338     sz += sorted_list_size(header->replicas_on_transfer) * 2;
339     /* pending transition active */
340     sz += 2;
341     sz += sorted_list_size(header->pending_transition.active) * 2;
342     /* pending transition passive */
343     sz += 2;
344     sz += sorted_list_size(header->pending_transition.passive) * 2;
345     /* pending transition unindexable */
346     sz += 2;
347     sz += sorted_list_size(header->pending_transition.unindexable) * 2;
348     /* unindexable seqs */
349     sz += 2;
350     sz += sorted_list_size(header->unindexable_seqs) * (2 + 6);
351     /* partition versions */
352     if (header->version >= 2) {
353         sz += size_of_partition_versions(header->part_versions);
354     }
355 
356     b = buf = (char *) malloc(sz);
357     if (buf == NULL) {
358         goto alloc_error;
359     }
360 
361     b[0] = (char) header->version;
362     b += 1;
363 
364     enc_uint16(header->num_partitions, &b);
365 
366     memcpy(b, &header->active_bitmask, BITMASK_BYTE_SIZE);
367     b += BITMASK_BYTE_SIZE;
368     memcpy(b, &header->passive_bitmask, BITMASK_BYTE_SIZE);
369     b += BITMASK_BYTE_SIZE;
370     memcpy(b, &header->cleanup_bitmask, BITMASK_BYTE_SIZE);
371     b += BITMASK_BYTE_SIZE;
372 
373     enc_part_seq_list(header->seqs, &b);
374 
375     if (header->id_btree_state != NULL) {
376         id_btree_state_size = (uint16_t) sizeof(raw_btree_root);
377         id_btree_state_size += (uint16_t) header->id_btree_state->reduce_value.size;
378     } else {
379         id_btree_state_size = 0;
380     }
381     enc_uint16(id_btree_state_size, &b);
382 
383     encode_root(b, header->id_btree_state);
384     b += id_btree_state_size;
385 
386     b[0] = (char) header->num_views;
387     b += 1;
388     for (i = 0; i < header->num_views; ++i) {
389         uint16_t view_state_size = 0;
390 
391         if (header->view_states[i] != NULL) {
392             view_state_size = (uint16_t) sizeof(raw_btree_root);
393             view_state_size += (uint16_t) header->view_states[i]->reduce_value.size;
394         }
395         enc_uint16(view_state_size, &b);
396 
397         encode_root(b, header->view_states[i]);
398         b += view_state_size;
399     }
400 
401     b[0] = (char) (header->has_replica ? 1 : 0);
402     b += 1;
403 
404     enc_seq_list(header->replicas_on_transfer, &b);
405     enc_seq_list(header->pending_transition.active, &b);
406     enc_seq_list(header->pending_transition.passive, &b);
407     enc_seq_list(header->pending_transition.unindexable, &b);
408     enc_part_seq_list(header->unindexable_seqs, &b);
409 
410     if (header->version >= 2) {
411         enc_part_versions_list(header->part_versions, &b);
412     }
413 
414     comp_size = snappy_max_compressed_length(sz);
415     comp = (char *) malloc(16 + comp_size);
416 
417     if (comp == NULL) {
418         goto alloc_error;
419     }
420 
421     res = snappy_compress(buf, sz, comp + 16, &comp_size);
422 
423     if (res != SNAPPY_OK) {
424         /* TODO: a new error for couchstore_error_t */
425         free(comp);
426         goto alloc_error;
427     }
428 
429     memcpy(comp, header->signature, 16);
430     *buffer = comp;
431     *buffer_size = 16 + comp_size;
432     free(buf);
433 
434     return COUCHSTORE_SUCCESS;
435 
436  alloc_error:
437     free(buf);
438     *buffer = NULL;
439     *buffer_size = 0;
440     return COUCHSTORE_ERROR_ALLOC_FAIL;
441 }
442 
443 
free_index_header(index_header_t *header)444 void free_index_header(index_header_t *header)
445 {
446     int i;
447 
448     if (header == NULL) {
449         return;
450     }
451 
452     sorted_list_free(header->seqs);
453     free(header->id_btree_state);
454 
455     if (header->view_states != NULL) {
456         for (i = 0; i < header->num_views; ++i) {
457             free(header->view_states[i]);
458         }
459         free(header->view_states);
460     }
461 
462     sorted_list_free(header->replicas_on_transfer);
463     sorted_list_free(header->pending_transition.active);
464     sorted_list_free(header->pending_transition.passive);
465     sorted_list_free(header->pending_transition.unindexable);
466     sorted_list_free(header->unindexable_seqs);
467     if (header->version >= 2) {
468         free_part_versions(header->part_versions);
469     }
470 
471     free(header);
472 }
473 
free_part_versions(part_version_t *part_versions)474 static void free_part_versions(part_version_t *part_versions) {
475     void *it = sorted_list_iterator(part_versions);
476     part_version_t *pver = NULL;
477     pver = sorted_list_next(it);
478     while (pver != NULL) {
479         free(pver->failover_log);
480         pver = sorted_list_next(it);
481     }
482     sorted_list_free_iterator(it);
483     sorted_list_free(part_versions);
484 }
485 
486 
enc_uint16(uint16_t u, char **buf)487 static void enc_uint16(uint16_t u, char **buf)
488 {
489     raw_16 r = encode_raw16(u);
490     memcpy(*buf, &r, 2);
491     *buf += 2;
492 }
493 
494 
enc_uint48(uint64_t u, char **buf)495 static void enc_uint48(uint64_t u, char **buf)
496 {
497     raw_48 r;
498     encode_raw48(u, &r);
499     memcpy(*buf, &r, 6);
500     *buf += 6;
501 }
502 
503 
enc_uint64(uint64_t u, char **buf)504 static void enc_uint64(uint64_t u, char **buf)
505 {
506     raw_64 r = encode_raw64(u);
507     memcpy(*buf, &r, 8);
508     *buf += 8;
509 }
510 
511 
enc_seq_list(const void *list, char **buf)512 static void enc_seq_list(const void *list, char **buf)
513 {
514     void *it = sorted_list_iterator(list);
515     uint16_t *seq = NULL;
516 
517     enc_uint16((uint16_t) sorted_list_size(list), buf);
518     seq = sorted_list_next(it);
519     while (seq != NULL) {
520         enc_uint16(*seq, buf);
521         seq = sorted_list_next(it);
522     }
523     sorted_list_free_iterator(it);
524 }
525 
526 
enc_part_seq_list(const void *list, char **buf)527 static void enc_part_seq_list(const void *list, char **buf)
528 {
529     void *it = sorted_list_iterator(list);
530     part_seq_t *pseq = NULL;
531 
532     enc_uint16((uint16_t) sorted_list_size(list), buf);
533     pseq = sorted_list_next(it);
534     while (pseq != NULL) {
535         enc_uint16(pseq->part_id, buf);
536         enc_uint48(pseq->seq, buf);
537         pseq = sorted_list_next(it);
538     }
539     sorted_list_free_iterator(it);
540 }
541 
542 
enc_part_versions_list(const void *list, char **buf)543 static void enc_part_versions_list(const void *list, char **buf)
544 {
545     void *it = sorted_list_iterator(list);
546     part_version_t *pver = NULL;
547     uint16_t i;
548 
549     enc_uint16((uint16_t) sorted_list_size(list), buf);
550     pver = sorted_list_next(it);
551     while (pver != NULL) {
552         enc_uint16(pver->part_id, buf);
553         enc_uint16(pver->num_failover_log, buf);
554         for (i = 0; i < pver->num_failover_log; ++i) {
555             memcpy(*buf, &(pver->failover_log[i].uuid), 8);
556             *buf += 8;
557             enc_uint64(pver->failover_log[i].seq, buf);
558         }
559         pver = sorted_list_next(it);
560     }
561     sorted_list_free_iterator(it);
562 }
563 
564 
part_seq_cmp(const void *a, const void *b)565 static int part_seq_cmp(const void *a, const void *b)
566 {
567     return ((part_seq_t *) a)->part_id - ((part_seq_t *) b)->part_id;
568 }
569 
570 
part_id_cmp(const void *a, const void *b)571 static int part_id_cmp(const void *a, const void *b)
572 {
573     return *((uint16_t *) a) - *((uint16_t *) b);
574 }
575 
576 
part_versions_cmp(const void *a, const void *b)577 static int part_versions_cmp(const void *a, const void *b)
578 {
579     return ((part_version_t *) a)->part_id - ((part_version_t *) b)->part_id;
580 }
581