xref: /3.0.2-MP2/couchstore/src/mergesort.cc (revision afa48171)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/* Merge Sort
3   by Philip J. Erdelsky
4   pje@efgh.com
5   http://www.efgh.com/software/mergesor.htm
6*/
7#include "config.h"
8#include "internal.h"
9#include "mergesort.h"
10
11#include <stdlib.h>
12#include <string.h>
13struct record_in_memory {
14    struct record_in_memory *next;
15    char *record;
16};
17
18struct compare_info {
19    mergesort_compare_records_t compare;
20    void *pointer;
21};
22
23static void free_memory_blocks(struct record_in_memory *first,
24                               mergesort_record_free_t record_free)
25{
26    while (first != NULL) {
27        struct record_in_memory *next = first->next;
28        (*record_free)(first->record);
29        free(first);
30        first = next;
31    }
32}
33
34static int compare_records(void *p, void *q, void *pointer)
35{
36#define pp ((struct record_in_memory *) p)
37#define qq ((struct record_in_memory *) q)
38#define point ((struct compare_info *) pointer)
39    return (*point->compare)(pp->record, qq->record, point->pointer);
40}
41
42FILE *openTmpFile(char *path) {
43    FILE *tempFile = NULL;
44    int pos = strlen(path);
45    // If file name is 4.couch.2.compact.btree-tmp_356
46    // pull out suffix as int in reverse i.e 653
47    // increment suffix by 1 to 654
48    // append new suffix in reverse as 4.couch.2.compact.btree-tmp_456
49    int suffix = 0;
50    while (path[--pos] != '_'){ // ok to crash on underflow
51        suffix = suffix * 10 + (path[pos] - '0'); // atoi
52    }
53
54    suffix++;
55
56    // do itoa in reverse
57    while (suffix) {
58        int tens = suffix % 10;
59        path[++pos] = tens + '0';
60        suffix = suffix / 10;
61    }
62    path[++pos] = '\0';
63
64    tempFile = fopen(path, "w+b");
65    if (tempFile) {
66#ifdef WIN32
67        _unlink(path);
68#else
69        unlink(path);
70#endif
71    }
72    return tempFile;
73}
74
75#define OK                   COUCHSTORE_SUCCESS
76#define INSUFFICIENT_MEMORY  COUCHSTORE_ERROR_ALLOC_FAIL
77#define FILE_CREATION_ERROR  COUCHSTORE_ERROR_OPEN_FILE
78#define FILE_WRITE_ERROR     COUCHSTORE_ERROR_WRITE
79#define FILE_READ_ERROR      COUCHSTORE_ERROR_READ
80
81int merge_sort(FILE *unsorted_file, FILE *sorted_file,
82               char *tmp_path,
83               mergesort_read_record_t read,
84               mergesort_write_record_t write,
85               mergesort_compare_records_t compare,
86               mergesort_record_alloc_t record_alloc,
87               mergesort_record_duplicate_t record_duplicate,
88               mergesort_record_free_t record_free,
89               void *pointer,
90               unsigned long block_size,
91               unsigned long *pcount)
92{
93    struct tape {
94        FILE *fp;
95        unsigned long count;
96    };
97    struct tape source_tape[2];
98    char *record[2];
99    /* allocate memory */
100    if ((record[0] = (*record_alloc)()) == NULL) {
101        return INSUFFICIENT_MEMORY;
102    }
103    if ((record[1] = (*record_alloc)()) == NULL) {
104        (*record_free)(record[0]);
105        return INSUFFICIENT_MEMORY;
106    }
107    /* create temporary files source_tape[0] and source_tape[1] */
108    source_tape[0].fp = openTmpFile(tmp_path);
109    source_tape[0].count = 0L;
110    if (source_tape[0].fp == NULL) {
111        (*record_free)(record[0]);
112        (*record_free)(record[1]);
113        return FILE_CREATION_ERROR;
114    }
115    source_tape[1].fp = openTmpFile(tmp_path);
116    source_tape[1].count = 0L;
117    if (source_tape[1].fp == NULL) {
118        fclose(source_tape[0].fp);
119        (*record_free)(record[0]);
120        (*record_free)(record[1]);
121        return FILE_CREATION_ERROR;
122    }
123    /* read blocks, sort them in memory, and write the alternately to */
124    /* tapes 0 and 1 */
125    {
126        struct record_in_memory *first = NULL;
127        unsigned long block_count = 0;
128        unsigned destination = 0;
129        struct compare_info comp;
130        comp.compare = compare;
131        comp.pointer = pointer;
132        while (1) {
133            int record_size = (*read)(unsorted_file, record[0], pointer);
134            if (record_size > 0) {
135                struct record_in_memory *p = (struct record_in_memory *) malloc(sizeof(*p));
136                if (p == NULL) {
137                    fclose(source_tape[0].fp);
138                    fclose(source_tape[1].fp);
139                    (*record_free)(record[0]);
140                    (*record_free)(record[1]);
141                    free_memory_blocks(first, record_free);
142                    return INSUFFICIENT_MEMORY;
143                }
144                p->record = (*record_duplicate)(record[0]);
145                if (p->record == NULL) {
146                    fclose(source_tape[0].fp);
147                    fclose(source_tape[1].fp);
148                    free(p);
149                    (*record_free)(record[0]);
150                    (*record_free)(record[1]);
151                    free_memory_blocks(first, record_free);
152                    return INSUFFICIENT_MEMORY;
153                }
154                p->next = first;
155                first = p;
156                block_count++;
157            } else if (record_size < 0) {
158                fclose(source_tape[0].fp);
159                fclose(source_tape[1].fp);
160                (*record_free)(record[0]);
161                (*record_free)(record[1]);
162                free_memory_blocks(first, record_free);
163                return FILE_READ_ERROR;
164            }
165            if (block_count == block_size || (record_size == 0 && block_count != 0)) {
166                first = static_cast<record_in_memory*>(sort_linked_list(first, 0, compare_records, &comp, NULL));
167                while (first != NULL) {
168                    struct record_in_memory *next = first->next;
169                    if ((*write)(source_tape[destination].fp, first->record,
170                                 pointer) == 0) {
171                        fclose(source_tape[0].fp);
172                        fclose(source_tape[1].fp);
173                        (*record_free)(record[0]);
174                        (*record_free)(record[1]);
175                        free_memory_blocks(first, record_free);
176                        return FILE_WRITE_ERROR;
177                    }
178                    source_tape[destination].count++;
179                    (*record_free)(first->record);
180                    free(first);
181                    first = next;
182                }
183                destination ^= 1;
184                block_count = 0;
185            }
186            if (record_size == 0) {
187                break;
188            }
189        }
190    }
191    if (sorted_file == unsorted_file) {
192        rewind(unsorted_file);
193    }
194    rewind(source_tape[0].fp);
195    rewind(source_tape[1].fp);
196    /* delete the unsorted file here, if required (see instructions) */
197    /* handle case where memory sort is all that is required */
198    if (source_tape[1].count == 0L) {
199        fclose(source_tape[1].fp);
200        source_tape[1] = source_tape[0];
201        source_tape[0].fp = sorted_file;
202        while (source_tape[1].count-- != 0L) {
203            if ((*read)(source_tape[1].fp, record[0], pointer) <= 0) {
204                fclose(source_tape[1].fp);
205                (*record_free)(record[0]);
206                (*record_free)(record[1]);
207                return FILE_READ_ERROR;
208            }
209            if ((*write)(source_tape[0].fp, record[0], pointer) == 0) {
210                fclose(source_tape[1].fp);
211                (*record_free)(record[0]);
212                (*record_free)(record[1]);
213                return FILE_WRITE_ERROR;
214            }
215        }
216    } else {
217        /* merge tapes, two by two, until every record is in source_tape[0] */
218        while (source_tape[1].count != 0L) {
219            unsigned destination = 0;
220            struct tape destination_tape[2];
221            int record1_size, record2_size;
222            destination_tape[0].fp = source_tape[0].count <= block_size ?
223                                     sorted_file : openTmpFile(tmp_path);
224            destination_tape[0].count = 0L;
225            if (destination_tape[0].fp == NULL) {
226                fclose(source_tape[0].fp);
227                fclose(source_tape[1].fp);
228                (*record_free)(record[0]);
229                (*record_free)(record[1]);
230                return FILE_CREATION_ERROR;
231            }
232            destination_tape[1].fp = tmpfile();
233            destination_tape[1].count = 0L;
234            if (destination_tape[1].fp == NULL) {
235                if (destination_tape[0].fp != sorted_file) {
236                    fclose(destination_tape[0].fp);
237                }
238                fclose(source_tape[0].fp);
239                fclose(source_tape[1].fp);
240                (*record_free)(record[0]);
241                (*record_free)(record[1]);
242                return FILE_CREATION_ERROR;
243            }
244            record1_size = (*read)(source_tape[0].fp, record[0], pointer);
245            record2_size = (*read)(source_tape[1].fp, record[1], pointer);
246            if (record1_size <= 0 || record2_size <= 0) {
247                if (destination_tape[0].fp != sorted_file) {
248                    fclose(destination_tape[0].fp);
249                }
250                fclose(source_tape[0].fp);
251                fclose(source_tape[1].fp);
252                (*record_free)(record[0]);
253                (*record_free)(record[1]);
254                return FILE_READ_ERROR;
255            }
256            while (source_tape[0].count != 0L) {
257                unsigned long count[2];
258                count[0] = source_tape[0].count;
259                if (count[0] > block_size) {
260                    count[0] = block_size;
261                }
262                count[1] = source_tape[1].count;
263                if (count[1] > block_size) {
264                    count[1] = block_size;
265                }
266                while (count[0] + count[1] != 0) {
267                    unsigned select = count[0] == 0 ? 1 : count[1] == 0 ? 0 :
268                                      compare(record[0], record[1], pointer) < 0 ? 0 : 1;
269                    if ((*write)(destination_tape[destination].fp, record[select],
270                                 pointer) == 0) {
271                        if (destination_tape[0].fp != sorted_file) {
272                            fclose(destination_tape[0].fp);
273                        }
274                        fclose(destination_tape[1].fp);
275                        fclose(source_tape[0].fp);
276                        fclose(source_tape[1].fp);
277                        (*record_free)(record[0]);
278                        (*record_free)(record[1]);
279                        return FILE_WRITE_ERROR;
280                    }
281                    if (source_tape[select].count > 1L) {
282                        if ((*read)(source_tape[select].fp, record[select], pointer) <= 0) {
283                            if (destination_tape[0].fp != sorted_file) {
284                                fclose(destination_tape[0].fp);
285                            }
286                            fclose(destination_tape[1].fp);
287                            fclose(source_tape[0].fp);
288                            fclose(source_tape[1].fp);
289                            (*record_free)(record[0]);
290                            (*record_free)(record[1]);
291                            return FILE_READ_ERROR;
292                        }
293                    }
294                    source_tape[select].count--;
295                    count[select]--;
296                    destination_tape[destination].count++;
297                }
298                destination ^= 1;
299            }
300            fclose(source_tape[0].fp);
301            fclose(source_tape[1].fp);
302            if (fflush(destination_tape[0].fp) == EOF ||
303                    fflush(destination_tape[1].fp) == EOF) {
304                if (destination_tape[0].fp != sorted_file) {
305                    fclose(destination_tape[0].fp);
306                }
307                fclose(destination_tape[1].fp);
308                (*record_free)(record[0]);
309                (*record_free)(record[1]);
310                return FILE_WRITE_ERROR;
311            }
312            rewind(destination_tape[0].fp);
313            rewind(destination_tape[1].fp);
314            memcpy(source_tape, destination_tape, sizeof(source_tape));
315            block_size <<= 1;
316        }
317    }
318    fclose(source_tape[1].fp);
319    if (pcount != NULL) {
320        *pcount = source_tape[0].count;
321    }
322    (*record_free)(record[0]);
323    (*record_free)(record[1]);
324    return OK;
325}
326
327