1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3/**
4 * @copyright 2013 Couchbase, Inc.
5 *
6 * @author Filipe Manana  <filipe@couchbase.com>
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 * use this file except in compliance with the License. You may obtain a copy of
10 * the License at
11 *
12 *  http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 * License for the specific language governing permissions and limitations under
18 * the License.
19 **/
20
21#include "config.h"
22
23#include <platform/cb_malloc.h>
24#include <stdio.h>
25#include <stdlib.h>
26#include <string.h>
27#include <errno.h>
28#include <libcouchstore/couch_db.h>
29#include "../file_merger.h"
30#include "../util.h"
31#include "util.h"
32
33#define LINE_BUF_SIZE (8 * 1024)
34#define MERGE_ERROR_CODE(Err) (100 + (Err))
35
36typedef enum {
37    MERGE_FILE_TYPE_ID_BTREE = 'i',
38    MERGE_FILE_TYPE_MAPREDUCE_VIEW = 'v',
39    MERGE_FILE_TYPE_SPATIAL = 's'
40} merge_file_type_t;
41
42
43int main(int argc, char *argv[])
44{
45    merge_file_type_t view_file_type;
46    int num_files;
47    int i, j;
48    char **view_files;
49    char tmp_dir[LINE_BUF_SIZE];
50    char dest_file[LINE_BUF_SIZE];
51    file_merger_error_t error;
52    cb_thread_t exit_thread;
53    int status = 0;
54
55    (void) argc;
56    (void) argv;
57
58    /*
59     * Disable buffering for stdout/stderr since file merger messages needs
60     * to be immediately available at erlang side
61     */
62    setvbuf(stdout, (char *) NULL, _IONBF, 0);
63    setvbuf(stderr, (char *) NULL, _IONBF, 0);
64
65    if (set_binary_mode() < 0) {
66        fprintf(stderr, "Error setting binary mode\n");
67        exit(EXIT_FAILURE);
68    }
69
70    if (fscanf(stdin, "%c\n", (char *)&view_file_type) != 1) {
71        fprintf(stderr, "Error reading view file type.\n");
72        exit(EXIT_FAILURE);
73    }
74    switch(view_file_type) {
75        case MERGE_FILE_TYPE_ID_BTREE:
76        case MERGE_FILE_TYPE_MAPREDUCE_VIEW:
77        case MERGE_FILE_TYPE_SPATIAL:
78            break;
79        default:
80            fprintf(stderr, "View file type must be 'i', 'v' or 's'.\n");
81            exit(EXIT_FAILURE);
82    }
83
84    if (view_file_type == MERGE_FILE_TYPE_SPATIAL) {
85        if (couchstore_read_line(stdin, tmp_dir, LINE_BUF_SIZE) != tmp_dir) {
86            fprintf(stderr, "Error reading temporary directory path.\n");
87            exit(EXIT_FAILURE);
88        }
89    }
90    if (fscanf(stdin, "%d\n", &num_files) != 1) {
91        fprintf(stderr, "Error reading number of files to merge.\n");
92        exit(EXIT_FAILURE);
93    }
94    if (num_files <= 0) {
95        fprintf(stderr, "Number of files to merge is negative or zero.\n");
96        exit(EXIT_FAILURE);
97    }
98
99    view_files = (char **) cb_malloc(sizeof(char *) * num_files);
100    if (view_files == NULL) {
101        fprintf(stderr, "Memory allocation failure.\n");
102        exit(EXIT_FAILURE);
103    }
104
105    for (i = 0; i < num_files; ++i) {
106        view_files[i] = (char *) cb_malloc(LINE_BUF_SIZE);
107        if (view_files[i] == NULL) {
108            for (j = 0; j < i; ++j) {
109                cb_free(view_files[j]);
110            }
111            cb_free(view_files);
112            fprintf(stderr, "Memory allocation failure.\n");
113            exit(EXIT_FAILURE);
114        }
115
116        if (couchstore_read_line(stdin, view_files[i], LINE_BUF_SIZE) != view_files[i]) {
117            for (j = 0; j <= i; ++j) {
118                cb_free(view_files[j]);
119            }
120            cb_free(view_files);
121            fprintf(stderr, "Error reading view file number %d.\n", (i + 1));
122            exit(EXIT_FAILURE);
123        }
124    }
125
126    if (couchstore_read_line(stdin, dest_file, LINE_BUF_SIZE) != dest_file) {
127        fprintf(stderr, "Error reading destination file name.\n");
128        status = 1;
129        goto finished;
130    }
131
132    status = start_exit_listener(&exit_thread, 0 /*uses_v8*/);
133    if (status) {
134        fprintf(stderr, "Error starting stdin exit listener thread\n");
135        goto finished;
136    }
137
138    if (num_files > 1) {
139        const char **src_files = (const char **) view_files;
140        switch (view_file_type) {
141        case MERGE_FILE_TYPE_ID_BTREE:
142            error = merge_view_ids_ops_files(src_files, num_files, dest_file);
143            break;
144        case MERGE_FILE_TYPE_MAPREDUCE_VIEW:
145            error = merge_view_kvs_ops_files(src_files, num_files, dest_file);
146            break;
147        case MERGE_FILE_TYPE_SPATIAL:
148            error = merge_spatial_kvs_ops_files(src_files, num_files,
149                                                dest_file, tmp_dir);
150            break;
151        default:
152            fprintf(stderr, "Unknown view file type: %c\n", view_file_type);
153            status = 1;
154            goto finished;
155        }
156
157        if (error == FILE_MERGER_SUCCESS) {
158            for (i = 0; i < num_files; ++i) {
159                /* Ignore failures.
160                   Erlang side will eventually delete the files later. */
161                remove(view_files[i]);
162            }
163        } else {
164            status = MERGE_ERROR_CODE(error);
165            goto finished;
166        }
167    } else {
168        if (rename(view_files[0], dest_file) != 0) {
169            fprintf(stderr, "Error renaming file %s to %s: %s\n",
170                    view_files[0], dest_file, strerror(errno));
171            status = 1;
172            goto finished;
173        }
174    }
175
176 finished:
177    for (i = 0; i < num_files; ++i) {
178        cb_free(view_files[i]);
179    }
180    cb_free(view_files);
181
182    _exit(status);
183}
184