1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 /**
4 * @copyright 2013 Couchbase, Inc.
5 *
6 * @author Filipe Manana <filipe@couchbase.com>
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 * use this file except in compliance with the License. You may obtain a copy of
10 * the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 * License for the specific language governing permissions and limitations under
18 * the License.
19 **/
20
21 #include "index_header.h"
22 #include "../bitfield.h"
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <string.h>
26 #include <snappy-c.h>
27
28 #define BITMASK_BYTE_SIZE (1024 / CHAR_BIT)
29
30 #define dec_uint16(b) (decode_raw16(*((raw_16 *) b)))
31 #define dec_uint48(b) (decode_raw48(*((raw_48 *) b)))
32 #define dec_uint64(b) (decode_raw64(*((raw_64 *) b)))
33
34 static size_t size_of_partition_versions(part_version_t *part_versions);
35 static void free_part_versions(part_version_t *part_versions);
36
37 static void enc_uint16(uint16_t u, char **buf);
38 static void enc_uint48(uint64_t u, char **buf);
39
40 static void enc_seq_list(const void *list, char **buf);
41 static void enc_part_seq_list(const void *list, char **buf);
42 static void enc_part_versions_list(const void *list, char **buf);
43 static int part_seq_cmp(const void *a, const void *b);
44 static int part_id_cmp(const void *a, const void *b);
45 static int part_versions_cmp(const void *a, const void *b);
46
47
decode_index_header(const char *bytes, size_t len, index_header_t **header)48 couchstore_error_t decode_index_header(const char *bytes,
49 size_t len,
50 index_header_t **header)
51 {
52 index_header_t *h = NULL;
53 char *b = NULL, *uncomp = NULL;
54 uint16_t num_seqs, i, j, sz, num_part_versions;
55 size_t uncompLen;
56
57 /* First 16 bytes are md5 checksum (group signature). */
58 if (len <= 16) {
59 return COUCHSTORE_ERROR_CORRUPT;
60 }
61
62 if (snappy_uncompressed_length(bytes + 16, len, &uncompLen) != SNAPPY_OK) {
63 return COUCHSTORE_ERROR_CORRUPT;
64 }
65
66 b = uncomp = (char *) malloc(uncompLen);
67 if (b == NULL) {
68 return COUCHSTORE_ERROR_ALLOC_FAIL;
69 }
70
71 if (snappy_uncompress(bytes + 16, len - 16, b, &uncompLen) != SNAPPY_OK) {
72 goto alloc_error;
73 }
74
75 h = (index_header_t *) malloc(sizeof(index_header_t));
76 if (h == NULL) {
77 goto alloc_error;
78 }
79 h->seqs = NULL;
80 h->id_btree_state = NULL;
81 h->view_states = NULL;
82 h->replicas_on_transfer = NULL;
83 h->pending_transition.active = NULL;
84 h->pending_transition.passive = NULL;
85 h->pending_transition.unindexable = NULL;
86 h->unindexable_seqs = NULL;
87 memcpy(h->signature, bytes, 16);
88
89 h->version = (uint8_t) b[0];
90 b += 1;
91
92 h->num_partitions = dec_uint16(b);
93 b += 2;
94
95 memcpy(&h->active_bitmask, b, BITMASK_BYTE_SIZE);
96 b += BITMASK_BYTE_SIZE;
97 memcpy(&h->passive_bitmask, b, BITMASK_BYTE_SIZE);
98 b += BITMASK_BYTE_SIZE;
99 memcpy(&h->cleanup_bitmask, b, BITMASK_BYTE_SIZE);
100 b += BITMASK_BYTE_SIZE;
101
102 num_seqs = dec_uint16(b);
103 b += 2;
104
105 h->seqs = sorted_list_create(part_seq_cmp);
106 if (h->seqs == NULL) {
107 goto alloc_error;
108 }
109
110 for (i = 0; i < num_seqs; ++i) {
111 part_seq_t pseq;
112
113 pseq.part_id = dec_uint16(b);
114 b += 2;
115 pseq.seq = dec_uint48(b);
116 b += 6;
117
118 if (sorted_list_add(h->seqs, &pseq, sizeof(pseq)) != 0) {
119 goto alloc_error;
120 }
121 }
122
123 sz = dec_uint16(b);
124 b += 2;
125 h->id_btree_state = read_root((void *) b, (int) sz);
126 b += sz;
127
128 h->num_views = (uint8_t) b[0];
129 b += 1;
130
131 h->view_states = (node_pointer **) malloc(sizeof(node_pointer *) * h->num_views);
132 if (h->view_states == NULL) {
133 goto alloc_error;
134 }
135
136 for (i = 0; i < (uint16_t) h->num_views; ++i) {
137 sz = dec_uint16(b);
138 b += 2;
139 h->view_states[i] = read_root((void *) b, (int) sz);
140 b += sz;
141 }
142
143 h->has_replica = b[0] == 0 ? 0 : 1;
144 b += 1;
145
146 sz = dec_uint16(b);
147 b += 2;
148 h->replicas_on_transfer = sorted_list_create(part_id_cmp);
149 if (h->replicas_on_transfer == NULL) {
150 goto alloc_error;
151 }
152
153 for (i = 0; i < sz; ++i) {
154 uint16_t part_id = dec_uint16(b);
155 b += 2;
156
157 if (sorted_list_add(h->replicas_on_transfer, &part_id, sizeof(part_id)) != 0) {
158 goto alloc_error;
159 }
160 }
161
162 sz = dec_uint16(b);
163 b += 2;
164
165 h->pending_transition.active = sorted_list_create(part_id_cmp);
166 if (h->pending_transition.active == NULL) {
167 goto alloc_error;
168 }
169
170 for (i = 0; i < sz; ++i) {
171 uint16_t part_id = dec_uint16(b);
172 b += 2;
173
174 if (sorted_list_add(h->pending_transition.active,
175 &part_id, sizeof(part_id)) != 0) {
176 goto alloc_error;
177 }
178 }
179
180 sz = dec_uint16(b);
181 b += 2;
182
183 h->pending_transition.passive = sorted_list_create(part_id_cmp);
184 if (h->pending_transition.passive == NULL) {
185 goto alloc_error;
186 }
187
188 for (i = 0; i < sz; ++i) {
189 uint16_t part_id = dec_uint16(b);
190 b += 2;
191
192 if (sorted_list_add(h->pending_transition.passive,
193 &part_id, sizeof(part_id)) != 0) {
194 goto alloc_error;
195 }
196 }
197
198 sz = dec_uint16(b);
199 b += 2;
200
201 h->pending_transition.unindexable = sorted_list_create(part_id_cmp);
202 if (h->pending_transition.unindexable == NULL) {
203 goto alloc_error;
204 }
205
206 for (i = 0; i < sz; ++i) {
207 uint16_t part_id = dec_uint16(b);
208 b += 2;
209
210 if (sorted_list_add(h->pending_transition.unindexable,
211 &part_id, sizeof(part_id)) != 0) {
212 goto alloc_error;
213 }
214 }
215
216 num_seqs = dec_uint16(b);
217 b += 2;
218
219 h->unindexable_seqs = sorted_list_create(part_seq_cmp);
220 if (h->unindexable_seqs == NULL) {
221 goto alloc_error;
222 }
223
224 for (i = 0; i < num_seqs; ++i) {
225 part_seq_t pseq;
226
227 pseq.part_id = dec_uint16(b);
228 b += 2;
229 pseq.seq = dec_uint48(b);
230 b += 6;
231
232 if (sorted_list_add(h->unindexable_seqs, &pseq, sizeof(pseq)) != 0) {
233 goto alloc_error;
234 }
235 }
236
237 if (h->version >= 2) {
238 num_part_versions = dec_uint16(b);
239 b += 2;
240
241 h->part_versions = sorted_list_create(part_versions_cmp);
242 if (h->part_versions == NULL) {
243 goto alloc_error;
244 }
245
246 for (i = 0; i < num_part_versions; ++i) {
247 part_version_t pver;
248
249 pver.part_id = dec_uint16(b);
250 b += 2;
251 pver.num_failover_log = dec_uint16(b);
252 b += 2;
253 pver.failover_log = (failover_log_t *) malloc(
254 sizeof(failover_log_t) * pver.num_failover_log);
255
256 if (pver.failover_log == NULL) {
257 goto alloc_error;
258 }
259
260 for (j = 0; j < pver.num_failover_log; ++j) {
261 memcpy(&pver.failover_log[j].uuid, b, 8);
262 b += 8;
263 pver.failover_log[j].seq = dec_uint64(b);
264 b += 8;
265 }
266 if (sorted_list_add(h->part_versions, &pver, sizeof(pver)) != 0) {
267 free(pver.failover_log);
268 goto alloc_error;
269 }
270 }
271 }
272
273 free(uncomp);
274 *header = h;
275
276 return COUCHSTORE_SUCCESS;
277
278 alloc_error:
279 free_index_header(h);
280 free(uncomp);
281 return COUCHSTORE_ERROR_ALLOC_FAIL;
282 }
283
284
size_of_partition_versions(part_version_t *part_versions)285 static size_t size_of_partition_versions(part_version_t *part_versions) {
286 /* 2 is for the number of partition versions */
287 size_t sz = 2;
288 void *it = sorted_list_iterator(part_versions);
289 part_version_t *pver = NULL;
290 pver = sorted_list_next(it);
291 while (pver != NULL) {
292 /* partition ID + number of failover logs */
293 sz += 2 + 2;
294 sz += pver->num_failover_log * 16;
295 pver = sorted_list_next(it);
296 }
297 sorted_list_free_iterator(it);
298 return sz;
299 }
300
encode_index_header(const index_header_t *header, char **buffer, size_t *buffer_size)301 couchstore_error_t encode_index_header(const index_header_t *header,
302 char **buffer,
303 size_t *buffer_size)
304 {
305 char *buf = NULL, *b = NULL;
306 size_t sz = 0;
307 uint16_t id_btree_state_size;
308 int i;
309 size_t comp_size;
310 char *comp;
311 snappy_status res;
312
313 sz += 1; /* version */
314 sz += 2; /* number of partitions */
315 sz += 3 * BITMASK_BYTE_SIZE; /* active/passive/cleanup bitmasks */
316 /* seqs */
317 sz += 2;
318 sz += sorted_list_size(header->seqs) * (2 + 6);
319 /* id btree state */
320 sz += 2;
321 if (header->id_btree_state != NULL) {
322 sz += sizeof(raw_btree_root);
323 sz += header->id_btree_state->reduce_value.size;
324 }
325 /* view btree states */
326 sz += 1;
327 for (i = 0; i < header->num_views; ++i) {
328 sz += 2;
329 if (header->view_states[i] != NULL) {
330 sz += sizeof(raw_btree_root);
331 sz += header->view_states[i]->reduce_value.size;
332 }
333 }
334 /* has_replicas */
335 sz += 1;
336 /* replicas_on_transfer */
337 sz += 2;
338 sz += sorted_list_size(header->replicas_on_transfer) * 2;
339 /* pending transition active */
340 sz += 2;
341 sz += sorted_list_size(header->pending_transition.active) * 2;
342 /* pending transition passive */
343 sz += 2;
344 sz += sorted_list_size(header->pending_transition.passive) * 2;
345 /* pending transition unindexable */
346 sz += 2;
347 sz += sorted_list_size(header->pending_transition.unindexable) * 2;
348 /* unindexable seqs */
349 sz += 2;
350 sz += sorted_list_size(header->unindexable_seqs) * (2 + 6);
351 /* partition versions */
352 if (header->version >= 2) {
353 sz += size_of_partition_versions(header->part_versions);
354 }
355
356 b = buf = (char *) malloc(sz);
357 if (buf == NULL) {
358 goto alloc_error;
359 }
360
361 b[0] = (char) header->version;
362 b += 1;
363
364 enc_uint16(header->num_partitions, &b);
365
366 memcpy(b, &header->active_bitmask, BITMASK_BYTE_SIZE);
367 b += BITMASK_BYTE_SIZE;
368 memcpy(b, &header->passive_bitmask, BITMASK_BYTE_SIZE);
369 b += BITMASK_BYTE_SIZE;
370 memcpy(b, &header->cleanup_bitmask, BITMASK_BYTE_SIZE);
371 b += BITMASK_BYTE_SIZE;
372
373 enc_part_seq_list(header->seqs, &b);
374
375 if (header->id_btree_state != NULL) {
376 id_btree_state_size = (uint16_t) sizeof(raw_btree_root);
377 id_btree_state_size += (uint16_t) header->id_btree_state->reduce_value.size;
378 } else {
379 id_btree_state_size = 0;
380 }
381 enc_uint16(id_btree_state_size, &b);
382
383 encode_root(b, header->id_btree_state);
384 b += id_btree_state_size;
385
386 b[0] = (char) header->num_views;
387 b += 1;
388 for (i = 0; i < header->num_views; ++i) {
389 uint16_t view_state_size = 0;
390
391 if (header->view_states[i] != NULL) {
392 view_state_size = (uint16_t) sizeof(raw_btree_root);
393 view_state_size += (uint16_t) header->view_states[i]->reduce_value.size;
394 }
395 enc_uint16(view_state_size, &b);
396
397 encode_root(b, header->view_states[i]);
398 b += view_state_size;
399 }
400
401 b[0] = (char) (header->has_replica ? 1 : 0);
402 b += 1;
403
404 enc_seq_list(header->replicas_on_transfer, &b);
405 enc_seq_list(header->pending_transition.active, &b);
406 enc_seq_list(header->pending_transition.passive, &b);
407 enc_seq_list(header->pending_transition.unindexable, &b);
408 enc_part_seq_list(header->unindexable_seqs, &b);
409
410 if (header->version >= 2) {
411 enc_part_versions_list(header->part_versions, &b);
412 }
413
414 comp_size = snappy_max_compressed_length(sz);
415 comp = (char *) malloc(16 + comp_size);
416
417 if (comp == NULL) {
418 goto alloc_error;
419 }
420
421 res = snappy_compress(buf, sz, comp + 16, &comp_size);
422
423 if (res != SNAPPY_OK) {
424 /* TODO: a new error for couchstore_error_t */
425 free(comp);
426 goto alloc_error;
427 }
428
429 memcpy(comp, header->signature, 16);
430 *buffer = comp;
431 *buffer_size = 16 + comp_size;
432 free(buf);
433
434 return COUCHSTORE_SUCCESS;
435
436 alloc_error:
437 free(buf);
438 *buffer = NULL;
439 *buffer_size = 0;
440 return COUCHSTORE_ERROR_ALLOC_FAIL;
441 }
442
443
free_index_header(index_header_t *header)444 void free_index_header(index_header_t *header)
445 {
446 int i;
447
448 if (header == NULL) {
449 return;
450 }
451
452 sorted_list_free(header->seqs);
453 free(header->id_btree_state);
454
455 if (header->view_states != NULL) {
456 for (i = 0; i < header->num_views; ++i) {
457 free(header->view_states[i]);
458 }
459 free(header->view_states);
460 }
461
462 sorted_list_free(header->replicas_on_transfer);
463 sorted_list_free(header->pending_transition.active);
464 sorted_list_free(header->pending_transition.passive);
465 sorted_list_free(header->pending_transition.unindexable);
466 sorted_list_free(header->unindexable_seqs);
467 if (header->version >= 2) {
468 free_part_versions(header->part_versions);
469 }
470
471 free(header);
472 }
473
free_part_versions(part_version_t *part_versions)474 static void free_part_versions(part_version_t *part_versions) {
475 void *it = sorted_list_iterator(part_versions);
476 part_version_t *pver = NULL;
477 pver = sorted_list_next(it);
478 while (pver != NULL) {
479 free(pver->failover_log);
480 pver = sorted_list_next(it);
481 }
482 sorted_list_free_iterator(it);
483 sorted_list_free(part_versions);
484 }
485
486
enc_uint16(uint16_t u, char **buf)487 static void enc_uint16(uint16_t u, char **buf)
488 {
489 raw_16 r = encode_raw16(u);
490 memcpy(*buf, &r, 2);
491 *buf += 2;
492 }
493
494
enc_uint48(uint64_t u, char **buf)495 static void enc_uint48(uint64_t u, char **buf)
496 {
497 raw_48 r;
498 encode_raw48(u, &r);
499 memcpy(*buf, &r, 6);
500 *buf += 6;
501 }
502
503
enc_uint64(uint64_t u, char **buf)504 static void enc_uint64(uint64_t u, char **buf)
505 {
506 raw_64 r = encode_raw64(u);
507 memcpy(*buf, &r, 8);
508 *buf += 8;
509 }
510
511
enc_seq_list(const void *list, char **buf)512 static void enc_seq_list(const void *list, char **buf)
513 {
514 void *it = sorted_list_iterator(list);
515 uint16_t *seq = NULL;
516
517 enc_uint16((uint16_t) sorted_list_size(list), buf);
518 seq = sorted_list_next(it);
519 while (seq != NULL) {
520 enc_uint16(*seq, buf);
521 seq = sorted_list_next(it);
522 }
523 sorted_list_free_iterator(it);
524 }
525
526
enc_part_seq_list(const void *list, char **buf)527 static void enc_part_seq_list(const void *list, char **buf)
528 {
529 void *it = sorted_list_iterator(list);
530 part_seq_t *pseq = NULL;
531
532 enc_uint16((uint16_t) sorted_list_size(list), buf);
533 pseq = sorted_list_next(it);
534 while (pseq != NULL) {
535 enc_uint16(pseq->part_id, buf);
536 enc_uint48(pseq->seq, buf);
537 pseq = sorted_list_next(it);
538 }
539 sorted_list_free_iterator(it);
540 }
541
542
enc_part_versions_list(const void *list, char **buf)543 static void enc_part_versions_list(const void *list, char **buf)
544 {
545 void *it = sorted_list_iterator(list);
546 part_version_t *pver = NULL;
547 uint16_t i;
548
549 enc_uint16((uint16_t) sorted_list_size(list), buf);
550 pver = sorted_list_next(it);
551 while (pver != NULL) {
552 enc_uint16(pver->part_id, buf);
553 enc_uint16(pver->num_failover_log, buf);
554 for (i = 0; i < pver->num_failover_log; ++i) {
555 memcpy(*buf, &(pver->failover_log[i].uuid), 8);
556 *buf += 8;
557 enc_uint64(pver->failover_log[i].seq, buf);
558 }
559 pver = sorted_list_next(it);
560 }
561 sorted_list_free_iterator(it);
562 }
563
564
part_seq_cmp(const void *a, const void *b)565 static int part_seq_cmp(const void *a, const void *b)
566 {
567 return ((part_seq_t *) a)->part_id - ((part_seq_t *) b)->part_id;
568 }
569
570
part_id_cmp(const void *a, const void *b)571 static int part_id_cmp(const void *a, const void *b)
572 {
573 return *((uint16_t *) a) - *((uint16_t *) b);
574 }
575
576
part_versions_cmp(const void *a, const void *b)577 static int part_versions_cmp(const void *a, const void *b)
578 {
579 return ((part_version_t *) a)->part_id - ((part_version_t *) b)->part_id;
580 }
581