1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 /**
4  * @copyright 2013 Couchbase, Inc.
5  *
6  * @author Filipe Manana  <filipe@couchbase.com>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
9  * use this file except in compliance with the License. You may obtain a copy of
10  * the License at
11  *
12  *  http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17  * License for the specific language governing permissions and limitations under
18  * the License.
19  **/
20 
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <assert.h>
24 #include <string.h>
25 #include "file_sorter.h"
26 #include "file_name_utils.h"
27 #include "quicksort.h"
28 
29 #define NSORT_RECORDS_INIT 500000
30 #define NSORT_RECORD_INCR  100000
31 #define NSORT_THREADS 2
32 #define SORTER_TMP_FILE_SUFFIX ".XXXXXX"
33 
34 typedef struct {
35     char     *name;
36     unsigned  level;
37 } tmp_file_t;
38 
39 typedef struct {
40     const char                   *tmp_dir;
41     const char                   *source_file;
42     char                         *tmp_file_prefix;
43     unsigned                      num_tmp_files;
44     unsigned                      max_buffer_size;
45     file_merger_read_record_t     read_record;
46     file_merger_write_record_t    write_record;
47     file_merger_feed_record_t     feed_record;
48     file_merger_compare_records_t compare_records;
49     file_merger_record_free_t     free_record;
50     void                         *user_ctx;
51     FILE                         *f;
52     tmp_file_t                   *tmp_files;
53     unsigned                      active_tmp_files;
54     int                           skip_writeback;
55 } file_sort_ctx_t;
56 
57 // For parallel sorter
58 typedef struct {
59     void       **records;
60     tmp_file_t *tmp_file;
61     size_t     n;
62 } sort_job_t;
63 
64 typedef struct {
65     size_t              nworkers;
66     size_t              free_workers;
67     cb_cond_t           cond;
68     cb_mutex_t          mutex;
69     int                 finished;
70     file_sorter_error_t error;
71     sort_job_t          *job;
72     file_sort_ctx_t     *ctx;
73     cb_thread_t         *threads;
74 } parallel_sorter_t;
75 
76 
77 static file_sorter_error_t do_sort_file(file_sort_ctx_t *ctx);
78 
79 static void sort_records(void **records, size_t n,
80                                           file_sort_ctx_t *ctx);
81 
82 static tmp_file_t *create_tmp_file(file_sort_ctx_t *ctx);
83 
84 static file_sorter_error_t write_record_list(void **records,
85                                              size_t n,
86                                              tmp_file_t *tmp_file,
87                                              file_sort_ctx_t *ctx);
88 
89 static void pick_merge_files(file_sort_ctx_t *ctx,
90                              unsigned *start,
91                              unsigned *end,
92                              unsigned *next_level);
93 
94 static file_sorter_error_t merge_tmp_files(file_sort_ctx_t *ctx,
95                                            unsigned start,
96                                            unsigned end,
97                                            unsigned next_level);
98 
99 static file_sorter_error_t iterate_records_file(file_sort_ctx_t *ctx,
100                                                 const char *file);
101 
102 
103 static sort_job_t *create_sort_job(void **recs, size_t n, tmp_file_t *t);
104 
105 static void free_sort_job(sort_job_t *job, parallel_sorter_t *sorter);
106 
107 static parallel_sorter_t *create_parallel_sorter(size_t workers,
108                                                  file_sort_ctx_t *ctx);
109 
110 static void free_parallel_sorter(parallel_sorter_t *s);
111 
112 static void sort_worker(void *args);
113 
114 static file_sorter_error_t parallel_sorter_add_job(parallel_sorter_t *s,
115                                                   void **records,
116                                                   size_t n);
117 
118 static file_sorter_error_t parallel_sorter_wait(parallel_sorter_t *s, size_t n);
119 
120 static file_sorter_error_t parallel_sorter_finish(parallel_sorter_t *s);
121 
122 static int sorter_random_name(char *tmpl, int totlen, int suffixlen);
123 
124 static char *sorter_tmp_file_path(const char *tmp_dir, const char *prefix);
125 
sort_file(const char *source_file, const char *tmp_dir, unsigned num_tmp_files, unsigned max_buffer_size, file_merger_read_record_t read_record, file_merger_write_record_t write_record, file_merger_feed_record_t feed_record, file_merger_compare_records_t compare_records, file_merger_record_free_t free_record, int skip_writeback, void *user_ctx)126 file_sorter_error_t sort_file(const char *source_file,
127                               const char *tmp_dir,
128                               unsigned num_tmp_files,
129                               unsigned max_buffer_size,
130                               file_merger_read_record_t read_record,
131                               file_merger_write_record_t write_record,
132                               file_merger_feed_record_t feed_record,
133                               file_merger_compare_records_t compare_records,
134                               file_merger_record_free_t free_record,
135                               int skip_writeback,
136                               void *user_ctx)
137 {
138     file_sort_ctx_t ctx;
139     unsigned i;
140     file_sorter_error_t ret;
141 
142     if (num_tmp_files <= 1) {
143         return FILE_SORTER_ERROR_BAD_ARG;
144     }
145 
146     ctx.tmp_file_prefix = file_basename(source_file);
147     if (ctx.tmp_file_prefix == NULL) {
148         return FILE_SORTER_ERROR_TMP_FILE_BASENAME;
149     }
150 
151     ctx.tmp_dir = tmp_dir;
152     ctx.source_file = source_file;
153     ctx.num_tmp_files = num_tmp_files;
154     ctx.max_buffer_size = max_buffer_size;
155     ctx.read_record = read_record;
156     ctx.write_record = write_record;
157     ctx.feed_record = feed_record;
158     ctx.compare_records = compare_records;
159     ctx.free_record = free_record;
160     ctx.user_ctx = user_ctx;
161     ctx.active_tmp_files = 0;
162     ctx.skip_writeback = skip_writeback;
163 
164     if (skip_writeback && !feed_record) {
165         return FILE_SORTER_ERROR_MISSING_CALLBACK;
166     }
167 
168     ctx.f = fopen(source_file, "rb");
169     if (ctx.f == NULL) {
170         free(ctx.tmp_file_prefix);
171         return FILE_SORTER_ERROR_OPEN_FILE;
172     }
173 
174     ctx.tmp_files = (tmp_file_t *) malloc(sizeof(tmp_file_t) * num_tmp_files);
175 
176     if (ctx.tmp_files == NULL) {
177         fclose(ctx.f);
178         free(ctx.tmp_file_prefix);
179         return FILE_SORTER_ERROR_ALLOC;
180     }
181 
182     for (i = 0; i < num_tmp_files; ++i) {
183         ctx.tmp_files[i].name = NULL;
184         ctx.tmp_files[i].level = 0;
185     }
186 
187     ret = do_sort_file(&ctx);
188 
189     if (ctx.f != NULL) {
190         fclose(ctx.f);
191     }
192     for (i = 0; i < ctx.active_tmp_files; ++i) {
193         if (ctx.tmp_files[i].name != NULL) {
194             remove(ctx.tmp_files[i].name);
195             free(ctx.tmp_files[i].name);
196         }
197     }
198     free(ctx.tmp_files);
199     free(ctx.tmp_file_prefix);
200 
201     return ret;
202 }
203 
204 
create_sort_job(void **recs, size_t n, tmp_file_t *t)205 static sort_job_t *create_sort_job(void **recs, size_t n, tmp_file_t *t)
206 {
207     sort_job_t *job = (sort_job_t *) calloc(1, sizeof(sort_job_t));
208     if (job) {
209         job->records = recs;
210         job->n = n;
211         job->tmp_file = t;
212     }
213 
214     return job;
215 }
216 
217 
free_sort_job(sort_job_t *job, parallel_sorter_t *sorter)218 static void free_sort_job(sort_job_t *job, parallel_sorter_t *sorter)
219 {
220     size_t i;
221 
222     if (job) {
223         for (i = 0; i < job->n; i++) {
224             (*sorter->ctx->free_record)(job->records[i], sorter->ctx->user_ctx);
225         }
226 
227         free(job->records);
228         free(job);
229     }
230 }
231 
232 
create_parallel_sorter(size_t workers, file_sort_ctx_t *ctx)233 static parallel_sorter_t *create_parallel_sorter(size_t workers,
234                                                  file_sort_ctx_t *ctx)
235 {
236     size_t i, j;
237     parallel_sorter_t *s = (parallel_sorter_t *) calloc(1, sizeof(parallel_sorter_t));
238     if (!s) {
239         return NULL;
240     }
241 
242     s->nworkers = workers;
243     s->free_workers = workers;
244     cb_mutex_initialize(&s->mutex);
245     cb_cond_initialize(&s->cond);
246     s->finished = 0;
247     s->error = FILE_SORTER_SUCCESS;
248     s->job = NULL;
249     s->ctx = ctx;
250 
251     s->threads = (cb_thread_t *) calloc(workers, sizeof(cb_thread_t));
252     if (!s->threads) {
253         goto failure;
254     }
255 
256     for (i = 0; i < workers; i++) {
257         if (cb_create_thread(&s->threads[i], &sort_worker, (void *) s, 0) < 0) {
258             cb_mutex_enter(&s->mutex);
259             s->finished = 1;
260             cb_mutex_exit(&s->mutex);
261             cb_cond_broadcast(&s->cond);
262 
263             for (j = 0; j < i; j++) {
264                 cb_join_thread(s->threads[j]);
265             }
266 
267             goto failure;
268         }
269     }
270 
271     return s;
272 
273 failure:
274     cb_mutex_destroy(&s->mutex);
275     cb_cond_destroy(&s->cond);
276     free(s->threads);
277     free(s);
278     return NULL;
279 }
280 
281 
free_parallel_sorter(parallel_sorter_t *s)282 static void free_parallel_sorter(parallel_sorter_t *s)
283 {
284     if (s) {
285         cb_mutex_destroy(&s->mutex);
286         cb_cond_destroy(&s->cond);
287         free_sort_job(s->job, s);
288         free(s->threads);
289         free(s);
290     }
291 }
292 
293 
sort_worker(void *args)294 static void sort_worker(void *args)
295 {
296     file_sorter_error_t ret;
297     sort_job_t *job;
298     parallel_sorter_t *s = (parallel_sorter_t *) args;
299 
300     /*
301      * If a job is available, pick it up and notify all waiters
302      * If job is not available, wait on condition variable
303      * Once a job is complete, notify all waiters
304      * Loop it over until finished flag becomes true
305      */
306     while (1) {
307         cb_mutex_enter(&s->mutex);
308         if (s->finished) {
309             cb_mutex_exit(&s->mutex);
310             return;
311         }
312 
313         if (s->job) {
314             job = s->job;
315             s->job = NULL;
316             s->free_workers -= 1;
317             cb_mutex_exit(&s->mutex);
318             cb_cond_broadcast(&s->cond);
319 
320             ret = write_record_list(job->records, job->n, job->tmp_file, s->ctx);
321             free_sort_job(job, s);
322             if (ret != FILE_SORTER_SUCCESS) {
323                 cb_mutex_enter(&s->mutex);
324                 s->finished = 1;
325                 s->error = ret;
326                 cb_mutex_exit(&s->mutex);
327                 cb_cond_broadcast(&s->cond);
328                 return;
329             }
330 
331             cb_mutex_enter(&s->mutex);
332             s->free_workers += 1;
333             cb_mutex_exit(&s->mutex);
334             cb_cond_broadcast(&s->cond);
335         } else {
336             cb_cond_wait(&s->cond, &s->mutex);
337             cb_mutex_exit(&s->mutex);
338         }
339     }
340 }
341 
342 
343  // Add a job and block wait until a worker picks up the job
parallel_sorter_add_job(parallel_sorter_t *s, void **records, size_t n)344 static file_sorter_error_t parallel_sorter_add_job(parallel_sorter_t *s,
345                                                   void **records,
346                                                   size_t n)
347 {
348     file_sorter_error_t ret;
349     sort_job_t *job;
350     tmp_file_t *tmp_file = create_tmp_file(s->ctx);
351 
352     if (tmp_file == NULL) {
353         return FILE_SORTER_ERROR_MK_TMP_FILE;
354     }
355 
356     job = create_sort_job(records, n, tmp_file);
357     if (!job) {
358         return FILE_SORTER_ERROR_ALLOC;
359     }
360 
361     cb_mutex_enter(&s->mutex);
362     if (s->finished) {
363         ret = s->error;
364         cb_mutex_exit(&s->mutex);
365         return ret;
366     }
367 
368     s->job = job;
369     cb_mutex_exit(&s->mutex);
370     cb_cond_signal(&s->cond);
371 
372     cb_mutex_enter(&s->mutex);
373     while (s->job) {
374         cb_cond_wait(&s->cond, &s->mutex);
375     }
376     cb_mutex_exit(&s->mutex);
377 
378     return FILE_SORTER_SUCCESS;
379 }
380 
381 
382 // Wait until n or more workers become idle after processing current jobs
parallel_sorter_wait(parallel_sorter_t *s, size_t n)383 static file_sorter_error_t parallel_sorter_wait(parallel_sorter_t *s, size_t n)
384 {
385     while (1) {
386         cb_mutex_enter(&s->mutex);
387         if (s->finished) {
388             cb_mutex_exit(&s->mutex);
389             return s->error;
390         }
391 
392         if (s->free_workers >= n) {
393             cb_mutex_exit(&s->mutex);
394             return FILE_SORTER_SUCCESS;
395         }
396 
397         cb_cond_wait(&s->cond, &s->mutex);
398         cb_mutex_exit(&s->mutex);
399     }
400 }
401 
402 
403 // Notify all workers that we have no more jobs and wait for them to join
parallel_sorter_finish(parallel_sorter_t *s)404 static file_sorter_error_t parallel_sorter_finish(parallel_sorter_t *s)
405 {
406     file_sorter_error_t ret;
407     size_t i;
408 
409     cb_mutex_enter(&s->mutex);
410     s->finished = 1;
411     cb_mutex_exit(&s->mutex);
412     cb_cond_broadcast(&s->cond);
413 
414     for (i = 0; i < s->nworkers; i++) {
415         ret = (file_sorter_error_t) cb_join_thread(s->threads[i]);
416         if (ret != 0) {
417             return ret;
418         }
419     }
420 
421     return FILE_SORTER_SUCCESS;
422 }
423 
424 
do_sort_file(file_sort_ctx_t *ctx)425 static file_sorter_error_t do_sort_file(file_sort_ctx_t *ctx)
426 {
427     unsigned buffer_size = 0;
428     size_t i = 0;
429     size_t record_count = NSORT_RECORDS_INIT;
430     void *record;
431     int record_size;
432     file_sorter_error_t ret;
433     file_merger_feed_record_t feed_record = ctx->feed_record;
434     parallel_sorter_t *sorter;
435     void **records = (void **) calloc(record_count, sizeof(void *));
436 
437     if (records == NULL) {
438         return FILE_SORTER_ERROR_ALLOC;
439     }
440 
441     sorter = create_parallel_sorter(NSORT_THREADS, ctx);
442     if (sorter == NULL) {
443         return FILE_SORTER_ERROR_ALLOC;
444     }
445 
446     ctx->feed_record = NULL;
447 
448     i = 0;
449     while (1) {
450         record_size = (*ctx->read_record)(ctx->f, &record, ctx->user_ctx);
451         if (record_size < 0) {
452            ret = (file_sorter_error_t) record_size;
453            goto failure;
454         } else if (record_size == 0) {
455             break;
456         }
457 
458         if (records == NULL) {
459             records = (void **) calloc(record_count, sizeof(void *));
460             if (records == NULL) {
461                 ret =  FILE_SORTER_ERROR_ALLOC;
462                 goto failure;
463             }
464         }
465 
466         records[i++] = record;
467         if (i == record_count) {
468             record_count += NSORT_RECORD_INCR;
469             records = (void **) realloc(records, record_count * sizeof(void *));
470             if (records == NULL) {
471                 ret =  FILE_SORTER_ERROR_ALLOC;
472                 goto failure;
473             }
474         }
475 
476         buffer_size += (unsigned) record_size;
477 
478         if (buffer_size >= ctx->max_buffer_size) {
479             ret = parallel_sorter_add_job(sorter, records, i);
480             if (ret != FILE_SORTER_SUCCESS) {
481                 goto failure;
482             }
483 
484             ret = parallel_sorter_wait(sorter, 1);
485             if (ret != FILE_SORTER_SUCCESS) {
486                 goto failure;
487             }
488 
489             records = NULL;
490             record_count = NSORT_RECORDS_INIT;
491             buffer_size = 0;
492             i = 0;
493         }
494 
495         if (ctx->active_tmp_files >= ctx->num_tmp_files) {
496             unsigned start, end, next_level;
497 
498             ret = parallel_sorter_wait(sorter, NSORT_THREADS);
499             if (ret != FILE_SORTER_SUCCESS) {
500                 goto failure;
501             }
502 
503             pick_merge_files(ctx, &start, &end, &next_level);
504             assert(next_level > 1);
505             ret = merge_tmp_files(ctx, start, end, next_level);
506             if (ret != FILE_SORTER_SUCCESS) {
507                 goto failure;
508             }
509         }
510     }
511 
512     fclose(ctx->f);
513     ctx->f = NULL;
514 
515     if (ctx->active_tmp_files == 0 && buffer_size == 0) {
516         /* empty source file */
517         return FILE_SORTER_SUCCESS;
518     }
519 
520     if (buffer_size > 0) {
521         ret = parallel_sorter_add_job(sorter, records, i);
522         if (ret != FILE_SORTER_SUCCESS) {
523             goto failure;
524         }
525     }
526 
527     ret = parallel_sorter_finish(sorter);
528     if (ret != FILE_SORTER_SUCCESS) {
529         goto failure;
530     }
531 
532     assert(ctx->active_tmp_files > 0);
533 
534     if (!ctx->skip_writeback && remove(ctx->source_file) != 0) {
535         ret = FILE_SORTER_ERROR_DELETE_FILE;
536         goto failure;
537     }
538 
539     // Restore feed_record callback for final merge */
540     ctx->feed_record = feed_record;
541     if (ctx->active_tmp_files == 1) {
542         if (ctx->feed_record) {
543             ret = iterate_records_file(ctx, ctx->tmp_files[0].name);
544             if (ret != FILE_SORTER_SUCCESS) {
545                 goto failure;
546             }
547 
548             if (ctx->skip_writeback && remove(ctx->tmp_files[0].name) != 0) {
549                 ret = FILE_SORTER_ERROR_DELETE_FILE;
550                 goto failure;
551             }
552         }
553 
554         if (!ctx->skip_writeback &&
555                 rename(ctx->tmp_files[0].name, ctx->source_file) != 0) {
556             ret = FILE_SORTER_ERROR_RENAME_FILE;
557             goto failure;
558         }
559     } else if (ctx->active_tmp_files > 1) {
560         ret = merge_tmp_files(ctx, 0, ctx->active_tmp_files, 0);
561         if (ret != FILE_SORTER_SUCCESS) {
562             goto failure;
563         }
564     }
565 
566     ret = FILE_SORTER_SUCCESS;
567 
568  failure:
569     free_parallel_sorter(sorter);
570     return ret;
571 }
572 
573 
write_record_list(void **records, size_t n, tmp_file_t *tmp_file, file_sort_ctx_t *ctx)574 static file_sorter_error_t write_record_list(void **records,
575                                              size_t n,
576                                              tmp_file_t *tmp_file,
577                                              file_sort_ctx_t *ctx)
578 {
579     size_t i;
580     FILE *f;
581 
582     sort_records(records, n, ctx);
583 
584     remove(tmp_file->name);
585     f = fopen(tmp_file->name, "ab");
586     if (f == NULL) {
587         return FILE_SORTER_ERROR_MK_TMP_FILE;
588     }
589 
590     if (ftell(f) != 0) {
591         /* File already existed. It's not supposed to exist, and if it
592          * exists it means a temporary file name collision happened or
593          * some previous sort left temporary files that were never
594          * deleted. */
595         return FILE_SORTER_ERROR_NOT_EMPTY_TMP_FILE;
596     }
597 
598 
599     for (i = 0; i < n; i++) {
600         file_sorter_error_t err;
601         err = static_cast<file_sorter_error_t>((*ctx->write_record)(f, records[i], ctx->user_ctx));
602         (*ctx->free_record)(records[i], ctx->user_ctx);
603         records[i] = NULL;
604 
605         if (err != FILE_SORTER_SUCCESS) {
606             fclose(f);
607             return err;
608         }
609     }
610 
611     fclose(f);
612 
613     return FILE_SORTER_SUCCESS;
614 }
615 
616 
create_tmp_file(file_sort_ctx_t *ctx)617 static tmp_file_t *create_tmp_file(file_sort_ctx_t *ctx)
618 {
619     unsigned i = ctx->active_tmp_files;
620 
621     assert(ctx->active_tmp_files < ctx->num_tmp_files);
622     assert(ctx->tmp_files[i].name == NULL);
623     assert(ctx->tmp_files[i].level == 0);
624 
625     ctx->tmp_files[i].name = sorter_tmp_file_path(ctx->tmp_dir,
626         ctx->tmp_file_prefix);
627     if (ctx->tmp_files[i].name == NULL) {
628         return NULL;
629     }
630 
631     ctx->tmp_files[i].level = 1;
632     ctx->active_tmp_files += 1;
633 
634     return &ctx->tmp_files[i];
635 }
636 
637 
qsort_cmp(const void *a, const void *b, void *ctx)638 static int qsort_cmp(const void *a, const void *b, void *ctx)
639 {
640     file_sort_ctx_t *sort_ctx = (file_sort_ctx_t *) ctx;
641     const void **k1 = (const void **) a, **k2 = (const void **) b;
642     return (*sort_ctx->compare_records)(*k1, *k2, sort_ctx->user_ctx);
643 }
644 
645 
sort_records(void **records, size_t n, file_sort_ctx_t *ctx)646 static void sort_records(void **records, size_t n,
647                                          file_sort_ctx_t *ctx)
648 {
649     quicksort(records, n, sizeof(void *), &qsort_cmp, ctx);
650 }
651 
652 
tmp_file_cmp(const void *a, const void *b)653 static int tmp_file_cmp(const void *a, const void *b)
654 {
655     unsigned x = ((const tmp_file_t *) a)->level;
656     unsigned y = ((const tmp_file_t *) b)->level;
657 
658     if (x == 0) {
659         return 1;
660     }
661     if (y == 0) {
662         return -1;
663     }
664 
665     return x - y;
666 }
667 
668 
pick_merge_files(file_sort_ctx_t *ctx, unsigned *start, unsigned *end, unsigned *next_level)669 static void pick_merge_files(file_sort_ctx_t *ctx,
670                              unsigned *start,
671                              unsigned *end,
672                              unsigned *next_level)
673 {
674     unsigned i, j, level;
675 
676     qsort(ctx->tmp_files, ctx->active_tmp_files, sizeof(tmp_file_t), tmp_file_cmp);
677 
678     for (i = 0; i < ctx->active_tmp_files; i = j) {
679         level = ctx->tmp_files[i].level;
680         assert(level > 0);
681         j = i + 1;
682 
683         while (j < ctx->active_tmp_files) {
684             assert(ctx->tmp_files[j].level > 0);
685             if (ctx->tmp_files[j].level != level) {
686                 break;
687             }
688             j += 1;
689         }
690 
691         if ((j - i) > 1) {
692             *start = i;
693             *end = j;
694             *next_level = (j - i) * level;
695             return;
696         }
697     }
698 
699     /* All files have a different level. */
700     assert(ctx->active_tmp_files == ctx->num_tmp_files);
701     assert(ctx->active_tmp_files >= 2);
702     *start = 0;
703     *end = 2;
704     *next_level = ctx->tmp_files[0].level + ctx->tmp_files[1].level;
705 }
706 
707 
merge_tmp_files(file_sort_ctx_t *ctx, unsigned start, unsigned end, unsigned next_level)708 static file_sorter_error_t merge_tmp_files(file_sort_ctx_t *ctx,
709                                            unsigned start,
710                                            unsigned end,
711                                            unsigned next_level)
712 {
713     char *dest_tmp_file;
714     const char **files;
715     unsigned nfiles, i;
716     file_sorter_error_t ret;
717     file_merger_feed_record_t feed_record = NULL;
718 
719     nfiles = end - start;
720     files = (const char **) malloc(sizeof(char *) * nfiles);
721     if (files == NULL) {
722         return FILE_SORTER_ERROR_ALLOC;
723     }
724     for (i = start; i < end; ++i) {
725         files[i - start] = ctx->tmp_files[i].name;
726         assert(files[i - start] != NULL);
727     }
728 
729     if (next_level == 0) {
730         /* Final merge iteration. */
731         if (ctx->skip_writeback) {
732             dest_tmp_file = NULL;
733         } else {
734             dest_tmp_file = (char *) ctx->source_file;
735         }
736 
737         feed_record = ctx->feed_record;
738     } else {
739         dest_tmp_file = sorter_tmp_file_path(ctx->tmp_dir,
740             ctx->tmp_file_prefix);
741         if (dest_tmp_file == NULL) {
742             free(files);
743             return FILE_SORTER_ERROR_MK_TMP_FILE;
744         }
745     }
746 
747     ret = (file_sorter_error_t) merge_files(files,
748                                             nfiles,
749                                             dest_tmp_file,
750                                             ctx->read_record,
751                                             ctx->write_record,
752                                             feed_record,
753                                             ctx->compare_records,
754                                             NULL,
755                                             ctx->free_record,
756                                             ctx->skip_writeback,
757                                             ctx->user_ctx);
758 
759     free(files);
760 
761     if (ret != FILE_SORTER_SUCCESS) {
762         if (dest_tmp_file != NULL && dest_tmp_file != ctx->source_file) {
763             remove(dest_tmp_file);
764             free(dest_tmp_file);
765         }
766         return ret;
767     }
768 
769     for (i = start; i < end; ++i) {
770         if (remove(ctx->tmp_files[i].name) != 0) {
771             if (dest_tmp_file != ctx->source_file) {
772                 free(dest_tmp_file);
773             }
774             return FILE_SORTER_ERROR_DELETE_FILE;
775         }
776         free(ctx->tmp_files[i].name);
777         ctx->tmp_files[i].name = NULL;
778         ctx->tmp_files[i].level = 0;
779     }
780 
781     qsort(ctx->tmp_files + start, ctx->num_tmp_files - start,
782           sizeof(tmp_file_t), tmp_file_cmp);
783     ctx->active_tmp_files -= nfiles;
784 
785     if (dest_tmp_file != ctx->source_file) {
786         i = ctx->active_tmp_files;
787         ctx->tmp_files[i].name = dest_tmp_file;
788         ctx->tmp_files[i].level = next_level;
789         ctx->active_tmp_files += 1;
790     }
791 
792     return FILE_SORTER_SUCCESS;
793 }
794 
iterate_records_file(file_sort_ctx_t *ctx, const char *file)795 static file_sorter_error_t iterate_records_file(file_sort_ctx_t *ctx,
796                                                const char *file)
797 {
798     void *record_data = NULL;
799     int record_len;
800     FILE *f = fopen(file, "rb");
801     int ret = FILE_SORTER_SUCCESS;
802 
803     if (f == NULL) {
804         return FILE_SORTER_ERROR_OPEN_FILE;
805     }
806 
807     while (1) {
808         record_len = (*ctx->read_record)(f, &record_data, ctx->user_ctx);
809         if (record_len == 0) {
810             record_data = NULL;
811             break;
812         } else if (record_len < 0) {
813             ret = record_len;
814             goto cleanup;
815         } else {
816             ret = (*ctx->feed_record)(record_data, ctx->user_ctx);
817             if (ret != FILE_SORTER_SUCCESS) {
818                 goto cleanup;
819             }
820 
821             (*ctx->free_record)(record_data, ctx->user_ctx);
822         }
823     }
824 
825 cleanup:
826     (*ctx->free_record)(record_data, ctx->user_ctx);
827     if (f != NULL) {
828         fclose(f);
829     }
830 
831     return (file_sorter_error_t) ret;
832 }
833 
sorter_random_name(char *tmpl, int totlen, int suffixlen)834 int sorter_random_name(char *tmpl, int totlen, int suffixlen) {
835     static unsigned int value = 0;
836     tmpl = tmpl + totlen - suffixlen;
837 #ifdef WINDOWS
838     _snprintf(tmpl, suffixlen, ".%d", value);
839 #else
840     snprintf(tmpl, suffixlen, ".%d", value);
841 #endif
842     if (++value > 2 << 18) {
843         value = 0;
844     }
845     return 0;
846 }
847 
sorter_tmp_file_path(const char *tmp_dir, const char *prefix)848 char *sorter_tmp_file_path(const char *tmp_dir, const char *prefix) {
849     char *file_path;
850     size_t tmp_dir_len, prefix_len, total_len;
851     tmp_dir_len = strlen(tmp_dir);
852     prefix_len = strlen(prefix);
853     total_len = tmp_dir_len + 1 + prefix_len + sizeof(SORTER_TMP_FILE_SUFFIX);
854     file_path = (char *) malloc(total_len);
855 
856     if (file_path == NULL) {
857         return NULL;
858     }
859 
860     memcpy(file_path, tmp_dir, tmp_dir_len);
861     /* Windows specific file API functions and stdio file functions on Windows
862      * convert forward slashes to back slashes. */
863     file_path[tmp_dir_len] = '/';
864     memcpy(file_path + tmp_dir_len + 1, prefix, prefix_len);
865     sorter_random_name(file_path, total_len - 1, sizeof(SORTER_TMP_FILE_SUFFIX)
866         - 1);
867     return file_path;
868 }
869