xref: /3.0.2-MP2/couchstore/src/views/view_group.c (revision ea765eb9)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3/**
4 * @copyright 2013 Couchbase, Inc.
5 *
6 * @author Filipe Manana  <filipe@couchbase.com>
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 * use this file except in compliance with the License. You may obtain a copy of
10 * the License at
11 *
12 *  http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 * License for the specific language governing permissions and limitations under
18 * the License.
19 **/
20
21#include "config.h"
22#include <assert.h>
23#include <stdlib.h>
24#include <string.h>
25#include "view_group.h"
26#include "reducers.h"
27#include "reductions.h"
28#include "values.h"
29#include "purgers.h"
30#include "util.h"
31#include "file_sorter.h"
32#include "../arena.h"
33#include "../couch_btree.h"
34#include "../internal.h"
35#include "../util.h"
36
37#define VIEW_KV_CHUNK_THRESHOLD (7 * 1024)
38#define VIEW_KP_CHUNK_THRESHOLD (6 * 1024)
39#define MAX_HEADER_SIZE         (64 * 1024)
40#define MAX_ACTIONS_SIZE        (2 * 1024 * 1024)
41
42static couchstore_error_t read_btree_info(view_group_info_t *info,
43                                          FILE *in_stream,
44                                          FILE *error_stream);
45
46static couchstore_error_t read_spatial_info(view_group_info_t *info,
47                                            FILE *in_stream,
48                                            FILE *error_stream);
49
50static couchstore_error_t open_view_group_file(const char *path,
51                                               couchstore_open_flags open_flags,
52                                               tree_file *file);
53
54static couchstore_error_t build_btree(const char *source_file,
55                                      tree_file *dest_file,
56                                      compare_info *cmp,
57                                      reduce_fn reduce_fun,
58                                      reduce_fn rereduce_fun,
59                                      const char *tmpdir,
60                                      sort_record_fn sort_fun,
61                                      void *reduce_ctx,
62                                      node_pointer **out_root);
63
64static couchstore_error_t build_id_btree(const char *source_file,
65                                         tree_file *dest_file,
66                                         const char *tmpdir,
67                                         node_pointer **out_root);
68
69static couchstore_error_t build_view_btree(const char *source_file,
70                                           const view_btree_info_t *info,
71                                           tree_file *dest_file,
72                                           const char *tmpdir,
73                                           node_pointer **out_root,
74                                           view_error_t *error_info);
75
76static void close_view_group_file(view_group_info_t *info);
77
78static int read_record(FILE *f, arena *a, sized_buf *k, sized_buf *v,
79                                                        uint8_t *op);
80
81static couchstore_error_t update_btree(const char *source_file,
82                                       tree_file *dest_file,
83                                       const node_pointer *root,
84                                       size_t batch_size,
85                                       compare_info *cmp,
86                                       reduce_fn reduce_fun,
87                                       reduce_fn rereduce_fun,
88                                       purge_kv_fn purge_kv,
89                                       purge_kp_fn purge_kp,
90                                       view_reducer_ctx_t *red_ctx,
91                                       view_purger_ctx_t *purge_ctx,
92                                       uint64_t *inserted,
93                                       uint64_t *removed,
94                                       node_pointer **out_root);
95
96static couchstore_error_t update_id_btree(const char *source_file,
97                                         tree_file *dest_file,
98                                         const node_pointer *root,
99                                         size_t batch_size,
100                                         view_purger_ctx_t *purge_ctx,
101                                         uint64_t *inserted,
102                                         uint64_t *removed,
103                                         node_pointer **out_root);
104
105static couchstore_error_t update_view_btree(const char *source_file,
106                                            const view_btree_info_t *info,
107                                            tree_file *dest_file,
108                                            const node_pointer *root,
109                                            size_t batch_size,
110                                            view_purger_ctx_t *purge_ctx,
111                                            uint64_t *inserted,
112                                            uint64_t *removed,
113                                            node_pointer **out_root,
114                                            view_error_t *error_info);
115
116static couchstore_error_t compact_view_fetchcb(couchfile_lookup_request *rq,
117                                        const sized_buf *k,
118                                        const sized_buf *v);
119
120static couchstore_error_t compact_btree(tree_file *source,
121                                 tree_file *target,
122                                 const node_pointer *root,
123                                 compare_info *cmp,
124                                 reduce_fn reduce_fun,
125                                 reduce_fn rereduce_fun,
126                                 compact_filter_fn filter_fun,
127                                 view_reducer_ctx_t *red_ctx,
128                                 const bitmap_t *filterbm,
129                                 compactor_stats_t *stats,
130                                 node_pointer **out_root);
131
132static couchstore_error_t compact_id_btree(tree_file *source,
133                                    tree_file *target,
134                                    const node_pointer *root,
135                                    const bitmap_t *filterbm,
136                                    compactor_stats_t *stats,
137                                    node_pointer **out_root);
138
139static couchstore_error_t compact_view_btree(tree_file *source,
140                                      tree_file *target,
141                                      const view_btree_info_t *info,
142                                      const node_pointer *root,
143                                      const bitmap_t *filterbm,
144                                      compactor_stats_t *stats,
145                                      node_pointer **out_root,
146                                      view_error_t *error_info);
147
148LIBCOUCHSTORE_API
149view_group_info_t *couchstore_read_view_group_info(FILE *in_stream,
150                                                   FILE *error_stream)
151{
152    view_group_info_t *info;
153    char buf[4096];
154    char *dup;
155    couchstore_error_t ret;
156    uint64_t type;
157
158    info = (view_group_info_t *) calloc(1, sizeof(*info));
159    if (info == NULL) {
160        fprintf(error_stream, "Memory allocation failure\n");
161        goto out_error;
162    }
163
164    type = couchstore_read_int(in_stream, buf, sizeof(buf), &ret);
165    if (ret != COUCHSTORE_SUCCESS) {
166        fprintf(stderr, "Error reading view file type\n");
167        goto out_error;
168    }
169    switch (type) {
170    case VIEW_INDEX_TYPE_MAPREDUCE:
171        info->type = VIEW_INDEX_TYPE_MAPREDUCE;
172        break;
173    case VIEW_INDEX_TYPE_SPATIAL:
174        info->type = VIEW_INDEX_TYPE_SPATIAL;
175        break;
176    default:
177        fprintf(stderr, "Invalid view file type: %"PRIu64"\n", type);
178        goto out_error;
179    }
180
181    if (couchstore_read_line(in_stream, buf, sizeof(buf)) != buf) {
182        fprintf(stderr, "Error reading source index file path\n");
183        goto out_error;
184    }
185    dup = strdup(buf);
186    if (dup == NULL) {
187        fprintf(error_stream, "Memory allocation failure\n");
188        goto out_error;
189    }
190    info->filepath = (const char *) dup;
191
192    info->header_pos = couchstore_read_int(in_stream, buf,
193                                                      sizeof(buf),
194                                                      &ret);
195    if (ret != COUCHSTORE_SUCCESS) {
196        fprintf(error_stream, "Error reading header position\n");
197        goto out_error;
198    }
199
200    info->num_btrees = couchstore_read_int(in_stream, buf,
201                                                      sizeof(buf),
202                                                      &ret);
203    if (ret != COUCHSTORE_SUCCESS) {
204        fprintf(error_stream, "Error reading number of btrees\n");
205        goto out_error;
206    }
207
208    switch (info->type) {
209    case VIEW_INDEX_TYPE_MAPREDUCE:
210        ret = read_btree_info(info, in_stream, error_stream);
211        break;
212    case VIEW_INDEX_TYPE_SPATIAL:
213        ret = read_spatial_info(info, in_stream, error_stream);
214        break;
215    }
216    if (ret != COUCHSTORE_SUCCESS) {
217        goto out_error;
218    }
219
220    return info;
221
222out_error:
223    couchstore_free_view_group_info(info);
224
225    return NULL;
226}
227
228
229/* Read in the information about the mapreduce view indexes */
230static couchstore_error_t read_btree_info(view_group_info_t *info,
231                                          FILE *in_stream,
232                                          FILE *error_stream)
233{
234    char buf[4096];
235    char *dup;
236    int i, j;
237    int reduce_len;
238    couchstore_error_t ret;
239
240    info->view_infos.btree = (view_btree_info_t *)
241        calloc(info->num_btrees, sizeof(view_btree_info_t));
242    if (info->view_infos.btree == NULL) {
243        fprintf(error_stream, "Memory allocation failure on btree infos\n");
244        info->num_btrees = 0;
245        return COUCHSTORE_ERROR_ALLOC_FAIL;
246    }
247
248    for (i = 0; i < info->num_btrees; ++i) {
249        view_btree_info_t *bti = &info->view_infos.btree[i];
250
251        bti->view_id = i;
252        bti->num_reducers = couchstore_read_int(in_stream,
253                                                buf,
254                                                sizeof(buf),
255                                                &ret);
256
257        if (ret != COUCHSTORE_SUCCESS) {
258            fprintf(error_stream,
259                    "Error reading number of reducers for btree %d\n", i);
260            return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
261        }
262
263        bti->names = (const char **) calloc(bti->num_reducers, sizeof(char *));
264        if (bti->names == NULL) {
265            fprintf(error_stream,
266                    "Memory allocation failure on btree %d reducer names\n",
267                    i);
268            bti->num_reducers = 0;
269            return COUCHSTORE_ERROR_ALLOC_FAIL;
270        }
271
272        bti->reducers = (const char **) calloc(bti->num_reducers, sizeof(char *));
273        if (bti->reducers == NULL) {
274            fprintf(error_stream,
275                    "Memory allocation failure on btree %d reducers\n", i);
276            bti->num_reducers = 0;
277            free(bti->names);
278            return COUCHSTORE_ERROR_ALLOC_FAIL;
279        }
280
281        for (j = 0; j < bti->num_reducers; ++j) {
282            if (couchstore_read_line(in_stream, buf, sizeof(buf)) != buf) {
283                fprintf(error_stream,
284                        "Error reading btree %d view %d name\n", i, j);
285                return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
286            }
287            dup = strdup(buf);
288            if (dup == NULL) {
289                fprintf(error_stream,
290                        "Memory allocation failure on btree %d "
291                        "view %d name\n", i, j);
292                return COUCHSTORE_ERROR_ALLOC_FAIL;
293            }
294            bti->names[j] = (const char *) dup;
295
296            reduce_len = couchstore_read_int(in_stream,
297                                             buf,
298                                             sizeof(buf),
299                                             &ret);
300            if (ret != COUCHSTORE_SUCCESS) {
301                fprintf(error_stream,
302                        "Error reading btree %d view %d "
303                        "reduce function size\n", i, j);
304                return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
305            }
306
307            dup = (char *) malloc(reduce_len + 1);
308            if (dup == NULL) {
309                fprintf(error_stream,
310                        "Memory allocation failure on btree %d "
311                        "view %d reducer\n", i, j);
312                return COUCHSTORE_ERROR_ALLOC_FAIL;
313            }
314
315            if (fread(dup, reduce_len, 1, in_stream) != 1) {
316                fprintf(error_stream,
317                        "Error reading btree %d view %d reducer\n", i, j);
318                free(dup);
319                return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
320            }
321            dup[reduce_len] = '\0';
322            bti->reducers[j] = (const char *) dup;
323        }
324    }
325    return COUCHSTORE_SUCCESS;
326}
327
328
329/* Read in the information about the spatial view indexes */
330static couchstore_error_t read_spatial_info(view_group_info_t *info,
331                                            FILE *in_stream,
332                                            FILE *error_stream)
333{
334    char buf[24];
335    int i;
336    couchstore_error_t ret;
337
338    info->view_infos.spatial = (view_spatial_info_t *)
339        calloc(info->num_btrees, sizeof(view_spatial_info_t));
340    if (info->view_infos.spatial == NULL) {
341        fprintf(error_stream, "Memory allocation failure on spatial infos\n");
342        info->num_btrees = 0;
343        return COUCHSTORE_ERROR_ALLOC_FAIL;
344    }
345
346    for (i = 0; i < info->num_btrees; ++i) {
347        view_spatial_info_t *si = &info->view_infos.spatial[i];
348
349        si->dimension = couchstore_read_int(in_stream, buf, sizeof(buf), &ret);
350
351        if (ret != COUCHSTORE_SUCCESS) {
352            fprintf(error_stream,
353                    "Error reading the dimension of spatial view %d\n", i);
354            return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
355        }
356
357        si->mbb = (double *) calloc(si->dimension * 2, sizeof(double));
358        if (si->mbb == NULL) {
359            fprintf(error_stream,
360                    "Memory allocation failure on spatial view %d\n", i);
361            si->dimension = 0;
362            return COUCHSTORE_ERROR_ALLOC_FAIL;
363        }
364
365        if (fread(si->mbb, sizeof(double), si->dimension * 2, in_stream) <
366                si->dimension * 2) {
367            fprintf(error_stream, "Error reading mbb of view %d\n", i);
368            return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
369        }
370    }
371    return COUCHSTORE_SUCCESS;
372}
373
374
375LIBCOUCHSTORE_API
376void couchstore_free_view_group_info(view_group_info_t *info)
377{
378    int i, j;
379
380    if (info == NULL)
381        return;
382
383    close_view_group_file(info);
384
385    switch (info->type) {
386    case VIEW_INDEX_TYPE_MAPREDUCE:
387        for (i = 0; i < info->num_btrees; ++i) {
388            view_btree_info_t vi = info->view_infos.btree[i];
389
390            for (j = 0; j < vi.num_reducers; ++j) {
391                free((void *) vi.names[j]);
392                free((void *) vi.reducers[j]);
393            }
394            free(vi.names);
395            free(vi.reducers);
396        }
397        free(info->view_infos.btree);
398        break;
399    case VIEW_INDEX_TYPE_SPATIAL:
400        for (i = 0; i < info->num_btrees; ++i) {
401            free((void *) info->view_infos.spatial[i].mbb);
402        }
403        free(info->view_infos.spatial);
404        break;
405    }
406    free((void *) info->filepath);
407    free(info);
408}
409
410
411static void close_view_group_file(view_group_info_t *info)
412{
413    if (info->file.ops != NULL) {
414        info->file.ops->close(NULL, info->file.handle);
415        info->file.ops->destructor(NULL, info->file.handle);
416        info->file.ops = NULL;
417        info->file.handle = NULL;
418    }
419    free((void *) info->file.path);
420    info->file.path = NULL;
421}
422
423
424static couchstore_error_t open_view_group_file(const char *path,
425                                               couchstore_open_flags open_flags,
426                                               tree_file *file)
427{
428    couchstore_error_t ret;
429    const couch_file_ops *file_ops = couchstore_get_default_file_ops();
430    int flags = 0;
431
432    if ((open_flags & COUCHSTORE_OPEN_FLAG_RDONLY) &&
433        (open_flags & COUCHSTORE_OPEN_FLAG_CREATE)) {
434        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
435    }
436
437    if (open_flags & COUCHSTORE_OPEN_FLAG_RDONLY) {
438        flags |= O_RDONLY;
439    } else {
440        flags |= O_RDWR;
441    }
442
443    if (open_flags & COUCHSTORE_OPEN_FLAG_CREATE) {
444        flags |= O_CREAT;
445    }
446
447    ret = tree_file_open(file, path, flags, file_ops);
448
449    return ret;
450}
451
452
453LIBCOUCHSTORE_API
454couchstore_error_t couchstore_build_view_group(view_group_info_t *info,
455                                               const char *id_records_file,
456                                               const char *kv_records_files[],
457                                               const char *dst_file,
458                                               const char *tmpdir,
459                                               uint64_t *header_pos,
460                                               view_error_t *error_info)
461{
462    couchstore_error_t ret;
463    tree_file index_file;
464    index_header_t *header = NULL;
465    node_pointer *id_root = NULL;
466    node_pointer **view_roots = NULL;
467    int i;
468
469    error_info->view_name = NULL;
470    error_info->error_msg = NULL;
471    index_file.handle = NULL;
472    index_file.ops = NULL;
473    index_file.path = NULL;
474
475    view_roots = (node_pointer **) calloc(
476        info->num_btrees, sizeof(node_pointer *));
477    if (view_roots == NULL) {
478        return COUCHSTORE_ERROR_ALLOC_FAIL;
479    }
480
481    ret = open_view_group_file(info->filepath,
482                               COUCHSTORE_OPEN_FLAG_RDONLY,
483                               &info->file);
484    if (ret != COUCHSTORE_SUCCESS) {
485        goto out;
486    }
487
488    ret = read_view_group_header(info, &header);
489    if (ret != COUCHSTORE_SUCCESS) {
490        goto out;
491    }
492    assert(info->num_btrees == header->num_views);
493
494    ret = open_view_group_file(dst_file,
495                               COUCHSTORE_OPEN_FLAG_CREATE,
496                               &index_file);
497    if (ret != COUCHSTORE_SUCCESS) {
498        goto out;
499    }
500
501    ret = build_id_btree(id_records_file, &index_file, tmpdir, &id_root);
502    if (ret != COUCHSTORE_SUCCESS) {
503        goto out;
504    }
505
506    free(header->id_btree_state);
507    header->id_btree_state = id_root;
508    id_root = NULL;
509
510    for (i = 0; i < info->num_btrees; ++i) {
511        ret = build_view_btree(kv_records_files[i],
512                               &info->view_infos.btree[i],
513                               &index_file,
514                               tmpdir,
515                               &view_roots[i],
516                               error_info);
517        if (ret != COUCHSTORE_SUCCESS) {
518            goto out;
519        }
520
521        free(header->view_states[i]);
522        header->view_states[i] = view_roots[i];
523        view_roots[i] = NULL;
524    }
525
526    ret = write_view_group_header(&index_file, header_pos, header);
527    if (ret != COUCHSTORE_SUCCESS) {
528        goto out;
529    }
530
531    ret = COUCHSTORE_SUCCESS;
532
533out:
534    free_index_header(header);
535    close_view_group_file(info);
536    tree_file_close(&index_file);
537    free(id_root);
538    if (view_roots != NULL) {
539        for (i = 0; i < info->num_btrees; ++i) {
540            free(view_roots[i]);
541        }
542        free(view_roots);
543    }
544
545    return ret;
546}
547
548
549/*
550 * Similar to util.c:read_view_record(), but it uses arena allocator, which is
551 * required for the existing semantics/api of btree bottom-up build in
552 * src/btree_modify.cc.
553 */
554static int read_record(FILE *f, arena *a, sized_buf *k, sized_buf *v,
555                                                        uint8_t *op)
556{
557    uint16_t klen;
558    uint8_t oplen = 0;
559    uint32_t vlen, len;
560
561    if (fread(&len, sizeof(len), 1, f) != 1) {
562        if (feof(f)) {
563            return 0;
564        } else {
565            return COUCHSTORE_ERROR_READ;
566        }
567    }
568
569    /* For incremental update, read optype */
570    if (op != NULL) {
571        if (fread(op, sizeof(uint8_t), 1, f) != 1) {
572            return COUCHSTORE_ERROR_READ;
573        }
574
575        oplen = 1;
576    }
577
578    if (fread(&klen, sizeof(klen), 1, f) != 1) {
579        return COUCHSTORE_ERROR_READ;
580    }
581
582    klen = ntohs(klen);
583    vlen = len - sizeof(klen) - klen - oplen;
584
585    k->size = klen;
586    k->buf = (char *) arena_alloc(a, k->size);
587    if (k->buf == NULL) {
588        return COUCHSTORE_ERROR_ALLOC_FAIL;
589    }
590
591    v->size = vlen;
592    /* Handle zero len vals */
593    if (v->size) {
594        v->buf = (char *) arena_alloc(a, v->size);
595        if (v->buf == NULL) {
596            return COUCHSTORE_ERROR_ALLOC_FAIL;
597        }
598    } else {
599        v->buf = NULL;
600    }
601
602    if (fread(k->buf, k->size, 1, f) != 1) {
603        return FILE_MERGER_ERROR_FILE_READ;
604    }
605
606    if (v->size && fread(v->buf, v->size, 1, f) != 1) {
607        return FILE_MERGER_ERROR_FILE_READ;
608    }
609
610    return len;
611}
612
613
614static int id_btree_cmp(const sized_buf *key1, const sized_buf *key2)
615{
616    return view_id_cmp(key1, key2, NULL);
617}
618
619
620static int view_btree_cmp(const sized_buf *key1, const sized_buf *key2)
621{
622    return view_key_cmp(key1, key2, NULL);
623}
624
625
626/*
627 * For initial btree build, feed the btree builder as soon as
628 * sorted records are available.
629 */
630static file_merger_error_t build_btree_record_callback(void *buf, void *ctx)
631{
632    int ret;
633    sized_buf *k, *v;
634    view_file_merge_record_t *rec = (view_file_merge_record_t *) buf;
635    view_file_merge_ctx_t *merge_ctx = (view_file_merge_ctx_t *) ctx;
636    view_btree_builder_ctx_t *build_ctx =
637                    (view_btree_builder_ctx_t *) merge_ctx->user_ctx;
638    sized_buf src_k, src_v;
639
640    src_k.size = rec->ksize;
641    src_k.buf = VIEW_RECORD_KEY(rec);
642    src_v.size = rec->vsize;
643    src_v.buf = VIEW_RECORD_VAL(rec);
644
645    k = arena_copy_buf(build_ctx->transient_arena, &src_k);
646    v = arena_copy_buf(build_ctx->transient_arena, &src_v);
647    ret = mr_push_item(k, v, build_ctx->modify_result);
648
649    if (ret != COUCHSTORE_SUCCESS) {
650        return ret;
651    }
652
653    if (build_ctx->modify_result->count == 0) {
654        arena_free_all(build_ctx->transient_arena);
655    }
656
657    return ret;
658}
659
660
661static couchstore_error_t build_btree(const char *source_file,
662                                      tree_file *dest_file,
663                                      compare_info *cmp,
664                                      reduce_fn reduce_fun,
665                                      reduce_fn rereduce_fun,
666                                      const char *tmpdir,
667                                      sort_record_fn sort_fun,
668                                      void *reduce_ctx,
669                                      node_pointer **out_root)
670{
671    couchstore_error_t ret = COUCHSTORE_SUCCESS;
672    arena *transient_arena = new_arena(0);
673    arena *persistent_arena = new_arena(0);
674    couchfile_modify_result *mr;
675    view_btree_builder_ctx_t build_ctx;
676
677    if (transient_arena == NULL || persistent_arena == NULL) {
678        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
679        goto out;
680    }
681
682    mr = new_btree_modres(persistent_arena,
683                          transient_arena,
684                          dest_file,
685                          cmp,
686                          reduce_fun,
687                          rereduce_fun,
688                          reduce_ctx,
689                          VIEW_KV_CHUNK_THRESHOLD + (VIEW_KV_CHUNK_THRESHOLD / 3),
690                          VIEW_KP_CHUNK_THRESHOLD + (VIEW_KP_CHUNK_THRESHOLD / 3));
691    if (mr == NULL) {
692        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
693        goto out;
694    }
695
696    build_ctx.transient_arena = transient_arena;
697    build_ctx.modify_result = mr;
698
699    ret = (couchstore_error_t) sort_fun(source_file,
700                                        tmpdir,
701                                        build_btree_record_callback, &build_ctx);
702    if (ret != COUCHSTORE_SUCCESS) {
703        goto out;
704    }
705
706    *out_root = complete_new_btree(mr, &ret);
707    if (ret != COUCHSTORE_SUCCESS) {
708        goto out;
709    }
710
711    /* Don't care about success/failure. Erlang side will eventually delete it. */
712    remove(source_file);
713
714out:
715    if (transient_arena != NULL) {
716        delete_arena(transient_arena);
717    }
718    if (persistent_arena != NULL) {
719        delete_arena(persistent_arena);
720    }
721
722    return ret;
723}
724
725
726static couchstore_error_t build_id_btree(const char *source_file,
727                                         tree_file *dest_file,
728                                         const char *tmpdir,
729                                         node_pointer **out_root)
730{
731    couchstore_error_t ret;
732    compare_info cmp;
733
734    cmp.compare = id_btree_cmp;
735
736    ret = build_btree(source_file,
737                      dest_file,
738                      &cmp,
739                      view_id_btree_reduce,
740                      view_id_btree_rereduce,
741                      tmpdir,
742                      sort_view_ids_file,
743                      NULL,
744                      out_root);
745
746    return ret;
747}
748
749
750static couchstore_error_t build_view_btree(const char *source_file,
751                                           const view_btree_info_t *info,
752                                           tree_file *dest_file,
753                                           const char *tmpdir,
754                                           node_pointer **out_root,
755                                           view_error_t *error_info)
756{
757    couchstore_error_t ret = COUCHSTORE_SUCCESS;
758    compare_info cmp;
759    view_reducer_ctx_t *red_ctx = NULL;
760    char *error_msg = NULL;
761
762    cmp.compare = view_btree_cmp;
763    red_ctx = make_view_reducer_ctx(info->reducers,
764                                    info->num_reducers,
765                                    &error_msg);
766    if (red_ctx == NULL) {
767        set_error_info(info, (const char *) error_msg, ret, error_info);
768        return COUCHSTORE_ERROR_REDUCER_FAILURE;
769    }
770
771    ret = build_btree(source_file,
772                      dest_file,
773                      &cmp,
774                      view_btree_reduce,
775                      view_btree_rereduce,
776                      tmpdir,
777                      sort_view_kvs_file,
778                      red_ctx,
779                      out_root);
780
781    if (ret != COUCHSTORE_SUCCESS) {
782        set_error_info(info, red_ctx->error, ret, error_info);
783    }
784
785    free_view_reducer_ctx(red_ctx);
786
787    return ret;
788}
789
790
791couchstore_error_t read_view_group_header(view_group_info_t *info,
792                                          index_header_t **header)
793{
794    couchstore_error_t ret;
795    cs_off_t pos = info->header_pos;
796    tree_file *file = &info->file;
797    char *header_buf = NULL;
798    int header_len;
799    char buf;
800
801    if (file->handle == NULL) {
802        return COUCHSTORE_ERROR_FILE_CLOSED;
803    }
804
805    if (info->file.ops->pread(NULL, file->handle, &buf, 1, pos) != 1) {
806        return COUCHSTORE_ERROR_READ;
807    }
808    if (buf == 0) {
809        return COUCHSTORE_ERROR_NO_HEADER;
810    } else if (buf != 1) {
811        return COUCHSTORE_ERROR_CORRUPT;
812    }
813
814    header_len = pread_header(file, pos, &header_buf, MAX_HEADER_SIZE);
815    if (header_len < 0) {
816        return (couchstore_error_t) header_len;
817    }
818
819    ret = decode_index_header(header_buf, (size_t) header_len, header);
820    free(header_buf);
821
822    return ret;
823}
824
825couchstore_error_t write_view_group_header(tree_file *file,
826                                           uint64_t *pos,
827                                           const index_header_t *header)
828{
829    couchstore_error_t ret;
830    sized_buf buf = { NULL, 0 };
831    cs_off_t p;
832
833    if (file->handle == NULL) {
834        return COUCHSTORE_ERROR_FILE_CLOSED;
835    }
836
837    ret = encode_index_header(header, &buf.buf, &buf.size);
838    if (ret != COUCHSTORE_SUCCESS) {
839        return ret;
840    }
841
842    ret = write_header(file, &buf, &p);
843    if (ret != COUCHSTORE_SUCCESS) {
844        goto out;
845    }
846
847    assert(p >= 0);
848    *pos = (uint64_t) p;
849
850out:
851    free(buf.buf);
852
853    return ret;
854}
855
856
857static couchstore_error_t view_id_bitmask(const node_pointer *root, bitmap_t *bm)
858{
859    view_id_btree_reduction_t *r = NULL;
860    couchstore_error_t errcode;
861    if (root == NULL) {
862        return COUCHSTORE_SUCCESS;
863    }
864
865    errcode = decode_view_id_btree_reduction(root->reduce_value.buf, &r);
866    if (errcode != COUCHSTORE_SUCCESS) {
867        goto cleanup;
868    }
869
870    union_bitmaps(bm, &r->partitions_bitmap);
871
872cleanup:
873    free_view_id_btree_reduction(r);
874    return errcode;
875}
876
877
878static couchstore_error_t view_bitmask(const node_pointer *root, bitmap_t *bm)
879{
880    view_btree_reduction_t *r = NULL;
881    couchstore_error_t errcode;
882    if (root == NULL) {
883        return COUCHSTORE_SUCCESS;
884    }
885
886    errcode = decode_view_btree_reduction(root->reduce_value.buf,
887                                          root->reduce_value.size, &r);
888    if (errcode != COUCHSTORE_SUCCESS) {
889        goto cleanup;
890    }
891
892    union_bitmaps(bm, &r->partitions_bitmap);
893
894cleanup:
895    free_view_btree_reduction(r);
896    return errcode;
897}
898
899
900static couchstore_error_t cleanup_btree(tree_file *file,
901                                        node_pointer *root,
902                                        compare_info *cmp,
903                                        reduce_fn reduce,
904                                        reduce_fn rereduce,
905                                        purge_kv_fn purge_kv,
906                                        purge_kp_fn purge_kp,
907                                        view_purger_ctx_t *purge_ctx,
908                                        view_reducer_ctx_t *red_ctx,
909                                        node_pointer **out_root)
910{
911    couchstore_error_t errcode;
912    couchfile_modify_request rq;
913
914    rq.cmp = *cmp;
915    rq.file = file;
916    rq.actions = NULL;
917    rq.num_actions = 0;
918    rq.reduce = reduce;
919    rq.rereduce = rereduce;
920    rq.compacting = 0;
921    rq.kv_chunk_threshold = VIEW_KV_CHUNK_THRESHOLD;
922    rq.kp_chunk_threshold = VIEW_KP_CHUNK_THRESHOLD;
923    rq.purge_kp = purge_kp;
924    rq.purge_kv = purge_kv;
925    rq.enable_purging = 1;
926    rq.guided_purge_ctx = purge_ctx;
927    rq.user_reduce_ctx = red_ctx;
928
929    *out_root = guided_purge_btree(&rq, root, &errcode);
930
931    return errcode;
932}
933
934static couchstore_error_t cleanup_id_btree(tree_file *file,
935                                           node_pointer *root,
936                                           node_pointer **out_root,
937                                           view_purger_ctx_t *purge_ctx,
938                                           view_error_t *error_info)
939{
940    couchstore_error_t ret;
941    compare_info cmp;
942
943    cmp.compare = id_btree_cmp;
944
945    ret = cleanup_btree(file,
946                        root,
947                        &cmp,
948                        view_id_btree_reduce,
949                        view_id_btree_rereduce,
950                        view_id_btree_purge_kv,
951                        view_id_btree_purge_kp,
952                        purge_ctx,
953                        NULL,
954                        out_root);
955
956    return ret;
957}
958
959
960static couchstore_error_t cleanup_view_btree(tree_file *file,
961                                             node_pointer *root,
962                                             const view_btree_info_t *info,
963                                             node_pointer **out_root,
964                                             view_purger_ctx_t *purge_ctx,
965                                             view_error_t *error_info)
966{
967    couchstore_error_t ret = COUCHSTORE_SUCCESS;
968    compare_info cmp;
969    view_reducer_ctx_t *red_ctx = NULL;
970    char *error_msg = NULL;
971
972    cmp.compare = view_btree_cmp;
973    red_ctx = make_view_reducer_ctx(info->reducers,
974                                    info->num_reducers,
975                                    &error_msg);
976    if (red_ctx == NULL) {
977        set_error_info(info, (const char *) error_msg, ret, error_info);
978        return COUCHSTORE_ERROR_REDUCER_FAILURE;
979    }
980
981    ret = cleanup_btree(file,
982                        root,
983                        &cmp,
984                        view_btree_reduce,
985                        view_btree_rereduce,
986                        view_btree_purge_kv,
987                        view_btree_purge_kp,
988                        purge_ctx,
989                        red_ctx,
990                        out_root);
991
992    if (ret != COUCHSTORE_SUCCESS) {
993        char *error_msg = NULL;
994
995        if (red_ctx->error != NULL) {
996            error_msg = strdup(red_ctx->error);
997        } else {
998            error_msg = (char *) malloc(64);
999            if (error_msg != NULL) {
1000                sprintf(error_msg, "%d", ret);
1001            }
1002        }
1003        set_error_info(info, (const char *) error_msg, ret, error_info);
1004    }
1005
1006    free_view_reducer_ctx(red_ctx);
1007
1008    return ret;
1009}
1010
1011
1012LIBCOUCHSTORE_API
1013couchstore_error_t couchstore_cleanup_view_group(view_group_info_t *info,
1014                                                 uint64_t *header_pos,
1015                                                 uint64_t *purge_count,
1016                                                 view_error_t *error_info)
1017{
1018    couchstore_error_t ret;
1019    tree_file index_file;
1020    index_header_t *header = NULL;
1021    node_pointer *id_root = NULL;
1022    node_pointer **view_roots = NULL;
1023    view_purger_ctx_t purge_ctx;
1024    bitmap_t bm_cleanup;
1025    int i;
1026
1027    memset(&bm_cleanup, 0, sizeof(bm_cleanup));
1028    error_info->view_name = NULL;
1029    error_info->error_msg = NULL;
1030    index_file.handle = NULL;
1031    index_file.ops = NULL;
1032    index_file.path = NULL;
1033
1034    view_roots = (node_pointer **) calloc(
1035        info->num_btrees, sizeof(node_pointer *));
1036    if (view_roots == NULL) {
1037        return COUCHSTORE_ERROR_ALLOC_FAIL;
1038    }
1039
1040    /* Read info from current index viewgroup file */
1041    ret = open_view_group_file(info->filepath,
1042                               COUCHSTORE_OPEN_FLAG_RDONLY,
1043                               &info->file);
1044    if (ret != COUCHSTORE_SUCCESS) {
1045        goto cleanup;
1046    }
1047
1048    ret = read_view_group_header(info, &header);
1049    if (ret != COUCHSTORE_SUCCESS) {
1050        goto cleanup;
1051    }
1052    assert(info->num_btrees == header->num_views);
1053
1054    /* Setup purger context */
1055    purge_ctx.count = 0;
1056    purge_ctx.cbitmask = header->cleanup_bitmask;
1057
1058    ret = open_view_group_file(info->filepath,
1059                               0,
1060                               &index_file);
1061    if (ret != COUCHSTORE_SUCCESS) {
1062        goto cleanup;
1063    }
1064
1065    index_file.pos = index_file.ops->goto_eof(&index_file.lastError,
1066                                                         index_file.handle);
1067
1068    /* Cleanup id_bree */
1069    ret = cleanup_id_btree(&index_file, header->id_btree_state, &id_root,
1070                                                                &purge_ctx,
1071                                                                error_info);
1072    if (ret != COUCHSTORE_SUCCESS) {
1073        goto cleanup;
1074    }
1075
1076    if (header->id_btree_state != id_root) {
1077        free(header->id_btree_state);
1078    }
1079
1080    header->id_btree_state = id_root;
1081    view_id_bitmask(id_root, &bm_cleanup);
1082    id_root = NULL;
1083
1084    /* Cleanup all view btrees */
1085    for (i = 0; i < info->num_btrees; ++i) {
1086        ret = cleanup_view_btree(&index_file,
1087                                 (node_pointer *) header->view_states[i],
1088                                 &info->view_infos.btree[i],
1089                                 &view_roots[i],
1090                                 &purge_ctx,
1091                                 error_info);
1092
1093        if (ret != COUCHSTORE_SUCCESS) {
1094            goto cleanup;
1095        }
1096
1097        if (header->view_states[i] != view_roots[i]) {
1098            free(header->view_states[i]);
1099        }
1100
1101        header->view_states[i] = view_roots[i];
1102        view_bitmask(view_roots[i], &bm_cleanup);
1103        view_roots[i] = NULL;
1104    }
1105
1106    /* Set resulting cleanup bitmask */
1107    /* TODO: This code can be removed, if we do not plan for cleanup STOP command */
1108    intersect_bitmaps(&bm_cleanup, &purge_ctx.cbitmask);
1109    header->cleanup_bitmask = bm_cleanup;
1110
1111    /* Update header with new btree infos */
1112    ret = write_view_group_header(&index_file, header_pos, header);
1113    if (ret != COUCHSTORE_SUCCESS) {
1114        goto cleanup;
1115    }
1116
1117    *purge_count = purge_ctx.count;
1118    ret = COUCHSTORE_SUCCESS;
1119
1120cleanup:
1121    free_index_header(header);
1122    close_view_group_file(info);
1123    tree_file_close(&index_file);
1124    free(id_root);
1125    if (view_roots != NULL) {
1126        for (i = 0; i < info->num_btrees; ++i) {
1127            free(view_roots[i]);
1128        }
1129        free(view_roots);
1130    }
1131
1132    return ret;
1133}
1134
1135static couchstore_error_t update_btree(const char *source_file,
1136                                       tree_file *dest_file,
1137                                       const node_pointer *root,
1138                                       size_t batch_size,
1139                                       compare_info *cmp,
1140                                       reduce_fn reduce_fun,
1141                                       reduce_fn rereduce_fun,
1142                                       purge_kv_fn purge_kv,
1143                                       purge_kp_fn purge_kp,
1144                                       view_reducer_ctx_t *red_ctx,
1145                                       view_purger_ctx_t *purge_ctx,
1146                                       uint64_t *inserted,
1147                                       uint64_t *removed,
1148                                       node_pointer **out_root)
1149{
1150    couchstore_error_t ret = COUCHSTORE_SUCCESS;
1151    couchfile_modify_request rq;
1152    node_pointer *newroot = (node_pointer *) root;
1153    arena *transient_arena = new_arena(0);
1154    FILE *f = NULL;
1155    couchfile_modify_action *actions = NULL;
1156    sized_buf *keybufs = NULL, *valbufs = NULL;
1157    size_t bufsize = 0;
1158    int last_record = 0;
1159    bitmap_t empty_bm;
1160    int max_actions = MAX_ACTIONS_SIZE /
1161                (sizeof(couchfile_modify_action) + 2 * sizeof(sized_buf));
1162
1163    memset(&empty_bm, 0, sizeof(empty_bm));
1164
1165    if (transient_arena == NULL) {
1166        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
1167        goto cleanup;
1168    }
1169
1170
1171    actions = (couchfile_modify_action *) calloc(
1172                                            max_actions,
1173                                            sizeof(couchfile_modify_action));
1174    if (actions == NULL) {
1175        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
1176        goto cleanup;
1177    }
1178
1179    keybufs = (sized_buf *) calloc(max_actions, sizeof(sized_buf));
1180    if (keybufs == NULL) {
1181        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
1182        goto cleanup;
1183    }
1184
1185    valbufs = (sized_buf *) calloc(max_actions, sizeof(sized_buf));
1186    if (valbufs == NULL) {
1187        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
1188        goto cleanup;
1189    }
1190
1191    rq.cmp = *cmp;
1192    rq.file = dest_file;
1193    rq.actions = actions;
1194    rq.num_actions = 0;
1195    rq.reduce = reduce_fun;
1196    rq.rereduce = rereduce_fun;
1197    rq.compacting = 0;
1198    rq.kv_chunk_threshold = VIEW_KV_CHUNK_THRESHOLD;
1199    rq.kp_chunk_threshold = VIEW_KP_CHUNK_THRESHOLD;
1200    rq.purge_kp = purge_kp;
1201    rq.purge_kv = purge_kv;
1202    rq.guided_purge_ctx = purge_ctx;
1203    rq.enable_purging = 1;
1204    rq.user_reduce_ctx = red_ctx;
1205
1206    /* If cleanup bitmask is empty, no need to try purging */
1207    if (is_equal_bitmap(&empty_bm, &purge_ctx->cbitmask)) {
1208        rq.enable_purging = 0;
1209    }
1210
1211    f = fopen(source_file, "rb");
1212    if (f == NULL) {
1213        ret = COUCHSTORE_ERROR_OPEN_FILE;
1214        goto cleanup;
1215    }
1216
1217    while (!last_record) {
1218        int read_ret;
1219        uint8_t op;
1220
1221        read_ret = read_record(f, transient_arena, &keybufs[rq.num_actions],
1222                                                   &valbufs[rq.num_actions],
1223                                                   &op);
1224        if (read_ret == 0) {
1225            last_record = 1;
1226            goto flush;
1227        } else if (read_ret < 0) {
1228            ret = (couchstore_error_t) read_ret;
1229            goto cleanup;
1230        }
1231
1232        /* Add action */
1233        actions[rq.num_actions].type = op;
1234        actions[rq.num_actions].key = &keybufs[rq.num_actions];
1235        actions[rq.num_actions].value.data = &valbufs[rq.num_actions];
1236
1237        if (inserted && op == ACTION_INSERT) {
1238            (*inserted)++;
1239        } else if (removed && op == ACTION_REMOVE) {
1240            (*removed)++;
1241        }
1242
1243        bufsize += keybufs[rq.num_actions].size +
1244                   valbufs[rq.num_actions].size +
1245                   sizeof(uint8_t);
1246        rq.num_actions++;
1247
1248flush:
1249        if (rq.num_actions && (last_record || bufsize > batch_size ||
1250                               rq.num_actions == max_actions)) {
1251            newroot = modify_btree(&rq, newroot, &ret);
1252            if (ret != COUCHSTORE_SUCCESS) {
1253                goto cleanup;
1254            }
1255
1256            rq.num_actions = 0;
1257            bufsize = 0;
1258            arena_free_all(transient_arena);
1259        }
1260    }
1261
1262    *out_root = newroot;
1263
1264cleanup:
1265    free(actions);
1266    free(keybufs);
1267    free(valbufs);
1268
1269    if (f != NULL) {
1270        fclose(f);
1271    }
1272
1273    if (transient_arena != NULL) {
1274        delete_arena(transient_arena);
1275    }
1276
1277    return ret;
1278}
1279
1280static couchstore_error_t update_id_btree(const char *source_file,
1281                                         tree_file *dest_file,
1282                                         const node_pointer *root,
1283                                         size_t batch_size,
1284                                         view_purger_ctx_t *purge_ctx,
1285                                         uint64_t *inserted,
1286                                         uint64_t *removed,
1287                                         node_pointer **out_root)
1288{
1289    couchstore_error_t ret;
1290    compare_info cmp;
1291
1292    cmp.compare = id_btree_cmp;
1293
1294    ret = update_btree(source_file,
1295                      dest_file,
1296                      root,
1297                      batch_size,
1298                      &cmp,
1299                      view_id_btree_reduce,
1300                      view_id_btree_rereduce,
1301                      view_id_btree_purge_kv,
1302                      view_id_btree_purge_kp,
1303                      NULL,
1304                      purge_ctx,
1305                      inserted,
1306                      removed,
1307                      out_root);
1308
1309    return ret;
1310}
1311
1312
1313static couchstore_error_t update_view_btree(const char *source_file,
1314                                            const view_btree_info_t *info,
1315                                            tree_file *dest_file,
1316                                            const node_pointer *root,
1317                                            size_t batch_size,
1318                                            view_purger_ctx_t *purge_ctx,
1319                                            uint64_t *inserted,
1320                                            uint64_t *removed,
1321                                            node_pointer **out_root,
1322                                            view_error_t *error_info)
1323{
1324    couchstore_error_t ret = COUCHSTORE_SUCCESS;
1325    compare_info cmp;
1326    view_reducer_ctx_t *red_ctx = NULL;
1327    char *error_msg = NULL;
1328
1329    cmp.compare = view_btree_cmp;
1330    red_ctx = make_view_reducer_ctx(info->reducers,
1331                                    info->num_reducers,
1332                                    &error_msg);
1333    if (red_ctx == NULL) {
1334        set_error_info(info, (const char *) error_msg, ret, error_info);
1335        return COUCHSTORE_ERROR_REDUCER_FAILURE;
1336    }
1337
1338    ret = update_btree(source_file,
1339                      dest_file,
1340                      root,
1341                      batch_size,
1342                      &cmp,
1343                      view_btree_reduce,
1344                      view_btree_rereduce,
1345                      view_btree_purge_kv,
1346                      view_btree_purge_kp,
1347                      red_ctx,
1348                      purge_ctx,
1349                      inserted,
1350                      removed,
1351                      out_root);
1352
1353    if (ret != COUCHSTORE_SUCCESS) {
1354        set_error_info(info, red_ctx->error, ret, error_info);
1355    }
1356
1357    free_view_reducer_ctx(red_ctx);
1358
1359    return ret;
1360}
1361
1362LIBCOUCHSTORE_API
1363couchstore_error_t couchstore_update_view_group(view_group_info_t *info,
1364                                               const char *id_records_file,
1365                                               const char *kv_records_files[],
1366                                               size_t batch_size,
1367                                               const sized_buf *header_buf,
1368                                               int is_sorted,
1369                                               const char *tmp_dir,
1370                                               view_group_update_stats_t *stats,
1371                                               sized_buf *header_outbuf,
1372                                               view_error_t *error_info)
1373{
1374    couchstore_error_t ret;
1375    tree_file index_file;
1376    index_header_t *header = NULL;
1377    node_pointer *id_root = NULL;
1378    node_pointer **view_roots = NULL;
1379    view_purger_ctx_t purge_ctx;
1380    bitmap_t bm_cleanup;
1381    int i;
1382
1383    ret = decode_index_header(header_buf->buf, header_buf->size, &header);
1384    if (ret < 0) {
1385        goto cleanup;
1386    }
1387
1388    memset(&bm_cleanup, 0, sizeof(bm_cleanup));
1389
1390    error_info->view_name = NULL;
1391    error_info->error_msg = NULL;
1392    index_file.handle = NULL;
1393    index_file.ops = NULL;
1394    index_file.path = NULL;
1395
1396    view_roots = (node_pointer **) calloc(info->num_btrees,
1397                                          sizeof(node_pointer *));
1398    if (view_roots == NULL) {
1399        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
1400        goto cleanup;
1401    }
1402
1403    assert(info->num_btrees == header->num_views);
1404
1405    /* Setup purger context */
1406    purge_ctx.count = 0;
1407    purge_ctx.cbitmask = header->cleanup_bitmask;
1408
1409    ret = open_view_group_file(info->filepath,
1410                               0,
1411                               &index_file);
1412    if (ret != COUCHSTORE_SUCCESS) {
1413        goto cleanup;
1414    }
1415
1416    /* Move pos to end of file */
1417    index_file.pos = index_file.ops->goto_eof(&index_file.lastError,
1418                                              index_file.handle);
1419
1420    if (!is_sorted) {
1421        ret = (couchstore_error_t) sort_view_ids_ops_file(id_records_file, tmp_dir);
1422        if (ret != COUCHSTORE_SUCCESS) {
1423            char buf[1024];
1424            snprintf(buf, sizeof(buf),
1425                    "Error sorting records file: %s", id_records_file);
1426            error_info->error_msg = strdup(buf);
1427            error_info->view_name = (const char *) strdup("id_btree");
1428            goto cleanup;
1429        }
1430    }
1431
1432    ret = update_id_btree(id_records_file, &index_file,
1433                                           header->id_btree_state,
1434                                           batch_size,
1435                                           &purge_ctx,
1436                                           &stats->ids_inserted,
1437                                           &stats->ids_removed,
1438                                           &id_root);
1439    if (ret != COUCHSTORE_SUCCESS) {
1440        goto cleanup;
1441    }
1442
1443
1444    if (header->id_btree_state != id_root) {
1445        free(header->id_btree_state);
1446    }
1447
1448    header->id_btree_state = id_root;
1449    view_id_bitmask(id_root, &bm_cleanup);
1450    id_root = NULL;
1451
1452    for (i = 0; i < info->num_btrees; ++i) {
1453        if (!is_sorted) {
1454            ret = (couchstore_error_t) sort_view_kvs_ops_file(kv_records_files[i], tmp_dir);
1455            if (ret != COUCHSTORE_SUCCESS) {
1456                char buf[1024];
1457                snprintf(buf, sizeof(buf),
1458                        "Error sorting records file: %s", kv_records_files[i]);
1459                set_error_info(&info->view_infos.btree[i], buf, ret, error_info);
1460                goto cleanup;
1461            }
1462        }
1463
1464        ret = update_view_btree(kv_records_files[i],
1465                                &info->view_infos.btree[i],
1466                                &index_file,
1467                                header->view_states[i],
1468                                batch_size,
1469                                &purge_ctx,
1470                                &stats->kvs_inserted,
1471                                &stats->kvs_removed,
1472                                &view_roots[i],
1473                                error_info);
1474
1475        if (ret != COUCHSTORE_SUCCESS) {
1476            goto cleanup;
1477        }
1478
1479        if (header->view_states[i] != view_roots[i]) {
1480            free(header->view_states[i]);
1481        }
1482
1483        header->view_states[i] = view_roots[i];
1484        view_bitmask(view_roots[i], &bm_cleanup);
1485        view_roots[i] = NULL;
1486    }
1487
1488    /* Set resulting cleanup bitmask */
1489    intersect_bitmaps(&bm_cleanup, &purge_ctx.cbitmask);
1490    header->cleanup_bitmask = bm_cleanup;
1491    stats->purged = purge_ctx.count;
1492
1493    ret = encode_index_header(header, &header_outbuf->buf, &header_outbuf->size);
1494    if (ret != COUCHSTORE_SUCCESS) {
1495        goto cleanup;
1496    }
1497
1498    ret = COUCHSTORE_SUCCESS;
1499
1500cleanup:
1501    free_index_header(header);
1502    close_view_group_file(info);
1503    tree_file_close(&index_file);
1504    free(id_root);
1505    if (view_roots != NULL) {
1506        for (i = 0; i < info->num_btrees; ++i) {
1507            free(view_roots[i]);
1508        }
1509        free(view_roots);
1510    }
1511
1512    return ret;
1513}
1514
1515/* Add the kv pair to modify result */
1516static couchstore_error_t compact_view_fetchcb(couchfile_lookup_request *rq,
1517                                        const sized_buf *k,
1518                                        const sized_buf *v)
1519{
1520    int ret;
1521    sized_buf *k_c, *v_c;
1522    view_compact_ctx_t *ctx = (view_compact_ctx_t *) rq->callback_ctx;
1523    compactor_stats_t *stats = ctx->stats;
1524
1525    if (k == NULL || v == NULL) {
1526        return COUCHSTORE_ERROR_READ;
1527    }
1528
1529    if (ctx->filter_fun) {
1530        ret = ctx->filter_fun(k, v, ctx->filterbm);
1531        if (ret < 0) {
1532            return (couchstore_error_t) ret;
1533        } else if (ret) {
1534            return COUCHSTORE_SUCCESS;
1535        }
1536    }
1537
1538    k_c = arena_copy_buf(ctx->transient_arena, k);
1539    v_c = arena_copy_buf(ctx->transient_arena, v);
1540    ret = mr_push_item(k_c, v_c, ctx->mr);
1541    if (ret != COUCHSTORE_SUCCESS) {
1542        return ret;
1543    }
1544
1545    if (stats) {
1546        stats->inserted++;
1547        if (stats->update_fun) {
1548            stats->update_fun(stats->freq, stats->inserted);
1549        }
1550    }
1551
1552    if (ctx->mr->count == 0) {
1553        arena_free_all(ctx->transient_arena);
1554    }
1555
1556    return (couchstore_error_t) ret;
1557}
1558
1559static couchstore_error_t compact_btree(tree_file *source,
1560                                 tree_file *target,
1561                                 const node_pointer *root,
1562                                 compare_info *cmp,
1563                                 reduce_fn reduce_fun,
1564                                 reduce_fn rereduce_fun,
1565                                 compact_filter_fn filter_fun,
1566                                 view_reducer_ctx_t *red_ctx,
1567                                 const bitmap_t *filterbm,
1568                                 compactor_stats_t *stats,
1569                                 node_pointer **out_root)
1570{
1571    couchstore_error_t ret = COUCHSTORE_SUCCESS;
1572    arena *transient_arena;
1573    arena *persistent_arena;
1574    couchfile_modify_result *modify_result;
1575    couchfile_lookup_request lookup_rq;
1576    view_compact_ctx_t compact_ctx;
1577    sized_buf nullkey = {NULL, 0};
1578    sized_buf *lowkeys = &nullkey;
1579
1580    if (!root) {
1581        return COUCHSTORE_SUCCESS;
1582    }
1583
1584    transient_arena = new_arena(0);
1585    persistent_arena = new_arena(0);
1586
1587    if (transient_arena == NULL || persistent_arena == NULL) {
1588        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
1589        goto cleanup;
1590    }
1591
1592    /* Create new btree on new file */
1593    modify_result = new_btree_modres(persistent_arena,
1594                          transient_arena,
1595                          target,
1596                          cmp,
1597                          reduce_fun,
1598                          rereduce_fun,
1599                          red_ctx,
1600                          VIEW_KV_CHUNK_THRESHOLD + (VIEW_KV_CHUNK_THRESHOLD / 3),
1601                          VIEW_KP_CHUNK_THRESHOLD + (VIEW_KP_CHUNK_THRESHOLD / 3));
1602    if (modify_result == NULL) {
1603        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
1604        goto cleanup;
1605    }
1606
1607    compact_ctx.filter_fun = NULL;
1608    compact_ctx.mr = modify_result;
1609    compact_ctx.transient_arena = transient_arena;
1610    compact_ctx.stats = stats;
1611
1612    if (filterbm) {
1613        compact_ctx.filterbm = filterbm;
1614        compact_ctx.filter_fun = filter_fun;
1615    }
1616
1617    lookup_rq.cmp.compare = cmp->compare;
1618    lookup_rq.file = source;
1619    lookup_rq.num_keys = 1;
1620    lookup_rq.keys = &lowkeys;
1621    lookup_rq.callback_ctx = &compact_ctx;
1622    lookup_rq.fetch_callback = compact_view_fetchcb;
1623    lookup_rq.node_callback = NULL;
1624    lookup_rq.fold = 1;
1625
1626    ret = btree_lookup(&lookup_rq, root->pointer);
1627    if (ret != COUCHSTORE_SUCCESS) {
1628        goto cleanup;
1629    }
1630
1631    *out_root = complete_new_btree(modify_result, &ret);
1632
1633cleanup:
1634    if (transient_arena != NULL) {
1635        delete_arena(transient_arena);
1636    }
1637
1638    if (persistent_arena != NULL) {
1639        delete_arena(persistent_arena);
1640    }
1641
1642    return ret;
1643}
1644
1645static couchstore_error_t compact_id_btree(tree_file *source,
1646                                    tree_file *target,
1647                                    const node_pointer *root,
1648                                    const bitmap_t *filterbm,
1649                                    compactor_stats_t *stats,
1650                                    node_pointer **out_root)
1651{
1652    couchstore_error_t ret;
1653    compare_info cmp;
1654
1655    cmp.compare = id_btree_cmp;
1656
1657    ret = compact_btree(source,
1658                        target,
1659                        root,
1660                        &cmp,
1661                        view_id_btree_reduce,
1662                        view_id_btree_rereduce,
1663                        view_id_btree_filter,
1664                        NULL,
1665                        filterbm,
1666                        stats,
1667                        out_root);
1668
1669    return ret;
1670}
1671
1672static couchstore_error_t compact_view_btree(tree_file *source,
1673                                      tree_file *target,
1674                                      const view_btree_info_t *info,
1675                                      const node_pointer *root,
1676                                      const bitmap_t *filterbm,
1677                                      compactor_stats_t *stats,
1678                                      node_pointer **out_root,
1679                                      view_error_t *error_info)
1680{
1681    couchstore_error_t ret = COUCHSTORE_SUCCESS;
1682    compare_info cmp;
1683    view_reducer_ctx_t *red_ctx = NULL;
1684    char *error_msg = NULL;
1685
1686    cmp.compare = view_btree_cmp;
1687    red_ctx = make_view_reducer_ctx(info->reducers,
1688                                    info->num_reducers,
1689                                    &error_msg);
1690    if (red_ctx == NULL) {
1691        set_error_info(info, (const char *) error_msg, ret, error_info);
1692        return COUCHSTORE_ERROR_REDUCER_FAILURE;
1693    }
1694
1695    ret = compact_btree(source,
1696                        target,
1697                        root,
1698                        &cmp,
1699                        view_btree_reduce,
1700                        view_btree_rereduce,
1701                        view_btree_filter,
1702                        red_ctx,
1703                        filterbm,
1704                        stats,
1705                        out_root);
1706
1707    if (ret != COUCHSTORE_SUCCESS) {
1708        set_error_info(info, red_ctx->error, ret, error_info);
1709    }
1710
1711    free_view_reducer_ctx(red_ctx);
1712
1713    return ret;
1714}
1715
1716LIBCOUCHSTORE_API
1717couchstore_error_t couchstore_compact_view_group(view_group_info_t *info,
1718                                                 const char *target_file,
1719                                                 const sized_buf *header_buf,
1720                                                 compactor_stats_t *stats,
1721                                                 sized_buf *header_outbuf,
1722                                                 view_error_t *error_info)
1723{
1724    couchstore_error_t ret;
1725    tree_file index_file;
1726    tree_file compact_file;
1727    index_header_t *header = NULL;
1728    node_pointer *id_root = NULL;
1729    node_pointer **view_roots = NULL;
1730    bitmap_t *filterbm = NULL;
1731    bitmap_t emptybm;
1732    int i;
1733
1734    memset(&emptybm, 0, sizeof(bitmap_t));
1735    error_info->view_name = NULL;
1736    error_info->error_msg = NULL;
1737    index_file.handle = NULL;
1738    index_file.ops = NULL;
1739    index_file.path = NULL;
1740    compact_file.handle = NULL;
1741    compact_file.ops = NULL;
1742    compact_file.path = NULL;
1743
1744    ret = decode_index_header(header_buf->buf, header_buf->size, &header);
1745    if (ret < 0) {
1746        goto cleanup;
1747    }
1748
1749    /* Set filter bitmask if required */
1750    if (!is_equal_bitmap(&emptybm, &header->cleanup_bitmask)) {
1751        filterbm = &header->cleanup_bitmask;
1752    }
1753
1754    view_roots = (node_pointer **) calloc(info->num_btrees,
1755                                          sizeof(node_pointer *));
1756    if (view_roots == NULL) {
1757        ret = COUCHSTORE_ERROR_ALLOC_FAIL;
1758        goto cleanup;
1759    }
1760
1761    assert(info->num_btrees == header->num_views);
1762
1763    ret = open_view_group_file(info->filepath,
1764                               COUCHSTORE_OPEN_FLAG_RDONLY,
1765                               &index_file);
1766    if (ret != COUCHSTORE_SUCCESS) {
1767        goto cleanup;
1768    }
1769
1770    /*
1771     * Open target file for compaction
1772     * Expects that caller created the target file
1773     */
1774    ret = open_view_group_file(target_file, 0, &compact_file);
1775    if (ret != COUCHSTORE_SUCCESS) {
1776        goto cleanup;
1777    }
1778
1779    compact_file.pos = compact_file.ops->goto_eof(&compact_file.lastError,
1780                                                  compact_file.handle);
1781    ret = compact_id_btree(&index_file, &compact_file,
1782                                        header->id_btree_state,
1783                                        filterbm,
1784                                        stats,
1785                                        &id_root);
1786    if (ret != COUCHSTORE_SUCCESS) {
1787        goto cleanup;
1788    }
1789
1790    free(header->id_btree_state);
1791    header->id_btree_state = id_root;
1792    id_root = NULL;
1793
1794    for (i = 0; i < info->num_btrees; ++i) {
1795        ret = compact_view_btree(&index_file,
1796                                 &compact_file,
1797                                 &info->view_infos.btree[i],
1798                                 header->view_states[i],
1799                                 filterbm,
1800                                 stats,
1801                                 &view_roots[i],
1802                                 error_info);
1803
1804        if (ret != COUCHSTORE_SUCCESS) {
1805            goto cleanup;
1806        }
1807
1808        free(header->view_states[i]);
1809        header->view_states[i] = view_roots[i];
1810        view_roots[i] = NULL;
1811    }
1812
1813    header->cleanup_bitmask = emptybm;
1814    ret = encode_index_header(header, &header_outbuf->buf, &header_outbuf->size);
1815    if (ret != COUCHSTORE_SUCCESS) {
1816        goto cleanup;
1817    }
1818
1819    ret = COUCHSTORE_SUCCESS;
1820
1821cleanup:
1822    free_index_header(header);
1823    close_view_group_file(info);
1824    tree_file_close(&index_file);
1825    tree_file_close(&compact_file);
1826    free(id_root);
1827    if (view_roots != NULL) {
1828        for (i = 0; i < info->num_btrees; ++i) {
1829            free(view_roots[i]);
1830        }
1831        free(view_roots);
1832    }
1833
1834    return ret;
1835}
1836