1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <fcntl.h>
22 #if !defined(WIN32) && !defined(_WIN32)
23 #include <sys/time.h>
24 #include <dirent.h>
25 #include <unistd.h>
26 #endif
27 
28 #include "libforestdb/forestdb.h"
29 #include "fdb_internal.h"
30 #include "filemgr.h"
31 #include "avltree.h"
32 #include "list.h"
33 #include "common.h"
34 #include "filemgr_ops.h"
35 #include "configuration.h"
36 #include "internal_types.h"
37 #include "compactor.h"
38 #include "wal.h"
39 #include "memleak.h"
40 
41 #ifdef __DEBUG
42 #ifndef __DEBUG_CPT
43     #undef DBG
44     #undef DBGCMD
45     #undef DBGSW
46     #define DBG(...)
47     #define DBGCMD(...)
48     #define DBGSW(n, ...)
49 #endif
50 #endif
51 
52 #define COMPACTOR_META_VERSION (1)
53 #define MAX_FNAMELEN (FDB_MAX_FILENAME_LEN)
54 
55 // variables for initialization
56 static volatile uint8_t compactor_initialized = 0;
57 mutex_t cpt_lock;
58 
59 static size_t num_compactor_threads = DEFAULT_NUM_COMPACTOR_THREADS;
60 static thread_t *compactor_tids = NULL;
61 
62 
63 static size_t sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
64 
65 static mutex_t sync_mutex;
66 static thread_cond_t sync_cond;
67 
68 static volatile uint8_t compactor_terminate_signal = 0;
69 
70 static struct avl_tree openfiles;
71 
72 struct openfiles_elem {
73     char filename[MAX_FNAMELEN];
74     struct filemgr *file;
75     fdb_config config;
76     uint32_t register_count;
77     bool compaction_flag; // set when the file is being compacted
78     bool daemon_compact_in_progress;
79     bool removal_activated;
80     err_log_callback *log_callback;
81     struct list *cmp_func_list; // pointer to fhandle's list
82     struct avl_node avl;
83 };
84 
85 struct compactor_args_t {
86     // void *aux; (reserved for future use)
87     size_t strcmp_len; // Used to search for prefix match
88 };
89 static struct compactor_args_t compactor_args;
90 
91 struct compactor_meta{
92     uint32_t version;
93     char filename[MAX_FNAMELEN];
94     uint32_t crc;
95 };
96 
97 #if !defined(WIN32) && !defined(_WIN32)
convert_reltime_to_abstime(unsigned int ms)98 struct timespec convert_reltime_to_abstime(unsigned int ms) {
99     struct timespec ts;
100     struct timeval tp;
101     uint64_t wakeup;
102 
103     memset(&ts, 0, sizeof(ts));
104 
105     /*
106      * Unfortunately pthread_cond_timedwait doesn't support relative sleeps
107      * so we need to convert back to an absolute time.
108      */
109     gettimeofday(&tp, NULL);
110     wakeup = ((uint64_t)(tp.tv_sec) * 1000) + (tp.tv_usec / 1000) + ms;
111     /* Round up for sub ms */
112     if ((tp.tv_usec % 1000) > 499) {
113         ++wakeup;
114     }
115 
116     ts.tv_sec = wakeup / 1000;
117     wakeup %= 1000;
118     ts.tv_nsec = wakeup * 1000000;
119     return ts;
120 }
121 #endif
122 
123 #if !defined(WIN32) && !defined(_WIN32)
does_file_exist(const char *filename)124 static bool does_file_exist(const char *filename) {
125     struct stat st;
126     int result = stat(filename, &st);
127     return result == 0;
128 }
129 #else
does_file_exist(const char *filename)130 static bool does_file_exist(const char *filename) {
131     return GetFileAttributes(filename) != INVALID_FILE_ATTRIBUTES;
132 }
133 #endif
134 
135 // compares file names
_compactor_cmp(struct avl_node *a, struct avl_node *b, void *aux)136 int _compactor_cmp(struct avl_node *a, struct avl_node *b, void *aux)
137 {
138     struct openfiles_elem *aa, *bb;
139     struct compactor_args_t *args = (struct compactor_args_t *)aux;
140     aa = _get_entry(a, struct openfiles_elem, avl);
141     bb = _get_entry(b, struct openfiles_elem, avl);
142     return strncmp(aa->filename, bb->filename, args->strcmp_len);
143 }
144 
_compactor_estimate_space(struct openfiles_elem *elem)145 INLINE uint64_t _compactor_estimate_space(struct openfiles_elem *elem)
146 {
147     uint64_t ret = 0;
148     uint64_t datasize;
149     uint64_t nlivenodes;
150 
151     datasize = _kvs_stat_get_sum(elem->file, KVS_STAT_DATASIZE);
152     nlivenodes = _kvs_stat_get_sum(elem->file, KVS_STAT_NLIVENODES);
153 
154     ret = datasize;
155     ret += nlivenodes * elem->config.blocksize;
156     ret += wal_get_datasize(elem->file);
157 
158     return ret;
159 }
160 
161 // check if the compaction threshold is satisfied
_compactor_is_threshold_satisfied(struct openfiles_elem *elem)162 INLINE int _compactor_is_threshold_satisfied(struct openfiles_elem *elem)
163 {
164     uint64_t filesize;
165     uint64_t active_data;
166     int threshold;
167 
168     if (elem->compaction_flag || filemgr_is_rollback_on(elem->file)) {
169         // do not perform compaction if the file is already being compacted or
170         // in rollback.
171         return 0;
172     }
173 
174     threshold = elem->config.compaction_threshold;
175     if (elem->config.compaction_mode == FDB_COMPACTION_AUTO &&
176         threshold > 0)
177         {
178         filesize = filemgr_get_pos(elem->file);
179         active_data = _compactor_estimate_space(elem);
180         if (active_data == 0 || active_data >= filesize ||
181             filesize < elem->config.compaction_minimum_filesize) {
182             return 0;
183         }
184 
185         return ((filesize / 100.0 * threshold) < (filesize - active_data));
186     } else {
187         return 0;
188     }
189 }
190 
191 // check if the file is waiting for being removed
_compactor_check_file_removal(struct openfiles_elem *elem)192 INLINE bool _compactor_check_file_removal(struct openfiles_elem *elem)
193 {
194     if (elem->file->fflags & FILEMGR_REMOVAL_IN_PROG &&
195         !elem->removal_activated) {
196         return true;
197     }
198     return false;
199 }
200 
201 // check if background file deletion is done
compactor_is_file_removed(const char *filename)202 bool compactor_is_file_removed(const char *filename)
203 {
204     struct avl_node *a;
205     struct openfiles_elem query;
206 
207     strcpy(query.filename, filename);
208     mutex_lock(&cpt_lock);
209     a = avl_search(&openfiles, &query.avl, _compactor_cmp);
210     mutex_unlock(&cpt_lock);
211     if (a) {
212         // exist .. old file is not removed yet
213         return false;
214     }
215     return true;
216 }
217 
218 // return the location of '.'
_compactor_prefix_len(char *filename)219 INLINE int _compactor_prefix_len(char *filename)
220 {
221     int i;
222     int file_len = strlen(filename);
223     int prefix_len = 0;
224     // find the first '.'
225     for (i=file_len-1; i>=0; --i){
226         if (filename[i] == '.') {
227             prefix_len = i+1;
228             break;
229         }
230     }
231     return prefix_len;
232 }
233 
234 // return the the location of '/' or '\'
_compactor_dir_len(char *filename)235 INLINE int _compactor_dir_len(char *filename)
236 {
237     int i;
238     int file_len = strlen(filename);
239     int dir_len = 0;
240     // find the first '/' or '\'
241     for (i=file_len-1; i>=0; --i){
242         if (filename[i] == '/' || filename[i] == '\\') {
243             dir_len = i+1;
244             break;
245         }
246     }
247     return dir_len;
248 }
249 
250 // copy from 'foo/bar.baz' to 'bar.baz'
_strcpy_fname(char *dst, char *src)251 static void _strcpy_fname(char *dst, char *src)
252 {
253     int dir_len = _compactor_dir_len(src);
254     strcpy(dst, src + dir_len);
255 }
256 
257 // copy from 'foo/bar.baz' to 'foo/' (including '/')
_strcpy_dirname(char *dst, char *src)258 static void _strcpy_dirname(char *dst, char *src)
259 {
260     int dir_len = _compactor_dir_len(src);
261     if (dir_len) {
262         strncpy(dst, src, dir_len);
263     }
264     // set NULL char
265     dst[dir_len] = 0;
266 }
267 
268 // <example>
269 // fname: 'foo.bar'
270 // path: 'tmp/dir/other.file'
271 // returned dst: 'tmp/dir/foo.bar'
_reconstruct_path(char *dst, char *path, char *fname)272 static void _reconstruct_path(char *dst, char *path, char *fname)
273 {
274     _strcpy_dirname(dst, path);
275     strcat(dst + strlen(dst), fname);
276 }
277 
_compactor_get_vfilename(char *filename, char *vfilename)278 static void _compactor_get_vfilename(char *filename, char *vfilename)
279 {
280     int prefix_len = _compactor_prefix_len(filename);
281 
282     if (prefix_len > 0) {
283         strncpy(vfilename, filename, prefix_len-1);
284         vfilename[prefix_len-1] = 0;
285     }
286 }
287 
_compactor_convert_dbfile_to_metafile(char *dbfile, char *metafile)288 static void _compactor_convert_dbfile_to_metafile(char *dbfile, char *metafile)
289 {
290     int prefix_len = _compactor_prefix_len(dbfile);
291 
292     if (prefix_len > 0) {
293         strncpy(metafile, dbfile, prefix_len);
294         metafile[prefix_len] = 0;
295         strcat(metafile, "meta");
296     }
297 }
298 
_allDigit(char *str)299 static bool _allDigit(char *str) {
300     int numchar = strlen(str);
301     for(int i = 0; i < numchar; ++i) {
302         if (str[i] < '0' || str[i] > '9') {
303             return false;
304         }
305     }
306     return true;
307 }
308 
compactor_get_next_filename(char *file, char *nextfile)309 void compactor_get_next_filename(char *file, char *nextfile)
310 {
311     int compaction_no = 0;
312     int prefix_len = _compactor_prefix_len(file);
313     char str_no[24];
314 
315     if (prefix_len > 0 && _allDigit(file + prefix_len)) {
316         sscanf(file+prefix_len, "%d", &compaction_no);
317         strncpy(nextfile, file, prefix_len);
318         do {
319             nextfile[prefix_len] = 0;
320             sprintf(str_no, "%d", ++compaction_no);
321             strcat(nextfile, str_no);
322         } while (does_file_exist(nextfile));
323     } else {
324         do {
325             strcpy(nextfile, file);
326             sprintf(str_no, ".%d", ++compaction_no);
327             strcat(nextfile, str_no);
328         } while (does_file_exist(nextfile));
329     }
330 }
331 
compactor_switch_compaction_flag(struct filemgr *file, bool flag)332 bool compactor_switch_compaction_flag(struct filemgr *file, bool flag)
333 {
334     struct avl_node *a = NULL;
335     struct openfiles_elem query, *elem;
336 
337     strcpy(query.filename, file->filename);
338     mutex_lock(&cpt_lock);
339     a = avl_search(&openfiles, &query.avl, _compactor_cmp);
340     if (a) {
341         // found
342         elem = _get_entry(a, struct openfiles_elem, avl);
343         if (elem->compaction_flag == flag) {
344             // already switched by other thread .. return false
345             mutex_unlock(&cpt_lock);
346             return false;
347         }
348         // switch
349         elem->compaction_flag = flag;
350         mutex_unlock(&cpt_lock);
351         return true;
352     }
353     // file doesn't exist .. already compacted or deregistered
354     mutex_unlock(&cpt_lock);
355     return false;
356 }
357 
compactor_thread(void *voidargs)358 void * compactor_thread(void *voidargs)
359 {
360     char vfilename[MAX_FNAMELEN];
361     char new_filename[MAX_FNAMELEN];
362     fdb_file_handle *fhandle;
363     fdb_status fs;
364     struct avl_node *a;
365     struct openfiles_elem *elem;
366     struct openfiles_elem query;
367 
368     // Sleep for 10 secs by default to allow applications to warm up their data.
369     // TODO: Need to implement more flexible way of scheduling the compaction
370     // daemon (e.g., public APIs to start / stop the compaction daemon).
371     mutex_lock(&sync_mutex);
372     thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
373     mutex_unlock(&sync_mutex);
374 
375     while (1) {
376 
377         mutex_lock(&cpt_lock);
378         a = avl_first(&openfiles);
379         while(a) {
380             elem = _get_entry(a, struct openfiles_elem, avl);
381             if (!elem->file) {
382                 a = avl_next(a);
383                 avl_remove(&openfiles, &elem->avl);
384                 free(elem);
385                 continue;
386             }
387 
388             if (_compactor_is_threshold_satisfied(elem)) {
389 
390                 elem->daemon_compact_in_progress = true;
391                 // set compaction flag
392                 elem->compaction_flag = true;
393                 mutex_unlock(&cpt_lock);
394                 // Once 'daemon_compact_in_progress' is set to true, then it is safe to
395                 // read the variables of 'elem' until the compaction is completed.
396                 _compactor_get_vfilename(elem->filename, vfilename);
397 
398                 fs = fdb_open_for_compactor(&fhandle, vfilename, &elem->config,
399                                             elem->cmp_func_list);
400                 if (fs == FDB_RESULT_SUCCESS) {
401                     compactor_get_next_filename(elem->filename, new_filename);
402                     fdb_compact_file(fhandle, new_filename, false, (bid_t) -1);
403                     fdb_close(fhandle);
404 
405                     strcpy(query.filename, new_filename);
406                     mutex_lock(&cpt_lock);
407                     // Search the next file for compaction.
408                     a = avl_search_greater(&openfiles, &query.avl, _compactor_cmp);
409                 } else {
410                     fdb_log(&fhandle->root->log_callback, fs,
411                             "Failed to open the file '%s' for auto daemon "
412                             "compaction.\n", vfilename);
413                     // fail to open file
414                     mutex_lock(&cpt_lock);
415                     a = avl_next(&elem->avl);
416                     elem->daemon_compact_in_progress = false;
417                     // clear compaction flag
418                     elem->compaction_flag = false;
419                 }
420 
421             } else if (_compactor_check_file_removal(elem)) {
422 
423                 // remove file
424                 int ret;
425 
426                 // set activation flag to prevent other compactor threads attempt
427                 // to remove the same file and double free the 'elem' structure,
428                 // during 'cpt_lock' is released.
429                 elem->removal_activated = true;
430 
431                 mutex_unlock(&cpt_lock);
432                 ret = remove(elem->file->filename);
433                 filemgr_remove_all_buffer_blocks(elem->file);
434                 mutex_lock(&cpt_lock);
435 
436                 if (elem->log_callback && ret != 0) {
437                     char errno_msg[512];
438                     elem->file->ops->get_errno_str(errno_msg, 512);
439                     fdb_log(elem->log_callback, (fdb_status)ret,
440                             "Error in REMOVE on a database file '%s', %s",
441                             elem->file->filename, errno_msg);
442                 }
443 
444                 // free filemgr structure
445                 filemgr_free_func(&elem->file->e);
446                 // remove & free elem
447                 a = avl_next(a);
448                 avl_remove(&openfiles, &elem->avl);
449                 free(elem);
450 
451             } else {
452 
453                 // next
454                 a = avl_next(a);
455 
456             }
457             if (compactor_terminate_signal) {
458                 mutex_unlock(&cpt_lock);
459                 return NULL;
460             }
461         }
462         mutex_unlock(&cpt_lock);
463 
464         mutex_lock(&sync_mutex);
465         if (compactor_terminate_signal) {
466             mutex_unlock(&sync_mutex);
467             break;
468         }
469         thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
470         if (compactor_terminate_signal) {
471             mutex_unlock(&sync_mutex);
472             break;
473         }
474         mutex_unlock(&sync_mutex);
475     }
476     return NULL;
477 }
478 
compactor_init(struct compactor_config *config)479 void compactor_init(struct compactor_config *config)
480 {
481     if (!compactor_initialized) {
482         // Note that this function is synchronized by the spin lock in fdb_init API.
483         mutex_init(&cpt_lock);
484 
485         mutex_lock(&cpt_lock);
486         if (!compactor_initialized) {
487             // initialize
488             compactor_args.strcmp_len = MAX_FNAMELEN;
489             avl_init(&openfiles, &compactor_args);
490 
491             if (config) {
492                 if (config->sleep_duration > 0) {
493                     sleep_duration = config->sleep_duration;
494                 }
495             }
496 
497             compactor_terminate_signal = 0;
498 
499             mutex_init(&sync_mutex);
500             thread_cond_init(&sync_cond);
501 
502             // create worker threads
503             num_compactor_threads = config->num_threads;
504             compactor_tids = (thread_t *) calloc(num_compactor_threads, sizeof(thread_t));
505             for (size_t i = 0; i < num_compactor_threads; ++i) {
506                 thread_create(&compactor_tids[i], compactor_thread, NULL);
507             }
508 
509             compactor_initialized = 1;
510         }
511         mutex_unlock(&cpt_lock);
512     }
513 }
514 
compactor_shutdown()515 void compactor_shutdown()
516 {
517     void *ret;
518     struct avl_node *a = NULL;
519     struct openfiles_elem *elem;
520 
521     // set terminate signal
522     mutex_lock(&sync_mutex);
523     compactor_terminate_signal = 1;
524     thread_cond_broadcast(&sync_cond);
525     mutex_unlock(&sync_mutex);
526 
527     for (size_t i = 0; i < num_compactor_threads; ++i) {
528         thread_join(compactor_tids[i], &ret);
529     }
530     free(compactor_tids);
531 
532     mutex_lock(&cpt_lock);
533     // free all elems in the tree
534     a = avl_first(&openfiles);
535     while (a) {
536         elem = _get_entry(a, struct openfiles_elem, avl);
537         a = avl_next(a);
538 
539         if (_compactor_check_file_removal(elem)) {
540             // remove file if removal is pended.
541             remove(elem->file->filename);
542             filemgr_free_func(&elem->file->e);
543         }
544 
545         avl_remove(&openfiles, &elem->avl);
546         free(elem);
547     }
548 
549     sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
550     compactor_initialized = 0;
551     mutex_destroy(&sync_mutex);
552     thread_cond_destroy(&sync_cond);
553     mutex_unlock(&cpt_lock);
554 
555     mutex_destroy(&cpt_lock);
556 }
557 
558 static fdb_status _compactor_store_metafile(char *metafile,
559                                             struct compactor_meta *metadata,
560                                             err_log_callback *log_callback);
561 
compactor_register_file(struct filemgr *file, fdb_config *config, struct list *cmp_func_list, err_log_callback *log_callback)562 fdb_status compactor_register_file(struct filemgr *file,
563                                    fdb_config *config,
564                                    struct list *cmp_func_list,
565                                    err_log_callback *log_callback)
566 {
567     file_status_t fstatus;
568     fdb_status fs = FDB_RESULT_SUCCESS;
569     struct avl_node *a = NULL;
570     struct openfiles_elem query, *elem;
571 
572     // Ignore files whose status is COMPACT_OLD or REMOVED_PENDING.
573     // Those files do not need to be compacted again.
574     fstatus = filemgr_get_file_status(file);
575     if (fstatus == FILE_COMPACT_OLD ||
576         fstatus == FILE_REMOVED_PENDING) {
577         return fs;
578     }
579 
580     strcpy(query.filename, file->filename);
581     // first search the existing file
582     mutex_lock(&cpt_lock);
583     a = avl_search(&openfiles, &query.avl, _compactor_cmp);
584     if (a == NULL) {
585         // doesn't exist
586         // create elem and insert into tree
587         char path[MAX_FNAMELEN];
588         struct compactor_meta meta;
589 
590         elem = (struct openfiles_elem *)calloc(1, sizeof(struct openfiles_elem));
591         strcpy(elem->filename, file->filename);
592         elem->file = file;
593         elem->config = *config;
594         elem->register_count = 1;
595         elem->compaction_flag = false;
596         elem->daemon_compact_in_progress = false;
597         elem->removal_activated = false;
598         elem->cmp_func_list = cmp_func_list;
599         elem->log_callback = NULL;
600         avl_insert(&openfiles, &elem->avl, _compactor_cmp);
601         mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as
602                                  // subsequent registration attempts for the same file
603                                  // will be simply processed by incrementing its
604                                  // counter below.
605 
606         // store in metafile
607         _compactor_convert_dbfile_to_metafile(file->filename, path);
608         _strcpy_fname(meta.filename, file->filename);
609         fs = _compactor_store_metafile(path, &meta, log_callback);
610     } else {
611         // already exists
612         elem = _get_entry(a, struct openfiles_elem, avl);
613         if (!elem->file) {
614             elem->file = file;
615         }
616         elem->register_count++;
617         mutex_unlock(&cpt_lock);
618     }
619     return fs;
620 }
621 
compactor_deregister_file(struct filemgr *file)622 void compactor_deregister_file(struct filemgr *file)
623 {
624     struct avl_node *a = NULL;
625     struct openfiles_elem query, *elem;
626 
627     strcpy(query.filename, file->filename);
628     mutex_lock(&cpt_lock);
629     a = avl_search(&openfiles, &query.avl, _compactor_cmp);
630     if (a) {
631         elem = _get_entry(a, struct openfiles_elem, avl);
632         if ((--elem->register_count) == 0) {
633             // if no handle refers this file
634             if (elem->daemon_compact_in_progress) {
635                 // This file is waiting for compaction by compactor (but not opened
636                 // yet). Do not remove 'elem' for now. The 'elem' will be automatically
637                 // replaced after the compaction is done by calling
638                 // 'compactor_switch_file()'. However, elem->file should be set to NULL
639                 // in order to be removed from the AVL tree in case of the compaction
640                 // failure.
641                 elem->file = NULL;
642             } else {
643                 // remove from the tree
644                 avl_remove(&openfiles, &elem->avl);
645                 free(elem);
646             }
647         }
648     }
649     mutex_unlock(&cpt_lock);
650 }
651 
compactor_register_file_removing(struct filemgr *file, err_log_callback *log_callback)652 fdb_status compactor_register_file_removing(struct filemgr *file,
653                                             err_log_callback *log_callback)
654 {
655     fdb_status fs = FDB_RESULT_SUCCESS;
656     struct avl_node *a = NULL;
657     struct openfiles_elem query, *elem;
658 
659     strcpy(query.filename, file->filename);
660     // first search the existing file
661     mutex_lock(&cpt_lock);
662     a = avl_search(&openfiles, &query.avl, _compactor_cmp);
663     if (a == NULL) {
664         // doesn't exist
665         // create a fake & temporary element for the file to be removed.
666         elem = (struct openfiles_elem *)calloc(1, sizeof(struct openfiles_elem));
667         strcpy(elem->filename, file->filename);
668 
669         // set flag
670         file->fflags |= FILEMGR_REMOVAL_IN_PROG;
671 
672         elem->file = file;
673         elem->register_count = 1;
674         // to prevent this element to be compacted, set all flags
675         elem->compaction_flag = true;
676         elem->daemon_compact_in_progress = true;
677         elem->removal_activated = false;
678         elem->cmp_func_list = NULL;
679         elem->log_callback = log_callback;
680         avl_insert(&openfiles, &elem->avl, _compactor_cmp);
681         mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as
682                                  // subsequent registration attempts for the same file
683                                  // will be simply processed by incrementing its
684                                  // counter below.
685 
686         // wake up any sleeping thread
687         mutex_lock(&sync_mutex);
688         thread_cond_signal(&sync_cond);
689         mutex_unlock(&sync_mutex);
690 
691     } else {
692         // already exists .. just ignore
693         mutex_unlock(&cpt_lock);
694     }
695     return fs;
696 }
697 
compactor_change_threshold(struct filemgr *file, size_t new_threshold)698 void compactor_change_threshold(struct filemgr *file, size_t new_threshold)
699 {
700     struct avl_node *a = NULL;
701     struct openfiles_elem query, *elem;
702 
703     strcpy(query.filename, file->filename);
704     mutex_lock(&cpt_lock);
705     a = avl_search(&openfiles, &query.avl, _compactor_cmp);
706     if (a) {
707         elem = _get_entry(a, struct openfiles_elem, avl);
708         elem->config.compaction_threshold = new_threshold;
709     }
710     mutex_unlock(&cpt_lock);
711 }
712 
_compactor_read_metafile(char *metafile, struct compactor_meta *metadata, err_log_callback *log_callback)713 struct compactor_meta * _compactor_read_metafile(char *metafile,
714                                                  struct compactor_meta *metadata,
715                                                  err_log_callback *log_callback)
716 {
717     int fd_meta, fd_db;
718     ssize_t ret;
719     uint8_t *buf = alca(uint8_t, sizeof(struct compactor_meta));
720     uint32_t crc;
721     char fullpath[MAX_FNAMELEN];
722     struct filemgr_ops *ops;
723     struct compactor_meta meta;
724 
725     ops = get_filemgr_ops();
726     fd_meta = ops->open(metafile, O_RDONLY, 0644);
727 
728     if (fd_meta >= 0) {
729         // metafile exists .. read metadata
730         ret = ops->pread(fd_meta, buf, sizeof(struct compactor_meta), 0);
731         if (ret < 0 || (size_t)ret < sizeof(struct compactor_meta)) {
732             char errno_msg[512];
733             ops->get_errno_str(errno_msg, 512);
734             fdb_log(log_callback, (fdb_status) ret,
735                     "Failed to read the meta file '%s', errno_message: %s\n",
736                     metafile, errno_msg);
737             ret = ops->close(fd_meta);
738             if (ret < 0) {
739                 ops->get_errno_str(errno_msg, 512);
740                 fdb_log(log_callback, (fdb_status) ret,
741                         "Failed to close the meta file '%s', errno_message: %s\n",
742                         metafile, errno_msg);
743             }
744             return NULL;
745         }
746         memcpy(&meta, buf, sizeof(struct compactor_meta));
747         meta.version = _endian_decode(meta.version);
748         meta.crc = _endian_decode(meta.crc);
749         ops->close(fd_meta);
750 
751         // CRC check
752         crc = chksum(buf, sizeof(struct compactor_meta) - sizeof(crc));
753         if (crc != meta.crc) {
754             fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
755                     "Checksum mismatch in the meta file '%s'\n", metafile);
756             return NULL;
757         }
758         // check if the file exists
759         _reconstruct_path(fullpath, metafile, meta.filename);
760         fd_db = ops->open(fullpath, O_RDONLY, 0644);
761         if (fd_db < 0) {
762             // file doesn't exist
763             return NULL;
764         }
765         ops->close(fd_db);
766     } else {
767         // file doesn't exist
768         return NULL;
769     }
770 
771     *metadata = meta;
772     return metadata;
773 }
774 
_compactor_store_metafile(char *metafile, struct compactor_meta *metadata, err_log_callback *log_callback)775 static fdb_status _compactor_store_metafile(char *metafile,
776                                             struct compactor_meta *metadata,
777                                             err_log_callback *log_callback)
778 {
779     int fd_meta;
780     ssize_t ret;
781     uint32_t crc;
782     struct filemgr_ops *ops;
783     struct compactor_meta meta;
784 
785     ops = get_filemgr_ops();
786     fd_meta = ops->open(metafile, O_RDWR | O_CREAT, 0644);
787 
788     if (fd_meta >= 0){
789         meta.version = _endian_encode(COMPACTOR_META_VERSION);
790         strcpy(meta.filename, metadata->filename);
791         crc = chksum((void*)&meta, sizeof(struct compactor_meta) - sizeof(crc));
792         meta.crc = _endian_encode(crc);
793 
794         char errno_msg[512];
795         ret = ops->pwrite(fd_meta, &meta, sizeof(struct compactor_meta), 0);
796         if (ret < 0 || (size_t)ret < sizeof(struct compactor_meta)) {
797             ops->get_errno_str(errno_msg, 512);
798             fdb_log(log_callback, (fdb_status) ret,
799                     "Failed to perform a write in the meta file '%s', "
800                     "errno_message: %s\n", metafile, errno_msg);
801             ops->close(fd_meta);
802             return FDB_RESULT_WRITE_FAIL;
803         }
804         ret = ops->fsync(fd_meta);
805         if (ret < 0) {
806             ops->get_errno_str(errno_msg, 512);
807             fdb_log(log_callback, (fdb_status) ret,
808                     "Failed to perform a sync in the meta file '%s', "
809                     "errno_message: %s\n", metafile, errno_msg);
810             ops->close(fd_meta);
811             return FDB_RESULT_FSYNC_FAIL;
812         }
813         ops->close(fd_meta);
814     } else {
815         return FDB_RESULT_OPEN_FAIL;
816     }
817 
818     return FDB_RESULT_SUCCESS;
819 }
820 
compactor_switch_file(struct filemgr *old_file, struct filemgr *new_file, err_log_callback *log_callback)821 void compactor_switch_file(struct filemgr *old_file, struct filemgr *new_file,
822                            err_log_callback *log_callback)
823 {
824     struct avl_node *a = NULL;
825     struct openfiles_elem query, *elem;
826     struct compactor_meta meta;
827 
828     strcpy(query.filename, old_file->filename);
829     mutex_lock(&cpt_lock);
830     a = avl_search(&openfiles, &query.avl, _compactor_cmp);
831     if (a) {
832         char metafile[MAX_FNAMELEN];
833         fdb_compaction_mode_t comp_mode;
834 
835         elem = _get_entry(a, struct openfiles_elem, avl);
836         avl_remove(&openfiles, a);
837         strcpy(elem->filename, new_file->filename);
838         elem->file = new_file;
839         elem->register_count = 1;
840         elem->daemon_compact_in_progress = false;
841         // clear compaction flag
842         elem->compaction_flag = false;
843         avl_insert(&openfiles, &elem->avl, _compactor_cmp);
844         comp_mode = elem->config.compaction_mode;
845         mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as we don't
846                                  // expect more than one compaction task completion for
847                                  // the same file.
848 
849         if (comp_mode == FDB_COMPACTION_AUTO) {
850             _compactor_convert_dbfile_to_metafile(new_file->filename, metafile);
851             _strcpy_fname(meta.filename, new_file->filename);
852             _compactor_store_metafile(metafile, &meta, log_callback);
853         }
854     } else {
855         mutex_unlock(&cpt_lock);
856     }
857 }
858 
compactor_get_virtual_filename(const char *filename, char *virtual_filename)859 void compactor_get_virtual_filename(const char *filename,
860                                     char *virtual_filename)
861 {
862     int prefix_len = _compactor_prefix_len((char*)filename) - 1;
863     if (prefix_len > 0) {
864         strncpy(virtual_filename, filename, prefix_len);
865         virtual_filename[prefix_len] = 0;
866     } else {
867         strcpy(virtual_filename, filename);
868     }
869 }
870 
compactor_get_actual_filename(const char *filename, char *actual_filename, fdb_compaction_mode_t comp_mode, err_log_callback *log_callback)871 fdb_status compactor_get_actual_filename(const char *filename,
872                                          char *actual_filename,
873                                          fdb_compaction_mode_t comp_mode,
874                                          err_log_callback *log_callback)
875 {
876     int i;
877     int filename_len;
878     int dirname_len;
879     int compaction_no, max_compaction_no = -1;
880     char path[MAX_FNAMELEN];
881     char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
882     char ret_name[MAX_FNAMELEN];
883     fdb_status fs = FDB_RESULT_SUCCESS;
884     struct compactor_meta meta, *meta_ptr;
885 
886     // get actual filename from metafile
887     sprintf(path, "%s.meta", filename);
888     meta_ptr = _compactor_read_metafile(path, &meta, log_callback);
889 
890     if (meta_ptr == NULL) {
891         if (comp_mode == FDB_COMPACTION_MANUAL && does_file_exist(filename)) {
892             strcpy(actual_filename, filename);
893             return FDB_RESULT_SUCCESS;
894         }
895 
896         // error handling .. scan directory
897         // backward search until find the first '/' or '\' (Windows)
898         filename_len = strlen(filename);
899         dirname_len = 0;
900 
901 #if !defined(WIN32) && !defined(_WIN32)
902         DIR *dir_info;
903         struct dirent *dir_entry;
904 
905         for (i=filename_len-1; i>=0; --i){
906             if (filename[i] == '/') {
907                 dirname_len = i+1;
908                 break;
909             }
910         }
911 
912         if (dirname_len > 0) {
913             strncpy(dirname, filename, dirname_len);
914             dirname[dirname_len] = 0;
915         } else {
916             strcpy(dirname, ".");
917         }
918         strcpy(prefix, filename + dirname_len);
919         strcat(prefix, ".");
920 
921         dir_info = opendir(dirname);
922         if (dir_info != NULL) {
923             while ((dir_entry = readdir(dir_info))) {
924                 if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
925                     compaction_no = -1;
926                     sscanf(dir_entry->d_name + strlen(prefix), "%d", &compaction_no);
927                     if (compaction_no >= 0) {
928                         if (compaction_no > max_compaction_no) {
929                             max_compaction_no = compaction_no;
930                         }
931                     }
932                 }
933             }
934             closedir(dir_info);
935         }
936 #else
937         // Windows
938         for (i=filename_len-1; i>=0; --i){
939             if (filename[i] == '/' || filename[i] == '\\') {
940                 dirname_len = i+1;
941                 break;
942             }
943         }
944 
945         if (dirname_len > 0) {
946             strncpy(dirname, filename, dirname_len);
947             dirname[dirname_len] = 0;
948         } else {
949             strcpy(dirname, ".");
950         }
951         strcpy(prefix, filename + dirname_len);
952         strcat(prefix, ".");
953 
954         WIN32_FIND_DATA filedata;
955         HANDLE hfind;
956         char query_str[MAX_FNAMELEN];
957 
958         // find all files start with 'prefix'
959         sprintf(query_str, "%s*", prefix);
960         hfind = FindFirstFile(query_str, &filedata);
961         while (hfind != INVALID_HANDLE_VALUE) {
962             if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
963                 compaction_no = -1;
964                 sscanf(filedata.cFileName + strlen(prefix), "%d", &compaction_no);
965                 if (compaction_no >= 0) {
966                     if (compaction_no > max_compaction_no) {
967                         max_compaction_no = compaction_no;
968                     }
969                 }
970             }
971 
972             if (!FindNextFile(hfind, &filedata)) {
973                 FindClose(hfind);
974                 hfind = INVALID_HANDLE_VALUE;
975             }
976         }
977 
978 #endif
979 
980         if (max_compaction_no < 0) {
981             if (comp_mode == FDB_COMPACTION_AUTO) {
982                 // DB files with a revision number are not found.
983                 // initialize filename to '[filename].0'
984                 sprintf(ret_name, "%s.0", filename);
985             } else { // Manual compaction mode.
986                 // Simply use the file name passed to this function.
987                 strcpy(actual_filename, filename);
988                 return FDB_RESULT_SUCCESS;
989             }
990         } else {
991             // return the file that has the largest compaction number
992             sprintf(ret_name, "%s.%d", filename, max_compaction_no);
993             fs = FDB_RESULT_SUCCESS;
994         }
995         if (fs == FDB_RESULT_SUCCESS) {
996             strcpy(actual_filename, ret_name);
997         }
998         return fs;
999 
1000     } else {
1001         // metadata is successfully read from the metafile .. just return the filename
1002         _reconstruct_path(ret_name, (char*)filename, meta.filename);
1003         strcpy(actual_filename, ret_name);
1004         return FDB_RESULT_SUCCESS;
1005     }
1006 }
1007 
compactor_is_valid_mode(const char *filename, fdb_config *config)1008 bool compactor_is_valid_mode(const char *filename, fdb_config *config)
1009 {
1010     int fd;
1011     char path[MAX_FNAMELEN];
1012     struct filemgr_ops *ops;
1013 
1014     ops = get_filemgr_ops();
1015 
1016     if (config->compaction_mode == FDB_COMPACTION_AUTO) {
1017         // auto compaction mode: invalid when
1018         // the file '[filename]' exists
1019         fd = ops->open(filename, O_RDONLY, 0644);
1020         if (fd != FDB_RESULT_NO_SUCH_FILE) {
1021             ops->close(fd);
1022             return false;
1023         }
1024 
1025     } else if (config->compaction_mode == FDB_COMPACTION_MANUAL) {
1026         // manual compaction mode: invalid when
1027         // the file '[filename].meta' exists
1028         sprintf(path, "%s.meta", filename);
1029         fd = ops->open(path, O_RDONLY, 0644);
1030         if (fd != FDB_RESULT_NO_SUCH_FILE) {
1031             ops->close(fd);
1032             return false;
1033         }
1034 
1035     } else {
1036         // unknown mode
1037         return false;
1038     }
1039 
1040     return true;
1041 }
1042 
_compactor_search_n_destroy(const char *filename)1043 static fdb_status _compactor_search_n_destroy(const char *filename)
1044 {
1045     int i;
1046     int filename_len;
1047     int dirname_len;
1048     char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
1049     fdb_status fs = FDB_RESULT_SUCCESS;
1050 
1051     // error handling .. scan directory
1052     // backward search until find the first '/' or '\' (Windows)
1053     filename_len = strlen(filename);
1054     dirname_len = 0;
1055 
1056 #if !defined(WIN32) && !defined(_WIN32)
1057     DIR *dir_info;
1058     struct dirent *dir_entry;
1059 
1060     for (i=filename_len-1; i>=0; --i){
1061         if (filename[i] == '/') {
1062             dirname_len = i+1;
1063             break;
1064         }
1065     }
1066 
1067     if (dirname_len > 0) {
1068         strncpy(dirname, filename, dirname_len);
1069         dirname[dirname_len] = 0;
1070     } else {
1071         strcpy(dirname, ".");
1072     }
1073     strcpy(prefix, filename + dirname_len);
1074     strcat(prefix, ".");
1075 
1076     dir_info = opendir(dirname);
1077     if (dir_info != NULL) {
1078         while ((dir_entry = readdir(dir_info))) {
1079             if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
1080                 // Need to check filemgr for possible open entry?
1081                 if (remove(dir_entry->d_name)) {
1082                     fs = FDB_RESULT_FILE_REMOVE_FAIL;
1083                     closedir(dir_info);
1084                     return fs;
1085                 }
1086             }
1087         }
1088         closedir(dir_info);
1089     }
1090 #else
1091     // Windows
1092     for (i=filename_len-1; i>=0; --i){
1093         if (filename[i] == '/' || filename[i] == '\\') {
1094             dirname_len = i+1;
1095             break;
1096         }
1097     }
1098 
1099     if (dirname_len > 0) {
1100         strncpy(dirname, filename, dirname_len);
1101         dirname[dirname_len] = 0;
1102     } else {
1103         strcpy(dirname, ".");
1104     }
1105     strcpy(prefix, filename + dirname_len);
1106     strcat(prefix, ".");
1107 
1108     WIN32_FIND_DATA filedata;
1109     HANDLE hfind;
1110     char query_str[MAX_FNAMELEN];
1111 
1112     // find all files start with 'prefix'
1113     sprintf(query_str, "%s*", prefix);
1114     hfind = FindFirstFile(query_str, &filedata);
1115     while (hfind != INVALID_HANDLE_VALUE) {
1116         if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
1117             // Need to check filemgr for possible open entry?
1118             if (remove(filedata.cFileName)) {
1119                 fs = FDB_RESULT_FILE_REMOVE_FAIL;
1120                 FindClose(hfind);
1121                 hfind = INVALID_HANDLE_VALUE;
1122                 return fs;
1123             }
1124         }
1125 
1126         if (!FindNextFile(hfind, &filedata)) {
1127             FindClose(hfind);
1128             hfind = INVALID_HANDLE_VALUE;
1129         }
1130     }
1131 
1132 #endif
1133     return fs;
1134 }
1135 
compactor_destroy_file(char *filename, fdb_config *config)1136 fdb_status compactor_destroy_file(char *filename,
1137                                   fdb_config *config)
1138 {
1139     struct avl_node *a = NULL;
1140     struct openfiles_elem query, *elem;
1141     size_t strcmp_len;
1142     fdb_status status = FDB_RESULT_SUCCESS;
1143     compactor_config c_config;
1144 
1145     strcmp_len = strlen(filename);
1146     filename[strcmp_len] = '.'; // add a . suffix in place
1147     strcmp_len++;
1148     filename[strcmp_len] = '\0';
1149     strcpy(query.filename, filename);
1150 
1151     c_config.sleep_duration = config->compactor_sleep_duration;
1152     c_config.num_threads = config->num_compactor_threads;
1153     compactor_init(&c_config);
1154 
1155     mutex_lock(&cpt_lock);
1156     compactor_args.strcmp_len = strcmp_len; // Do prefix match for all vers
1157     a = avl_search(&openfiles, &query.avl, _compactor_cmp);
1158     if (a) {
1159         elem = _get_entry(a, struct openfiles_elem, avl);
1160         // if no handle refers this file
1161         if (elem->daemon_compact_in_progress) {
1162             // This file is waiting for compaction by compactor
1163             // Return a temporary failure, user must retry after sometime
1164             status = FDB_RESULT_IN_USE_BY_COMPACTOR;
1165         } else { // File handle not closed, fail operation
1166             status = FDB_RESULT_FILE_IS_BUSY;
1167         }
1168     }
1169 
1170     compactor_args.strcmp_len = MAX_FNAMELEN; // restore for normal compare
1171     mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as file
1172                              // deletions doesn't require strict synchronization.
1173     filename[strcmp_len - 1] = '\0'; // restore the filename
1174     if (status == FDB_RESULT_SUCCESS) {
1175         status = _compactor_search_n_destroy(filename);
1176     }
1177 
1178     return status;
1179 }
1180