xref: /5.5.2/couchstore/src/file_sorter.cc (revision 13d4a251)
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 /**
4  * @copyright 2013 Couchbase, Inc.
5  *
6  * @author Filipe Manana  <filipe@couchbase.com>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
9  * use this file except in compliance with the License. You may obtain a copy of
10  * the License at
11  *
12  *  http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17  * License for the specific language governing permissions and limitations under
18  * the License.
19  **/
20 
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <platform/cb_malloc.h>
25 #include <platform/cbassert.h>
26 #include <strings.h>
27 #include "file_sorter.h"
28 #include "file_name_utils.h"
29 #include "quicksort.h"
30 
31 #define NSORT_RECORDS_INIT 500000
32 #define NSORT_RECORD_INCR  100000
33 #define NSORT_THREADS 2
34 #define SORTER_TMP_FILE_SUFFIX ".XXXXXX"
35 
36 typedef struct {
37     char     *name;
38     unsigned  level;
39 } tmp_file_t;
40 
41 typedef struct {
42     const char                   *tmp_dir;
43     const char                   *source_file;
44     char                         *tmp_file_prefix;
45     unsigned                      num_tmp_files;
46     unsigned                      max_buffer_size;
47     file_merger_read_record_t     read_record;
48     file_merger_write_record_t    write_record;
49     file_merger_feed_record_t     feed_record;
50     file_merger_compare_records_t compare_records;
51     file_merger_record_free_t     free_record;
52     void                         *user_ctx;
53     FILE                         *f;
54     tmp_file_t                   *tmp_files;
55     unsigned                      active_tmp_files;
56     int                           skip_writeback;
57 } file_sort_ctx_t;
58 
59 // For parallel sorter
60 typedef struct {
61     void       **records;
62     tmp_file_t *tmp_file;
63     size_t     n;
64 } sort_job_t;
65 
66 typedef struct {
67     size_t              nworkers;
68     size_t              free_workers;
69     cb_cond_t           cond;
70     cb_mutex_t          mutex;
71     int                 finished;
72     file_sorter_error_t error;
73     sort_job_t          *job;
74     file_sort_ctx_t     *ctx;
75     cb_thread_t         *threads;
76 } parallel_sorter_t;
77 
78 
79 static file_sorter_error_t do_sort_file(file_sort_ctx_t *ctx);
80 
81 static void sort_records(void **records, size_t n,
82                                           file_sort_ctx_t *ctx);
83 
84 static tmp_file_t *create_tmp_file(file_sort_ctx_t *ctx);
85 
86 static file_sorter_error_t write_record_list(void **records,
87                                              size_t n,
88                                              tmp_file_t *tmp_file,
89                                              file_sort_ctx_t *ctx);
90 
91 static void pick_merge_files(file_sort_ctx_t *ctx,
92                              unsigned *start,
93                              unsigned *end,
94                              unsigned *next_level);
95 
96 static file_sorter_error_t merge_tmp_files(file_sort_ctx_t *ctx,
97                                            unsigned start,
98                                            unsigned end,
99                                            unsigned next_level);
100 
101 static file_sorter_error_t iterate_records_file(file_sort_ctx_t *ctx,
102                                                 const char *file);
103 
104 
105 static sort_job_t *create_sort_job(void **recs, size_t n, tmp_file_t *t);
106 
107 static void free_sort_job(sort_job_t *job, parallel_sorter_t *sorter);
108 
109 static parallel_sorter_t *create_parallel_sorter(size_t workers,
110                                                  file_sort_ctx_t *ctx);
111 
112 static void free_parallel_sorter(parallel_sorter_t *s);
113 
114 static void sort_worker(void *args);
115 
116 static file_sorter_error_t parallel_sorter_add_job(parallel_sorter_t *s,
117                                                   void **records,
118                                                   size_t n);
119 
120 static void free_n_records(parallel_sorter_t *s, void **records, size_t n);
121 
122 static file_sorter_error_t parallel_sorter_wait(parallel_sorter_t *s, size_t n);
123 
124 static file_sorter_error_t parallel_sorter_finish(parallel_sorter_t *s);
125 
126 static int sorter_random_name(char *tmpl, int totlen, int suffixlen);
127 
128 static char *sorter_tmp_file_path(const char *tmp_dir, const char *prefix);
129 
sort_file(const char * source_file,const char * tmp_dir,unsigned num_tmp_files,unsigned max_buffer_size,file_merger_read_record_t read_record,file_merger_write_record_t write_record,file_merger_feed_record_t feed_record,file_merger_compare_records_t compare_records,file_merger_record_free_t free_record,int skip_writeback,void * user_ctx)130 file_sorter_error_t sort_file(const char *source_file,
131                               const char *tmp_dir,
132                               unsigned num_tmp_files,
133                               unsigned max_buffer_size,
134                               file_merger_read_record_t read_record,
135                               file_merger_write_record_t write_record,
136                               file_merger_feed_record_t feed_record,
137                               file_merger_compare_records_t compare_records,
138                               file_merger_record_free_t free_record,
139                               int skip_writeback,
140                               void *user_ctx)
141 {
142     file_sort_ctx_t ctx;
143     unsigned i;
144     file_sorter_error_t ret;
145 
146     if (num_tmp_files <= 1) {
147         return FILE_SORTER_ERROR_BAD_ARG;
148     }
149 
150     ctx.tmp_file_prefix = file_basename(source_file);
151     if (ctx.tmp_file_prefix == NULL) {
152         return FILE_SORTER_ERROR_TMP_FILE_BASENAME;
153     }
154 
155     ctx.tmp_dir = tmp_dir;
156     ctx.source_file = source_file;
157     ctx.num_tmp_files = num_tmp_files;
158     ctx.max_buffer_size = max_buffer_size;
159     ctx.read_record = read_record;
160     ctx.write_record = write_record;
161     ctx.feed_record = feed_record;
162     ctx.compare_records = compare_records;
163     ctx.free_record = free_record;
164     ctx.user_ctx = user_ctx;
165     ctx.active_tmp_files = 0;
166     ctx.skip_writeback = skip_writeback;
167 
168     if (skip_writeback && !feed_record) {
169         return FILE_SORTER_ERROR_MISSING_CALLBACK;
170     }
171 
172     ctx.f = fopen(source_file, "rb");
173     if (ctx.f == NULL) {
174         cb_free(ctx.tmp_file_prefix);
175         return FILE_SORTER_ERROR_OPEN_FILE;
176     }
177 
178     ctx.tmp_files = (tmp_file_t *) cb_malloc(sizeof(tmp_file_t) * num_tmp_files);
179 
180     if (ctx.tmp_files == NULL) {
181         fclose(ctx.f);
182         cb_free(ctx.tmp_file_prefix);
183         return FILE_SORTER_ERROR_ALLOC;
184     }
185 
186     for (i = 0; i < num_tmp_files; ++i) {
187         ctx.tmp_files[i].name = NULL;
188         ctx.tmp_files[i].level = 0;
189     }
190 
191     ret = do_sort_file(&ctx);
192 
193     if (ctx.f != NULL) {
194         fclose(ctx.f);
195     }
196     for (i = 0; i < ctx.active_tmp_files; ++i) {
197         if (ctx.tmp_files[i].name != NULL) {
198             remove(ctx.tmp_files[i].name);
199             cb_free(ctx.tmp_files[i].name);
200         }
201     }
202     cb_free(ctx.tmp_files);
203     cb_free(ctx.tmp_file_prefix);
204 
205     return ret;
206 }
207 
208 
create_sort_job(void ** recs,size_t n,tmp_file_t * t)209 static sort_job_t *create_sort_job(void **recs, size_t n, tmp_file_t *t)
210 {
211     sort_job_t *job = (sort_job_t *) cb_calloc(1, sizeof(sort_job_t));
212     if (job) {
213         job->records = recs;
214         job->n = n;
215         job->tmp_file = t;
216     }
217 
218     return job;
219 }
220 
221 
free_sort_job(sort_job_t * job,parallel_sorter_t * sorter)222 static void free_sort_job(sort_job_t *job, parallel_sorter_t *sorter)
223 {
224     size_t i;
225 
226     if (job) {
227         for (i = 0; i < job->n; i++) {
228             (*sorter->ctx->free_record)(job->records[i], sorter->ctx->user_ctx);
229         }
230 
231         cb_free(job->records);
232         cb_free(job);
233     }
234 }
235 
236 
create_parallel_sorter(size_t workers,file_sort_ctx_t * ctx)237 static parallel_sorter_t *create_parallel_sorter(size_t workers,
238                                                  file_sort_ctx_t *ctx)
239 {
240     size_t i, j;
241     parallel_sorter_t *s = (parallel_sorter_t *) cb_calloc(1, sizeof(parallel_sorter_t));
242     if (!s) {
243         return NULL;
244     }
245 
246     s->nworkers = workers;
247     s->free_workers = workers;
248     cb_mutex_initialize(&s->mutex);
249     cb_cond_initialize(&s->cond);
250     s->finished = 0;
251     s->error = FILE_SORTER_SUCCESS;
252     s->job = NULL;
253     s->ctx = ctx;
254 
255     s->threads = (cb_thread_t *) cb_calloc(workers, sizeof(cb_thread_t));
256     if (!s->threads) {
257         goto failure;
258     }
259 
260     for (i = 0; i < workers; i++) {
261         if (cb_create_thread(&s->threads[i], &sort_worker, (void *) s, 0) < 0) {
262             cb_mutex_enter(&s->mutex);
263             s->finished = 1;
264             cb_mutex_exit(&s->mutex);
265             cb_cond_broadcast(&s->cond);
266 
267             for (j = 0; j < i; j++) {
268                 cb_join_thread(s->threads[j]);
269             }
270 
271             goto failure;
272         }
273     }
274 
275     return s;
276 
277 failure:
278     cb_mutex_destroy(&s->mutex);
279     cb_cond_destroy(&s->cond);
280     cb_free(s->threads);
281     cb_free(s);
282     return NULL;
283 }
284 
285 
free_parallel_sorter(parallel_sorter_t * s)286 static void free_parallel_sorter(parallel_sorter_t *s)
287 {
288     if (s) {
289         cb_mutex_destroy(&s->mutex);
290         cb_cond_destroy(&s->cond);
291         free_sort_job(s->job, s);
292         cb_free(s->threads);
293         cb_free(s);
294     }
295 }
296 
297 
sort_worker(void * args)298 static void sort_worker(void *args)
299 {
300     file_sorter_error_t ret;
301     sort_job_t *job;
302     parallel_sorter_t *s = (parallel_sorter_t *) args;
303 
304     /*
305      * If a job is available, pick it up and notify all waiters
306      * If job is not available, wait on condition variable
307      * Once a job is complete, notify all waiters
308      * Loop it over until finished flag becomes true
309      */
310     while (1) {
311         cb_mutex_enter(&s->mutex);
312         if (s->finished) {
313             cb_mutex_exit(&s->mutex);
314             return;
315         }
316 
317         if (s->job) {
318             job = s->job;
319             s->job = NULL;
320             s->free_workers -= 1;
321             cb_mutex_exit(&s->mutex);
322             cb_cond_broadcast(&s->cond);
323 
324             ret = write_record_list(job->records, job->n, job->tmp_file, s->ctx);
325             free_sort_job(job, s);
326             if (ret != FILE_SORTER_SUCCESS) {
327                 cb_mutex_enter(&s->mutex);
328                 s->finished = 1;
329                 s->error = ret;
330                 cb_mutex_exit(&s->mutex);
331                 cb_cond_broadcast(&s->cond);
332                 return;
333             }
334 
335             cb_mutex_enter(&s->mutex);
336             s->free_workers += 1;
337             cb_mutex_exit(&s->mutex);
338             cb_cond_broadcast(&s->cond);
339         } else {
340             cb_cond_wait(&s->cond, &s->mutex);
341             cb_mutex_exit(&s->mutex);
342         }
343     }
344 }
345 
346 
347 /* Adds the specified array of records to the given parallel sorter job,
348  * and block wait until a worker picks up the job.
349  * @param s The sorter to add the job to
350  * @param records an array of records to add. Ownership of this array
351  *                is transferred to the sorter, and it will be responsible
352  *                for later cb_free()ing it.
353  * @param n The number of elements in records.
354  * @return FILE_SORTER_SUCCESS if the job is successfully added,
355  *         otherwise the reason for the failure.
356  */
parallel_sorter_add_job(parallel_sorter_t * s,void ** records,size_t n)357 static file_sorter_error_t parallel_sorter_add_job(parallel_sorter_t *s,
358                                                   void **records,
359                                                   size_t n)
360 {
361     file_sorter_error_t ret;
362     sort_job_t *job;
363     tmp_file_t *tmp_file = create_tmp_file(s->ctx);
364 
365     if (tmp_file == NULL) {
366         free_n_records(s, records, n);
367         return FILE_SORTER_ERROR_MK_TMP_FILE;
368     }
369 
370     job = create_sort_job(records, n, tmp_file);
371     if (!job) {
372         free_n_records(s, records, n);
373         return FILE_SORTER_ERROR_ALLOC;
374     }
375 
376     cb_mutex_enter(&s->mutex);
377     if (s->finished) {
378         free_sort_job(job, s);
379         ret = s->error;
380         cb_mutex_exit(&s->mutex);
381         return ret;
382     }
383 
384     s->job = job;
385     cb_mutex_exit(&s->mutex);
386     cb_cond_signal(&s->cond);
387 
388     cb_mutex_enter(&s->mutex);
389     while (s->job) {
390         cb_cond_wait(&s->cond, &s->mutex);
391     }
392     cb_mutex_exit(&s->mutex);
393 
394     return FILE_SORTER_SUCCESS;
395 }
396 
397 
free_n_records(parallel_sorter_t * s,void ** records,size_t n)398 static void free_n_records(parallel_sorter_t *s, void **records, size_t n) {
399     for (size_t i = 0; i < n; i++) {
400         (*s->ctx->free_record)(records[i], s->ctx->user_ctx);
401     }
402     cb_free(records);
403 }
404 
405 
406 // Wait until n or more workers become idle after processing current jobs
parallel_sorter_wait(parallel_sorter_t * s,size_t n)407 static file_sorter_error_t parallel_sorter_wait(parallel_sorter_t *s, size_t n)
408 {
409     while (1) {
410         cb_mutex_enter(&s->mutex);
411         if (s->finished) {
412             cb_mutex_exit(&s->mutex);
413             return s->error;
414         }
415 
416         if (s->free_workers >= n) {
417             cb_mutex_exit(&s->mutex);
418             return FILE_SORTER_SUCCESS;
419         }
420 
421         cb_cond_wait(&s->cond, &s->mutex);
422         cb_mutex_exit(&s->mutex);
423     }
424 }
425 
426 
427 // Notify all workers that we have no more jobs and wait for them to join
parallel_sorter_finish(parallel_sorter_t * s)428 static file_sorter_error_t parallel_sorter_finish(parallel_sorter_t *s)
429 {
430     file_sorter_error_t ret;
431     size_t i;
432 
433     cb_mutex_enter(&s->mutex);
434     s->finished = 1;
435     cb_mutex_exit(&s->mutex);
436     cb_cond_broadcast(&s->cond);
437 
438     for (i = 0; i < s->nworkers; i++) {
439         ret = (file_sorter_error_t) cb_join_thread(s->threads[i]);
440         if (ret != 0) {
441             return ret;
442         }
443     }
444 
445     return FILE_SORTER_SUCCESS;
446 }
447 
448 
do_sort_file(file_sort_ctx_t * ctx)449 static file_sorter_error_t do_sort_file(file_sort_ctx_t *ctx)
450 {
451     unsigned buffer_size = 0;
452     size_t i = 0;
453     size_t record_count = NSORT_RECORDS_INIT;
454     void *record;
455     int record_size;
456     file_sorter_error_t ret;
457     file_merger_feed_record_t feed_record = ctx->feed_record;
458     parallel_sorter_t *sorter;
459     // This records array acts as a buffer for the parallel sorter. One
460     // batch of records will be passed on to sorter job which will then
461     // be responsible to free the memory in case of failures of success.
462     void **records = (void **) cb_calloc(record_count, sizeof(void *));
463 
464     if (records == NULL) {
465         cb_free(records);
466         return FILE_SORTER_ERROR_ALLOC;
467     }
468 
469     sorter = create_parallel_sorter(NSORT_THREADS, ctx);
470     if (sorter == NULL) {
471         cb_free(records);
472         return FILE_SORTER_ERROR_ALLOC;
473     }
474 
475     ctx->feed_record = NULL;
476 
477     i = 0;
478     while (1) {
479         record_size = (*ctx->read_record)(ctx->f, &record, ctx->user_ctx);
480         if (record_size < 0) {
481            ret = (file_sorter_error_t) record_size;
482            // If there's on error before a job got created, free the records
483            // directly
484            if (sorter->job == NULL) {
485                cb_free(records);
486            }
487            goto failure;
488         } else if (record_size == 0) {
489             break;
490         }
491 
492         if (records == NULL) {
493             records = (void **) cb_calloc(record_count, sizeof(void *));
494             if (records == NULL) {
495                 ret =  FILE_SORTER_ERROR_ALLOC;
496                 goto failure;
497             }
498         }
499 
500         records[i++] = record;
501         if (i == record_count) {
502             record_count += NSORT_RECORD_INCR;
503             records = (void **) cb_realloc(records, record_count * sizeof(void *));
504             if (records == NULL) {
505                 ret =  FILE_SORTER_ERROR_ALLOC;
506                 goto failure;
507             }
508         }
509 
510         buffer_size += (unsigned) record_size;
511 
512         if (buffer_size >= ctx->max_buffer_size) {
513             ret = parallel_sorter_add_job(sorter, records, i);
514             if (ret != FILE_SORTER_SUCCESS) {
515                 goto failure;
516             }
517 
518             ret = parallel_sorter_wait(sorter, 1);
519             if (ret != FILE_SORTER_SUCCESS) {
520                 goto failure;
521             }
522 
523             records = NULL;
524             record_count = NSORT_RECORDS_INIT;
525             buffer_size = 0;
526             i = 0;
527         }
528 
529         if (ctx->active_tmp_files >= ctx->num_tmp_files) {
530             unsigned start, end, next_level;
531 
532             ret = parallel_sorter_wait(sorter, NSORT_THREADS);
533             if (ret != FILE_SORTER_SUCCESS) {
534                 goto failure;
535             }
536 
537             pick_merge_files(ctx, &start, &end, &next_level);
538             cb_assert(next_level > 1);
539             ret = merge_tmp_files(ctx, start, end, next_level);
540             if (ret != FILE_SORTER_SUCCESS) {
541                 goto failure;
542             }
543         }
544     }
545 
546     fclose(ctx->f);
547     ctx->f = NULL;
548 
549     if (ctx->active_tmp_files == 0 && buffer_size == 0) {
550         /* empty source file */
551         return FILE_SORTER_SUCCESS;
552     }
553 
554     if (buffer_size > 0) {
555         ret = parallel_sorter_add_job(sorter, records, i);
556         if (ret != FILE_SORTER_SUCCESS) {
557             goto failure;
558         }
559     }
560 
561     ret = parallel_sorter_finish(sorter);
562     if (ret != FILE_SORTER_SUCCESS) {
563         goto failure;
564     }
565 
566     cb_assert(ctx->active_tmp_files > 0);
567 
568     if (!ctx->skip_writeback && remove(ctx->source_file) != 0) {
569         ret = FILE_SORTER_ERROR_DELETE_FILE;
570         goto failure;
571     }
572 
573     // Restore feed_record callback for final merge */
574     ctx->feed_record = feed_record;
575     if (ctx->active_tmp_files == 1) {
576         if (ctx->feed_record) {
577             ret = iterate_records_file(ctx, ctx->tmp_files[0].name);
578             if (ret != FILE_SORTER_SUCCESS) {
579                 goto failure;
580             }
581 
582             if (ctx->skip_writeback && remove(ctx->tmp_files[0].name) != 0) {
583                 ret = FILE_SORTER_ERROR_DELETE_FILE;
584                 goto failure;
585             }
586         }
587 
588         if (!ctx->skip_writeback &&
589                 rename(ctx->tmp_files[0].name, ctx->source_file) != 0) {
590             ret = FILE_SORTER_ERROR_RENAME_FILE;
591             goto failure;
592         }
593     } else if (ctx->active_tmp_files > 1) {
594         ret = merge_tmp_files(ctx, 0, ctx->active_tmp_files, 0);
595         if (ret != FILE_SORTER_SUCCESS) {
596             goto failure;
597         }
598     }
599 
600     ret = FILE_SORTER_SUCCESS;
601 
602  failure:
603     free_parallel_sorter(sorter);
604     return ret;
605 }
606 
607 
write_record_list(void ** records,size_t n,tmp_file_t * tmp_file,file_sort_ctx_t * ctx)608 static file_sorter_error_t write_record_list(void **records,
609                                              size_t n,
610                                              tmp_file_t *tmp_file,
611                                              file_sort_ctx_t *ctx)
612 {
613     size_t i;
614     FILE *f;
615 
616     sort_records(records, n, ctx);
617 
618     remove(tmp_file->name);
619     f = fopen(tmp_file->name, "ab");
620     if (f == NULL) {
621         return FILE_SORTER_ERROR_MK_TMP_FILE;
622     }
623 
624     if (ftell(f) != 0) {
625         /* File already existed. It's not supposed to exist, and if it
626          * exists it means a temporary file name collision happened or
627          * some previous sort left temporary files that were never
628          * deleted. */
629         return FILE_SORTER_ERROR_NOT_EMPTY_TMP_FILE;
630     }
631 
632 
633     for (i = 0; i < n; i++) {
634         file_sorter_error_t err;
635         err = static_cast<file_sorter_error_t>((*ctx->write_record)(f, records[i], ctx->user_ctx));
636         (*ctx->free_record)(records[i], ctx->user_ctx);
637         records[i] = NULL;
638 
639         if (err != FILE_SORTER_SUCCESS) {
640             fclose(f);
641             return err;
642         }
643     }
644 
645     fclose(f);
646 
647     return FILE_SORTER_SUCCESS;
648 }
649 
650 
create_tmp_file(file_sort_ctx_t * ctx)651 static tmp_file_t *create_tmp_file(file_sort_ctx_t *ctx)
652 {
653     unsigned i = ctx->active_tmp_files;
654 
655     cb_assert(ctx->active_tmp_files < ctx->num_tmp_files);
656     cb_assert(ctx->tmp_files[i].name == NULL);
657     cb_assert(ctx->tmp_files[i].level == 0);
658 
659     ctx->tmp_files[i].name = sorter_tmp_file_path(ctx->tmp_dir,
660         ctx->tmp_file_prefix);
661     if (ctx->tmp_files[i].name == NULL) {
662         return NULL;
663     }
664 
665     ctx->tmp_files[i].level = 1;
666     ctx->active_tmp_files += 1;
667 
668     return &ctx->tmp_files[i];
669 }
670 
671 
qsort_cmp(const void * a,const void * b,void * ctx)672 static int qsort_cmp(const void *a, const void *b, void *ctx)
673 {
674     file_sort_ctx_t *sort_ctx = (file_sort_ctx_t *) ctx;
675     const void **k1 = (const void **) a, **k2 = (const void **) b;
676     return (*sort_ctx->compare_records)(*k1, *k2, sort_ctx->user_ctx);
677 }
678 
679 
sort_records(void ** records,size_t n,file_sort_ctx_t * ctx)680 static void sort_records(void **records, size_t n,
681                                          file_sort_ctx_t *ctx)
682 {
683     quicksort(records, n, sizeof(void *), &qsort_cmp, ctx);
684 }
685 
686 
tmp_file_cmp(const void * a,const void * b)687 static int tmp_file_cmp(const void *a, const void *b)
688 {
689     unsigned x = ((const tmp_file_t *) a)->level;
690     unsigned y = ((const tmp_file_t *) b)->level;
691 
692     if (x == 0) {
693         return 1;
694     }
695     if (y == 0) {
696         return -1;
697     }
698 
699     return x - y;
700 }
701 
702 
pick_merge_files(file_sort_ctx_t * ctx,unsigned * start,unsigned * end,unsigned * next_level)703 static void pick_merge_files(file_sort_ctx_t *ctx,
704                              unsigned *start,
705                              unsigned *end,
706                              unsigned *next_level)
707 {
708     unsigned i, j, level;
709 
710     qsort(ctx->tmp_files, ctx->active_tmp_files, sizeof(tmp_file_t), tmp_file_cmp);
711 
712     for (i = 0; i < ctx->active_tmp_files; i = j) {
713         level = ctx->tmp_files[i].level;
714         cb_assert(level > 0);
715         j = i + 1;
716 
717         while (j < ctx->active_tmp_files) {
718             cb_assert(ctx->tmp_files[j].level > 0);
719             if (ctx->tmp_files[j].level != level) {
720                 break;
721             }
722             j += 1;
723         }
724 
725         if ((j - i) > 1) {
726             *start = i;
727             *end = j;
728             *next_level = (j - i) * level;
729             return;
730         }
731     }
732 
733     /* All files have a different level. */
734     cb_assert(ctx->active_tmp_files == ctx->num_tmp_files);
735     cb_assert(ctx->active_tmp_files >= 2);
736     *start = 0;
737     *end = 2;
738     *next_level = ctx->tmp_files[0].level + ctx->tmp_files[1].level;
739 }
740 
741 
merge_tmp_files(file_sort_ctx_t * ctx,unsigned start,unsigned end,unsigned next_level)742 static file_sorter_error_t merge_tmp_files(file_sort_ctx_t *ctx,
743                                            unsigned start,
744                                            unsigned end,
745                                            unsigned next_level)
746 {
747     char *dest_tmp_file;
748     const char **files;
749     unsigned nfiles, i;
750     file_sorter_error_t ret;
751     file_merger_feed_record_t feed_record = NULL;
752 
753     nfiles = end - start;
754     files = (const char **) cb_malloc(sizeof(char *) * nfiles);
755     if (files == NULL) {
756         return FILE_SORTER_ERROR_ALLOC;
757     }
758     for (i = start; i < end; ++i) {
759         files[i - start] = ctx->tmp_files[i].name;
760         cb_assert(files[i - start] != NULL);
761     }
762 
763     if (next_level == 0) {
764         /* Final merge iteration. */
765         if (ctx->skip_writeback) {
766             dest_tmp_file = NULL;
767         } else {
768             dest_tmp_file = (char *) ctx->source_file;
769         }
770 
771         feed_record = ctx->feed_record;
772     } else {
773         dest_tmp_file = sorter_tmp_file_path(ctx->tmp_dir,
774             ctx->tmp_file_prefix);
775         if (dest_tmp_file == NULL) {
776             cb_free(files);
777             return FILE_SORTER_ERROR_MK_TMP_FILE;
778         }
779     }
780 
781     ret = (file_sorter_error_t) merge_files(files,
782                                             nfiles,
783                                             dest_tmp_file,
784                                             ctx->read_record,
785                                             ctx->write_record,
786                                             feed_record,
787                                             ctx->compare_records,
788                                             NULL,
789                                             ctx->free_record,
790                                             ctx->skip_writeback,
791                                             ctx->user_ctx);
792 
793     cb_free(files);
794 
795     if (ret != FILE_SORTER_SUCCESS) {
796         if (dest_tmp_file != NULL && dest_tmp_file != ctx->source_file) {
797             remove(dest_tmp_file);
798             cb_free(dest_tmp_file);
799         }
800         return ret;
801     }
802 
803     for (i = start; i < end; ++i) {
804         if (remove(ctx->tmp_files[i].name) != 0) {
805             if (dest_tmp_file != ctx->source_file) {
806                 cb_free(dest_tmp_file);
807             }
808             return FILE_SORTER_ERROR_DELETE_FILE;
809         }
810         cb_free(ctx->tmp_files[i].name);
811         ctx->tmp_files[i].name = NULL;
812         ctx->tmp_files[i].level = 0;
813     }
814 
815     qsort(ctx->tmp_files + start, ctx->num_tmp_files - start,
816           sizeof(tmp_file_t), tmp_file_cmp);
817     ctx->active_tmp_files -= nfiles;
818 
819     if (dest_tmp_file != ctx->source_file) {
820         i = ctx->active_tmp_files;
821         ctx->tmp_files[i].name = dest_tmp_file;
822         ctx->tmp_files[i].level = next_level;
823         ctx->active_tmp_files += 1;
824     }
825 
826     return FILE_SORTER_SUCCESS;
827 }
828 
iterate_records_file(file_sort_ctx_t * ctx,const char * file)829 static file_sorter_error_t iterate_records_file(file_sort_ctx_t *ctx,
830                                                const char *file)
831 {
832     void *record_data = NULL;
833     int record_len;
834     FILE *f = fopen(file, "rb");
835     int ret = FILE_SORTER_SUCCESS;
836 
837     if (f == NULL) {
838         return FILE_SORTER_ERROR_OPEN_FILE;
839     }
840 
841     while (1) {
842         record_len = (*ctx->read_record)(f, &record_data, ctx->user_ctx);
843         if (record_len == 0) {
844             record_data = NULL;
845             break;
846         } else if (record_len < 0) {
847             ret = record_len;
848             goto cleanup;
849         } else {
850             ret = (*ctx->feed_record)(record_data, ctx->user_ctx);
851             if (ret != FILE_SORTER_SUCCESS) {
852                 goto cleanup;
853             }
854 
855             (*ctx->free_record)(record_data, ctx->user_ctx);
856         }
857     }
858 
859 cleanup:
860     (*ctx->free_record)(record_data, ctx->user_ctx);
861     if (f != NULL) {
862         fclose(f);
863     }
864 
865     return (file_sorter_error_t) ret;
866 }
867 
sorter_random_name(char * tmpl,int totlen,int suffixlen)868 static int sorter_random_name(char *tmpl, int totlen, int suffixlen) {
869     static unsigned int value = 0;
870     tmpl = tmpl + totlen - suffixlen;
871 
872     int nw = snprintf(tmpl, suffixlen, ".%d", value);
873     if (nw < 0 || nw >= suffixlen) {
874         return -1;
875     }
876 
877     if (++value > 2 << 18) {
878         value = 0;
879     }
880     return 0;
881 }
882 
sorter_tmp_file_path(const char * tmp_dir,const char * prefix)883 char *sorter_tmp_file_path(const char *tmp_dir, const char *prefix) {
884     char *file_path;
885     size_t tmp_dir_len, prefix_len, total_len;
886     tmp_dir_len = strlen(tmp_dir);
887     prefix_len = strlen(prefix);
888     total_len = tmp_dir_len + 1 + prefix_len + sizeof(SORTER_TMP_FILE_SUFFIX);
889     file_path = (char *) cb_malloc(total_len);
890 
891     if (file_path == NULL) {
892         return NULL;
893     }
894 
895     memcpy(file_path, tmp_dir, tmp_dir_len);
896     /* Windows specific file API functions and stdio file functions on Windows
897      * convert forward slashes to back slashes. */
898     file_path[tmp_dir_len] = '/';
899     memcpy(file_path + tmp_dir_len + 1, prefix, prefix_len);
900     if (sorter_random_name(file_path, total_len - 1,
901                            sizeof(SORTER_TMP_FILE_SUFFIX) - 1) < 0) {
902         cb_free(file_path);
903         return NULL;
904     }
905 
906     return file_path;
907 }
908