xref: /3.0.2-MP2/couchstore/src/tree_writer.cc (revision afa48171)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3
4#include "arena.h"
5#include "bitfield.h"
6#include "couch_btree.h"
7#include "internal.h"
8#include "mergesort.h"
9#include "reduces.h"
10#include "tree_writer.h"
11#include "util.h"
12
13#include <stdlib.h>
14
15
16#define ID_SORT_CHUNK_SIZE (100 * 1024 * 1024) // 100MB. Make tuneable?
17#define ID_SORT_MAX_RECORD_SIZE 4196
18
19
20static char *alloc_record(void);
21static char *duplicate_record(char *rec);
22static void free_record(char *rec);
23static int read_id_record(FILE *in, void *buf, void *ctx);
24static int write_id_record(FILE *out, void *ptr, void *ctx);
25static int compare_id_record(const void *r1, const void *r2, void *ctx);
26
27
28struct TreeWriter {
29    FILE* file;
30    char *tmp_path; // a buffer used to build unique temporary filenames
31    compare_callback key_compare;
32    reduce_fn reduce;
33    reduce_fn rereduce;
34    void *user_reduce_ctx;
35};
36
37
38couchstore_error_t TreeWriterOpen(char* unsortedFilePath,
39                                  compare_callback key_compare,
40                                  reduce_fn reduce,
41                                  reduce_fn rereduce,
42                                  void *user_reduce_ctx,
43                                  TreeWriter** out_writer)
44{
45    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
46    TreeWriter* writer = static_cast<TreeWriter*>(calloc(1, sizeof(TreeWriter)));
47    error_unless(writer, COUCHSTORE_ERROR_ALLOC_FAIL);
48    if (unsortedFilePath) {
49        // stash the temp file path into context for uniq tempfile construction
50        writer->tmp_path = unsortedFilePath;
51        writer->file = openTmpFile(writer->tmp_path);
52    }
53
54    if (!writer->file) {
55        TreeWriterFree(writer);
56        error_pass(COUCHSTORE_ERROR_NO_SUCH_FILE);
57    }
58    if (unsortedFilePath) {
59        fseek(writer->file, 0, SEEK_END);  // in case more items will be added
60    }
61    writer->key_compare = (key_compare ? key_compare : ebin_cmp);
62    writer->reduce = reduce;
63    writer->rereduce = rereduce;
64    writer->user_reduce_ctx = user_reduce_ctx;
65    *out_writer = writer;
66cleanup:
67    return errcode;
68}
69
70
71void TreeWriterFree(TreeWriter* writer)
72{
73    if (writer && writer->file) {
74        fclose(writer->file);
75    }
76    free(writer);
77}
78
79
80couchstore_error_t TreeWriterAddItem(TreeWriter* writer, sized_buf key, sized_buf value)
81{
82    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
83
84    uint16_t klen = htons((uint16_t) key.size);
85    uint32_t vlen = htonl((uint32_t) value.size);
86    error_unless(fwrite(&klen, sizeof(klen), 1, writer->file) == 1, COUCHSTORE_ERROR_WRITE);
87    error_unless(fwrite(&vlen, sizeof(vlen), 1, writer->file) == 1, COUCHSTORE_ERROR_WRITE);
88    error_unless(fwrite(key.buf, key.size, 1, writer->file) == 1, COUCHSTORE_ERROR_WRITE);
89    error_unless(fwrite(value.buf, value.size, 1, writer->file) == 1, COUCHSTORE_ERROR_WRITE);
90
91cleanup:
92    return errcode;
93}
94
95
96couchstore_error_t TreeWriterSort(TreeWriter* writer)
97{
98    rewind(writer->file);
99    return static_cast<couchstore_error_t>(merge_sort(writer->file,
100                                                      writer->file,
101                                                      writer->tmp_path,
102                                                      read_id_record,
103                                                      write_id_record,
104                                                      compare_id_record,
105                                                      alloc_record,
106                                                      duplicate_record,
107                                                      free_record,
108                                                      writer,  // 'context' parameter to the above callbacks
109                                                      ID_SORT_CHUNK_SIZE,
110                                                      NULL));
111}
112
113
114couchstore_error_t TreeWriterWrite(TreeWriter* writer,
115                                   tree_file* treefile,
116                                   node_pointer** out_root)
117{
118    couchstore_error_t errcode = COUCHSTORE_SUCCESS;
119    arena* transient_arena = new_arena(0);
120    arena* persistent_arena = new_arena(0);
121    compare_info idcmp;
122    uint16_t klen;
123    uint32_t vlen;
124    sized_buf k, v;
125    int readerr;
126    couchfile_modify_result* target_mr;
127
128    error_unless(transient_arena && persistent_arena, COUCHSTORE_ERROR_ALLOC_FAIL);
129
130    rewind(writer->file);
131
132    // Create the structure to write the tree to the db:
133    idcmp.compare = writer->key_compare;
134
135    target_mr = new_btree_modres(persistent_arena,
136                                 transient_arena,
137                                 treefile, &idcmp,
138                                 writer->reduce,
139                                 writer->rereduce,
140                                 writer->user_reduce_ctx,
141                                 DB_CHUNK_THRESHOLD,
142                                 DB_CHUNK_THRESHOLD);
143    if (target_mr == NULL) {
144        error_pass(COUCHSTORE_ERROR_ALLOC_FAIL);
145    }
146
147    // Read all the key/value pairs from the file and add them to the tree:
148    while (1) {
149        if (fread(&klen, sizeof(klen), 1, writer->file) != 1) {
150            break;
151        }
152        if (fread(&vlen, sizeof(vlen), 1, writer->file) != 1) {
153            break;
154        }
155        k.size = ntohs(klen);
156        k.buf = static_cast<char*>(arena_alloc(transient_arena, k.size));
157        v.size = ntohl(vlen);
158        v.buf = static_cast<char*>(arena_alloc(transient_arena, v.size));
159        if (fread(k.buf, k.size, 1, writer->file) != 1) {
160            error_pass(COUCHSTORE_ERROR_READ);
161        }
162        if (fread(v.buf, v.size, 1, writer->file) != 1) {
163            error_pass(COUCHSTORE_ERROR_READ);
164        }
165        //printf("K: '%.*s'\n", k.size, k.buf);
166        mr_push_item(&k, &v, target_mr);
167        if (target_mr->count == 0) {
168            /* No items queued, we must have just flushed. We can safely rewind the transient arena. */
169            arena_free_all(transient_arena);
170        }
171    }
172
173    // Check for file error:
174    readerr = ferror(writer->file);
175    if (readerr != 0 && readerr != EOF) {
176        error_pass(COUCHSTORE_ERROR_READ);
177    }
178
179    // Finish up the tree:
180    *out_root = complete_new_btree(target_mr, &errcode);
181
182cleanup:
183    delete_arena(transient_arena);
184    delete_arena(persistent_arena);
185    return errcode;
186}
187
188
189//////// MERGE-SORT CALLBACKS:
190
191
192typedef struct extsort_record {
193    sized_buf k;
194    sized_buf v;
195    char buf[1];
196} extsort_record;
197
198static int read_id_record(FILE *in, void *buf, void *ctx)
199{
200    (void) ctx;
201    uint16_t klen;
202    uint32_t vlen;
203    extsort_record *rec = (extsort_record *) buf;
204    if (fread(&klen, 2, 1, in) != 1) {
205        if (feof(in)) {
206            return 0;
207        } else {
208            return -1;
209        }
210    }
211    if (fread(&vlen, 4, 1, in) != 1) {
212        return -1;
213    }
214    klen = ntohs(klen);
215    vlen = ntohl(vlen);
216    rec->k.size = klen;
217    rec->k.buf = rec->buf;
218    rec->v.size = vlen;
219    rec->v.buf = rec->buf + klen;
220    if (fread(rec->k.buf, klen, 1, in) != 1) {
221        return -1;
222    }
223    if (fread(rec->v.buf, vlen, 1, in) != 1) {
224        return -1;
225    }
226    return sizeof(extsort_record) + klen + vlen;
227}
228
229static int write_id_record(FILE *out, void *ptr, void *ctx)
230{
231    (void) ctx;
232    extsort_record *rec = (extsort_record *) ptr;
233    uint16_t klen = htons((uint16_t) rec->k.size);
234    uint32_t vlen = htonl((uint32_t) rec->v.size);
235    if (fwrite(&klen, 2, 1, out) != 1) {
236        return 0;
237    }
238    if (fwrite(&vlen, 4, 1, out) != 1) {
239        return 0;
240    }
241    if (fwrite(rec->buf, rec->k.size + rec->v.size, 1, out) != 1) {
242        return 0;
243    }
244    return 1;
245}
246
247static int compare_id_record(const void *r1, const void *r2, void *ctx)
248{
249    TreeWriter* writer = static_cast<TreeWriter*>(ctx);
250    extsort_record *e1 = (extsort_record *) r1, *e2 = (extsort_record *) r2;
251    e1->k.buf = e1->buf;
252    e2->k.buf = e2->buf;
253    return writer->key_compare(&e1->k, &e2->k);
254}
255
256static char *alloc_record(void)
257{
258    return static_cast<char*>(malloc(ID_SORT_MAX_RECORD_SIZE));
259}
260
261static char *duplicate_record(char *rec)
262{
263    extsort_record *record = (extsort_record *) rec;
264    size_t record_size = sizeof(extsort_record) + record->k.size + record->v.size;
265    extsort_record *new_record = (extsort_record *) malloc(record_size);
266
267    if (new_record != NULL) {
268        memcpy(new_record, record, record_size);
269    }
270
271    return (char *) new_record;
272}
273
274static void free_record(char *rec)
275{
276    free(rec);
277}
278