1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 /**
4  * @copyright 2013 Couchbase, Inc.
5  *
6  * @author Filipe Manana  <filipe@couchbase.com>
7  * @author Fulu Li        <fulu@couchbase.com>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
10  * use this file except in compliance with the License. You may obtain a copy of
11  * the License at
12  *
13  *  http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18  * License for the specific language governing permissions and limitations under
19  * the License.
20  **/
21 
22 #include "../bitfield.h"
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <string.h>
26 #include <inttypes.h>
27 #include <assert.h>
28 #include "bitmap.h"
29 #include "keys.h"
30 #include "reductions.h"
31 #include "values.h"
32 #include "reducers.h"
33 #include "../couch_btree.h"
34 
35 typedef enum {
36     VIEW_REDUCER_SUCCESS                = 0,
37     VIEW_REDUCER_ERROR_NOT_A_NUMBER     = 1,
38     VIEW_REDUCER_ERROR_BAD_STATS_OBJECT = 2
39 } builtin_reducer_error_t;
40 
41 static const char *builtin_reducer_error_msg[] = {
42     NULL,
43     "Value is not a number",
44     "Invalid _stats JSON object"
45 };
46 
47 typedef struct {
48     void  *parent_ctx;
49     void  *mapreduce_ctx;
50 } reducer_ctx_t;
51 
52 typedef couchstore_error_t (*reducer_fn_t)(const mapreduce_json_list_t *,
53                                            const mapreduce_json_list_t *,
54                                            reducer_ctx_t *,
55                                            sized_buf *);
56 
57 typedef struct {
58     /* For builtin reduce operations that fail, this is the Key of the KV pair
59        that caused the failure. For example, for a _sum reducer, if there's
60        a value that is not a number, the reducer will abort and populate the
61        field 'error_key' with the value's corresponding key and set 'error' to
62        VIEW_REDUCER_ERROR_NOT_A_NUMBER. */
63     const char              *error_key;
64     builtin_reducer_error_t  builtin_error;
65 
66     /* For custom (JavaScript) reduce operations, this is the error message received
67        from the JavaScript engine. */
68     char                    *error_msg;
69     mapreduce_error_t        mapreduce_error;
70 
71     unsigned                 num_reducers;
72     reducer_fn_t             *reducers;
73     reducer_ctx_t            *reducer_contexts;
74 } reducer_private_t;
75 
76 
77 #define DOUBLE_FMT "%.15g"
78 #define scan_stats(buf, sum, count, min, max, sumsqr) \
79         sscanf(buf, "{\"sum\":%lg,\"count\":%"SCNu64",\"min\":%lg,\"max\":%lg,\"sumsqr\":%lg}",\
80                &sum, &count, &min, &max, &sumsqr)
81 
82 #define sprint_stats(buf, sum, count, min, max, sumsqr) \
83         sprintf(buf, "{\"sum\":%g,\"count\":%"PRIu64",\"min\":%g,\"max\":%g,\"sumsqr\":%g}",\
84                 sum, count, min, max, sumsqr)
85 
86 static void free_key_excluding_elements(view_btree_key_t *key);
87 static void free_json_key_list(mapreduce_json_list_t *list);
88 
89 
json_to_str(const mapreduce_json_t *buf, char str[32])90 static int json_to_str(const mapreduce_json_t *buf, char str[32])
91 {
92     if (buf->length < 1 || buf->length > 31) {
93         return 0;
94     }
95     memcpy(str, buf->json, buf->length);
96     str[buf->length] = '\0';
97 
98     return 1;
99 }
100 
101 
json_to_double(const mapreduce_json_t *buf, double *out_num)102 static int json_to_double(const mapreduce_json_t *buf, double *out_num)
103 {
104     char str[32];
105     char *end;
106 
107     if (!json_to_str(buf, str)) {
108         return 0;
109     }
110     *out_num = strtod(str, &end);
111 
112     return ((end > str) && (*end == '\0'));
113 }
114 
115 
json_to_uint64(const mapreduce_json_t *buf, uint64_t *out_num)116 static int json_to_uint64(const mapreduce_json_t *buf, uint64_t *out_num)
117 {
118     char str[32];
119     char *end;
120 
121     if (!json_to_str(buf, str)) {
122         return 0;
123     }
124     *out_num = strtoull(str, &end, 10);
125 
126     return ((end > str) && (*end == '\0'));
127 }
128 
129 
view_id_btree_reduce(char *dst, size_t *size_r, const nodelist *leaflist, int count, void *ctx)130 couchstore_error_t view_id_btree_reduce(char *dst,
131                                         size_t *size_r,
132                                         const nodelist *leaflist,
133                                         int count,
134                                         void *ctx)
135 {
136     view_id_btree_reduction_t *r = NULL;
137     uint64_t subtree_count = 0;
138     couchstore_error_t errcode = COUCHSTORE_SUCCESS;
139     const nodelist *i;
140 
141     (void) ctx;
142     r = (view_id_btree_reduction_t *) malloc(sizeof(view_id_btree_reduction_t));
143     if (r == NULL) {
144         errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
145         goto alloc_error;
146     }
147     memset(&r->partitions_bitmap, 0, sizeof(bitmap_t));
148 
149     for (i = leaflist; i != NULL && count > 0; i = i->next, count--) {
150         view_id_btree_value_t *v = NULL;
151         errcode = decode_view_id_btree_value(i->data.buf, i->data.size, &v);
152         if (errcode != COUCHSTORE_SUCCESS) {
153             goto alloc_error;
154         }
155         set_bit(&r->partitions_bitmap, v->partition);
156         subtree_count++;
157         free_view_id_btree_value(v);
158     }
159     r->kv_count = subtree_count;
160     errcode = encode_view_id_btree_reduction(r, dst, size_r);
161 
162 alloc_error:
163     free_view_id_btree_reduction(r);
164 
165     return errcode;
166 }
167 
168 
view_id_btree_rereduce(char *dst, size_t *size_r, const nodelist *itmlist, int count, void *ctx)169 couchstore_error_t view_id_btree_rereduce(char *dst,
170                                           size_t *size_r,
171                                           const nodelist *itmlist,
172                                           int count,
173                                           void *ctx)
174 {
175     view_id_btree_reduction_t *r = NULL;
176     uint64_t subtree_count = 0;
177     couchstore_error_t errcode = COUCHSTORE_SUCCESS;
178     const nodelist *i;
179 
180     (void) ctx;
181     r = (view_id_btree_reduction_t *) malloc(sizeof(view_id_btree_reduction_t));
182     if (r == NULL) {
183         errcode = COUCHSTORE_ERROR_ALLOC_FAIL;
184         goto alloc_error;
185     }
186     memset(&r->partitions_bitmap, 0, sizeof(bitmap_t));
187     for (i = itmlist; i != NULL && count > 0; i = i->next, count--) {
188         view_id_btree_reduction_t *r2 = NULL;
189         errcode = decode_view_id_btree_reduction(i->pointer->reduce_value.buf, &r2);
190         if (errcode != COUCHSTORE_SUCCESS) {
191             goto alloc_error;
192         }
193         union_bitmaps(&r->partitions_bitmap, &r2->partitions_bitmap);
194         subtree_count += r2->kv_count;
195         free_view_id_btree_reduction(r2);
196     }
197     r->kv_count = subtree_count;
198     errcode = encode_view_id_btree_reduction(r, dst, size_r);
199 
200 alloc_error:
201     free_view_id_btree_reduction(r);
202 
203     return errcode;
204 }
205 
206 
builtin_sum_reducer(const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, reducer_ctx_t *ctx, sized_buf *buf)207 static couchstore_error_t builtin_sum_reducer(const mapreduce_json_list_t *keys,
208                                               const mapreduce_json_list_t *values,
209                                               reducer_ctx_t *ctx,
210                                               sized_buf *buf)
211 {
212     double n, sum = 0.0;
213     int i, size;
214     char red[512];
215 
216     for (i = 0; i < values->length; ++i) {
217         mapreduce_json_t *value = &values->values[i];
218 
219         if (json_to_double(value, &n)) {
220             sum += n;
221         } else {
222             reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
223 
224             priv->builtin_error = VIEW_REDUCER_ERROR_NOT_A_NUMBER;
225             if (keys == NULL) {
226                 /* rereduce */
227                 return COUCHSTORE_ERROR_REDUCER_FAILURE;
228             } else {
229                 /* reduce */
230                 mapreduce_json_t *key = &keys->values[i];
231                 char *error_key = (char *) malloc(key->length + 1);
232 
233                 if (error_key == NULL) {
234                     return COUCHSTORE_ERROR_ALLOC_FAIL;
235                 }
236                 memcpy(error_key, key->json, key->length);
237                 error_key[key->length] = '\0';
238                 priv->error_key = (const char *) error_key;
239 
240                 return COUCHSTORE_ERROR_REDUCER_FAILURE;
241             }
242         }
243     }
244 
245     size = sprintf(red, DOUBLE_FMT, sum);
246     assert(size > 0);
247     buf->buf = (char *) malloc(size);
248     if (buf->buf == NULL) {
249         return COUCHSTORE_ERROR_ALLOC_FAIL;
250     }
251     memcpy(buf->buf, red, size);
252     buf->size = size;
253 
254     return COUCHSTORE_SUCCESS;
255 }
256 
257 
builtin_count_reducer(const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, reducer_ctx_t *ctx, sized_buf *buf)258 static couchstore_error_t builtin_count_reducer(const mapreduce_json_list_t *keys,
259                                                 const mapreduce_json_list_t *values,
260                                                 reducer_ctx_t *ctx,
261                                                 sized_buf *buf)
262 {
263     uint64_t count = 0, n;
264     int i, size;
265     char red[512];
266 
267     for (i = 0; i < values->length; ++i) {
268         if (keys == NULL) {
269             /* rereduce */
270             mapreduce_json_t *value = &values->values[i];
271 
272             if (json_to_uint64(value, &n)) {
273                 count += n;
274             } else {
275                 reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
276                 priv->builtin_error = VIEW_REDUCER_ERROR_NOT_A_NUMBER;
277                 return COUCHSTORE_ERROR_REDUCER_FAILURE;
278             }
279         } else {
280             /* reduce */
281             count++;
282         }
283     }
284 
285     size = sprintf(red, "%"PRIu64, count);
286     assert(size > 0);
287     buf->buf = (char *) malloc(size);
288     if (buf->buf == NULL) {
289         return COUCHSTORE_ERROR_ALLOC_FAIL;
290     }
291     memcpy(buf->buf, red, size);
292     buf->size = size;
293 
294     return COUCHSTORE_SUCCESS;
295 }
296 
297 
builtin_stats_reducer(const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, reducer_ctx_t *ctx, sized_buf *buf)298 static couchstore_error_t builtin_stats_reducer(const mapreduce_json_list_t *keys,
299                                                 const mapreduce_json_list_t *values,
300                                                 reducer_ctx_t *ctx,
301                                                 sized_buf *buf)
302 {
303     double n;
304     int i, size, scanned;
305     stats_t s, reduced;
306     char red[4096];
307 
308     memset(&s, 0, sizeof(s));
309 
310     for (i = 0; i < values->length; ++i) {
311         mapreduce_json_t *value = &values->values[i];
312 
313         if (keys == NULL) {
314             /* rereduce */
315             char *value_buf = (char *) malloc(value->length + 1);
316 
317             if (value_buf == NULL) {
318                 return COUCHSTORE_ERROR_ALLOC_FAIL;
319             }
320 
321             memcpy(value_buf, value->json, value->length);
322             value_buf[value->length] = '\0';
323             scanned = scan_stats(value_buf,
324                                  reduced.sum, reduced.count, reduced.min,
325                                  reduced.max, reduced.sumsqr);
326             free(value_buf);
327             if (scanned == 5) {
328                 if (reduced.min < s.min || s.count == 0) {
329                     s.min = reduced.min;
330                 }
331                 if (reduced.max > s.max || s.count == 0) {
332                     s.max = reduced.max;
333                 }
334                 s.count += reduced.count;
335                 s.sum += reduced.sum;
336                 s.sumsqr += reduced.sumsqr;
337             } else {
338                 reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
339                 priv->builtin_error = VIEW_REDUCER_ERROR_BAD_STATS_OBJECT;
340                 return COUCHSTORE_ERROR_REDUCER_FAILURE;
341             }
342         } else {
343             /* reduce */
344             if (json_to_double(value, &n)) {
345                 s.sum += n;
346                 s.sumsqr += n * n;
347                 if (s.count++ == 0) {
348                     s.min = s.max = n;
349                 } else if (n > s.max) {
350                     s.max = n;
351                 } else if (n < s.min) {
352                     s.min = n;
353                 }
354             } else {
355                 reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
356 
357                 priv->builtin_error = VIEW_REDUCER_ERROR_NOT_A_NUMBER;
358                 if (keys == NULL) {
359                     /* rereduce */
360                     return COUCHSTORE_ERROR_REDUCER_FAILURE;
361                 } else {
362                     /* reduce */
363                     mapreduce_json_t *key = &keys->values[i];
364                     char *error_key = (char *) malloc(key->length + 1);
365 
366                     if (error_key == NULL) {
367                         return COUCHSTORE_ERROR_ALLOC_FAIL;
368                     }
369                     memcpy(error_key, key->json, key->length);
370                     error_key[key->length] = '\0';
371                     priv->error_key = (const char *) error_key;
372 
373                     return COUCHSTORE_ERROR_REDUCER_FAILURE;
374                 }
375             }
376         }
377     }
378 
379     size = sprint_stats(red, s.sum, s.count, s.min, s.max, s.sumsqr);
380     assert(size > 0);
381     buf->buf = (char *) malloc(size);
382     if (buf->buf == NULL) {
383         return COUCHSTORE_ERROR_ALLOC_FAIL;
384     }
385     memcpy(buf->buf, red, size);
386     buf->size = size;
387 
388     return COUCHSTORE_SUCCESS;
389 }
390 
391 
js_reducer(const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, reducer_ctx_t *ctx, sized_buf *buf)392 static couchstore_error_t js_reducer(const mapreduce_json_list_t *keys,
393                                      const mapreduce_json_list_t *values,
394                                      reducer_ctx_t *ctx,
395                                      sized_buf *buf)
396 {
397     mapreduce_error_t ret;
398     char *error_msg = NULL;
399     reducer_private_t *priv = (reducer_private_t *) ctx->parent_ctx;
400 
401     if (keys == NULL) {
402         /* rereduce */
403         mapreduce_json_t *result = NULL;
404 
405         ret = mapreduce_rereduce(ctx->mapreduce_ctx, 1, values, &result, &error_msg);
406         if (ret != MAPREDUCE_SUCCESS) {
407             priv->error_msg = error_msg;
408             priv->mapreduce_error = ret;
409             return COUCHSTORE_ERROR_REDUCER_FAILURE;
410         }
411         buf->buf = (char *) malloc(result->length);
412         if (buf->buf == NULL) {
413             free(result->json);
414             free(result);
415             return COUCHSTORE_ERROR_ALLOC_FAIL;
416         }
417         buf->size = result->length;
418         memcpy(buf->buf, result->json, result->length);
419         free(result->json);
420         free(result);
421     } else {
422         /* reduce */
423         mapreduce_json_list_t *results = NULL;
424 
425         ret = mapreduce_reduce_all(ctx->mapreduce_ctx,
426                                    keys,
427                                    values,
428                                    &results,
429                                    &error_msg);
430         if (ret != MAPREDUCE_SUCCESS) {
431             priv->error_msg = error_msg;
432             priv->mapreduce_error = ret;
433             return COUCHSTORE_ERROR_REDUCER_FAILURE;
434         }
435         assert(results->length == 1);
436         buf->buf = (char *) malloc(results->values[0].length);
437         if (buf->buf == NULL) {
438             mapreduce_free_json_list(results);
439             return COUCHSTORE_ERROR_ALLOC_FAIL;
440         }
441         buf->size = results->values[0].length;
442         memcpy(buf->buf, results->values[0].json, results->values[0].length);
443         mapreduce_free_json_list(results);
444     }
445 
446     return COUCHSTORE_SUCCESS;
447 }
448 
449 
free_key_excluding_elements(view_btree_key_t *key)450 static void free_key_excluding_elements(view_btree_key_t *key)
451 {
452     if (key != NULL) {
453         free(key->doc_id.buf);
454         free(key);
455     }
456 }
457 
458 
free_json_key_list(mapreduce_json_list_t *list)459 static void free_json_key_list(mapreduce_json_list_t *list)
460 {
461     int i;
462 
463     if (list == NULL) {
464         return;
465     }
466 
467     for (i = list->length - 1; i > 0; --i) {
468         if (list->values[i].json == list->values[i - 1].json) {
469             list->values[i].length = 0;
470         }
471     }
472 
473     for (i = 0; i < list->length; ++i) {
474         if (list->values[i].length != 0) {
475             free(list->values[i].json);
476         }
477     }
478     free(list->values);
479     free(list);
480 }
481 
482 
make_view_reducer_ctx(const char *functions[], unsigned num_functions, char **error_msg)483 view_reducer_ctx_t *make_view_reducer_ctx(const char *functions[],
484                                           unsigned num_functions,
485                                           char **error_msg)
486 {
487     unsigned i;
488     reducer_private_t *priv = calloc(1, sizeof(*priv));
489     view_reducer_ctx_t *ctx = calloc(1, sizeof(*ctx));
490 
491     if (ctx == NULL || priv == NULL) {
492         goto error;
493     }
494 
495     priv->num_reducers = num_functions;
496     priv->reducers = calloc(num_functions, sizeof(reducer_fn_t));
497     if (priv->reducers == NULL) {
498         goto error;
499     }
500 
501     priv->reducer_contexts = calloc(num_functions, sizeof(reducer_ctx_t));
502     if (priv->reducer_contexts == NULL) {
503         goto error;
504     }
505 
506     for (i = 0; i < num_functions; ++i) {
507         priv->reducer_contexts[i].parent_ctx = (void *) priv;
508 
509         if (strcmp(functions[i], "_count") == 0) {
510             priv->reducers[i] = builtin_count_reducer;
511         } else if (strcmp(functions[i], "_sum") == 0) {
512             priv->reducers[i] = builtin_sum_reducer;
513         } else if (strcmp(functions[i], "_stats") == 0) {
514             priv->reducers[i] = builtin_stats_reducer;
515         } else {
516             mapreduce_error_t mapred_error;
517             void *mapred_ctx = NULL;
518 
519             priv->reducers[i] = js_reducer;
520             /* TODO: use single reduce context for all JS functions */
521             mapred_error = mapreduce_start_reduce_context(&functions[i], 1,
522                                                           &mapred_ctx,
523                                                           error_msg);
524             if (mapred_error != MAPREDUCE_SUCCESS) {
525                 goto error;
526             }
527             priv->reducer_contexts[i].mapreduce_ctx = mapred_ctx;
528         }
529     }
530 
531     priv->builtin_error = VIEW_REDUCER_SUCCESS;
532     priv->mapreduce_error = MAPREDUCE_SUCCESS;
533     ctx->private = priv;
534     *error_msg = NULL;
535 
536     return ctx;
537 
538 error:
539     if (priv != NULL) {
540         if (priv->reducer_contexts != NULL) {
541             for (i = 0; i < num_functions; ++i) {
542                 mapreduce_free_context(priv->reducer_contexts[i].mapreduce_ctx);
543             }
544             free(priv->reducer_contexts);
545         }
546         free(priv->reducers);
547         free(priv);
548     }
549     free(ctx);
550 
551     return NULL;
552 }
553 
554 
free_view_reducer_ctx(view_reducer_ctx_t *ctx)555 void free_view_reducer_ctx(view_reducer_ctx_t *ctx)
556 {
557     unsigned i;
558     reducer_private_t *priv;
559 
560     if (ctx == NULL) {
561         return;
562     }
563 
564     priv = (reducer_private_t *) ctx->private;
565     for (i = 0; i < priv->num_reducers; ++i) {
566         mapreduce_free_context(priv->reducer_contexts[i].mapreduce_ctx);
567     }
568     free(priv->reducer_contexts);
569     free(priv->reducers);
570     free(priv);
571     free((void *) ctx->error);
572     free(ctx);
573 }
574 
575 
add_error_message(view_reducer_ctx_t *red_ctx, int rereduce)576 static void add_error_message(view_reducer_ctx_t *red_ctx, int rereduce)
577 {
578    reducer_private_t *priv = (reducer_private_t *) red_ctx->private;
579    char *error_msg;
580 
581    if (red_ctx->error != NULL) {
582        free((void *) red_ctx->error);
583        red_ctx->error = NULL;
584    }
585 
586    if (priv->builtin_error != VIEW_REDUCER_SUCCESS) {
587        const char *base_msg = builtin_reducer_error_msg[priv->builtin_error];
588 
589        assert(priv->mapreduce_error == MAPREDUCE_SUCCESS);
590        if (!rereduce && (priv->error_key != NULL)) {
591            error_msg = (char *) malloc(strlen(base_msg) + 12 + strlen(priv->error_key));
592            assert(error_msg != NULL);
593            sprintf(error_msg, "%s (key %s)", base_msg, priv->error_key);
594        } else {
595            error_msg = strdup(base_msg);
596            assert(error_msg != NULL);
597        }
598        if (priv->error_key != NULL) {
599            free((void *) priv->error_key);
600            priv->error_key = NULL;
601        }
602    } else {
603        assert(priv->mapreduce_error != MAPREDUCE_SUCCESS);
604        if (priv->error_msg != NULL) {
605            error_msg = priv->error_msg;
606        } else {
607            if (priv->mapreduce_error == MAPREDUCE_TIMEOUT) {
608                error_msg = strdup("function timeout");
609                assert(error_msg != NULL);
610            } else {
611                error_msg = malloc(64);
612                assert(error_msg != NULL);
613                sprintf(error_msg, "mapreduce error: %d", priv->mapreduce_error);
614            }
615        }
616    }
617 
618    red_ctx->error = (const char *) error_msg;
619 }
620 
621 
view_btree_reduce(char *dst, size_t *size_r, const nodelist *leaflist, int count, void *ctx)622 couchstore_error_t view_btree_reduce(char *dst,
623                                      size_t *size_r,
624                                      const nodelist *leaflist,
625                                      int count,
626                                      void *ctx)
627 {
628     view_reducer_ctx_t *red_ctx = (view_reducer_ctx_t *) ctx;
629     reducer_private_t *priv = (reducer_private_t *) red_ctx->private;
630     unsigned i;
631     reducer_fn_t reducer;
632     view_btree_reduction_t *red = NULL;
633     const nodelist *n;
634     int c;
635     couchstore_error_t ret = COUCHSTORE_SUCCESS;
636     mapreduce_json_list_t *key_list = NULL;
637     mapreduce_json_list_t *value_list = NULL;
638     view_btree_value_t **values = NULL;
639 
640     values = (view_btree_value_t **) calloc(count, sizeof(view_btree_value_t *));
641     red = (view_btree_reduction_t *) calloc(1, sizeof(*red));
642     key_list = (mapreduce_json_list_t *) calloc(1, sizeof(*key_list));
643     value_list = (mapreduce_json_list_t *) calloc(1, sizeof(*value_list));
644 
645     if (values == NULL || red == NULL || key_list == NULL || value_list == NULL) {
646         ret = COUCHSTORE_ERROR_ALLOC_FAIL;
647         goto out;
648     }
649 
650     for (n = leaflist, c = 0; n != NULL && c < count; n = n->next, ++c) {
651         view_btree_value_t *v = NULL;
652 
653         ret = decode_view_btree_value(n->data.buf, n->data.size, &v);
654         if (ret != COUCHSTORE_SUCCESS) {
655             goto out;
656         }
657         set_bit(&red->partitions_bitmap, v->partition);
658         red->kv_count += v->num_values;
659         values[c] = v;
660     }
661 
662     value_list->values = (mapreduce_json_t *) calloc(red->kv_count,
663                                                      sizeof(mapreduce_json_t));
664     if (value_list->values == NULL) {
665         ret = COUCHSTORE_ERROR_ALLOC_FAIL;
666         goto out;
667     }
668     key_list->values = (mapreduce_json_t *) calloc(red->kv_count,
669                                                    sizeof(mapreduce_json_t));
670     if (key_list->values == NULL) {
671         ret = COUCHSTORE_ERROR_ALLOC_FAIL;
672         goto out;
673     }
674 
675     for (n = leaflist, c = 0; n != NULL && c < count; n = n->next, ++c) {
676         view_btree_value_t *v = values[c];
677         view_btree_key_t *k = NULL;
678 
679         ret = decode_view_btree_key(n->key.buf, n->key.size, &k);
680         if (ret != COUCHSTORE_SUCCESS) {
681             free_view_btree_value(v);
682             goto out;
683         }
684         for (i = 0; i < v->num_values; ++i) {
685             value_list->values[value_list->length].length = v->values[i].size;
686             value_list->values[value_list->length].json = v->values[i].buf;
687             value_list->length++;
688             key_list->values[key_list->length].length = k->json_key.size;
689             key_list->values[key_list->length].json = k->json_key.buf;
690             key_list->length++;
691         }
692         free_key_excluding_elements(k);
693     }
694 
695     if (priv->num_reducers > 0) {
696         red->reduce_values = (sized_buf *) calloc(priv->num_reducers, sizeof(sized_buf));
697         if (red->reduce_values == NULL) {
698             ret = COUCHSTORE_ERROR_ALLOC_FAIL;
699             goto out;
700         }
701     }
702 
703     red->num_values = priv->num_reducers;
704     for (i = 0; i < priv->num_reducers; ++i) {
705         sized_buf buf;
706         reducer = priv->reducers[i];
707 
708         ret = (*reducer)(key_list, value_list, &priv->reducer_contexts[i], &buf);
709         if (ret != COUCHSTORE_SUCCESS) {
710             add_error_message(red_ctx, 0);
711             goto out;
712         }
713 
714         red->reduce_values[i] = buf;
715     }
716 
717     ret = encode_view_btree_reduction(red, dst, size_r);
718 
719  out:
720     if (red != NULL) {
721         for (i = 0; i < red->num_values; ++i) {
722             free(red->reduce_values[i].buf);
723         }
724         free(red->reduce_values);
725         free(red);
726     }
727     free_json_key_list(key_list);
728     if (values != NULL) {
729         for (c = 0; c < count; ++c) {
730             free_view_btree_value(values[c]);
731         }
732         free(values);
733     }
734     if (value_list != NULL) {
735         free(value_list->values);
736         free(value_list);
737     }
738 
739     return ret;
740 }
741 
742 
view_btree_rereduce(char *dst, size_t *size_r, const nodelist *leaflist, int count, void *ctx)743 couchstore_error_t view_btree_rereduce(char *dst,
744                                        size_t *size_r,
745                                        const nodelist *leaflist,
746                                        int count,
747                                        void *ctx)
748 {
749     view_reducer_ctx_t *red_ctx = (view_reducer_ctx_t *) ctx;
750     reducer_private_t *priv = (reducer_private_t *) red_ctx->private;
751     unsigned i;
752     reducer_fn_t reducer;
753     view_btree_reduction_t *red = NULL;
754     const nodelist *n;
755     int c;
756     couchstore_error_t ret = COUCHSTORE_SUCCESS;
757     mapreduce_json_list_t *value_list = NULL;
758     view_btree_reduction_t **reductions = NULL;
759 
760     reductions = (view_btree_reduction_t **) calloc(count, sizeof(view_btree_reduction_t *));
761     red = (view_btree_reduction_t *) calloc(1, sizeof(*red));
762 
763     if (reductions == NULL || red == NULL) {
764         ret = COUCHSTORE_ERROR_ALLOC_FAIL;
765         goto out;
766     }
767 
768     red->num_values = priv->num_reducers;
769     red->reduce_values = (sized_buf *) calloc(red->num_values, sizeof(sized_buf));
770     value_list = (mapreduce_json_list_t *) calloc(1, sizeof(*value_list));
771 
772     if (red->reduce_values == NULL || value_list == NULL) {
773         ret = COUCHSTORE_ERROR_ALLOC_FAIL;
774         goto out;
775     }
776 
777     for (n = leaflist, c = 0; n != NULL && c < count; n = n->next, ++c) {
778         view_btree_reduction_t *r = NULL;
779 
780         ret = decode_view_btree_reduction(n->pointer->reduce_value.buf,
781                                           n->pointer->reduce_value.size, &r);
782         if (ret != COUCHSTORE_SUCCESS) {
783             goto out;
784         }
785 
786         union_bitmaps(&red->partitions_bitmap, &r->partitions_bitmap);
787         assert(r->num_values == priv->num_reducers);
788         red->kv_count += r->kv_count;
789         reductions[c] = r;
790     }
791 
792     if (priv->num_reducers > 0) {
793         value_list->values = (mapreduce_json_t *) calloc(count,
794                                                          sizeof(mapreduce_json_t));
795         if (value_list->values == NULL) {
796             ret = COUCHSTORE_ERROR_ALLOC_FAIL;
797             goto out;
798         }
799     }
800 
801     for (i = 0; i < priv->num_reducers; ++i) {
802         sized_buf buf;
803 
804         for (n = leaflist, c = 0; n != NULL && c < count; n = n->next, ++c) {
805             view_btree_reduction_t *r = reductions[c];
806 
807             value_list->values[value_list->length].json = r->reduce_values[i].buf;
808             value_list->values[value_list->length].length = r->reduce_values[i].size;
809             value_list->length++;
810         }
811 
812         reducer = priv->reducers[i];
813         ret = (*reducer)(NULL, value_list, &priv->reducer_contexts[i], &buf);
814         if (ret != COUCHSTORE_SUCCESS) {
815             add_error_message(red_ctx, 1);
816             goto out;
817         }
818 
819         value_list->length = 0;
820         red->reduce_values[i] = buf;
821     }
822 
823     ret = encode_view_btree_reduction(red, dst, size_r);
824 
825  out:
826     if (red != NULL) {
827         for (i = 0; i < red->num_values; ++i) {
828             free(red->reduce_values[i].buf);
829         }
830         free(red->reduce_values);
831         free(red);
832     }
833     if (reductions != NULL) {
834         for (c = 0; c < count; ++c) {
835             free_view_btree_reduction(reductions[c]);
836         }
837         free(reductions);
838     }
839     if (value_list != NULL) {
840         free(value_list->values);
841         free(value_list);
842     }
843 
844     return ret;
845 }
846