xref: /4.0.0/forestdb/src/compactor.cc (revision 0d423664)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#if !defined(WIN32) && !defined(_WIN32)
23#include <sys/time.h>
24#include <dirent.h>
25#include <unistd.h>
26#endif
27
28#include "libforestdb/forestdb.h"
29#include "fdb_internal.h"
30#include "filemgr.h"
31#include "avltree.h"
32#include "list.h"
33#include "common.h"
34#include "filemgr_ops.h"
35#include "configuration.h"
36#include "internal_types.h"
37#include "compactor.h"
38#include "wal.h"
39#include "memleak.h"
40
41#ifdef __DEBUG
42#ifndef __DEBUG_CPT
43    #undef DBG
44    #undef DBGCMD
45    #undef DBGSW
46    #define DBG(...)
47    #define DBGCMD(...)
48    #define DBGSW(n, ...)
49#endif
50#endif
51
52#define COMPACTOR_META_VERSION (1)
53#define MAX_FNAMELEN (FDB_MAX_FILENAME_LEN)
54
55// variables for initialization
56static volatile uint8_t compactor_initialized = 0;
57mutex_t cpt_lock;
58
59static size_t num_compactor_threads = DEFAULT_NUM_COMPACTOR_THREADS;
60static thread_t *compactor_tids = NULL;
61
62
63static size_t sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
64
65static mutex_t sync_mutex;
66static thread_cond_t sync_cond;
67
68static volatile uint8_t compactor_terminate_signal = 0;
69
70static struct avl_tree openfiles;
71
72struct openfiles_elem {
73    char filename[MAX_FNAMELEN];
74    struct filemgr *file;
75    fdb_config config;
76    uint32_t register_count;
77    bool compaction_flag; // set when the file is being compacted
78    bool daemon_compact_in_progress;
79    struct list *cmp_func_list; // pointer to fhandle's list
80    struct avl_node avl;
81};
82
83struct compactor_args_t {
84    // void *aux; (reserved for future use)
85    size_t strcmp_len; // Used to search for prefix match
86};
87static struct compactor_args_t compactor_args;
88
89struct compactor_meta{
90    uint32_t version;
91    char filename[MAX_FNAMELEN];
92    uint32_t crc;
93};
94
95#if !defined(WIN32) && !defined(_WIN32)
96struct timespec convert_reltime_to_abstime(unsigned int ms) {
97    struct timespec ts;
98    struct timeval tp;
99    uint64_t wakeup;
100
101    memset(&ts, 0, sizeof(ts));
102
103    /*
104     * Unfortunately pthread_cond_timedwait doesn't support relative sleeps
105     * so we need to convert back to an absolute time.
106     */
107    gettimeofday(&tp, NULL);
108    wakeup = ((uint64_t)(tp.tv_sec) * 1000) + (tp.tv_usec / 1000) + ms;
109    /* Round up for sub ms */
110    if ((tp.tv_usec % 1000) > 499) {
111        ++wakeup;
112    }
113
114    ts.tv_sec = wakeup / 1000;
115    wakeup %= 1000;
116    ts.tv_nsec = wakeup * 1000000;
117    return ts;
118}
119#endif
120
121#if !defined(WIN32) && !defined(_WIN32)
122static bool does_file_exist(const char *filename) {
123    struct stat st;
124    int result = stat(filename, &st);
125    return result == 0;
126}
127#else
128static bool does_file_exist(const char *filename) {
129    return GetFileAttributes(filename) != INVALID_FILE_ATTRIBUTES;
130}
131#endif
132
133// compares file names
134int _compactor_cmp(struct avl_node *a, struct avl_node *b, void *aux)
135{
136    struct openfiles_elem *aa, *bb;
137    struct compactor_args_t *args = (struct compactor_args_t *)aux;
138    aa = _get_entry(a, struct openfiles_elem, avl);
139    bb = _get_entry(b, struct openfiles_elem, avl);
140    return strncmp(aa->filename, bb->filename, args->strcmp_len);
141}
142
143INLINE uint64_t _compactor_estimate_space(struct openfiles_elem *elem)
144{
145    uint64_t ret = 0;
146    uint64_t datasize;
147    uint64_t nlivenodes;
148
149    datasize = _kvs_stat_get_sum(elem->file, KVS_STAT_DATASIZE);
150    nlivenodes = _kvs_stat_get_sum(elem->file, KVS_STAT_NLIVENODES);
151
152    ret = datasize;
153    ret += nlivenodes * elem->config.blocksize;
154    ret += wal_get_datasize(elem->file);
155
156    return ret;
157}
158
159// check if the compaction threshold is satisfied
160INLINE int _compactor_is_threshold_satisfied(struct openfiles_elem *elem)
161{
162    uint64_t filesize;
163    uint64_t active_data;
164    int threshold;
165
166    if (elem->compaction_flag || filemgr_is_rollback_on(elem->file)) {
167        // do not perform compaction if the file is already being compacted or
168        // in rollback.
169        return 0;
170    }
171
172    threshold = elem->config.compaction_threshold;
173    if (elem->config.compaction_mode == FDB_COMPACTION_AUTO &&
174        threshold > 0)
175        {
176        filesize = filemgr_get_pos(elem->file);
177        active_data = _compactor_estimate_space(elem);
178        if (active_data == 0 || active_data >= filesize ||
179            filesize < elem->config.compaction_minimum_filesize) {
180            return 0;
181        }
182
183        return ((filesize / 100.0 * threshold) < (filesize - active_data));
184    } else {
185        return 0;
186    }
187}
188
189// return the location of '.'
190INLINE int _compactor_prefix_len(char *filename)
191{
192    int i;
193    int file_len = strlen(filename);
194    int prefix_len = 0;
195    // find the first '.'
196    for (i=file_len-1; i>=0; --i){
197        if (filename[i] == '.') {
198            prefix_len = i+1;
199            break;
200        }
201    }
202    return prefix_len;
203}
204
205// return the the location of '/' or '\'
206INLINE int _compactor_dir_len(char *filename)
207{
208    int i;
209    int file_len = strlen(filename);
210    int dir_len = 0;
211    // find the first '/' or '\'
212    for (i=file_len-1; i>=0; --i){
213        if (filename[i] == '/' || filename[i] == '\\') {
214            dir_len = i+1;
215            break;
216        }
217    }
218    return dir_len;
219}
220
221// copy from 'foo/bar.baz' to 'bar.baz'
222static void _strcpy_fname(char *dst, char *src)
223{
224    int dir_len = _compactor_dir_len(src);
225    strcpy(dst, src + dir_len);
226}
227
228// copy from 'foo/bar.baz' to 'foo/' (including '/')
229static void _strcpy_dirname(char *dst, char *src)
230{
231    int dir_len = _compactor_dir_len(src);
232    if (dir_len) {
233        strncpy(dst, src, dir_len);
234    }
235    // set NULL char
236    dst[dir_len] = 0;
237}
238
239// <example>
240// fname: 'foo.bar'
241// path: 'tmp/dir/other.file'
242// returned dst: 'tmp/dir/foo.bar'
243static void _reconstruct_path(char *dst, char *path, char *fname)
244{
245    _strcpy_dirname(dst, path);
246    strcat(dst + strlen(dst), fname);
247}
248
249static void _compactor_get_vfilename(char *filename, char *vfilename)
250{
251    int prefix_len = _compactor_prefix_len(filename);
252
253    if (prefix_len > 0) {
254        strncpy(vfilename, filename, prefix_len-1);
255        vfilename[prefix_len-1] = 0;
256    }
257}
258
259static void _compactor_convert_dbfile_to_metafile(char *dbfile, char *metafile)
260{
261    int prefix_len = _compactor_prefix_len(dbfile);
262
263    if (prefix_len > 0) {
264        strncpy(metafile, dbfile, prefix_len);
265        metafile[prefix_len] = 0;
266        strcat(metafile, "meta");
267    }
268}
269
270static bool _allDigit(char *str) {
271    int numchar = strlen(str);
272    for(int i = 0; i < numchar; ++i) {
273        if (str[i] < '0' || str[i] > '9') {
274            return false;
275        }
276    }
277    return true;
278}
279
280void compactor_get_next_filename(char *file, char *nextfile)
281{
282    int compaction_no = 0;
283    int prefix_len = _compactor_prefix_len(file);
284    char str_no[24];
285
286    if (prefix_len > 0 && _allDigit(file + prefix_len)) {
287        sscanf(file+prefix_len, "%d", &compaction_no);
288        strncpy(nextfile, file, prefix_len);
289        do {
290            nextfile[prefix_len] = 0;
291            sprintf(str_no, "%d", ++compaction_no);
292            strcat(nextfile, str_no);
293        } while (does_file_exist(nextfile));
294    } else {
295        do {
296            strcpy(nextfile, file);
297            sprintf(str_no, ".%d", ++compaction_no);
298            strcat(nextfile, str_no);
299        } while (does_file_exist(nextfile));
300    }
301}
302
303bool compactor_switch_compaction_flag(struct filemgr *file, bool flag)
304{
305    struct avl_node *a = NULL;
306    struct openfiles_elem query, *elem;
307
308    strcpy(query.filename, file->filename);
309    mutex_lock(&cpt_lock);
310    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
311    if (a) {
312        // found
313        elem = _get_entry(a, struct openfiles_elem, avl);
314        if (elem->compaction_flag == flag) {
315            // already switched by other thread .. return false
316            mutex_unlock(&cpt_lock);
317            return false;
318        }
319        // switch
320        elem->compaction_flag = flag;
321        mutex_unlock(&cpt_lock);
322        return true;
323    }
324    // file doesn't exist .. already compacted or deregistered
325    mutex_unlock(&cpt_lock);
326    return false;
327}
328
329void * compactor_thread(void *voidargs)
330{
331    char vfilename[MAX_FNAMELEN];
332    char new_filename[MAX_FNAMELEN];
333    fdb_file_handle *fhandle;
334    fdb_status fs;
335    struct avl_node *a;
336    struct openfiles_elem *elem;
337    struct openfiles_elem query;
338
339    // Sleep for 10 secs by default to allow applications to warm up their data.
340    // TODO: Need to implement more flexible way of scheduling the compaction
341    // daemon (e.g., public APIs to start / stop the compaction daemon).
342    mutex_lock(&sync_mutex);
343    thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
344    mutex_unlock(&sync_mutex);
345
346    while (1) {
347
348        mutex_lock(&cpt_lock);
349        a = avl_first(&openfiles);
350        while(a) {
351            elem = _get_entry(a, struct openfiles_elem, avl);
352            if (!elem->file) {
353                a = avl_next(a);
354                avl_remove(&openfiles, &elem->avl);
355                free(elem);
356                continue;
357            }
358
359            if (_compactor_is_threshold_satisfied(elem)) {
360                elem->daemon_compact_in_progress = true;
361                // set compaction flag
362                elem->compaction_flag = true;
363                mutex_unlock(&cpt_lock);
364                // Once 'daemon_compact_in_progress' is set to true, then it is safe to
365                // read the variables of 'elem' until the compaction is completed.
366                _compactor_get_vfilename(elem->filename, vfilename);
367
368                fs = fdb_open_for_compactor(&fhandle, vfilename, &elem->config,
369                                            elem->cmp_func_list);
370                if (fs == FDB_RESULT_SUCCESS) {
371                    compactor_get_next_filename(elem->filename, new_filename);
372                    fdb_compact_file(fhandle, new_filename, false, (bid_t) -1);
373                    fdb_close(fhandle);
374
375                    strcpy(query.filename, new_filename);
376                    mutex_lock(&cpt_lock);
377                    // Search the next file for compaction.
378                    a = avl_search_greater(&openfiles, &query.avl, _compactor_cmp);
379                } else {
380                    fdb_log(&fhandle->root->log_callback, fs,
381                            "Failed to open the file '%s' for auto daemon "
382                            "compaction.\n", vfilename);
383                    // fail to open file
384                    mutex_lock(&cpt_lock);
385                    a = avl_next(&elem->avl);
386                    elem->daemon_compact_in_progress = false;
387                    // clear compaction flag
388                    elem->compaction_flag = false;
389                }
390            } else {
391                a = avl_next(a);
392            }
393            if (compactor_terminate_signal) {
394                mutex_unlock(&cpt_lock);
395                return NULL;
396            }
397        }
398        mutex_unlock(&cpt_lock);
399
400        mutex_lock(&sync_mutex);
401        if (compactor_terminate_signal) {
402            mutex_unlock(&sync_mutex);
403            break;
404        }
405        thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
406        if (compactor_terminate_signal) {
407            mutex_unlock(&sync_mutex);
408            break;
409        }
410        mutex_unlock(&sync_mutex);
411    }
412    return NULL;
413}
414
415void compactor_init(struct compactor_config *config)
416{
417    if (!compactor_initialized) {
418        // Note that this function is synchronized by the spin lock in fdb_init API.
419        mutex_init(&cpt_lock);
420
421        mutex_lock(&cpt_lock);
422        if (!compactor_initialized) {
423            // initialize
424            compactor_args.strcmp_len = MAX_FNAMELEN;
425            avl_init(&openfiles, &compactor_args);
426
427            if (config) {
428                if (config->sleep_duration > 0) {
429                    sleep_duration = config->sleep_duration;
430                }
431            }
432
433            compactor_terminate_signal = 0;
434
435            mutex_init(&sync_mutex);
436            thread_cond_init(&sync_cond);
437
438            // create worker threads
439            num_compactor_threads = config->num_threads;
440            compactor_tids = (thread_t *) calloc(num_compactor_threads, sizeof(thread_t));
441            for (size_t i = 0; i < num_compactor_threads; ++i) {
442                thread_create(&compactor_tids[i], compactor_thread, NULL);
443            }
444
445            compactor_initialized = 1;
446        }
447        mutex_unlock(&cpt_lock);
448    }
449}
450
451void compactor_shutdown()
452{
453    void *ret;
454    struct avl_node *a = NULL;
455    struct openfiles_elem *elem;
456
457    // set terminate signal
458    mutex_lock(&sync_mutex);
459    compactor_terminate_signal = 1;
460    thread_cond_broadcast(&sync_cond);
461    mutex_unlock(&sync_mutex);
462
463    for (size_t i = 0; i < num_compactor_threads; ++i) {
464        thread_join(compactor_tids[i], &ret);
465    }
466    free(compactor_tids);
467
468    mutex_lock(&cpt_lock);
469    // free all elems in the tree
470    a = avl_first(&openfiles);
471    while (a) {
472        elem = _get_entry(a, struct openfiles_elem, avl);
473        a = avl_next(a);
474
475        avl_remove(&openfiles, &elem->avl);
476        free(elem);
477    }
478
479    sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
480    compactor_initialized = 0;
481    mutex_destroy(&sync_mutex);
482    thread_cond_destroy(&sync_cond);
483    mutex_unlock(&cpt_lock);
484
485    mutex_destroy(&cpt_lock);
486}
487
488static fdb_status _compactor_store_metafile(char *metafile,
489                                            struct compactor_meta *metadata,
490                                            err_log_callback *log_callback);
491
492fdb_status compactor_register_file(struct filemgr *file,
493                                   fdb_config *config,
494                                   struct list *cmp_func_list,
495                                   err_log_callback *log_callback)
496{
497    file_status_t fstatus;
498    fdb_status fs = FDB_RESULT_SUCCESS;
499    struct avl_node *a = NULL;
500    struct openfiles_elem query, *elem;
501
502    // Ignore files whose status is COMPACT_OLD or REMOVED_PENDING.
503    // Those files do not need to be compacted again.
504    fstatus = filemgr_get_file_status(file);
505    if (fstatus == FILE_COMPACT_OLD ||
506        fstatus == FILE_REMOVED_PENDING) {
507        return fs;
508    }
509
510    strcpy(query.filename, file->filename);
511    // first search the existing file
512    mutex_lock(&cpt_lock);
513    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
514    if (a == NULL) {
515        // doesn't exist
516        // create elem and insert into tree
517        char path[MAX_FNAMELEN];
518        struct compactor_meta meta;
519
520        elem = (struct openfiles_elem *)malloc(sizeof(struct openfiles_elem));
521        strcpy(elem->filename, file->filename);
522        elem->file = file;
523        elem->config = *config;
524        elem->register_count = 1;
525        elem->compaction_flag = false;
526        elem->daemon_compact_in_progress = false;
527        elem->cmp_func_list = cmp_func_list;
528        avl_insert(&openfiles, &elem->avl, _compactor_cmp);
529        mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as
530                                 // subsequent registration attempts for the same file
531                                 // will be simply processed by incrementing its
532                                 // counter below.
533
534        // store in metafile
535        _compactor_convert_dbfile_to_metafile(file->filename, path);
536        _strcpy_fname(meta.filename, file->filename);
537        fs = _compactor_store_metafile(path, &meta, log_callback);
538    } else {
539        // already exists
540        elem = _get_entry(a, struct openfiles_elem, avl);
541        if (!elem->file) {
542            elem->file = file;
543        }
544        elem->register_count++;
545        mutex_unlock(&cpt_lock);
546    }
547    return fs;
548}
549
550void compactor_deregister_file(struct filemgr *file)
551{
552    struct avl_node *a = NULL;
553    struct openfiles_elem query, *elem;
554
555    strcpy(query.filename, file->filename);
556    mutex_lock(&cpt_lock);
557    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
558    if (a) {
559        elem = _get_entry(a, struct openfiles_elem, avl);
560        if ((--elem->register_count) == 0) {
561            // if no handle refers this file
562            if (elem->daemon_compact_in_progress) {
563                // This file is waiting for compaction by compactor (but not opened
564                // yet). Do not remove 'elem' for now. The 'elem' will be automatically
565                // replaced after the compaction is done by calling
566                // 'compactor_switch_file()'. However, elem->file should be set to NULL
567                // in order to be removed from the AVL tree in case of the compaction
568                // failure.
569                elem->file = NULL;
570            } else {
571                // remove from the tree
572                avl_remove(&openfiles, &elem->avl);
573                free(elem);
574            }
575        }
576    }
577    mutex_unlock(&cpt_lock);
578}
579
580void compactor_change_threshold(struct filemgr *file, size_t new_threshold)
581{
582    struct avl_node *a = NULL;
583    struct openfiles_elem query, *elem;
584
585    strcpy(query.filename, file->filename);
586    mutex_lock(&cpt_lock);
587    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
588    if (a) {
589        elem = _get_entry(a, struct openfiles_elem, avl);
590        elem->config.compaction_threshold = new_threshold;
591    }
592    mutex_unlock(&cpt_lock);
593}
594
595struct compactor_meta * _compactor_read_metafile(char *metafile,
596                                                 struct compactor_meta *metadata,
597                                                 err_log_callback *log_callback)
598{
599    int fd_meta, fd_db;
600    ssize_t ret;
601    uint8_t *buf = alca(uint8_t, sizeof(struct compactor_meta));
602    uint32_t crc;
603    char fullpath[MAX_FNAMELEN];
604    struct filemgr_ops *ops;
605    struct compactor_meta meta;
606
607    ops = get_filemgr_ops();
608    fd_meta = ops->open(metafile, O_RDONLY, 0644);
609
610    if (fd_meta >= 0) {
611        // metafile exists .. read metadata
612        ret = ops->pread(fd_meta, buf, sizeof(struct compactor_meta), 0);
613        if (ret < 0 || (size_t)ret < sizeof(struct compactor_meta)) {
614            char errno_msg[512];
615            ops->get_errno_str(errno_msg, 512);
616            fdb_log(log_callback, (fdb_status) ret,
617                    "Failed to read the meta file '%s', errno_message: %s\n",
618                    metafile, errno_msg);
619            ret = ops->close(fd_meta);
620            if (ret < 0) {
621                ops->get_errno_str(errno_msg, 512);
622                fdb_log(log_callback, (fdb_status) ret,
623                        "Failed to close the meta file '%s', errno_message: %s\n",
624                        metafile, errno_msg);
625            }
626            return NULL;
627        }
628        memcpy(&meta, buf, sizeof(struct compactor_meta));
629        meta.version = _endian_decode(meta.version);
630        meta.crc = _endian_decode(meta.crc);
631        ops->close(fd_meta);
632
633        // CRC check
634        crc = chksum(buf, sizeof(struct compactor_meta) - sizeof(crc));
635        if (crc != meta.crc) {
636            fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
637                    "Checksum mismatch in the meta file '%s'\n", metafile);
638            return NULL;
639        }
640        // check if the file exists
641        _reconstruct_path(fullpath, metafile, meta.filename);
642        fd_db = ops->open(fullpath, O_RDONLY, 0644);
643        if (fd_db < 0) {
644            // file doesn't exist
645            return NULL;
646        }
647        ops->close(fd_db);
648    } else {
649        // file doesn't exist
650        return NULL;
651    }
652
653    *metadata = meta;
654    return metadata;
655}
656
657static fdb_status _compactor_store_metafile(char *metafile,
658                                            struct compactor_meta *metadata,
659                                            err_log_callback *log_callback)
660{
661    int fd_meta;
662    ssize_t ret;
663    uint32_t crc;
664    struct filemgr_ops *ops;
665    struct compactor_meta meta;
666
667    ops = get_filemgr_ops();
668    fd_meta = ops->open(metafile, O_RDWR | O_CREAT, 0644);
669
670    if (fd_meta >= 0){
671        meta.version = _endian_encode(COMPACTOR_META_VERSION);
672        strcpy(meta.filename, metadata->filename);
673        crc = chksum((void*)&meta, sizeof(struct compactor_meta) - sizeof(crc));
674        meta.crc = _endian_encode(crc);
675
676        char errno_msg[512];
677        ret = ops->pwrite(fd_meta, &meta, sizeof(struct compactor_meta), 0);
678        if (ret < 0 || (size_t)ret < sizeof(struct compactor_meta)) {
679            ops->get_errno_str(errno_msg, 512);
680            fdb_log(log_callback, (fdb_status) ret,
681                    "Failed to perform a write in the meta file '%s', "
682                    "errno_message: %s\n", metafile, errno_msg);
683            ops->close(fd_meta);
684            return FDB_RESULT_WRITE_FAIL;
685        }
686        ret = ops->fsync(fd_meta);
687        if (ret < 0) {
688            ops->get_errno_str(errno_msg, 512);
689            fdb_log(log_callback, (fdb_status) ret,
690                    "Failed to perform a sync in the meta file '%s', "
691                    "errno_message: %s\n", metafile, errno_msg);
692            ops->close(fd_meta);
693            return FDB_RESULT_FSYNC_FAIL;
694        }
695        ops->close(fd_meta);
696    } else {
697        return FDB_RESULT_OPEN_FAIL;
698    }
699
700    return FDB_RESULT_SUCCESS;
701}
702
703void compactor_switch_file(struct filemgr *old_file, struct filemgr *new_file,
704                           err_log_callback *log_callback)
705{
706    struct avl_node *a = NULL;
707    struct openfiles_elem query, *elem;
708    struct compactor_meta meta;
709
710    strcpy(query.filename, old_file->filename);
711    mutex_lock(&cpt_lock);
712    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
713    if (a) {
714        char metafile[MAX_FNAMELEN];
715        fdb_compaction_mode_t comp_mode;
716
717        elem = _get_entry(a, struct openfiles_elem, avl);
718        avl_remove(&openfiles, a);
719        strcpy(elem->filename, new_file->filename);
720        elem->file = new_file;
721        elem->register_count = 1;
722        elem->daemon_compact_in_progress = false;
723        // clear compaction flag
724        elem->compaction_flag = false;
725        avl_insert(&openfiles, &elem->avl, _compactor_cmp);
726        comp_mode = elem->config.compaction_mode;
727        mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as we don't
728                                 // expect more than one compaction task completion for
729                                 // the same file.
730
731        if (comp_mode == FDB_COMPACTION_AUTO) {
732            _compactor_convert_dbfile_to_metafile(new_file->filename, metafile);
733            _strcpy_fname(meta.filename, new_file->filename);
734            _compactor_store_metafile(metafile, &meta, log_callback);
735        }
736    } else {
737        mutex_unlock(&cpt_lock);
738    }
739}
740
741void compactor_get_virtual_filename(const char *filename,
742                                    char *virtual_filename)
743{
744    int prefix_len = _compactor_prefix_len((char*)filename) - 1;
745    if (prefix_len > 0) {
746        strncpy(virtual_filename, filename, prefix_len);
747        virtual_filename[prefix_len] = 0;
748    } else {
749        strcpy(virtual_filename, filename);
750    }
751}
752
753fdb_status compactor_get_actual_filename(const char *filename,
754                                         char *actual_filename,
755                                         fdb_compaction_mode_t comp_mode,
756                                         err_log_callback *log_callback)
757{
758    int i;
759    int filename_len;
760    int dirname_len;
761    int compaction_no, max_compaction_no = -1;
762    char path[MAX_FNAMELEN];
763    char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
764    char ret_name[MAX_FNAMELEN];
765    fdb_status fs = FDB_RESULT_SUCCESS;
766    struct compactor_meta meta, *meta_ptr;
767
768    // get actual filename from metafile
769    sprintf(path, "%s.meta", filename);
770    meta_ptr = _compactor_read_metafile(path, &meta, log_callback);
771
772    if (meta_ptr == NULL) {
773        if (comp_mode == FDB_COMPACTION_MANUAL && does_file_exist(filename)) {
774            strcpy(actual_filename, filename);
775            return FDB_RESULT_SUCCESS;
776        }
777
778        // error handling .. scan directory
779        // backward search until find the first '/' or '\' (Windows)
780        filename_len = strlen(filename);
781        dirname_len = 0;
782
783#if !defined(WIN32) && !defined(_WIN32)
784        DIR *dir_info;
785        struct dirent *dir_entry;
786
787        for (i=filename_len-1; i>=0; --i){
788            if (filename[i] == '/') {
789                dirname_len = i+1;
790                break;
791            }
792        }
793
794        if (dirname_len > 0) {
795            strncpy(dirname, filename, dirname_len);
796            dirname[dirname_len] = 0;
797        } else {
798            strcpy(dirname, ".");
799        }
800        strcpy(prefix, filename + dirname_len);
801        strcat(prefix, ".");
802
803        dir_info = opendir(dirname);
804        if (dir_info != NULL) {
805            while ((dir_entry = readdir(dir_info))) {
806                if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
807                    compaction_no = -1;
808                    sscanf(dir_entry->d_name + strlen(prefix), "%d", &compaction_no);
809                    if (compaction_no >= 0) {
810                        if (compaction_no > max_compaction_no) {
811                            max_compaction_no = compaction_no;
812                        }
813                    }
814                }
815            }
816            closedir(dir_info);
817        }
818#else
819        // Windows
820        for (i=filename_len-1; i>=0; --i){
821            if (filename[i] == '/' || filename[i] == '\\') {
822                dirname_len = i+1;
823                break;
824            }
825        }
826
827        if (dirname_len > 0) {
828            strncpy(dirname, filename, dirname_len);
829            dirname[dirname_len] = 0;
830        } else {
831            strcpy(dirname, ".");
832        }
833        strcpy(prefix, filename + dirname_len);
834        strcat(prefix, ".");
835
836        WIN32_FIND_DATA filedata;
837        HANDLE hfind;
838        char query_str[MAX_FNAMELEN];
839
840        // find all files start with 'prefix'
841        sprintf(query_str, "%s*", prefix);
842        hfind = FindFirstFile(query_str, &filedata);
843        while (hfind != INVALID_HANDLE_VALUE) {
844            if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
845                compaction_no = -1;
846                sscanf(filedata.cFileName + strlen(prefix), "%d", &compaction_no);
847                if (compaction_no >= 0) {
848                    if (compaction_no > max_compaction_no) {
849                        max_compaction_no = compaction_no;
850                    }
851                }
852            }
853
854            if (!FindNextFile(hfind, &filedata)) {
855                FindClose(hfind);
856                hfind = INVALID_HANDLE_VALUE;
857            }
858        }
859
860#endif
861
862        if (max_compaction_no < 0) {
863            if (comp_mode == FDB_COMPACTION_AUTO) {
864                // DB files with a revision number are not found.
865                // initialize filename to '[filename].0'
866                sprintf(ret_name, "%s.0", filename);
867            } else { // Manual compaction mode.
868                // Simply use the file name passed to this function.
869                strcpy(actual_filename, filename);
870                return FDB_RESULT_SUCCESS;
871            }
872        } else {
873            // return the file that has the largest compaction number
874            sprintf(ret_name, "%s.%d", filename, max_compaction_no);
875            fs = FDB_RESULT_SUCCESS;
876        }
877        if (fs == FDB_RESULT_SUCCESS) {
878            strcpy(actual_filename, ret_name);
879        }
880        return fs;
881
882    } else {
883        // metadata is successfully read from the metafile .. just return the filename
884        _reconstruct_path(ret_name, (char*)filename, meta.filename);
885        strcpy(actual_filename, ret_name);
886        return FDB_RESULT_SUCCESS;
887    }
888}
889
890bool compactor_is_valid_mode(const char *filename, fdb_config *config)
891{
892    int fd;
893    char path[MAX_FNAMELEN];
894    struct filemgr_ops *ops;
895
896    ops = get_filemgr_ops();
897
898    if (config->compaction_mode == FDB_COMPACTION_AUTO) {
899        // auto compaction mode: invalid when
900        // the file '[filename]' exists
901        fd = ops->open(filename, O_RDONLY, 0644);
902        if (fd != FDB_RESULT_NO_SUCH_FILE) {
903            ops->close(fd);
904            return false;
905        }
906
907    } else if (config->compaction_mode == FDB_COMPACTION_MANUAL) {
908        // manual compaction mode: invalid when
909        // the file '[filename].meta' exists
910        sprintf(path, "%s.meta", filename);
911        fd = ops->open(path, O_RDONLY, 0644);
912        if (fd != FDB_RESULT_NO_SUCH_FILE) {
913            ops->close(fd);
914            return false;
915        }
916
917    } else {
918        // unknown mode
919        return false;
920    }
921
922    return true;
923}
924
925static fdb_status _compactor_search_n_destroy(const char *filename)
926{
927    int i;
928    int filename_len;
929    int dirname_len;
930    char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
931    fdb_status fs = FDB_RESULT_SUCCESS;
932
933    // error handling .. scan directory
934    // backward search until find the first '/' or '\' (Windows)
935    filename_len = strlen(filename);
936    dirname_len = 0;
937
938#if !defined(WIN32) && !defined(_WIN32)
939    DIR *dir_info;
940    struct dirent *dir_entry;
941
942    for (i=filename_len-1; i>=0; --i){
943        if (filename[i] == '/') {
944            dirname_len = i+1;
945            break;
946        }
947    }
948
949    if (dirname_len > 0) {
950        strncpy(dirname, filename, dirname_len);
951        dirname[dirname_len] = 0;
952    } else {
953        strcpy(dirname, ".");
954    }
955    strcpy(prefix, filename + dirname_len);
956    strcat(prefix, ".");
957
958    dir_info = opendir(dirname);
959    if (dir_info != NULL) {
960        while ((dir_entry = readdir(dir_info))) {
961            if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
962                // Need to check filemgr for possible open entry?
963                if (remove(dir_entry->d_name)) {
964                    fs = FDB_RESULT_FILE_REMOVE_FAIL;
965                    closedir(dir_info);
966                    return fs;
967                }
968            }
969        }
970        closedir(dir_info);
971    }
972#else
973    // Windows
974    for (i=filename_len-1; i>=0; --i){
975        if (filename[i] == '/' || filename[i] == '\\') {
976            dirname_len = i+1;
977            break;
978        }
979    }
980
981    if (dirname_len > 0) {
982        strncpy(dirname, filename, dirname_len);
983        dirname[dirname_len] = 0;
984    } else {
985        strcpy(dirname, ".");
986    }
987    strcpy(prefix, filename + dirname_len);
988    strcat(prefix, ".");
989
990    WIN32_FIND_DATA filedata;
991    HANDLE hfind;
992    char query_str[MAX_FNAMELEN];
993
994    // find all files start with 'prefix'
995    sprintf(query_str, "%s*", prefix);
996    hfind = FindFirstFile(query_str, &filedata);
997    while (hfind != INVALID_HANDLE_VALUE) {
998        if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
999            // Need to check filemgr for possible open entry?
1000            if (remove(filedata.cFileName)) {
1001                fs = FDB_RESULT_FILE_REMOVE_FAIL;
1002                FindClose(hfind);
1003                hfind = INVALID_HANDLE_VALUE;
1004                return fs;
1005            }
1006        }
1007
1008        if (!FindNextFile(hfind, &filedata)) {
1009            FindClose(hfind);
1010            hfind = INVALID_HANDLE_VALUE;
1011        }
1012    }
1013
1014#endif
1015    return fs;
1016}
1017
1018fdb_status compactor_destroy_file(char *filename,
1019                                  fdb_config *config)
1020{
1021    struct avl_node *a = NULL;
1022    struct openfiles_elem query, *elem;
1023    size_t strcmp_len;
1024    fdb_status status = FDB_RESULT_SUCCESS;
1025    compactor_config c_config;
1026
1027    strcmp_len = strlen(filename);
1028    filename[strcmp_len] = '.'; // add a . suffix in place
1029    strcmp_len++;
1030    filename[strcmp_len] = '\0';
1031    strcpy(query.filename, filename);
1032
1033    c_config.sleep_duration = config->compactor_sleep_duration;
1034    c_config.num_threads = config->num_compactor_threads;
1035    compactor_init(&c_config);
1036
1037    mutex_lock(&cpt_lock);
1038    compactor_args.strcmp_len = strcmp_len; // Do prefix match for all vers
1039    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
1040    if (a) {
1041        elem = _get_entry(a, struct openfiles_elem, avl);
1042        // if no handle refers this file
1043        if (elem->daemon_compact_in_progress) {
1044            // This file is waiting for compaction by compactor
1045            // Return a temporary failure, user must retry after sometime
1046            status = FDB_RESULT_IN_USE_BY_COMPACTOR;
1047        } else { // File handle not closed, fail operation
1048            status = FDB_RESULT_FILE_IS_BUSY;
1049        }
1050    }
1051
1052    compactor_args.strcmp_len = MAX_FNAMELEN; // restore for normal compare
1053    mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as file
1054                             // deletions doesn't require strict synchronization.
1055    filename[strcmp_len - 1] = '\0'; // restore the filename
1056    if (status == FDB_RESULT_SUCCESS) {
1057        status = _compactor_search_n_destroy(filename);
1058    }
1059
1060    return status;
1061}
1062