1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 /**
4 * @copyright 2013 Couchbase, Inc.
5 *
6 * @author Filipe Manana <filipe@couchbase.com>
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 * use this file except in compliance with the License. You may obtain a copy of
10 * the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 * License for the specific language governing permissions and limitations under
18 * the License.
19 **/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <assert.h>
24 #include <string.h>
25 #include "file_sorter.h"
26 #include "file_name_utils.h"
27 #include "quicksort.h"
28
29 #define NSORT_RECORDS_INIT 500000
30 #define NSORT_RECORD_INCR 100000
31 #define NSORT_THREADS 2
32 #define SORTER_TMP_FILE_SUFFIX ".XXXXXX"
33
34 typedef struct {
35 char *name;
36 unsigned level;
37 } tmp_file_t;
38
39 typedef struct {
40 const char *tmp_dir;
41 const char *source_file;
42 char *tmp_file_prefix;
43 unsigned num_tmp_files;
44 unsigned max_buffer_size;
45 file_merger_read_record_t read_record;
46 file_merger_write_record_t write_record;
47 file_merger_feed_record_t feed_record;
48 file_merger_compare_records_t compare_records;
49 file_merger_record_free_t free_record;
50 void *user_ctx;
51 FILE *f;
52 tmp_file_t *tmp_files;
53 unsigned active_tmp_files;
54 int skip_writeback;
55 } file_sort_ctx_t;
56
57 // For parallel sorter
58 typedef struct {
59 void **records;
60 tmp_file_t *tmp_file;
61 size_t n;
62 } sort_job_t;
63
64 typedef struct {
65 size_t nworkers;
66 size_t free_workers;
67 cb_cond_t cond;
68 cb_mutex_t mutex;
69 int finished;
70 file_sorter_error_t error;
71 sort_job_t *job;
72 file_sort_ctx_t *ctx;
73 cb_thread_t *threads;
74 } parallel_sorter_t;
75
76
77 static file_sorter_error_t do_sort_file(file_sort_ctx_t *ctx);
78
79 static void sort_records(void **records, size_t n,
80 file_sort_ctx_t *ctx);
81
82 static tmp_file_t *create_tmp_file(file_sort_ctx_t *ctx);
83
84 static file_sorter_error_t write_record_list(void **records,
85 size_t n,
86 tmp_file_t *tmp_file,
87 file_sort_ctx_t *ctx);
88
89 static void pick_merge_files(file_sort_ctx_t *ctx,
90 unsigned *start,
91 unsigned *end,
92 unsigned *next_level);
93
94 static file_sorter_error_t merge_tmp_files(file_sort_ctx_t *ctx,
95 unsigned start,
96 unsigned end,
97 unsigned next_level);
98
99 static file_sorter_error_t iterate_records_file(file_sort_ctx_t *ctx,
100 const char *file);
101
102
103 static sort_job_t *create_sort_job(void **recs, size_t n, tmp_file_t *t);
104
105 static void free_sort_job(sort_job_t *job, parallel_sorter_t *sorter);
106
107 static parallel_sorter_t *create_parallel_sorter(size_t workers,
108 file_sort_ctx_t *ctx);
109
110 static void free_parallel_sorter(parallel_sorter_t *s);
111
112 static void sort_worker(void *args);
113
114 static file_sorter_error_t parallel_sorter_add_job(parallel_sorter_t *s,
115 void **records,
116 size_t n);
117
118 static file_sorter_error_t parallel_sorter_wait(parallel_sorter_t *s, size_t n);
119
120 static file_sorter_error_t parallel_sorter_finish(parallel_sorter_t *s);
121
122 static int sorter_random_name(char *tmpl, int totlen, int suffixlen);
123
124 static char *sorter_tmp_file_path(const char *tmp_dir, const char *prefix);
125
sort_file(const char *source_file, const char *tmp_dir, unsigned num_tmp_files, unsigned max_buffer_size, file_merger_read_record_t read_record, file_merger_write_record_t write_record, file_merger_feed_record_t feed_record, file_merger_compare_records_t compare_records, file_merger_record_free_t free_record, int skip_writeback, void *user_ctx)126 file_sorter_error_t sort_file(const char *source_file,
127 const char *tmp_dir,
128 unsigned num_tmp_files,
129 unsigned max_buffer_size,
130 file_merger_read_record_t read_record,
131 file_merger_write_record_t write_record,
132 file_merger_feed_record_t feed_record,
133 file_merger_compare_records_t compare_records,
134 file_merger_record_free_t free_record,
135 int skip_writeback,
136 void *user_ctx)
137 {
138 file_sort_ctx_t ctx;
139 unsigned i;
140 file_sorter_error_t ret;
141
142 if (num_tmp_files <= 1) {
143 return FILE_SORTER_ERROR_BAD_ARG;
144 }
145
146 ctx.tmp_file_prefix = file_basename(source_file);
147 if (ctx.tmp_file_prefix == NULL) {
148 return FILE_SORTER_ERROR_TMP_FILE_BASENAME;
149 }
150
151 ctx.tmp_dir = tmp_dir;
152 ctx.source_file = source_file;
153 ctx.num_tmp_files = num_tmp_files;
154 ctx.max_buffer_size = max_buffer_size;
155 ctx.read_record = read_record;
156 ctx.write_record = write_record;
157 ctx.feed_record = feed_record;
158 ctx.compare_records = compare_records;
159 ctx.free_record = free_record;
160 ctx.user_ctx = user_ctx;
161 ctx.active_tmp_files = 0;
162 ctx.skip_writeback = skip_writeback;
163
164 if (skip_writeback && !feed_record) {
165 return FILE_SORTER_ERROR_MISSING_CALLBACK;
166 }
167
168 ctx.f = fopen(source_file, "rb");
169 if (ctx.f == NULL) {
170 free(ctx.tmp_file_prefix);
171 return FILE_SORTER_ERROR_OPEN_FILE;
172 }
173
174 ctx.tmp_files = (tmp_file_t *) malloc(sizeof(tmp_file_t) * num_tmp_files);
175
176 if (ctx.tmp_files == NULL) {
177 fclose(ctx.f);
178 free(ctx.tmp_file_prefix);
179 return FILE_SORTER_ERROR_ALLOC;
180 }
181
182 for (i = 0; i < num_tmp_files; ++i) {
183 ctx.tmp_files[i].name = NULL;
184 ctx.tmp_files[i].level = 0;
185 }
186
187 ret = do_sort_file(&ctx);
188
189 if (ctx.f != NULL) {
190 fclose(ctx.f);
191 }
192 for (i = 0; i < ctx.active_tmp_files; ++i) {
193 if (ctx.tmp_files[i].name != NULL) {
194 remove(ctx.tmp_files[i].name);
195 free(ctx.tmp_files[i].name);
196 }
197 }
198 free(ctx.tmp_files);
199 free(ctx.tmp_file_prefix);
200
201 return ret;
202 }
203
204
create_sort_job(void **recs, size_t n, tmp_file_t *t)205 static sort_job_t *create_sort_job(void **recs, size_t n, tmp_file_t *t)
206 {
207 sort_job_t *job = (sort_job_t *) calloc(1, sizeof(sort_job_t));
208 if (job) {
209 job->records = recs;
210 job->n = n;
211 job->tmp_file = t;
212 }
213
214 return job;
215 }
216
217
free_sort_job(sort_job_t *job, parallel_sorter_t *sorter)218 static void free_sort_job(sort_job_t *job, parallel_sorter_t *sorter)
219 {
220 size_t i;
221
222 if (job) {
223 for (i = 0; i < job->n; i++) {
224 (*sorter->ctx->free_record)(job->records[i], sorter->ctx->user_ctx);
225 }
226
227 free(job->records);
228 free(job);
229 }
230 }
231
232
create_parallel_sorter(size_t workers, file_sort_ctx_t *ctx)233 static parallel_sorter_t *create_parallel_sorter(size_t workers,
234 file_sort_ctx_t *ctx)
235 {
236 size_t i, j;
237 parallel_sorter_t *s = (parallel_sorter_t *) calloc(1, sizeof(parallel_sorter_t));
238 if (!s) {
239 return NULL;
240 }
241
242 s->nworkers = workers;
243 s->free_workers = workers;
244 cb_mutex_initialize(&s->mutex);
245 cb_cond_initialize(&s->cond);
246 s->finished = 0;
247 s->error = FILE_SORTER_SUCCESS;
248 s->job = NULL;
249 s->ctx = ctx;
250
251 s->threads = (cb_thread_t *) calloc(workers, sizeof(cb_thread_t));
252 if (!s->threads) {
253 goto failure;
254 }
255
256 for (i = 0; i < workers; i++) {
257 if (cb_create_thread(&s->threads[i], &sort_worker, (void *) s, 0) < 0) {
258 cb_mutex_enter(&s->mutex);
259 s->finished = 1;
260 cb_mutex_exit(&s->mutex);
261 cb_cond_broadcast(&s->cond);
262
263 for (j = 0; j < i; j++) {
264 cb_join_thread(s->threads[j]);
265 }
266
267 goto failure;
268 }
269 }
270
271 return s;
272
273 failure:
274 cb_mutex_destroy(&s->mutex);
275 cb_cond_destroy(&s->cond);
276 free(s->threads);
277 free(s);
278 return NULL;
279 }
280
281
free_parallel_sorter(parallel_sorter_t *s)282 static void free_parallel_sorter(parallel_sorter_t *s)
283 {
284 if (s) {
285 cb_mutex_destroy(&s->mutex);
286 cb_cond_destroy(&s->cond);
287 free_sort_job(s->job, s);
288 free(s->threads);
289 free(s);
290 }
291 }
292
293
sort_worker(void *args)294 static void sort_worker(void *args)
295 {
296 file_sorter_error_t ret;
297 sort_job_t *job;
298 parallel_sorter_t *s = (parallel_sorter_t *) args;
299
300 /*
301 * If a job is available, pick it up and notify all waiters
302 * If job is not available, wait on condition variable
303 * Once a job is complete, notify all waiters
304 * Loop it over until finished flag becomes true
305 */
306 while (1) {
307 cb_mutex_enter(&s->mutex);
308 if (s->finished) {
309 cb_mutex_exit(&s->mutex);
310 return;
311 }
312
313 if (s->job) {
314 job = s->job;
315 s->job = NULL;
316 s->free_workers -= 1;
317 cb_mutex_exit(&s->mutex);
318 cb_cond_broadcast(&s->cond);
319
320 ret = write_record_list(job->records, job->n, job->tmp_file, s->ctx);
321 free_sort_job(job, s);
322 if (ret != FILE_SORTER_SUCCESS) {
323 cb_mutex_enter(&s->mutex);
324 s->finished = 1;
325 s->error = ret;
326 cb_mutex_exit(&s->mutex);
327 cb_cond_broadcast(&s->cond);
328 return;
329 }
330
331 cb_mutex_enter(&s->mutex);
332 s->free_workers += 1;
333 cb_mutex_exit(&s->mutex);
334 cb_cond_broadcast(&s->cond);
335 } else {
336 cb_cond_wait(&s->cond, &s->mutex);
337 cb_mutex_exit(&s->mutex);
338 }
339 }
340 }
341
342
343 // Add a job and block wait until a worker picks up the job
parallel_sorter_add_job(parallel_sorter_t *s, void **records, size_t n)344 static file_sorter_error_t parallel_sorter_add_job(parallel_sorter_t *s,
345 void **records,
346 size_t n)
347 {
348 file_sorter_error_t ret;
349 sort_job_t *job;
350 tmp_file_t *tmp_file = create_tmp_file(s->ctx);
351
352 if (tmp_file == NULL) {
353 return FILE_SORTER_ERROR_MK_TMP_FILE;
354 }
355
356 job = create_sort_job(records, n, tmp_file);
357 if (!job) {
358 return FILE_SORTER_ERROR_ALLOC;
359 }
360
361 cb_mutex_enter(&s->mutex);
362 if (s->finished) {
363 ret = s->error;
364 cb_mutex_exit(&s->mutex);
365 return ret;
366 }
367
368 s->job = job;
369 cb_mutex_exit(&s->mutex);
370 cb_cond_signal(&s->cond);
371
372 cb_mutex_enter(&s->mutex);
373 while (s->job) {
374 cb_cond_wait(&s->cond, &s->mutex);
375 }
376 cb_mutex_exit(&s->mutex);
377
378 return FILE_SORTER_SUCCESS;
379 }
380
381
382 // Wait until n or more workers become idle after processing current jobs
parallel_sorter_wait(parallel_sorter_t *s, size_t n)383 static file_sorter_error_t parallel_sorter_wait(parallel_sorter_t *s, size_t n)
384 {
385 while (1) {
386 cb_mutex_enter(&s->mutex);
387 if (s->finished) {
388 cb_mutex_exit(&s->mutex);
389 return s->error;
390 }
391
392 if (s->free_workers >= n) {
393 cb_mutex_exit(&s->mutex);
394 return FILE_SORTER_SUCCESS;
395 }
396
397 cb_cond_wait(&s->cond, &s->mutex);
398 cb_mutex_exit(&s->mutex);
399 }
400 }
401
402
403 // Notify all workers that we have no more jobs and wait for them to join
parallel_sorter_finish(parallel_sorter_t *s)404 static file_sorter_error_t parallel_sorter_finish(parallel_sorter_t *s)
405 {
406 file_sorter_error_t ret;
407 size_t i;
408
409 cb_mutex_enter(&s->mutex);
410 s->finished = 1;
411 cb_mutex_exit(&s->mutex);
412 cb_cond_broadcast(&s->cond);
413
414 for (i = 0; i < s->nworkers; i++) {
415 ret = (file_sorter_error_t) cb_join_thread(s->threads[i]);
416 if (ret != 0) {
417 return ret;
418 }
419 }
420
421 return FILE_SORTER_SUCCESS;
422 }
423
424
do_sort_file(file_sort_ctx_t *ctx)425 static file_sorter_error_t do_sort_file(file_sort_ctx_t *ctx)
426 {
427 unsigned buffer_size = 0;
428 size_t i = 0;
429 size_t record_count = NSORT_RECORDS_INIT;
430 void *record;
431 int record_size;
432 file_sorter_error_t ret;
433 file_merger_feed_record_t feed_record = ctx->feed_record;
434 parallel_sorter_t *sorter;
435 void **records = (void **) calloc(record_count, sizeof(void *));
436
437 if (records == NULL) {
438 return FILE_SORTER_ERROR_ALLOC;
439 }
440
441 sorter = create_parallel_sorter(NSORT_THREADS, ctx);
442 if (sorter == NULL) {
443 return FILE_SORTER_ERROR_ALLOC;
444 }
445
446 ctx->feed_record = NULL;
447
448 i = 0;
449 while (1) {
450 record_size = (*ctx->read_record)(ctx->f, &record, ctx->user_ctx);
451 if (record_size < 0) {
452 ret = (file_sorter_error_t) record_size;
453 goto failure;
454 } else if (record_size == 0) {
455 break;
456 }
457
458 if (records == NULL) {
459 records = (void **) calloc(record_count, sizeof(void *));
460 if (records == NULL) {
461 ret = FILE_SORTER_ERROR_ALLOC;
462 goto failure;
463 }
464 }
465
466 records[i++] = record;
467 if (i == record_count) {
468 record_count += NSORT_RECORD_INCR;
469 records = (void **) realloc(records, record_count * sizeof(void *));
470 if (records == NULL) {
471 ret = FILE_SORTER_ERROR_ALLOC;
472 goto failure;
473 }
474 }
475
476 buffer_size += (unsigned) record_size;
477
478 if (buffer_size >= ctx->max_buffer_size) {
479 ret = parallel_sorter_add_job(sorter, records, i);
480 if (ret != FILE_SORTER_SUCCESS) {
481 goto failure;
482 }
483
484 ret = parallel_sorter_wait(sorter, 1);
485 if (ret != FILE_SORTER_SUCCESS) {
486 goto failure;
487 }
488
489 records = NULL;
490 record_count = NSORT_RECORDS_INIT;
491 buffer_size = 0;
492 i = 0;
493 }
494
495 if (ctx->active_tmp_files >= ctx->num_tmp_files) {
496 unsigned start, end, next_level;
497
498 ret = parallel_sorter_wait(sorter, NSORT_THREADS);
499 if (ret != FILE_SORTER_SUCCESS) {
500 goto failure;
501 }
502
503 pick_merge_files(ctx, &start, &end, &next_level);
504 assert(next_level > 1);
505 ret = merge_tmp_files(ctx, start, end, next_level);
506 if (ret != FILE_SORTER_SUCCESS) {
507 goto failure;
508 }
509 }
510 }
511
512 fclose(ctx->f);
513 ctx->f = NULL;
514
515 if (ctx->active_tmp_files == 0 && buffer_size == 0) {
516 /* empty source file */
517 return FILE_SORTER_SUCCESS;
518 }
519
520 if (buffer_size > 0) {
521 ret = parallel_sorter_add_job(sorter, records, i);
522 if (ret != FILE_SORTER_SUCCESS) {
523 goto failure;
524 }
525 }
526
527 ret = parallel_sorter_finish(sorter);
528 if (ret != FILE_SORTER_SUCCESS) {
529 goto failure;
530 }
531
532 assert(ctx->active_tmp_files > 0);
533
534 if (!ctx->skip_writeback && remove(ctx->source_file) != 0) {
535 ret = FILE_SORTER_ERROR_DELETE_FILE;
536 goto failure;
537 }
538
539 // Restore feed_record callback for final merge */
540 ctx->feed_record = feed_record;
541 if (ctx->active_tmp_files == 1) {
542 if (ctx->feed_record) {
543 ret = iterate_records_file(ctx, ctx->tmp_files[0].name);
544 if (ret != FILE_SORTER_SUCCESS) {
545 goto failure;
546 }
547
548 if (ctx->skip_writeback && remove(ctx->tmp_files[0].name) != 0) {
549 ret = FILE_SORTER_ERROR_DELETE_FILE;
550 goto failure;
551 }
552 }
553
554 if (!ctx->skip_writeback &&
555 rename(ctx->tmp_files[0].name, ctx->source_file) != 0) {
556 ret = FILE_SORTER_ERROR_RENAME_FILE;
557 goto failure;
558 }
559 } else if (ctx->active_tmp_files > 1) {
560 ret = merge_tmp_files(ctx, 0, ctx->active_tmp_files, 0);
561 if (ret != FILE_SORTER_SUCCESS) {
562 goto failure;
563 }
564 }
565
566 ret = FILE_SORTER_SUCCESS;
567
568 failure:
569 free_parallel_sorter(sorter);
570 return ret;
571 }
572
573
write_record_list(void **records, size_t n, tmp_file_t *tmp_file, file_sort_ctx_t *ctx)574 static file_sorter_error_t write_record_list(void **records,
575 size_t n,
576 tmp_file_t *tmp_file,
577 file_sort_ctx_t *ctx)
578 {
579 size_t i;
580 FILE *f;
581
582 sort_records(records, n, ctx);
583
584 remove(tmp_file->name);
585 f = fopen(tmp_file->name, "ab");
586 if (f == NULL) {
587 return FILE_SORTER_ERROR_MK_TMP_FILE;
588 }
589
590 if (ftell(f) != 0) {
591 /* File already existed. It's not supposed to exist, and if it
592 * exists it means a temporary file name collision happened or
593 * some previous sort left temporary files that were never
594 * deleted. */
595 return FILE_SORTER_ERROR_NOT_EMPTY_TMP_FILE;
596 }
597
598
599 for (i = 0; i < n; i++) {
600 file_sorter_error_t err;
601 err = static_cast<file_sorter_error_t>((*ctx->write_record)(f, records[i], ctx->user_ctx));
602 (*ctx->free_record)(records[i], ctx->user_ctx);
603 records[i] = NULL;
604
605 if (err != FILE_SORTER_SUCCESS) {
606 fclose(f);
607 return err;
608 }
609 }
610
611 fclose(f);
612
613 return FILE_SORTER_SUCCESS;
614 }
615
616
create_tmp_file(file_sort_ctx_t *ctx)617 static tmp_file_t *create_tmp_file(file_sort_ctx_t *ctx)
618 {
619 unsigned i = ctx->active_tmp_files;
620
621 assert(ctx->active_tmp_files < ctx->num_tmp_files);
622 assert(ctx->tmp_files[i].name == NULL);
623 assert(ctx->tmp_files[i].level == 0);
624
625 ctx->tmp_files[i].name = sorter_tmp_file_path(ctx->tmp_dir,
626 ctx->tmp_file_prefix);
627 if (ctx->tmp_files[i].name == NULL) {
628 return NULL;
629 }
630
631 ctx->tmp_files[i].level = 1;
632 ctx->active_tmp_files += 1;
633
634 return &ctx->tmp_files[i];
635 }
636
637
qsort_cmp(const void *a, const void *b, void *ctx)638 static int qsort_cmp(const void *a, const void *b, void *ctx)
639 {
640 file_sort_ctx_t *sort_ctx = (file_sort_ctx_t *) ctx;
641 const void **k1 = (const void **) a, **k2 = (const void **) b;
642 return (*sort_ctx->compare_records)(*k1, *k2, sort_ctx->user_ctx);
643 }
644
645
sort_records(void **records, size_t n, file_sort_ctx_t *ctx)646 static void sort_records(void **records, size_t n,
647 file_sort_ctx_t *ctx)
648 {
649 quicksort(records, n, sizeof(void *), &qsort_cmp, ctx);
650 }
651
652
tmp_file_cmp(const void *a, const void *b)653 static int tmp_file_cmp(const void *a, const void *b)
654 {
655 unsigned x = ((const tmp_file_t *) a)->level;
656 unsigned y = ((const tmp_file_t *) b)->level;
657
658 if (x == 0) {
659 return 1;
660 }
661 if (y == 0) {
662 return -1;
663 }
664
665 return x - y;
666 }
667
668
pick_merge_files(file_sort_ctx_t *ctx, unsigned *start, unsigned *end, unsigned *next_level)669 static void pick_merge_files(file_sort_ctx_t *ctx,
670 unsigned *start,
671 unsigned *end,
672 unsigned *next_level)
673 {
674 unsigned i, j, level;
675
676 qsort(ctx->tmp_files, ctx->active_tmp_files, sizeof(tmp_file_t), tmp_file_cmp);
677
678 for (i = 0; i < ctx->active_tmp_files; i = j) {
679 level = ctx->tmp_files[i].level;
680 assert(level > 0);
681 j = i + 1;
682
683 while (j < ctx->active_tmp_files) {
684 assert(ctx->tmp_files[j].level > 0);
685 if (ctx->tmp_files[j].level != level) {
686 break;
687 }
688 j += 1;
689 }
690
691 if ((j - i) > 1) {
692 *start = i;
693 *end = j;
694 *next_level = (j - i) * level;
695 return;
696 }
697 }
698
699 /* All files have a different level. */
700 assert(ctx->active_tmp_files == ctx->num_tmp_files);
701 assert(ctx->active_tmp_files >= 2);
702 *start = 0;
703 *end = 2;
704 *next_level = ctx->tmp_files[0].level + ctx->tmp_files[1].level;
705 }
706
707
merge_tmp_files(file_sort_ctx_t *ctx, unsigned start, unsigned end, unsigned next_level)708 static file_sorter_error_t merge_tmp_files(file_sort_ctx_t *ctx,
709 unsigned start,
710 unsigned end,
711 unsigned next_level)
712 {
713 char *dest_tmp_file;
714 const char **files;
715 unsigned nfiles, i;
716 file_sorter_error_t ret;
717 file_merger_feed_record_t feed_record = NULL;
718
719 nfiles = end - start;
720 files = (const char **) malloc(sizeof(char *) * nfiles);
721 if (files == NULL) {
722 return FILE_SORTER_ERROR_ALLOC;
723 }
724 for (i = start; i < end; ++i) {
725 files[i - start] = ctx->tmp_files[i].name;
726 assert(files[i - start] != NULL);
727 }
728
729 if (next_level == 0) {
730 /* Final merge iteration. */
731 if (ctx->skip_writeback) {
732 dest_tmp_file = NULL;
733 } else {
734 dest_tmp_file = (char *) ctx->source_file;
735 }
736
737 feed_record = ctx->feed_record;
738 } else {
739 dest_tmp_file = sorter_tmp_file_path(ctx->tmp_dir,
740 ctx->tmp_file_prefix);
741 if (dest_tmp_file == NULL) {
742 free(files);
743 return FILE_SORTER_ERROR_MK_TMP_FILE;
744 }
745 }
746
747 ret = (file_sorter_error_t) merge_files(files,
748 nfiles,
749 dest_tmp_file,
750 ctx->read_record,
751 ctx->write_record,
752 feed_record,
753 ctx->compare_records,
754 NULL,
755 ctx->free_record,
756 ctx->skip_writeback,
757 ctx->user_ctx);
758
759 free(files);
760
761 if (ret != FILE_SORTER_SUCCESS) {
762 if (dest_tmp_file != NULL && dest_tmp_file != ctx->source_file) {
763 remove(dest_tmp_file);
764 free(dest_tmp_file);
765 }
766 return ret;
767 }
768
769 for (i = start; i < end; ++i) {
770 if (remove(ctx->tmp_files[i].name) != 0) {
771 if (dest_tmp_file != ctx->source_file) {
772 free(dest_tmp_file);
773 }
774 return FILE_SORTER_ERROR_DELETE_FILE;
775 }
776 free(ctx->tmp_files[i].name);
777 ctx->tmp_files[i].name = NULL;
778 ctx->tmp_files[i].level = 0;
779 }
780
781 qsort(ctx->tmp_files + start, ctx->num_tmp_files - start,
782 sizeof(tmp_file_t), tmp_file_cmp);
783 ctx->active_tmp_files -= nfiles;
784
785 if (dest_tmp_file != ctx->source_file) {
786 i = ctx->active_tmp_files;
787 ctx->tmp_files[i].name = dest_tmp_file;
788 ctx->tmp_files[i].level = next_level;
789 ctx->active_tmp_files += 1;
790 }
791
792 return FILE_SORTER_SUCCESS;
793 }
794
iterate_records_file(file_sort_ctx_t *ctx, const char *file)795 static file_sorter_error_t iterate_records_file(file_sort_ctx_t *ctx,
796 const char *file)
797 {
798 void *record_data = NULL;
799 int record_len;
800 FILE *f = fopen(file, "rb");
801 int ret = FILE_SORTER_SUCCESS;
802
803 if (f == NULL) {
804 return FILE_SORTER_ERROR_OPEN_FILE;
805 }
806
807 while (1) {
808 record_len = (*ctx->read_record)(f, &record_data, ctx->user_ctx);
809 if (record_len == 0) {
810 record_data = NULL;
811 break;
812 } else if (record_len < 0) {
813 ret = record_len;
814 goto cleanup;
815 } else {
816 ret = (*ctx->feed_record)(record_data, ctx->user_ctx);
817 if (ret != FILE_SORTER_SUCCESS) {
818 goto cleanup;
819 }
820
821 (*ctx->free_record)(record_data, ctx->user_ctx);
822 }
823 }
824
825 cleanup:
826 (*ctx->free_record)(record_data, ctx->user_ctx);
827 if (f != NULL) {
828 fclose(f);
829 }
830
831 return (file_sorter_error_t) ret;
832 }
833
sorter_random_name(char *tmpl, int totlen, int suffixlen)834 int sorter_random_name(char *tmpl, int totlen, int suffixlen) {
835 static unsigned int value = 0;
836 tmpl = tmpl + totlen - suffixlen;
837 #ifdef WINDOWS
838 _snprintf(tmpl, suffixlen, ".%d", value);
839 #else
840 snprintf(tmpl, suffixlen, ".%d", value);
841 #endif
842 if (++value > 2 << 18) {
843 value = 0;
844 }
845 return 0;
846 }
847
sorter_tmp_file_path(const char *tmp_dir, const char *prefix)848 char *sorter_tmp_file_path(const char *tmp_dir, const char *prefix) {
849 char *file_path;
850 size_t tmp_dir_len, prefix_len, total_len;
851 tmp_dir_len = strlen(tmp_dir);
852 prefix_len = strlen(prefix);
853 total_len = tmp_dir_len + 1 + prefix_len + sizeof(SORTER_TMP_FILE_SUFFIX);
854 file_path = (char *) malloc(total_len);
855
856 if (file_path == NULL) {
857 return NULL;
858 }
859
860 memcpy(file_path, tmp_dir, tmp_dir_len);
861 /* Windows specific file API functions and stdio file functions on Windows
862 * convert forward slashes to back slashes. */
863 file_path[tmp_dir_len] = '/';
864 memcpy(file_path + tmp_dir_len + 1, prefix, prefix_len);
865 sorter_random_name(file_path, total_len - 1, sizeof(SORTER_TMP_FILE_SUFFIX)
866 - 1);
867 return file_path;
868 }
869