1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 /**
4 * @copyright 2013 Couchbase, Inc.
5 *
6 * @author Filipe Manana <filipe@couchbase.com>
7 * @author Fulu Li <fulu@couchbase.com>
8 *
9 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
10 * use this file except in compliance with the License. You may obtain a copy of
11 * the License at
12 *
13 * http://www.apache.org/licenses/LICENSE-2.0
14 *
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 * License for the specific language governing permissions and limitations under
19 * the License.
20 **/
21
22 #include "../bitfield.h"
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <string.h>
26 #include <inttypes.h>
27 #include <assert.h>
28 #include "bitmap.h"
29 #include "keys.h"
30 #include "reductions.h"
31 #include "values.h"
32 #include "reducers.h"
33 #include "../couch_btree.h"
34
35 typedef enum {
36 VIEW_REDUCER_SUCCESS = 0,
37 VIEW_REDUCER_ERROR_NOT_A_NUMBER = 1,
38 VIEW_REDUCER_ERROR_BAD_STATS_OBJECT = 2
39 } builtin_reducer_error_t;
40
41 static const char *builtin_reducer_error_msg[] = {
42 NULL,
43 "Value is not a number",
44 "Invalid _stats JSON object"
45 };
46
47 typedef struct {
48 void *parent_ctx;
49 void *mapreduce_ctx;
50 } reducer_ctx_t;
51
52 typedef couchstore_error_t (*reducer_fn_t)(const mapreduce_json_list_t *,
53 const mapreduce_json_list_t *,
54 reducer_ctx_t *,
55 sized_buf *);
56
57 typedef struct {
58 /* For builtin reduce operations that fail, this is the Key of the KV pair
59 that caused the failure. For example, for a _sum reducer, if there's
60 a value that is not a number, the reducer will abort and populate the
61 field 'error_key' with the value's corresponding key and set 'error' to
62 VIEW_REDUCER_ERROR_NOT_A_NUMBER. */
63 const char *error_key;
64 builtin_reducer_error_t builtin_error;
65
66 /* For custom (JavaScript) reduce operations, this is the error message received
67 from the JavaScript engine. */
68 char *error_msg;
69 mapreduce_error_t mapreduce_error;
70
71 unsigned num_reducers;
72 reducer_fn_t *reducers;
73 reducer_ctx_t *reducer_contexts;
74 } reducer_private_t;
75
76
77 #define DOUBLE_FMT "%.15g"
78 #define scan_stats(buf, sum, count, min, max, sumsqr) \
79 sscanf(buf, "{\"sum\":%lg,\"count\":%"SCNu64",\"min\":%lg,\"max\":%lg,\"sumsqr\":%lg}",\
80 &sum, &count, &min, &max, &sumsqr)
81
82 #define sprint_stats(buf, sum, count, min, max, sumsqr) \
83 sprintf(buf, "{\"sum\":%g,\"count\":%"PRIu64",\"min\":%g,\"max\":%g,\"sumsqr\":%g}",\
84 sum, count, min, max, sumsqr)
85
86 static void free_key_excluding_elements(view_btree_key_t *key);
87 static void free_json_key_list(mapreduce_json_list_t *list);
88
89
json_to_str(const mapreduce_json_t *buf, char str[32])90 static int json_to_str(const mapreduce_json_t *buf, char str[32])
91 {
92 if (buf->length < 1 || buf->length > 31) {
93 return 0;
94 }
95 memcpy(str, buf->json, buf->length);
96 str[buf->length] = '\0';
97
98 return 1;
99 }
100
101
json_to_double(const mapreduce_json_t *buf, double *out_num)102 static int json_to_double(const mapreduce_json_t *buf, double *out_num)
103 {
104 char str[32];
105 char *end;
106
107 if (!json_to_str(buf, str)) {
108 return 0;
109 }
110 *out_num = strtod(str, &end);
111
112 return ((end > str) && (*end == '\0'));
113 }
114
115
json_to_uint64(const mapreduce_json_t *buf, uint64_t *out_num)116 static int json_to_uint64(const mapreduce_json_t *buf, uint64_t *out_num)
117 {
118 char str[32];
119 char *end;
120
121 if (!json_to_str(buf, str)) {
122 return 0;
123 }
124 *out_num = strtoull(str, &end, 10);
125
126 return ((end > str) && (*end == '\0'));
127 }
128
129
view_id_btree_reduce(char *dst, size_t *size_r, const nodelist *leaflist, int count, void *ctx)130 couchstore_error_t view_id_btree_reduce(char *dst,
131 size_t *size_r,
132 const nodelist *leaflist,
133 int count,
134 void *ctx)
135 {
136 view_id_btree_reduction_t *r = NULL;
137 uint64_t subtree_count = 0;
138 couchstore_error_t errcode = COUCHSTORE_SUCCESS;
139 const nodelist *i;
140
141 (void) ctx;
142 r = (view_id_btree_reduction_t *) malloc(sizeof(view_id_btree_reduction_t));
143 if (r == NULL) {
144 errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
145 goto alloc_error;
146 }
147 memset(&r->partitions_bitmap, 0, sizeof(bitmap_t));
148
149 for (i = leaflist; i != NULL && count > 0; i = i->next, count--) {
150 view_id_btree_value_t *v = NULL;
151 errcode = decode_view_id_btree_value(i->data.buf, i->data.size, &v);
152 if (errcode != COUCHSTORE_SUCCESS) {
153 goto alloc_error;
154 }
155 set_bit(&r->partitions_bitmap, v->partition);
156 subtree_count++;
157 free_view_id_btree_value(v);
158 }
159 r->kv_count = subtree_count;
160 errcode = encode_view_id_btree_reduction(r, dst, size_r);
161
162 alloc_error:
163 free_view_id_btree_reduction(r);
164
165 return errcode;
166 }
167
168
view_id_btree_rereduce(char *dst, size_t *size_r, const nodelist *itmlist, int count, void *ctx)169 couchstore_error_t view_id_btree_rereduce(char *dst,
170 size_t *size_r,
171 const nodelist *itmlist,
172 int count,
173 void *ctx)
174 {
175 view_id_btree_reduction_t *r = NULL;
176 uint64_t subtree_count = 0;
177 couchstore_error_t errcode = COUCHSTORE_SUCCESS;
178 const nodelist *i;
179
180 (void) ctx;
181 r = (view_id_btree_reduction_t *) malloc(sizeof(view_id_btree_reduction_t));
182 if (r == NULL) {
183 errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
184 goto alloc_error;
185 }
186 memset(&r->partitions_bitmap, 0, sizeof(bitmap_t));
187 for (i = itmlist; i != NULL && count > 0; i = i->next, count--) {
188 view_id_btree_reduction_t *r2 = NULL;
189 errcode = decode_view_id_btree_reduction(i->pointer->reduce_value.buf, &r2);
190 if (errcode != COUCHSTORE_SUCCESS) {
191 goto alloc_error;
192 }
193 union_bitmaps(&r->partitions_bitmap, &r2->partitions_bitmap);
194 subtree_count += r2->kv_count;
195 free_view_id_btree_reduction(r2);
196 }
197 r->kv_count = subtree_count;
198 errcode = encode_view_id_btree_reduction(r, dst, size_r);
199
200 alloc_error:
201 free_view_id_btree_reduction(r);
202
203 return errcode;
204 }
205
206
builtin_sum_reducer(const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, reducer_ctx_t *ctx, sized_buf *buf)207 static couchstore_error_t builtin_sum_reducer(const mapreduce_json_list_t *keys,
208 const mapreduce_json_list_t *values,
209 reducer_ctx_t *ctx,
210 sized_buf *buf)
211 {
212 double n, sum = 0.0;
213 int i, size;
214 char red[512];
215
216 for (i = 0; i < values->length; ++i) {
217 mapreduce_json_t *value = &values->values[i];
218
219 if (json_to_double(value, &n)) {
220 sum += n;
221 } else {
222 reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
223
224 priv->builtin_error = VIEW_REDUCER_ERROR_NOT_A_NUMBER;
225 if (keys == NULL) {
226 /* rereduce */
227 return COUCHSTORE_ERROR_REDUCER_FAILURE;
228 } else {
229 /* reduce */
230 mapreduce_json_t *key = &keys->values[i];
231 char *error_key = (char *) malloc(key->length + 1);
232
233 if (error_key == NULL) {
234 return COUCHSTORE_ERROR_ALLOC_FAIL;
235 }
236 memcpy(error_key, key->json, key->length);
237 error_key[key->length] = '\0';
238 priv->error_key = (const char *) error_key;
239
240 return COUCHSTORE_ERROR_REDUCER_FAILURE;
241 }
242 }
243 }
244
245 size = sprintf(red, DOUBLE_FMT, sum);
246 assert(size > 0);
247 buf->buf = (char *) malloc(size);
248 if (buf->buf == NULL) {
249 return COUCHSTORE_ERROR_ALLOC_FAIL;
250 }
251 memcpy(buf->buf, red, size);
252 buf->size = size;
253
254 return COUCHSTORE_SUCCESS;
255 }
256
257
builtin_count_reducer(const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, reducer_ctx_t *ctx, sized_buf *buf)258 static couchstore_error_t builtin_count_reducer(const mapreduce_json_list_t *keys,
259 const mapreduce_json_list_t *values,
260 reducer_ctx_t *ctx,
261 sized_buf *buf)
262 {
263 uint64_t count = 0, n;
264 int i, size;
265 char red[512];
266
267 for (i = 0; i < values->length; ++i) {
268 if (keys == NULL) {
269 /* rereduce */
270 mapreduce_json_t *value = &values->values[i];
271
272 if (json_to_uint64(value, &n)) {
273 count += n;
274 } else {
275 reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
276 priv->builtin_error = VIEW_REDUCER_ERROR_NOT_A_NUMBER;
277 return COUCHSTORE_ERROR_REDUCER_FAILURE;
278 }
279 } else {
280 /* reduce */
281 count++;
282 }
283 }
284
285 size = sprintf(red, "%"PRIu64, count);
286 assert(size > 0);
287 buf->buf = (char *) malloc(size);
288 if (buf->buf == NULL) {
289 return COUCHSTORE_ERROR_ALLOC_FAIL;
290 }
291 memcpy(buf->buf, red, size);
292 buf->size = size;
293
294 return COUCHSTORE_SUCCESS;
295 }
296
297
builtin_stats_reducer(const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, reducer_ctx_t *ctx, sized_buf *buf)298 static couchstore_error_t builtin_stats_reducer(const mapreduce_json_list_t *keys,
299 const mapreduce_json_list_t *values,
300 reducer_ctx_t *ctx,
301 sized_buf *buf)
302 {
303 double n;
304 int i, size, scanned;
305 stats_t s, reduced;
306 char red[4096];
307
308 memset(&s, 0, sizeof(s));
309
310 for (i = 0; i < values->length; ++i) {
311 mapreduce_json_t *value = &values->values[i];
312
313 if (keys == NULL) {
314 /* rereduce */
315 char *value_buf = (char *) malloc(value->length + 1);
316
317 if (value_buf == NULL) {
318 return COUCHSTORE_ERROR_ALLOC_FAIL;
319 }
320
321 memcpy(value_buf, value->json, value->length);
322 value_buf[value->length] = '\0';
323 scanned = scan_stats(value_buf,
324 reduced.sum, reduced.count, reduced.min,
325 reduced.max, reduced.sumsqr);
326 free(value_buf);
327 if (scanned == 5) {
328 if (reduced.min < s.min || s.count == 0) {
329 s.min = reduced.min;
330 }
331 if (reduced.max > s.max || s.count == 0) {
332 s.max = reduced.max;
333 }
334 s.count += reduced.count;
335 s.sum += reduced.sum;
336 s.sumsqr += reduced.sumsqr;
337 } else {
338 reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
339 priv->builtin_error = VIEW_REDUCER_ERROR_BAD_STATS_OBJECT;
340 return COUCHSTORE_ERROR_REDUCER_FAILURE;
341 }
342 } else {
343 /* reduce */
344 if (json_to_double(value, &n)) {
345 s.sum += n;
346 s.sumsqr += n * n;
347 if (s.count++ == 0) {
348 s.min = s.max = n;
349 } else if (n > s.max) {
350 s.max = n;
351 } else if (n < s.min) {
352 s.min = n;
353 }
354 } else {
355 reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
356
357 priv->builtin_error = VIEW_REDUCER_ERROR_NOT_A_NUMBER;
358 if (keys == NULL) {
359 /* rereduce */
360 return COUCHSTORE_ERROR_REDUCER_FAILURE;
361 } else {
362 /* reduce */
363 mapreduce_json_t *key = &keys->values[i];
364 char *error_key = (char *) malloc(key->length + 1);
365
366 if (error_key == NULL) {
367 return COUCHSTORE_ERROR_ALLOC_FAIL;
368 }
369 memcpy(error_key, key->json, key->length);
370 error_key[key->length] = '\0';
371 priv->error_key = (const char *) error_key;
372
373 return COUCHSTORE_ERROR_REDUCER_FAILURE;
374 }
375 }
376 }
377 }
378
379 size = sprint_stats(red, s.sum, s.count, s.min, s.max, s.sumsqr);
380 assert(size > 0);
381 buf->buf = (char *) malloc(size);
382 if (buf->buf == NULL) {
383 return COUCHSTORE_ERROR_ALLOC_FAIL;
384 }
385 memcpy(buf->buf, red, size);
386 buf->size = size;
387
388 return COUCHSTORE_SUCCESS;
389 }
390
391
js_reducer(const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, reducer_ctx_t *ctx, sized_buf *buf)392 static couchstore_error_t js_reducer(const mapreduce_json_list_t *keys,
393 const mapreduce_json_list_t *values,
394 reducer_ctx_t *ctx,
395 sized_buf *buf)
396 {
397 mapreduce_error_t ret;
398 char *error_msg = NULL;
399 reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
400
401 if (keys == NULL) {
402 /* rereduce */
403 mapreduce_json_t *result = NULL;
404
405 ret = mapreduce_rereduce(ctx->mapreduce_ctx, 1, values, &result, &error_msg);
406 if (ret != MAPREDUCE_SUCCESS) {
407 priv->error_msg = error_msg;
408 priv->mapreduce_error = ret;
409 return COUCHSTORE_ERROR_REDUCER_FAILURE;
410 }
411 buf->buf = (char *) malloc(result->length);
412 if (buf->buf == NULL) {
413 free(result->json);
414 free(result);
415 return COUCHSTORE_ERROR_ALLOC_FAIL;
416 }
417 buf->size = result->length;
418 memcpy(buf->buf, result->json, result->length);
419 free(result->json);
420 free(result);
421 } else {
422 /* reduce */
423 mapreduce_json_list_t *results = NULL;
424
425 ret = mapreduce_reduce_all(ctx->mapreduce_ctx,
426 keys,
427 values,
428 &results,
429 &error_msg);
430 if (ret != MAPREDUCE_SUCCESS) {
431 priv->error_msg = error_msg;
432 priv->mapreduce_error = ret;
433 return COUCHSTORE_ERROR_REDUCER_FAILURE;
434 }
435 assert(results->length == 1);
436 buf->buf = (char *) malloc(results->values[0].length);
437 if (buf->buf == NULL) {
438 mapreduce_free_json_list(results);
439 return COUCHSTORE_ERROR_ALLOC_FAIL;
440 }
441 buf->size = results->values[0].length;
442 memcpy(buf->buf, results->values[0].json, results->values[0].length);
443 mapreduce_free_json_list(results);
444 }
445
446 return COUCHSTORE_SUCCESS;
447 }
448
449
free_key_excluding_elements(view_btree_key_t *key)450 static void free_key_excluding_elements(view_btree_key_t *key)
451 {
452 if (key != NULL) {
453 free(key->doc_id.buf);
454 free(key);
455 }
456 }
457
458
free_json_key_list(mapreduce_json_list_t *list)459 static void free_json_key_list(mapreduce_json_list_t *list)
460 {
461 int i;
462
463 if (list == NULL) {
464 return;
465 }
466
467 for (i = list->length - 1; i > 0; --i) {
468 if (list->values[i].json == list->values[i - 1].json) {
469 list->values[i].length = 0;
470 }
471 }
472
473 for (i = 0; i < list->length; ++i) {
474 if (list->values[i].length != 0) {
475 free(list->values[i].json);
476 }
477 }
478 free(list->values);
479 free(list);
480 }
481
482
make_view_reducer_ctx(const char *functions[], unsigned num_functions, char **error_msg)483 view_reducer_ctx_t *make_view_reducer_ctx(const char *functions[],
484 unsigned num_functions,
485 char **error_msg)
486 {
487 unsigned i;
488 reducer_private_t *priv = calloc(1, sizeof(*priv));
489 view_reducer_ctx_t *ctx = calloc(1, sizeof(*ctx));
490
491 if (ctx == NULL || priv == NULL) {
492 goto error;
493 }
494
495 priv->num_reducers = num_functions;
496 priv->reducers = calloc(num_functions, sizeof(reducer_fn_t));
497 if (priv->reducers == NULL) {
498 goto error;
499 }
500
501 priv->reducer_contexts = calloc(num_functions, sizeof(reducer_ctx_t));
502 if (priv->reducer_contexts == NULL) {
503 goto error;
504 }
505
506 for (i = 0; i < num_functions; ++i) {
507 priv->reducer_contexts[i].parent_ctx = (void *) priv;
508
509 if (strcmp(functions[i], "_count") == 0) {
510 priv->reducers[i] = builtin_count_reducer;
511 } else if (strcmp(functions[i], "_sum") == 0) {
512 priv->reducers[i] = builtin_sum_reducer;
513 } else if (strcmp(functions[i], "_stats") == 0) {
514 priv->reducers[i] = builtin_stats_reducer;
515 } else {
516 mapreduce_error_t mapred_error;
517 void *mapred_ctx = NULL;
518
519 priv->reducers[i] = js_reducer;
520 /* TODO: use single reduce context for all JS functions */
521 mapred_error = mapreduce_start_reduce_context(&functions[i], 1,
522 &mapred_ctx,
523 error_msg);
524 if (mapred_error != MAPREDUCE_SUCCESS) {
525 goto error;
526 }
527 priv->reducer_contexts[i].mapreduce_ctx = mapred_ctx;
528 }
529 }
530
531 priv->builtin_error = VIEW_REDUCER_SUCCESS;
532 priv->mapreduce_error = MAPREDUCE_SUCCESS;
533 ctx->private = priv;
534 *error_msg = NULL;
535
536 return ctx;
537
538 error:
539 if (priv != NULL) {
540 if (priv->reducer_contexts != NULL) {
541 for (i = 0; i < num_functions; ++i) {
542 mapreduce_free_context(priv->reducer_contexts[i].mapreduce_ctx);
543 }
544 free(priv->reducer_contexts);
545 }
546 free(priv->reducers);
547 free(priv);
548 }
549 free(ctx);
550
551 return NULL;
552 }
553
554
free_view_reducer_ctx(view_reducer_ctx_t *ctx)555 void free_view_reducer_ctx(view_reducer_ctx_t *ctx)
556 {
557 unsigned i;
558 reducer_private_t *priv;
559
560 if (ctx == NULL) {
561 return;
562 }
563
564 priv = (reducer_private_t *) ctx->private;
565 for (i = 0; i < priv->num_reducers; ++i) {
566 mapreduce_free_context(priv->reducer_contexts[i].mapreduce_ctx);
567 }
568 free(priv->reducer_contexts);
569 free(priv->reducers);
570 free(priv);
571 free((void *) ctx->error);
572 free(ctx);
573 }
574
575
add_error_message(view_reducer_ctx_t *red_ctx, int rereduce)576 static void add_error_message(view_reducer_ctx_t *red_ctx, int rereduce)
577 {
578 reducer_private_t *priv = (reducer_private_t *) red_ctx->private;
579 char *error_msg;
580
581 if (red_ctx->error != NULL) {
582 free((void *) red_ctx->error);
583 red_ctx->error = NULL;
584 }
585
586 if (priv->builtin_error != VIEW_REDUCER_SUCCESS) {
587 const char *base_msg = builtin_reducer_error_msg[priv->builtin_error];
588
589 assert(priv->mapreduce_error == MAPREDUCE_SUCCESS);
590 if (!rereduce && (priv->error_key != NULL)) {
591 error_msg = (char *) malloc(strlen(base_msg) + 12 + strlen(priv->error_key));
592 assert(error_msg != NULL);
593 sprintf(error_msg, "%s (key %s)", base_msg, priv->error_key);
594 } else {
595 error_msg = strdup(base_msg);
596 assert(error_msg != NULL);
597 }
598 if (priv->error_key != NULL) {
599 free((void *) priv->error_key);
600 priv->error_key = NULL;
601 }
602 } else {
603 assert(priv->mapreduce_error != MAPREDUCE_SUCCESS);
604 if (priv->error_msg != NULL) {
605 error_msg = priv->error_msg;
606 } else {
607 if (priv->mapreduce_error == MAPREDUCE_TIMEOUT) {
608 error_msg = strdup("function timeout");
609 assert(error_msg != NULL);
610 } else {
611 error_msg = malloc(64);
612 assert(error_msg != NULL);
613 sprintf(error_msg, "mapreduce error: %d", priv->mapreduce_error);
614 }
615 }
616 }
617
618 red_ctx->error = (const char *) error_msg;
619 }
620
621
view_btree_reduce(char *dst, size_t *size_r, const nodelist *leaflist, int count, void *ctx)622 couchstore_error_t view_btree_reduce(char *dst,
623 size_t *size_r,
624 const nodelist *leaflist,
625 int count,
626 void *ctx)
627 {
628 view_reducer_ctx_t *red_ctx = (view_reducer_ctx_t *) ctx;
629 reducer_private_t *priv = (reducer_private_t *) red_ctx->private;
630 unsigned i;
631 reducer_fn_t reducer;
632 view_btree_reduction_t *red = NULL;
633 const nodelist *n;
634 int c;
635 couchstore_error_t ret = COUCHSTORE_SUCCESS;
636 mapreduce_json_list_t *key_list = NULL;
637 mapreduce_json_list_t *value_list = NULL;
638 view_btree_value_t **values = NULL;
639
640 values = (view_btree_value_t **) calloc(count, sizeof(view_btree_value_t *));
641 red = (view_btree_reduction_t *) calloc(1, sizeof(*red));
642 key_list = (mapreduce_json_list_t *) calloc(1, sizeof(*key_list));
643 value_list = (mapreduce_json_list_t *) calloc(1, sizeof(*value_list));
644
645 if (values == NULL || red == NULL || key_list == NULL || value_list == NULL) {
646 ret = COUCHSTORE_ERROR_ALLOC_FAIL;
647 goto out;
648 }
649
650 for (n = leaflist, c = 0; n != NULL && c < count; n = n->next, ++c) {
651 view_btree_value_t *v = NULL;
652
653 ret = decode_view_btree_value(n->data.buf, n->data.size, &v);
654 if (ret != COUCHSTORE_SUCCESS) {
655 goto out;
656 }
657 set_bit(&red->partitions_bitmap, v->partition);
658 red->kv_count += v->num_values;
659 values[c] = v;
660 }
661
662 value_list->values = (mapreduce_json_t *) calloc(red->kv_count,
663 sizeof(mapreduce_json_t));
664 if (value_list->values == NULL) {
665 ret = COUCHSTORE_ERROR_ALLOC_FAIL;
666 goto out;
667 }
668 key_list->values = (mapreduce_json_t *) calloc(red->kv_count,
669 sizeof(mapreduce_json_t));
670 if (key_list->values == NULL) {
671 ret = COUCHSTORE_ERROR_ALLOC_FAIL;
672 goto out;
673 }
674
675 for (n = leaflist, c = 0; n != NULL && c < count; n = n->next, ++c) {
676 view_btree_value_t *v = values[c];
677 view_btree_key_t *k = NULL;
678
679 ret = decode_view_btree_key(n->key.buf, n->key.size, &k);
680 if (ret != COUCHSTORE_SUCCESS) {
681 free_view_btree_value(v);
682 goto out;
683 }
684 for (i = 0; i < v->num_values; ++i) {
685 value_list->values[value_list->length].length = v->values[i].size;
686 value_list->values[value_list->length].json = v->values[i].buf;
687 value_list->length++;
688 key_list->values[key_list->length].length = k->json_key.size;
689 key_list->values[key_list->length].json = k->json_key.buf;
690 key_list->length++;
691 }
692 free_key_excluding_elements(k);
693 }
694
695 if (priv->num_reducers > 0) {
696 red->reduce_values = (sized_buf *) calloc(priv->num_reducers, sizeof(sized_buf));
697 if (red->reduce_values == NULL) {
698 ret = COUCHSTORE_ERROR_ALLOC_FAIL;
699 goto out;
700 }
701 }
702
703 red->num_values = priv->num_reducers;
704 for (i = 0; i < priv->num_reducers; ++i) {
705 sized_buf buf;
706 reducer = priv->reducers[i];
707
708 ret = (*reducer)(key_list, value_list, &priv->reducer_contexts[i], &buf);
709 if (ret != COUCHSTORE_SUCCESS) {
710 add_error_message(red_ctx, 0);
711 goto out;
712 }
713
714 red->reduce_values[i] = buf;
715 }
716
717 ret = encode_view_btree_reduction(red, dst, size_r);
718
719 out:
720 if (red != NULL) {
721 for (i = 0; i < red->num_values; ++i) {
722 free(red->reduce_values[i].buf);
723 }
724 free(red->reduce_values);
725 free(red);
726 }
727 free_json_key_list(key_list);
728 if (values != NULL) {
729 for (c = 0; c < count; ++c) {
730 free_view_btree_value(values[c]);
731 }
732 free(values);
733 }
734 if (value_list != NULL) {
735 free(value_list->values);
736 free(value_list);
737 }
738
739 return ret;
740 }
741
742
view_btree_rereduce(char *dst, size_t *size_r, const nodelist *leaflist, int count, void *ctx)743 couchstore_error_t view_btree_rereduce(char *dst,
744 size_t *size_r,
745 const nodelist *leaflist,
746 int count,
747 void *ctx)
748 {
749 view_reducer_ctx_t *red_ctx = (view_reducer_ctx_t *) ctx;
750 reducer_private_t *priv = (reducer_private_t *) red_ctx->private;
751 unsigned i;
752 reducer_fn_t reducer;
753 view_btree_reduction_t *red = NULL;
754 const nodelist *n;
755 int c;
756 couchstore_error_t ret = COUCHSTORE_SUCCESS;
757 mapreduce_json_list_t *value_list = NULL;
758 view_btree_reduction_t **reductions = NULL;
759
760 reductions = (view_btree_reduction_t **) calloc(count, sizeof(view_btree_reduction_t *));
761 red = (view_btree_reduction_t *) calloc(1, sizeof(*red));
762
763 if (reductions == NULL || red == NULL) {
764 ret = COUCHSTORE_ERROR_ALLOC_FAIL;
765 goto out;
766 }
767
768 red->num_values = priv->num_reducers;
769 red->reduce_values = (sized_buf *) calloc(red->num_values, sizeof(sized_buf));
770 value_list = (mapreduce_json_list_t *) calloc(1, sizeof(*value_list));
771
772 if (red->reduce_values == NULL || value_list == NULL) {
773 ret = COUCHSTORE_ERROR_ALLOC_FAIL;
774 goto out;
775 }
776
777 for (n = leaflist, c = 0; n != NULL && c < count; n = n->next, ++c) {
778 view_btree_reduction_t *r = NULL;
779
780 ret = decode_view_btree_reduction(n->pointer->reduce_value.buf,
781 n->pointer->reduce_value.size, &r);
782 if (ret != COUCHSTORE_SUCCESS) {
783 goto out;
784 }
785
786 union_bitmaps(&red->partitions_bitmap, &r->partitions_bitmap);
787 assert(r->num_values == priv->num_reducers);
788 red->kv_count += r->kv_count;
789 reductions[c] = r;
790 }
791
792 if (priv->num_reducers > 0) {
793 value_list->values = (mapreduce_json_t *) calloc(count,
794 sizeof(mapreduce_json_t));
795 if (value_list->values == NULL) {
796 ret = COUCHSTORE_ERROR_ALLOC_FAIL;
797 goto out;
798 }
799 }
800
801 for (i = 0; i < priv->num_reducers; ++i) {
802 sized_buf buf;
803
804 for (n = leaflist, c = 0; n != NULL && c < count; n = n->next, ++c) {
805 view_btree_reduction_t *r = reductions[c];
806
807 value_list->values[value_list->length].json = r->reduce_values[i].buf;
808 value_list->values[value_list->length].length = r->reduce_values[i].size;
809 value_list->length++;
810 }
811
812 reducer = priv->reducers[i];
813 ret = (*reducer)(NULL, value_list, &priv->reducer_contexts[i], &buf);
814 if (ret != COUCHSTORE_SUCCESS) {
815 add_error_message(red_ctx, 1);
816 goto out;
817 }
818
819 value_list->length = 0;
820 red->reduce_values[i] = buf;
821 }
822
823 ret = encode_view_btree_reduction(red, dst, size_r);
824
825 out:
826 if (red != NULL) {
827 for (i = 0; i < red->num_values; ++i) {
828 free(red->reduce_values[i].buf);
829 }
830 free(red->reduce_values);
831 free(red);
832 }
833 if (reductions != NULL) {
834 for (c = 0; c < count; ++c) {
835 free_view_btree_reduction(reductions[c]);
836 }
837 free(reductions);
838 }
839 if (value_list != NULL) {
840 free(value_list->values);
841 free(value_list);
842 }
843
844 return ret;
845 }
846