xref: /6.6.0/couchstore/src/file_merger.cc (revision b8bc47e9)
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3/**
4 * @copyright 2013 Couchbase, Inc.
5 *
6 * @author Filipe Manana  <filipe@couchbase.com>
7 * @author Aliaksey Kandratsenka <alk@tut.by> (small optimization)
8 *
9 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
10 * use this file except in compliance with the License. You may obtain a copy of
11 * the License at
12 *
13 *  http://www.apache.org/licenses/LICENSE-2.0
14 *
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 * License for the specific language governing permissions and limitations under
19 * the License.
20 **/
21
22#include "file_merger.h"
23
24#include <errno.h>
25#include <stdlib.h>
26#include <string.h>
27#include <platform/cb_malloc.h>
28#include <platform/cbassert.h>
29
30
31typedef struct {
32    void      *data;
33    unsigned  file;
34} record_t;
35
36#define FREE_RECORD(ctx, rec)                                \
37    do {                                                     \
38        (*(ctx)->free_record)((rec)->data, (ctx)->user_ctx); \
39        cb_free((rec));                                      \
40    } while (0)
41
42struct file_merger_ctx_t;
43
44typedef struct {
45    struct file_merger_ctx_t  *ctx;
46    record_t                  **data;
47    unsigned                  count;
48} sorted_vector_t;
49
50typedef struct file_merger_ctx_t {
51    unsigned                            num_files;
52    FILE                                **files;
53    FILE                                *dest_file;
54    file_merger_read_record_t           read_record;
55    file_merger_write_record_t          write_record;
56    file_merger_record_free_t           free_record;
57    file_merger_compare_records_t       compare_records;
58    file_merger_deduplicate_records_t   dedup_records;
59    file_merger_feed_record_t           feed_record;
60    void                                *user_ctx;
61    sorted_vector_t                     sorted_vector;
62} file_merger_ctx_t;
63
64
65static int  init_sorted_vector(sorted_vector_t *sorted_vector, unsigned max_elements, file_merger_ctx_t *ctx);
66static void sorted_vector_destroy(sorted_vector_t *sorted_vector);
67static void sorted_vector_pop(sorted_vector_t *sorted_vector,
68                              record_t ***records,
69                              size_t *n);
70static int  sorted_vector_add(sorted_vector_t *sorted_vector, record_t *record);
71
72static file_merger_error_t do_merge_files(file_merger_ctx_t *ctx);
73static void free_all_records(file_merger_ctx_t *ctx, record_t **records,
74                             int offset, int num);
75
76
77file_merger_error_t merge_files(const char *source_files[],
78                                unsigned num_files,
79                                const char *dest_file,
80                                file_merger_read_record_t read_record,
81                                file_merger_write_record_t write_record,
82                                file_merger_feed_record_t feed_record,
83                                file_merger_compare_records_t compare_records,
84                                file_merger_deduplicate_records_t dedup_records,
85                                file_merger_record_free_t free_record,
86                                int skip_writeback,
87                                void *user_ctx)
88{
89    file_merger_ctx_t ctx;
90    file_merger_error_t ret;
91    unsigned i, j;
92
93    if (num_files == 0) {
94        return FILE_MERGER_ERROR_BAD_ARG;
95    }
96
97    ctx.num_files = num_files;
98    ctx.read_record = read_record;
99    ctx.write_record = write_record;
100    ctx.free_record = free_record;
101    ctx.compare_records = compare_records;
102    ctx.user_ctx = user_ctx;
103    ctx.feed_record = feed_record;
104    ctx.dedup_records = dedup_records;
105
106    if (feed_record && skip_writeback) {
107        ctx.dest_file = NULL;
108    } else {
109        ctx.dest_file = fopen(dest_file, "ab");
110    }
111
112    if (!init_sorted_vector(&ctx.sorted_vector, num_files, &ctx)) {
113        return FILE_MERGER_ERROR_ALLOC;
114    }
115
116    if (feed_record == NULL && ctx.dest_file == NULL) {
117        sorted_vector_destroy(&ctx.sorted_vector);
118        return FILE_MERGER_ERROR_OPEN_FILE;
119    }
120
121    ctx.files = (FILE **) cb_malloc(sizeof(FILE *) * num_files);
122
123    if (ctx.files == NULL) {
124        sorted_vector_destroy(&ctx.sorted_vector);
125        fclose(ctx.dest_file);
126        return FILE_MERGER_ERROR_ALLOC;
127    }
128
129    for (i = 0; i < num_files; ++i) {
130        ctx.files[i] = fopen(source_files[i], "rb");
131
132        if (ctx.files[i] == NULL) {
133            fprintf(stderr, "Error while opening file %s errcode %s\n",
134                    source_files[i], strerror(errno));
135            for (j = 0; j < i; ++j) {
136                fclose(ctx.files[j]);
137            }
138            cb_free(ctx.files);
139            fclose(ctx.dest_file);
140            sorted_vector_destroy(&ctx.sorted_vector);
141
142            return FILE_MERGER_ERROR_OPEN_FILE;
143        }
144    }
145
146    ret = do_merge_files(&ctx);
147
148    for (i = 0; i < ctx.num_files; ++i) {
149        if (ctx.files[i] != NULL) {
150            fclose(ctx.files[i]);
151        }
152    }
153    cb_free(ctx.files);
154    sorted_vector_destroy(&ctx.sorted_vector);
155    if (ctx.dest_file) {
156        fclose(ctx.dest_file);
157    }
158
159    return ret;
160}
161
162
163static file_merger_error_t do_merge_files(file_merger_ctx_t *ctx)
164{
165    unsigned i;
166
167    for (i = 0; i < ctx->num_files; ++i) {
168        FILE *f = ctx->files[i];
169        int record_len;
170        void *record_data;
171        record_t *record;
172
173        record_len = (*ctx->read_record)(f, &record_data, ctx->user_ctx);
174
175        if (record_len == 0) {
176            fclose(f);
177            ctx->files[i] = NULL;
178        } else if (record_len < 0) {
179            return (file_merger_error_t) record_len;
180        } else {
181            int rv;
182            record = (record_t *) cb_malloc(sizeof(*record));
183            if (record == NULL) {
184                return FILE_MERGER_ERROR_ALLOC;
185            }
186            record->data = record_data;
187            record->file = i;
188            rv = sorted_vector_add(&ctx->sorted_vector, record);
189            if (!rv) {
190                FREE_RECORD(ctx, record);
191                return FILE_MERGER_SORT_ERROR;
192            }
193        }
194    }
195
196    while (ctx->sorted_vector.count != 0) {
197        record_t **records;
198        size_t n;
199        size_t i;
200        void *record_data;
201        int record_len;
202        file_merger_error_t ret;
203
204        /* The head of the list is the required item which needs to be written
205         * to the output destination records file.
206         * For each duplicated item that is eliminated (elements of linked
207         * list), we need to read one record from the same file as the
208         * duplicated record came from and add them to the sort vector.
209         */
210        sorted_vector_pop(&ctx->sorted_vector, &records, &n);
211        cb_assert(records != NULL);
212        cb_assert(n != 0);
213
214        if (ctx->feed_record) {
215            ret = (*ctx->feed_record)(records[0]->data, ctx->user_ctx);
216            if (ret != FILE_MERGER_SUCCESS) {
217                free_all_records(ctx, records, 0, n);
218                return ret;
219            }
220        } else {
221            cb_assert(ctx->dest_file != NULL);
222        }
223
224        if (ctx->dest_file) {
225            ret = (*ctx->write_record)(ctx->dest_file, records[0]->data, ctx->user_ctx);
226            if (ret != FILE_MERGER_SUCCESS) {
227                free_all_records(ctx, records, 0, n);
228                return ret;
229            }
230        }
231
232        for (i = 0; i < n; i++) {
233            record_len = (*ctx->read_record)(ctx->files[records[i]->file],
234                                             &record_data,
235                                             ctx->user_ctx);
236            if (record_len == 0) {
237                fclose(ctx->files[records[i]->file]);
238                ctx->files[records[i]->file] = NULL;
239                FREE_RECORD(ctx, records[i]);
240
241            } else if (record_len < 0) {
242                free_all_records(ctx, records, i, n);
243                return (file_merger_error_t) record_len;
244            } else {
245                int rv;
246                (*ctx->free_record)(records[i]->data, ctx->user_ctx);
247                records[i]->data = record_data;
248                rv = sorted_vector_add(&ctx->sorted_vector, records[i]);
249                if (!rv) {
250                    free_all_records(ctx, records, i, n);
251                    return FILE_MERGER_SORT_ERROR;
252                }
253            }
254        }
255
256        cb_free(records);
257    }
258
259    return FILE_MERGER_SUCCESS;
260}
261
262
263static void free_all_records(file_merger_ctx_t *ctx, record_t **records,
264                             int offset, int num) {
265    for (; offset < num; offset++) {
266        FREE_RECORD(ctx, records[offset]);
267    }
268    cb_free(records);
269}
270
271
272static int init_sorted_vector(sorted_vector_t *sorted_vector,
273                              unsigned max_elements,
274                              file_merger_ctx_t *ctx)
275{
276    sorted_vector->data = (record_t **) cb_malloc(sizeof(record_t *) * max_elements);
277    if (sorted_vector->data == NULL) {
278        return 0;
279    }
280
281    sorted_vector->count = 0;
282    sorted_vector->ctx = ctx;
283
284    return 1;
285}
286
287
288static void sorted_vector_destroy(sorted_vector_t *sorted_vector)
289{
290    unsigned i;
291
292    for (i = 0; i < sorted_vector->count; ++i) {
293        FREE_RECORD(sorted_vector->ctx, sorted_vector->data[i]);
294    }
295
296    cb_free(sorted_vector->data);
297}
298
299
300#define SORTED_VECTOR_CMP(h, a, b)  \
301    (*(h)->ctx->compare_records)((a)->data, (b)->data, (h)->ctx->user_ctx)
302
303static void sorted_vector_pop(sorted_vector_t *sorted_vector,
304                              record_t ***records,
305                              size_t *n)
306{
307    record_t *head;
308    file_merger_record_t **duplicates;
309    size_t i, j;
310
311    if (sorted_vector->count == 0) {
312        *records = NULL;
313        *n = 0;
314        return;
315    }
316
317    /* For deduplication, return the list of records whose keys are equal.
318     * Hence they can be eliminated from the sort vector and least element can
319     * be picked for writing out to the output file.
320     * */
321    head = sorted_vector->data[0];
322
323    for (i = 1; sorted_vector->ctx->dedup_records != NULL && i < sorted_vector->count; i++) {
324        if (SORTED_VECTOR_CMP(sorted_vector, head, sorted_vector->data[i]) != 0) {
325            break;
326        }
327    }
328
329    *records = (record_t **) cb_malloc(sizeof(record_t *) * i);
330    memcpy(*records, sorted_vector->data, sizeof(record_t *) * i);
331    *n = i;
332
333
334    if (i > 1) {
335        record_t *tmp;
336        duplicates = (file_merger_record_t **) *records;
337        j = sorted_vector->ctx->dedup_records(duplicates, i, sorted_vector->ctx->user_ctx);
338        tmp = (*records)[0];
339        (*records)[0] = (*records)[j];
340        (*records)[j] = tmp;
341    }
342
343    sorted_vector->count -= i;
344    memmove(sorted_vector->data + 0, sorted_vector->data + i,
345            sizeof(sorted_vector->data[0]) * (sorted_vector->count));
346}
347
348
349static int sorted_vector_add(sorted_vector_t *sorted_vector, record_t *record)
350{
351    unsigned l, r;
352
353    if (sorted_vector->count == sorted_vector->ctx->num_files) {
354        /* sorted_vector full */
355        return 0;
356    }
357
358    l = 0;
359    r = sorted_vector->count;
360    while (r - l > 1) {
361        unsigned pos = (l + r) / 2;
362
363        if (SORTED_VECTOR_CMP(sorted_vector, record, sorted_vector->data[pos]) < 0) {
364            r = pos;
365        } else {
366            l = pos;
367        }
368    }
369
370    if (l == 0 && r != 0) {
371        if (SORTED_VECTOR_CMP(sorted_vector, record, sorted_vector->data[0]) < 0) {
372            r = 0;
373        }
374    }
375
376    if (r < sorted_vector->count) {
377        memmove(sorted_vector->data + r + 1,
378                sorted_vector->data + r,
379                sizeof(sorted_vector->data[0]) * (sorted_vector->count - r));
380    }
381
382    sorted_vector->count += 1;
383
384    sorted_vector->data[r] = record;
385
386    return 1;
387}
388