xref: /4.0.0/forestdb/src/compactor.cc (revision 5f7821ba)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#if !defined(WIN32) && !defined(_WIN32)
23#include <sys/time.h>
24#include <dirent.h>
25#include <unistd.h>
26#endif
27
28#include "libforestdb/forestdb.h"
29#include "fdb_internal.h"
30#include "filemgr.h"
31#include "avltree.h"
32#include "list.h"
33#include "common.h"
34#include "filemgr_ops.h"
35#include "configuration.h"
36#include "internal_types.h"
37#include "compactor.h"
38#include "wal.h"
39#include "memleak.h"
40
41#ifdef __DEBUG
42#ifndef __DEBUG_CPT
43    #undef DBG
44    #undef DBGCMD
45    #undef DBGSW
46    #define DBG(...)
47    #define DBGCMD(...)
48    #define DBGSW(n, ...)
49#endif
50#endif
51
52#define COMPACTOR_META_VERSION (1)
53#define MAX_FNAMELEN (FDB_MAX_FILENAME_LEN)
54
55// variables for initialization
56static volatile uint8_t compactor_initialized = 0;
57mutex_t cpt_lock;
58
59static size_t num_compactor_threads = DEFAULT_NUM_COMPACTOR_THREADS;
60static thread_t *compactor_tids = NULL;
61
62
63static size_t sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
64
65static mutex_t sync_mutex;
66static thread_cond_t sync_cond;
67
68static volatile uint8_t compactor_terminate_signal = 0;
69
70static struct avl_tree openfiles;
71
72struct openfiles_elem {
73    char filename[MAX_FNAMELEN];
74    struct filemgr *file;
75    fdb_config config;
76    uint32_t register_count;
77    bool compaction_flag; // set when the file is being compacted
78    bool daemon_compact_in_progress;
79    bool removal_activated;
80    err_log_callback *log_callback;
81    struct list *cmp_func_list; // pointer to fhandle's list
82    struct avl_node avl;
83};
84
85struct compactor_args_t {
86    // void *aux; (reserved for future use)
87    size_t strcmp_len; // Used to search for prefix match
88};
89static struct compactor_args_t compactor_args;
90
91struct compactor_meta{
92    uint32_t version;
93    char filename[MAX_FNAMELEN];
94    uint32_t crc;
95};
96
97#if !defined(WIN32) && !defined(_WIN32)
98struct timespec convert_reltime_to_abstime(unsigned int ms) {
99    struct timespec ts;
100    struct timeval tp;
101    uint64_t wakeup;
102
103    memset(&ts, 0, sizeof(ts));
104
105    /*
106     * Unfortunately pthread_cond_timedwait doesn't support relative sleeps
107     * so we need to convert back to an absolute time.
108     */
109    gettimeofday(&tp, NULL);
110    wakeup = ((uint64_t)(tp.tv_sec) * 1000) + (tp.tv_usec / 1000) + ms;
111    /* Round up for sub ms */
112    if ((tp.tv_usec % 1000) > 499) {
113        ++wakeup;
114    }
115
116    ts.tv_sec = wakeup / 1000;
117    wakeup %= 1000;
118    ts.tv_nsec = wakeup * 1000000;
119    return ts;
120}
121#endif
122
123#if !defined(WIN32) && !defined(_WIN32)
124static bool does_file_exist(const char *filename) {
125    struct stat st;
126    int result = stat(filename, &st);
127    return result == 0;
128}
129#else
130static bool does_file_exist(const char *filename) {
131    return GetFileAttributes(filename) != INVALID_FILE_ATTRIBUTES;
132}
133#endif
134
135// compares file names
136int _compactor_cmp(struct avl_node *a, struct avl_node *b, void *aux)
137{
138    struct openfiles_elem *aa, *bb;
139    struct compactor_args_t *args = (struct compactor_args_t *)aux;
140    aa = _get_entry(a, struct openfiles_elem, avl);
141    bb = _get_entry(b, struct openfiles_elem, avl);
142    return strncmp(aa->filename, bb->filename, args->strcmp_len);
143}
144
145INLINE uint64_t _compactor_estimate_space(struct openfiles_elem *elem)
146{
147    uint64_t ret = 0;
148    uint64_t datasize;
149    uint64_t nlivenodes;
150
151    datasize = _kvs_stat_get_sum(elem->file, KVS_STAT_DATASIZE);
152    nlivenodes = _kvs_stat_get_sum(elem->file, KVS_STAT_NLIVENODES);
153
154    ret = datasize;
155    ret += nlivenodes * elem->config.blocksize;
156    ret += wal_get_datasize(elem->file);
157
158    return ret;
159}
160
161// check if the compaction threshold is satisfied
162INLINE int _compactor_is_threshold_satisfied(struct openfiles_elem *elem)
163{
164    uint64_t filesize;
165    uint64_t active_data;
166    int threshold;
167
168    if (elem->compaction_flag || filemgr_is_rollback_on(elem->file)) {
169        // do not perform compaction if the file is already being compacted or
170        // in rollback.
171        return 0;
172    }
173
174    threshold = elem->config.compaction_threshold;
175    if (elem->config.compaction_mode == FDB_COMPACTION_AUTO &&
176        threshold > 0)
177        {
178        filesize = filemgr_get_pos(elem->file);
179        active_data = _compactor_estimate_space(elem);
180        if (active_data == 0 || active_data >= filesize ||
181            filesize < elem->config.compaction_minimum_filesize) {
182            return 0;
183        }
184
185        return ((filesize / 100.0 * threshold) < (filesize - active_data));
186    } else {
187        return 0;
188    }
189}
190
191// check if the file is waiting for being removed
192INLINE bool _compactor_check_file_removal(struct openfiles_elem *elem)
193{
194    if (elem->file->fflags & FILEMGR_REMOVAL_IN_PROG &&
195        !elem->removal_activated) {
196        return true;
197    }
198    return false;
199}
200
201// check if background file deletion is done
202bool compactor_is_file_removed(const char *filename)
203{
204    struct avl_node *a;
205    struct openfiles_elem query;
206
207    strcpy(query.filename, filename);
208    mutex_lock(&cpt_lock);
209    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
210    mutex_unlock(&cpt_lock);
211    if (a) {
212        // exist .. old file is not removed yet
213        return false;
214    }
215    return true;
216}
217
218// return the location of '.'
219INLINE int _compactor_prefix_len(char *filename)
220{
221    int i;
222    int file_len = strlen(filename);
223    int prefix_len = 0;
224    // find the first '.'
225    for (i=file_len-1; i>=0; --i){
226        if (filename[i] == '.') {
227            prefix_len = i+1;
228            break;
229        }
230    }
231    return prefix_len;
232}
233
234// return the the location of '/' or '\'
235INLINE int _compactor_dir_len(char *filename)
236{
237    int i;
238    int file_len = strlen(filename);
239    int dir_len = 0;
240    // find the first '/' or '\'
241    for (i=file_len-1; i>=0; --i){
242        if (filename[i] == '/' || filename[i] == '\\') {
243            dir_len = i+1;
244            break;
245        }
246    }
247    return dir_len;
248}
249
250// copy from 'foo/bar.baz' to 'bar.baz'
251static void _strcpy_fname(char *dst, char *src)
252{
253    int dir_len = _compactor_dir_len(src);
254    strcpy(dst, src + dir_len);
255}
256
257// copy from 'foo/bar.baz' to 'foo/' (including '/')
258static void _strcpy_dirname(char *dst, char *src)
259{
260    int dir_len = _compactor_dir_len(src);
261    if (dir_len) {
262        strncpy(dst, src, dir_len);
263    }
264    // set NULL char
265    dst[dir_len] = 0;
266}
267
268// <example>
269// fname: 'foo.bar'
270// path: 'tmp/dir/other.file'
271// returned dst: 'tmp/dir/foo.bar'
272static void _reconstruct_path(char *dst, char *path, char *fname)
273{
274    _strcpy_dirname(dst, path);
275    strcat(dst + strlen(dst), fname);
276}
277
278static void _compactor_get_vfilename(char *filename, char *vfilename)
279{
280    int prefix_len = _compactor_prefix_len(filename);
281
282    if (prefix_len > 0) {
283        strncpy(vfilename, filename, prefix_len-1);
284        vfilename[prefix_len-1] = 0;
285    }
286}
287
288static void _compactor_convert_dbfile_to_metafile(char *dbfile, char *metafile)
289{
290    int prefix_len = _compactor_prefix_len(dbfile);
291
292    if (prefix_len > 0) {
293        strncpy(metafile, dbfile, prefix_len);
294        metafile[prefix_len] = 0;
295        strcat(metafile, "meta");
296    }
297}
298
299static bool _allDigit(char *str) {
300    int numchar = strlen(str);
301    for(int i = 0; i < numchar; ++i) {
302        if (str[i] < '0' || str[i] > '9') {
303            return false;
304        }
305    }
306    return true;
307}
308
309void compactor_get_next_filename(char *file, char *nextfile)
310{
311    int compaction_no = 0;
312    int prefix_len = _compactor_prefix_len(file);
313    char str_no[24];
314
315    if (prefix_len > 0 && _allDigit(file + prefix_len)) {
316        sscanf(file+prefix_len, "%d", &compaction_no);
317        strncpy(nextfile, file, prefix_len);
318        do {
319            nextfile[prefix_len] = 0;
320            sprintf(str_no, "%d", ++compaction_no);
321            strcat(nextfile, str_no);
322        } while (does_file_exist(nextfile));
323    } else {
324        do {
325            strcpy(nextfile, file);
326            sprintf(str_no, ".%d", ++compaction_no);
327            strcat(nextfile, str_no);
328        } while (does_file_exist(nextfile));
329    }
330}
331
332bool compactor_switch_compaction_flag(struct filemgr *file, bool flag)
333{
334    struct avl_node *a = NULL;
335    struct openfiles_elem query, *elem;
336
337    strcpy(query.filename, file->filename);
338    mutex_lock(&cpt_lock);
339    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
340    if (a) {
341        // found
342        elem = _get_entry(a, struct openfiles_elem, avl);
343        if (elem->compaction_flag == flag) {
344            // already switched by other thread .. return false
345            mutex_unlock(&cpt_lock);
346            return false;
347        }
348        // switch
349        elem->compaction_flag = flag;
350        mutex_unlock(&cpt_lock);
351        return true;
352    }
353    // file doesn't exist .. already compacted or deregistered
354    mutex_unlock(&cpt_lock);
355    return false;
356}
357
358void * compactor_thread(void *voidargs)
359{
360    char vfilename[MAX_FNAMELEN];
361    char new_filename[MAX_FNAMELEN];
362    fdb_file_handle *fhandle;
363    fdb_status fs;
364    struct avl_node *a;
365    struct openfiles_elem *elem;
366    struct openfiles_elem query;
367
368    // Sleep for 10 secs by default to allow applications to warm up their data.
369    // TODO: Need to implement more flexible way of scheduling the compaction
370    // daemon (e.g., public APIs to start / stop the compaction daemon).
371    mutex_lock(&sync_mutex);
372    thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
373    mutex_unlock(&sync_mutex);
374
375    while (1) {
376
377        mutex_lock(&cpt_lock);
378        a = avl_first(&openfiles);
379        while(a) {
380            elem = _get_entry(a, struct openfiles_elem, avl);
381            if (!elem->file) {
382                a = avl_next(a);
383                avl_remove(&openfiles, &elem->avl);
384                free(elem);
385                continue;
386            }
387
388            if (_compactor_is_threshold_satisfied(elem)) {
389
390                elem->daemon_compact_in_progress = true;
391                // set compaction flag
392                elem->compaction_flag = true;
393                mutex_unlock(&cpt_lock);
394                // Once 'daemon_compact_in_progress' is set to true, then it is safe to
395                // read the variables of 'elem' until the compaction is completed.
396                _compactor_get_vfilename(elem->filename, vfilename);
397
398                fs = fdb_open_for_compactor(&fhandle, vfilename, &elem->config,
399                                            elem->cmp_func_list);
400                if (fs == FDB_RESULT_SUCCESS) {
401                    compactor_get_next_filename(elem->filename, new_filename);
402                    fdb_compact_file(fhandle, new_filename, false, (bid_t) -1);
403                    fdb_close(fhandle);
404
405                    strcpy(query.filename, new_filename);
406                    mutex_lock(&cpt_lock);
407                    // Search the next file for compaction.
408                    a = avl_search_greater(&openfiles, &query.avl, _compactor_cmp);
409                } else {
410                    fdb_log(&fhandle->root->log_callback, fs,
411                            "Failed to open the file '%s' for auto daemon "
412                            "compaction.\n", vfilename);
413                    // fail to open file
414                    mutex_lock(&cpt_lock);
415                    a = avl_next(&elem->avl);
416                    elem->daemon_compact_in_progress = false;
417                    // clear compaction flag
418                    elem->compaction_flag = false;
419                }
420
421            } else if (_compactor_check_file_removal(elem)) {
422
423                // remove file
424                int ret;
425
426                // set activation flag to prevent other compactor threads attempt
427                // to remove the same file and double free the 'elem' structure,
428                // during 'cpt_lock' is released.
429                elem->removal_activated = true;
430
431                mutex_unlock(&cpt_lock);
432                ret = remove(elem->file->filename);
433                filemgr_remove_all_buffer_blocks(elem->file);
434                mutex_lock(&cpt_lock);
435
436                if (elem->log_callback && ret != 0) {
437                    char errno_msg[512];
438                    elem->file->ops->get_errno_str(errno_msg, 512);
439                    fdb_log(elem->log_callback, (fdb_status)ret,
440                            "Error in REMOVE on a database file '%s', %s",
441                            elem->file->filename, errno_msg);
442                }
443
444                // free filemgr structure
445                filemgr_free_func(&elem->file->e);
446                // remove & free elem
447                a = avl_next(a);
448                avl_remove(&openfiles, &elem->avl);
449                free(elem);
450
451            } else {
452
453                // next
454                a = avl_next(a);
455
456            }
457            if (compactor_terminate_signal) {
458                mutex_unlock(&cpt_lock);
459                return NULL;
460            }
461        }
462        mutex_unlock(&cpt_lock);
463
464        mutex_lock(&sync_mutex);
465        if (compactor_terminate_signal) {
466            mutex_unlock(&sync_mutex);
467            break;
468        }
469        thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
470        if (compactor_terminate_signal) {
471            mutex_unlock(&sync_mutex);
472            break;
473        }
474        mutex_unlock(&sync_mutex);
475    }
476    return NULL;
477}
478
479void compactor_init(struct compactor_config *config)
480{
481    if (!compactor_initialized) {
482        // Note that this function is synchronized by the spin lock in fdb_init API.
483        mutex_init(&cpt_lock);
484
485        mutex_lock(&cpt_lock);
486        if (!compactor_initialized) {
487            // initialize
488            compactor_args.strcmp_len = MAX_FNAMELEN;
489            avl_init(&openfiles, &compactor_args);
490
491            if (config) {
492                if (config->sleep_duration > 0) {
493                    sleep_duration = config->sleep_duration;
494                }
495            }
496
497            compactor_terminate_signal = 0;
498
499            mutex_init(&sync_mutex);
500            thread_cond_init(&sync_cond);
501
502            // create worker threads
503            num_compactor_threads = config->num_threads;
504            compactor_tids = (thread_t *) calloc(num_compactor_threads, sizeof(thread_t));
505            for (size_t i = 0; i < num_compactor_threads; ++i) {
506                thread_create(&compactor_tids[i], compactor_thread, NULL);
507            }
508
509            compactor_initialized = 1;
510        }
511        mutex_unlock(&cpt_lock);
512    }
513}
514
515void compactor_shutdown()
516{
517    void *ret;
518    struct avl_node *a = NULL;
519    struct openfiles_elem *elem;
520
521    // set terminate signal
522    mutex_lock(&sync_mutex);
523    compactor_terminate_signal = 1;
524    thread_cond_broadcast(&sync_cond);
525    mutex_unlock(&sync_mutex);
526
527    for (size_t i = 0; i < num_compactor_threads; ++i) {
528        thread_join(compactor_tids[i], &ret);
529    }
530    free(compactor_tids);
531
532    mutex_lock(&cpt_lock);
533    // free all elems in the tree
534    a = avl_first(&openfiles);
535    while (a) {
536        elem = _get_entry(a, struct openfiles_elem, avl);
537        a = avl_next(a);
538
539        if (_compactor_check_file_removal(elem)) {
540            // remove file if removal is pended.
541            remove(elem->file->filename);
542            filemgr_free_func(&elem->file->e);
543        }
544
545        avl_remove(&openfiles, &elem->avl);
546        free(elem);
547    }
548
549    sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
550    compactor_initialized = 0;
551    mutex_destroy(&sync_mutex);
552    thread_cond_destroy(&sync_cond);
553    mutex_unlock(&cpt_lock);
554
555    mutex_destroy(&cpt_lock);
556}
557
558static fdb_status _compactor_store_metafile(char *metafile,
559                                            struct compactor_meta *metadata,
560                                            err_log_callback *log_callback);
561
562fdb_status compactor_register_file(struct filemgr *file,
563                                   fdb_config *config,
564                                   struct list *cmp_func_list,
565                                   err_log_callback *log_callback)
566{
567    file_status_t fstatus;
568    fdb_status fs = FDB_RESULT_SUCCESS;
569    struct avl_node *a = NULL;
570    struct openfiles_elem query, *elem;
571
572    // Ignore files whose status is COMPACT_OLD or REMOVED_PENDING.
573    // Those files do not need to be compacted again.
574    fstatus = filemgr_get_file_status(file);
575    if (fstatus == FILE_COMPACT_OLD ||
576        fstatus == FILE_REMOVED_PENDING) {
577        return fs;
578    }
579
580    strcpy(query.filename, file->filename);
581    // first search the existing file
582    mutex_lock(&cpt_lock);
583    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
584    if (a == NULL) {
585        // doesn't exist
586        // create elem and insert into tree
587        char path[MAX_FNAMELEN];
588        struct compactor_meta meta;
589
590        elem = (struct openfiles_elem *)calloc(1, sizeof(struct openfiles_elem));
591        strcpy(elem->filename, file->filename);
592        elem->file = file;
593        elem->config = *config;
594        elem->register_count = 1;
595        elem->compaction_flag = false;
596        elem->daemon_compact_in_progress = false;
597        elem->removal_activated = false;
598        elem->cmp_func_list = cmp_func_list;
599        elem->log_callback = NULL;
600        avl_insert(&openfiles, &elem->avl, _compactor_cmp);
601        mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as
602                                 // subsequent registration attempts for the same file
603                                 // will be simply processed by incrementing its
604                                 // counter below.
605
606        // store in metafile
607        _compactor_convert_dbfile_to_metafile(file->filename, path);
608        _strcpy_fname(meta.filename, file->filename);
609        fs = _compactor_store_metafile(path, &meta, log_callback);
610    } else {
611        // already exists
612        elem = _get_entry(a, struct openfiles_elem, avl);
613        if (!elem->file) {
614            elem->file = file;
615        }
616        elem->register_count++;
617        mutex_unlock(&cpt_lock);
618    }
619    return fs;
620}
621
622void compactor_deregister_file(struct filemgr *file)
623{
624    struct avl_node *a = NULL;
625    struct openfiles_elem query, *elem;
626
627    strcpy(query.filename, file->filename);
628    mutex_lock(&cpt_lock);
629    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
630    if (a) {
631        elem = _get_entry(a, struct openfiles_elem, avl);
632        if ((--elem->register_count) == 0) {
633            // if no handle refers this file
634            if (elem->daemon_compact_in_progress) {
635                // This file is waiting for compaction by compactor (but not opened
636                // yet). Do not remove 'elem' for now. The 'elem' will be automatically
637                // replaced after the compaction is done by calling
638                // 'compactor_switch_file()'. However, elem->file should be set to NULL
639                // in order to be removed from the AVL tree in case of the compaction
640                // failure.
641                elem->file = NULL;
642            } else {
643                // remove from the tree
644                avl_remove(&openfiles, &elem->avl);
645                free(elem);
646            }
647        }
648    }
649    mutex_unlock(&cpt_lock);
650}
651
652fdb_status compactor_register_file_removing(struct filemgr *file,
653                                            err_log_callback *log_callback)
654{
655    fdb_status fs = FDB_RESULT_SUCCESS;
656    struct avl_node *a = NULL;
657    struct openfiles_elem query, *elem;
658
659    strcpy(query.filename, file->filename);
660    // first search the existing file
661    mutex_lock(&cpt_lock);
662    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
663    if (a == NULL) {
664        // doesn't exist
665        // create a fake & temporary element for the file to be removed.
666        elem = (struct openfiles_elem *)calloc(1, sizeof(struct openfiles_elem));
667        strcpy(elem->filename, file->filename);
668
669        // set flag
670        file->fflags |= FILEMGR_REMOVAL_IN_PROG;
671
672        elem->file = file;
673        elem->register_count = 1;
674        // to prevent this element to be compacted, set all flags
675        elem->compaction_flag = true;
676        elem->daemon_compact_in_progress = true;
677        elem->removal_activated = false;
678        elem->cmp_func_list = NULL;
679        elem->log_callback = log_callback;
680        avl_insert(&openfiles, &elem->avl, _compactor_cmp);
681        mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as
682                                 // subsequent registration attempts for the same file
683                                 // will be simply processed by incrementing its
684                                 // counter below.
685
686        // wake up any sleeping thread
687        mutex_lock(&sync_mutex);
688        thread_cond_signal(&sync_cond);
689        mutex_unlock(&sync_mutex);
690
691    } else {
692        // already exists .. just ignore
693        mutex_unlock(&cpt_lock);
694    }
695    return fs;
696}
697
698void compactor_change_threshold(struct filemgr *file, size_t new_threshold)
699{
700    struct avl_node *a = NULL;
701    struct openfiles_elem query, *elem;
702
703    strcpy(query.filename, file->filename);
704    mutex_lock(&cpt_lock);
705    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
706    if (a) {
707        elem = _get_entry(a, struct openfiles_elem, avl);
708        elem->config.compaction_threshold = new_threshold;
709    }
710    mutex_unlock(&cpt_lock);
711}
712
713struct compactor_meta * _compactor_read_metafile(char *metafile,
714                                                 struct compactor_meta *metadata,
715                                                 err_log_callback *log_callback)
716{
717    int fd_meta, fd_db;
718    ssize_t ret;
719    uint8_t *buf = alca(uint8_t, sizeof(struct compactor_meta));
720    uint32_t crc;
721    char fullpath[MAX_FNAMELEN];
722    struct filemgr_ops *ops;
723    struct compactor_meta meta;
724
725    ops = get_filemgr_ops();
726    fd_meta = ops->open(metafile, O_RDONLY, 0644);
727
728    if (fd_meta >= 0) {
729        // metafile exists .. read metadata
730        ret = ops->pread(fd_meta, buf, sizeof(struct compactor_meta), 0);
731        if (ret < 0 || (size_t)ret < sizeof(struct compactor_meta)) {
732            char errno_msg[512];
733            ops->get_errno_str(errno_msg, 512);
734            fdb_log(log_callback, (fdb_status) ret,
735                    "Failed to read the meta file '%s', errno_message: %s\n",
736                    metafile, errno_msg);
737            ret = ops->close(fd_meta);
738            if (ret < 0) {
739                ops->get_errno_str(errno_msg, 512);
740                fdb_log(log_callback, (fdb_status) ret,
741                        "Failed to close the meta file '%s', errno_message: %s\n",
742                        metafile, errno_msg);
743            }
744            return NULL;
745        }
746        memcpy(&meta, buf, sizeof(struct compactor_meta));
747        meta.version = _endian_decode(meta.version);
748        meta.crc = _endian_decode(meta.crc);
749        ops->close(fd_meta);
750
751        // CRC check
752        crc = chksum(buf, sizeof(struct compactor_meta) - sizeof(crc));
753        if (crc != meta.crc) {
754            fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
755                    "Checksum mismatch in the meta file '%s'\n", metafile);
756            return NULL;
757        }
758        // check if the file exists
759        _reconstruct_path(fullpath, metafile, meta.filename);
760        fd_db = ops->open(fullpath, O_RDONLY, 0644);
761        if (fd_db < 0) {
762            // file doesn't exist
763            return NULL;
764        }
765        ops->close(fd_db);
766    } else {
767        // file doesn't exist
768        return NULL;
769    }
770
771    *metadata = meta;
772    return metadata;
773}
774
775static fdb_status _compactor_store_metafile(char *metafile,
776                                            struct compactor_meta *metadata,
777                                            err_log_callback *log_callback)
778{
779    int fd_meta;
780    ssize_t ret;
781    uint32_t crc;
782    struct filemgr_ops *ops;
783    struct compactor_meta meta;
784
785    ops = get_filemgr_ops();
786    fd_meta = ops->open(metafile, O_RDWR | O_CREAT, 0644);
787
788    if (fd_meta >= 0){
789        meta.version = _endian_encode(COMPACTOR_META_VERSION);
790        strcpy(meta.filename, metadata->filename);
791        crc = chksum((void*)&meta, sizeof(struct compactor_meta) - sizeof(crc));
792        meta.crc = _endian_encode(crc);
793
794        char errno_msg[512];
795        ret = ops->pwrite(fd_meta, &meta, sizeof(struct compactor_meta), 0);
796        if (ret < 0 || (size_t)ret < sizeof(struct compactor_meta)) {
797            ops->get_errno_str(errno_msg, 512);
798            fdb_log(log_callback, (fdb_status) ret,
799                    "Failed to perform a write in the meta file '%s', "
800                    "errno_message: %s\n", metafile, errno_msg);
801            ops->close(fd_meta);
802            return FDB_RESULT_WRITE_FAIL;
803        }
804        ret = ops->fsync(fd_meta);
805        if (ret < 0) {
806            ops->get_errno_str(errno_msg, 512);
807            fdb_log(log_callback, (fdb_status) ret,
808                    "Failed to perform a sync in the meta file '%s', "
809                    "errno_message: %s\n", metafile, errno_msg);
810            ops->close(fd_meta);
811            return FDB_RESULT_FSYNC_FAIL;
812        }
813        ops->close(fd_meta);
814    } else {
815        return FDB_RESULT_OPEN_FAIL;
816    }
817
818    return FDB_RESULT_SUCCESS;
819}
820
821void compactor_switch_file(struct filemgr *old_file, struct filemgr *new_file,
822                           err_log_callback *log_callback)
823{
824    struct avl_node *a = NULL;
825    struct openfiles_elem query, *elem;
826    struct compactor_meta meta;
827
828    strcpy(query.filename, old_file->filename);
829    mutex_lock(&cpt_lock);
830    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
831    if (a) {
832        char metafile[MAX_FNAMELEN];
833        fdb_compaction_mode_t comp_mode;
834
835        elem = _get_entry(a, struct openfiles_elem, avl);
836        avl_remove(&openfiles, a);
837        strcpy(elem->filename, new_file->filename);
838        elem->file = new_file;
839        elem->register_count = 1;
840        elem->daemon_compact_in_progress = false;
841        // clear compaction flag
842        elem->compaction_flag = false;
843        avl_insert(&openfiles, &elem->avl, _compactor_cmp);
844        comp_mode = elem->config.compaction_mode;
845        mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as we don't
846                                 // expect more than one compaction task completion for
847                                 // the same file.
848
849        if (comp_mode == FDB_COMPACTION_AUTO) {
850            _compactor_convert_dbfile_to_metafile(new_file->filename, metafile);
851            _strcpy_fname(meta.filename, new_file->filename);
852            _compactor_store_metafile(metafile, &meta, log_callback);
853        }
854    } else {
855        mutex_unlock(&cpt_lock);
856    }
857}
858
859void compactor_get_virtual_filename(const char *filename,
860                                    char *virtual_filename)
861{
862    int prefix_len = _compactor_prefix_len((char*)filename) - 1;
863    if (prefix_len > 0) {
864        strncpy(virtual_filename, filename, prefix_len);
865        virtual_filename[prefix_len] = 0;
866    } else {
867        strcpy(virtual_filename, filename);
868    }
869}
870
871fdb_status compactor_get_actual_filename(const char *filename,
872                                         char *actual_filename,
873                                         fdb_compaction_mode_t comp_mode,
874                                         err_log_callback *log_callback)
875{
876    int i;
877    int filename_len;
878    int dirname_len;
879    int compaction_no, max_compaction_no = -1;
880    char path[MAX_FNAMELEN];
881    char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
882    char ret_name[MAX_FNAMELEN];
883    fdb_status fs = FDB_RESULT_SUCCESS;
884    struct compactor_meta meta, *meta_ptr;
885
886    // get actual filename from metafile
887    sprintf(path, "%s.meta", filename);
888    meta_ptr = _compactor_read_metafile(path, &meta, log_callback);
889
890    if (meta_ptr == NULL) {
891        if (comp_mode == FDB_COMPACTION_MANUAL && does_file_exist(filename)) {
892            strcpy(actual_filename, filename);
893            return FDB_RESULT_SUCCESS;
894        }
895
896        // error handling .. scan directory
897        // backward search until find the first '/' or '\' (Windows)
898        filename_len = strlen(filename);
899        dirname_len = 0;
900
901#if !defined(WIN32) && !defined(_WIN32)
902        DIR *dir_info;
903        struct dirent *dir_entry;
904
905        for (i=filename_len-1; i>=0; --i){
906            if (filename[i] == '/') {
907                dirname_len = i+1;
908                break;
909            }
910        }
911
912        if (dirname_len > 0) {
913            strncpy(dirname, filename, dirname_len);
914            dirname[dirname_len] = 0;
915        } else {
916            strcpy(dirname, ".");
917        }
918        strcpy(prefix, filename + dirname_len);
919        strcat(prefix, ".");
920
921        dir_info = opendir(dirname);
922        if (dir_info != NULL) {
923            while ((dir_entry = readdir(dir_info))) {
924                if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
925                    compaction_no = -1;
926                    sscanf(dir_entry->d_name + strlen(prefix), "%d", &compaction_no);
927                    if (compaction_no >= 0) {
928                        if (compaction_no > max_compaction_no) {
929                            max_compaction_no = compaction_no;
930                        }
931                    }
932                }
933            }
934            closedir(dir_info);
935        }
936#else
937        // Windows
938        for (i=filename_len-1; i>=0; --i){
939            if (filename[i] == '/' || filename[i] == '\\') {
940                dirname_len = i+1;
941                break;
942            }
943        }
944
945        if (dirname_len > 0) {
946            strncpy(dirname, filename, dirname_len);
947            dirname[dirname_len] = 0;
948        } else {
949            strcpy(dirname, ".");
950        }
951        strcpy(prefix, filename + dirname_len);
952        strcat(prefix, ".");
953
954        WIN32_FIND_DATA filedata;
955        HANDLE hfind;
956        char query_str[MAX_FNAMELEN];
957
958        // find all files start with 'prefix'
959        sprintf(query_str, "%s*", prefix);
960        hfind = FindFirstFile(query_str, &filedata);
961        while (hfind != INVALID_HANDLE_VALUE) {
962            if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
963                compaction_no = -1;
964                sscanf(filedata.cFileName + strlen(prefix), "%d", &compaction_no);
965                if (compaction_no >= 0) {
966                    if (compaction_no > max_compaction_no) {
967                        max_compaction_no = compaction_no;
968                    }
969                }
970            }
971
972            if (!FindNextFile(hfind, &filedata)) {
973                FindClose(hfind);
974                hfind = INVALID_HANDLE_VALUE;
975            }
976        }
977
978#endif
979
980        if (max_compaction_no < 0) {
981            if (comp_mode == FDB_COMPACTION_AUTO) {
982                // DB files with a revision number are not found.
983                // initialize filename to '[filename].0'
984                sprintf(ret_name, "%s.0", filename);
985            } else { // Manual compaction mode.
986                // Simply use the file name passed to this function.
987                strcpy(actual_filename, filename);
988                return FDB_RESULT_SUCCESS;
989            }
990        } else {
991            // return the file that has the largest compaction number
992            sprintf(ret_name, "%s.%d", filename, max_compaction_no);
993            fs = FDB_RESULT_SUCCESS;
994        }
995        if (fs == FDB_RESULT_SUCCESS) {
996            strcpy(actual_filename, ret_name);
997        }
998        return fs;
999
1000    } else {
1001        // metadata is successfully read from the metafile .. just return the filename
1002        _reconstruct_path(ret_name, (char*)filename, meta.filename);
1003        strcpy(actual_filename, ret_name);
1004        return FDB_RESULT_SUCCESS;
1005    }
1006}
1007
1008bool compactor_is_valid_mode(const char *filename, fdb_config *config)
1009{
1010    int fd;
1011    char path[MAX_FNAMELEN];
1012    struct filemgr_ops *ops;
1013
1014    ops = get_filemgr_ops();
1015
1016    if (config->compaction_mode == FDB_COMPACTION_AUTO) {
1017        // auto compaction mode: invalid when
1018        // the file '[filename]' exists
1019        fd = ops->open(filename, O_RDONLY, 0644);
1020        if (fd != FDB_RESULT_NO_SUCH_FILE) {
1021            ops->close(fd);
1022            return false;
1023        }
1024
1025    } else if (config->compaction_mode == FDB_COMPACTION_MANUAL) {
1026        // manual compaction mode: invalid when
1027        // the file '[filename].meta' exists
1028        sprintf(path, "%s.meta", filename);
1029        fd = ops->open(path, O_RDONLY, 0644);
1030        if (fd != FDB_RESULT_NO_SUCH_FILE) {
1031            ops->close(fd);
1032            return false;
1033        }
1034
1035    } else {
1036        // unknown mode
1037        return false;
1038    }
1039
1040    return true;
1041}
1042
1043static fdb_status _compactor_search_n_destroy(const char *filename)
1044{
1045    int i;
1046    int filename_len;
1047    int dirname_len;
1048    char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
1049    fdb_status fs = FDB_RESULT_SUCCESS;
1050
1051    // error handling .. scan directory
1052    // backward search until find the first '/' or '\' (Windows)
1053    filename_len = strlen(filename);
1054    dirname_len = 0;
1055
1056#if !defined(WIN32) && !defined(_WIN32)
1057    DIR *dir_info;
1058    struct dirent *dir_entry;
1059
1060    for (i=filename_len-1; i>=0; --i){
1061        if (filename[i] == '/') {
1062            dirname_len = i+1;
1063            break;
1064        }
1065    }
1066
1067    if (dirname_len > 0) {
1068        strncpy(dirname, filename, dirname_len);
1069        dirname[dirname_len] = 0;
1070    } else {
1071        strcpy(dirname, ".");
1072    }
1073    strcpy(prefix, filename + dirname_len);
1074    strcat(prefix, ".");
1075
1076    dir_info = opendir(dirname);
1077    if (dir_info != NULL) {
1078        while ((dir_entry = readdir(dir_info))) {
1079            if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
1080                // Need to check filemgr for possible open entry?
1081                if (remove(dir_entry->d_name)) {
1082                    fs = FDB_RESULT_FILE_REMOVE_FAIL;
1083                    closedir(dir_info);
1084                    return fs;
1085                }
1086            }
1087        }
1088        closedir(dir_info);
1089    }
1090#else
1091    // Windows
1092    for (i=filename_len-1; i>=0; --i){
1093        if (filename[i] == '/' || filename[i] == '\\') {
1094            dirname_len = i+1;
1095            break;
1096        }
1097    }
1098
1099    if (dirname_len > 0) {
1100        strncpy(dirname, filename, dirname_len);
1101        dirname[dirname_len] = 0;
1102    } else {
1103        strcpy(dirname, ".");
1104    }
1105    strcpy(prefix, filename + dirname_len);
1106    strcat(prefix, ".");
1107
1108    WIN32_FIND_DATA filedata;
1109    HANDLE hfind;
1110    char query_str[MAX_FNAMELEN];
1111
1112    // find all files start with 'prefix'
1113    sprintf(query_str, "%s*", prefix);
1114    hfind = FindFirstFile(query_str, &filedata);
1115    while (hfind != INVALID_HANDLE_VALUE) {
1116        if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
1117            // Need to check filemgr for possible open entry?
1118            if (remove(filedata.cFileName)) {
1119                fs = FDB_RESULT_FILE_REMOVE_FAIL;
1120                FindClose(hfind);
1121                hfind = INVALID_HANDLE_VALUE;
1122                return fs;
1123            }
1124        }
1125
1126        if (!FindNextFile(hfind, &filedata)) {
1127            FindClose(hfind);
1128            hfind = INVALID_HANDLE_VALUE;
1129        }
1130    }
1131
1132#endif
1133    return fs;
1134}
1135
1136fdb_status compactor_destroy_file(char *filename,
1137                                  fdb_config *config)
1138{
1139    struct avl_node *a = NULL;
1140    struct openfiles_elem query, *elem;
1141    size_t strcmp_len;
1142    fdb_status status = FDB_RESULT_SUCCESS;
1143    compactor_config c_config;
1144
1145    strcmp_len = strlen(filename);
1146    filename[strcmp_len] = '.'; // add a . suffix in place
1147    strcmp_len++;
1148    filename[strcmp_len] = '\0';
1149    strcpy(query.filename, filename);
1150
1151    c_config.sleep_duration = config->compactor_sleep_duration;
1152    c_config.num_threads = config->num_compactor_threads;
1153    compactor_init(&c_config);
1154
1155    mutex_lock(&cpt_lock);
1156    compactor_args.strcmp_len = strcmp_len; // Do prefix match for all vers
1157    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
1158    if (a) {
1159        elem = _get_entry(a, struct openfiles_elem, avl);
1160        // if no handle refers this file
1161        if (elem->daemon_compact_in_progress) {
1162            // This file is waiting for compaction by compactor
1163            // Return a temporary failure, user must retry after sometime
1164            status = FDB_RESULT_IN_USE_BY_COMPACTOR;
1165        } else { // File handle not closed, fail operation
1166            status = FDB_RESULT_FILE_IS_BUSY;
1167        }
1168    }
1169
1170    compactor_args.strcmp_len = MAX_FNAMELEN; // restore for normal compare
1171    mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as file
1172                             // deletions doesn't require strict synchronization.
1173    filename[strcmp_len - 1] = '\0'; // restore the filename
1174    if (status == FDB_RESULT_SUCCESS) {
1175        status = _compactor_search_n_destroy(filename);
1176    }
1177
1178    return status;
1179}
1180