xref: /6.0.3/forestdb/src/compactor.cc (revision b495fd36)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#if !defined(WIN32) && !defined(_WIN32)
23#include <sys/time.h>
24#include <dirent.h>
25#include <unistd.h>
26#endif
27
28#include "libforestdb/forestdb.h"
29#include "fdb_internal.h"
30#include "filemgr.h"
31#include "avltree.h"
32#include "list.h"
33#include "common.h"
34#include "filemgr_ops.h"
35#include "configuration.h"
36#include "internal_types.h"
37#include "compactor.h"
38#include "wal.h"
39#include "memleak.h"
40#include "time_utils.h"
41
42#ifdef __DEBUG
43#ifndef __DEBUG_CPT
44    #undef DBG
45    #undef DBGCMD
46    #undef DBGSW
47    #define DBG(...)
48    #define DBGCMD(...)
49    #define DBGSW(n, ...)
50#endif
51#endif
52
53#define COMPACTOR_META_VERSION (1)
54#define MAX_FNAMELEN (FDB_MAX_FILENAME_LEN)
55
56// variables for initialization
57static volatile uint8_t compactor_initialized = 0;
58mutex_t cpt_lock;
59
60static size_t num_compactor_threads = DEFAULT_NUM_COMPACTOR_THREADS;
61static thread_t *compactor_tids = NULL;
62
63
64static size_t sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
65
66static mutex_t sync_mutex;
67static thread_cond_t sync_cond;
68
69static atomic_uint8_t compactor_terminate_signal(0);
70
71static struct avl_tree openfiles;
72
73struct openfiles_elem {
74    char filename[MAX_FNAMELEN];
75    struct filemgr *file;
76    fdb_config config;
77    uint32_t register_count;
78    bool compaction_flag; // set when the file is being compacted
79    bool daemon_compact_in_progress;
80    bool removal_activated;
81    err_log_callback *log_callback;
82    struct avl_node avl;
83    struct timeval last_compaction_timestamp;
84    size_t interval;
85};
86
87struct compactor_args_t {
88    // void *aux; (reserved for future use)
89    size_t strcmp_len; // Used to search for prefix match
90};
91static struct compactor_args_t compactor_args;
92
93struct compactor_meta{
94    uint32_t version;
95    char filename[MAX_FNAMELEN];
96    uint32_t crc;
97};
98
99#if !defined(WIN32) && !defined(_WIN32)
100static bool does_file_exist(const char *filename) {
101    struct stat st;
102    int result = stat(filename, &st);
103    return result == 0;
104}
105#else
106static bool does_file_exist(const char *filename) {
107    return GetFileAttributes(filename) != INVALID_FILE_ATTRIBUTES;
108}
109#endif
110
111// compares file names
112int _compactor_cmp(struct avl_node *a, struct avl_node *b, void *aux)
113{
114    struct openfiles_elem *aa, *bb;
115    struct compactor_args_t *args = (struct compactor_args_t *)aux;
116    aa = _get_entry(a, struct openfiles_elem, avl);
117    bb = _get_entry(b, struct openfiles_elem, avl);
118    return strncmp(aa->filename, bb->filename, args->strcmp_len);
119}
120
121INLINE uint64_t _compactor_estimate_space(struct openfiles_elem *elem)
122{
123    uint64_t ret = 0;
124    uint64_t datasize;
125    uint64_t nlivenodes;
126
127    datasize = _kvs_stat_get_sum(elem->file, KVS_STAT_DATASIZE);
128    nlivenodes = _kvs_stat_get_sum(elem->file, KVS_STAT_NLIVENODES);
129
130    ret = datasize;
131    ret += nlivenodes * elem->config.blocksize;
132    ret += wal_get_datasize(elem->file);
133
134    return ret;
135}
136
137// check if the compaction threshold is satisfied
138INLINE bool _compactor_is_threshold_satisfied(struct openfiles_elem *elem)
139{
140    uint64_t filesize;
141    uint64_t active_data;
142    int threshold;
143
144    if (elem->compaction_flag || filemgr_is_rollback_on(elem->file)) {
145        // do not perform compaction if the file is already being compacted or
146        // in rollback.
147        return false;
148    }
149
150    struct timeval curr_time, gap;
151    gettimeofday(&curr_time, NULL);
152    gap = _utime_gap(elem->last_compaction_timestamp, curr_time);
153    uint64_t elapsed_us = (uint64_t)gap.tv_sec * 1000000 + gap.tv_usec;
154    if (elapsed_us < (elem->interval * 1000000)) {
155        return false;
156    }
157
158    threshold = elem->config.compaction_threshold;
159    if (elem->config.compaction_mode == FDB_COMPACTION_AUTO &&
160        threshold > 0)
161        {
162        filesize = filemgr_get_pos(elem->file);
163        active_data = _compactor_estimate_space(elem);
164        if (active_data == 0 || active_data >= filesize ||
165            filesize < elem->config.compaction_minimum_filesize) {
166            return false;
167        }
168
169        return ((filesize / 100.0 * threshold) < (filesize - active_data));
170    } else {
171        return false;
172    }
173}
174
175// check if the file is waiting for being removed
176INLINE bool _compactor_check_file_removal(struct openfiles_elem *elem)
177{
178    if (elem->file->fflags & FILEMGR_REMOVAL_IN_PROG &&
179        !elem->removal_activated) {
180        return true;
181    }
182    return false;
183}
184
185// check if background file deletion is done
186bool compactor_is_file_removed(const char *filename)
187{
188    struct avl_node *a;
189    struct openfiles_elem query;
190
191    strcpy(query.filename, filename);
192    mutex_lock(&cpt_lock);
193    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
194    mutex_unlock(&cpt_lock);
195    if (a) {
196        // exist .. old file is not removed yet
197        return false;
198    }
199    return true;
200}
201
202// return the location of '.'
203INLINE int _compactor_prefix_len(char *filename)
204{
205    int i;
206    int file_len = strlen(filename);
207    int prefix_len = 0;
208    // find the first '.'
209    for (i=file_len-1; i>=0; --i){
210        if (filename[i] == '.') {
211            prefix_len = i+1;
212            break;
213        }
214    }
215    return prefix_len;
216}
217
218// return the the location of '/' or '\'
219INLINE int _compactor_dir_len(char *filename)
220{
221    int i;
222    int file_len = strlen(filename);
223    int dir_len = 0;
224    // find the first '/' or '\'
225    for (i=file_len-1; i>=0; --i){
226        if (filename[i] == '/' || filename[i] == '\\') {
227            dir_len = i+1;
228            break;
229        }
230    }
231    return dir_len;
232}
233
234// copy from 'foo/bar.baz' to 'bar.baz'
235static void _strcpy_fname(char *dst, char *src)
236{
237    int dir_len = _compactor_dir_len(src);
238    strcpy(dst, src + dir_len);
239}
240
241// copy from 'foo/bar.baz' to 'foo/' (including '/')
242static void _strcpy_dirname(char *dst, char *src)
243{
244    int dir_len = _compactor_dir_len(src);
245    if (dir_len) {
246        strncpy(dst, src, dir_len);
247    }
248    // set NULL char
249    dst[dir_len] = 0;
250}
251
252// <example>
253// fname: 'foo.bar'
254// path: 'tmp/dir/other.file'
255// returned dst: 'tmp/dir/foo.bar'
256static void _reconstruct_path(char *dst, char *path, char *fname)
257{
258    _strcpy_dirname(dst, path);
259    strcat(dst + strlen(dst), fname);
260}
261
262static void _compactor_get_vfilename(char *filename, char *vfilename)
263{
264    int prefix_len = _compactor_prefix_len(filename);
265
266    if (prefix_len > 0) {
267        strncpy(vfilename, filename, prefix_len-1);
268        vfilename[prefix_len-1] = 0;
269    }
270}
271
272static void _compactor_convert_dbfile_to_metafile(char *dbfile, char *metafile)
273{
274    int prefix_len = _compactor_prefix_len(dbfile);
275
276    if (prefix_len > 0) {
277        strncpy(metafile, dbfile, prefix_len);
278        metafile[prefix_len] = 0;
279        strcat(metafile, "meta");
280    }
281}
282
283static bool _allDigit(char *str) {
284    int numchar = strlen(str);
285    for(int i = 0; i < numchar; ++i) {
286        if (str[i] < '0' || str[i] > '9') {
287            return false;
288        }
289    }
290    return true;
291}
292
293void compactor_get_next_filename(char *file, char *nextfile)
294{
295    int compaction_no = 0;
296    int prefix_len = _compactor_prefix_len(file);
297    char str_no[24];
298
299    if (prefix_len > 0 && _allDigit(file + prefix_len)) {
300        sscanf(file+prefix_len, "%d", &compaction_no);
301        strncpy(nextfile, file, prefix_len);
302        do {
303            nextfile[prefix_len] = 0;
304            sprintf(str_no, "%d", ++compaction_no);
305            strcat(nextfile, str_no);
306        } while (does_file_exist(nextfile));
307    } else {
308        do {
309            strcpy(nextfile, file);
310            sprintf(str_no, ".%d", ++compaction_no);
311            strcat(nextfile, str_no);
312        } while (does_file_exist(nextfile));
313    }
314}
315
316bool compactor_switch_compaction_flag(struct filemgr *file, bool flag)
317{
318    struct avl_node *a = NULL;
319    struct openfiles_elem query, *elem;
320
321    strcpy(query.filename, file->filename);
322    mutex_lock(&cpt_lock);
323    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
324    if (a) {
325        // found
326        elem = _get_entry(a, struct openfiles_elem, avl);
327        if (elem->compaction_flag == flag) {
328            // already switched by other thread .. return false
329            mutex_unlock(&cpt_lock);
330            return false;
331        }
332        // switch
333        elem->compaction_flag = flag;
334        mutex_unlock(&cpt_lock);
335        return true;
336    }
337    // file doesn't exist .. already compacted or deregistered
338    mutex_unlock(&cpt_lock);
339    return false;
340}
341
342void * compactor_thread(void *voidargs)
343{
344    char vfilename[MAX_FNAMELEN];
345    char new_filename[MAX_FNAMELEN];
346    fdb_file_handle *fhandle;
347    fdb_status fs;
348    struct avl_node *a;
349    struct openfiles_elem *elem;
350    struct openfiles_elem query;
351
352    // Sleep for a configured period by default to allow applications to warm up their data.
353    // TODO: Need to implement more flexible way of scheduling the compaction
354    // daemon (e.g., public APIs to start / stop the compaction daemon).
355    mutex_lock(&sync_mutex);
356    if (compactor_terminate_signal) {
357        mutex_unlock(&sync_mutex);
358        return NULL;
359    }
360    thread_cond_timedwait(&sync_cond, &sync_mutex, sleep_duration * 1000);
361    mutex_unlock(&sync_mutex);
362
363    while (1) {
364
365        mutex_lock(&cpt_lock);
366        a = avl_first(&openfiles);
367        while(a) {
368            elem = _get_entry(a, struct openfiles_elem, avl);
369            if (!elem->file) {
370                a = avl_next(a);
371                avl_remove(&openfiles, &elem->avl);
372                free(elem);
373                continue;
374            }
375
376            if (_compactor_is_threshold_satisfied(elem)) {
377
378                elem->daemon_compact_in_progress = true;
379                // set compaction flag
380                elem->compaction_flag = true;
381                mutex_unlock(&cpt_lock);
382                // Once 'daemon_compact_in_progress' is set to true, then it is safe to
383                // read the variables of 'elem' until the compaction is completed.
384                _compactor_get_vfilename(elem->filename, vfilename);
385
386                // Get the list of custom compare functions.
387                struct list cmp_func_list;
388                list_init(&cmp_func_list);
389                fdb_cmp_func_list_from_filemgr(elem->file, &cmp_func_list);
390                fs = fdb_open_for_compactor(&fhandle, vfilename, &elem->config,
391                                           &cmp_func_list);
392                fdb_free_cmp_func_list(&cmp_func_list);
393
394                if (fs == FDB_RESULT_SUCCESS) {
395                    compactor_get_next_filename(elem->filename, new_filename);
396                    fdb_compact_file(fhandle, new_filename, false, (bid_t) -1,
397                                     false, NULL);
398                    fdb_close(fhandle);
399
400                    strcpy(query.filename, new_filename);
401                    mutex_lock(&cpt_lock);
402                    // Search the next file for compaction.
403                    a = avl_search_greater(&openfiles, &query.avl, _compactor_cmp);
404                } else {
405                    // As a workaround for MB-17009, call fprintf instead of fdb_log
406                    // until c->cgo->go callback trace issue is resolved.
407                    fprintf(stderr,
408                            "Error status code: %d, Failed to open the file "
409                            "'%s' for auto daemon compaction.\n",
410                            fs, vfilename);
411                    // fail to open file
412                    mutex_lock(&cpt_lock);
413                    a = avl_next(&elem->avl);
414                    elem->daemon_compact_in_progress = false;
415                    // clear compaction flag
416                    elem->compaction_flag = false;
417                }
418
419            } else if (_compactor_check_file_removal(elem)) {
420
421                // remove file
422                int ret;
423
424                // set activation flag to prevent other compactor threads attempt
425                // to remove the same file and double free the 'elem' structure,
426                // during 'cpt_lock' is released.
427                elem->removal_activated = true;
428
429                mutex_unlock(&cpt_lock);
430                // As the file is already unlinked, just close it.
431                ret = elem->file->ops->close(elem->file->fd);
432#if defined(WIN32) || defined(_WIN32)
433                // For Windows, we need to manually remove the file.
434                ret = remove(elem->file->filename);
435#endif
436                filemgr_remove_all_buffer_blocks(elem->file);
437                mutex_lock(&cpt_lock);
438
439                if (elem->log_callback && ret != 0) {
440                    char errno_msg[512];
441                    elem->file->ops->get_errno_str(errno_msg, 512);
442                    // As a workaround for MB-17009, call fprintf instead of fdb_log
443                    // until c->cgo->go callback trace issue is resolved.
444                    fprintf(stderr,
445                            "Error status code: %d, Error in REMOVE on a "
446                            "database file '%s', %s",
447                            ret, elem->file->filename, errno_msg);
448                }
449
450                // free filemgr structure
451                filemgr_free_func(&elem->file->e);
452                // remove & free elem
453                a = avl_next(a);
454                avl_remove(&openfiles, &elem->avl);
455                free(elem);
456
457            } else {
458
459                // next
460                a = avl_next(a);
461
462            }
463            if (compactor_terminate_signal) {
464                mutex_unlock(&cpt_lock);
465                return NULL;
466            }
467        }
468        mutex_unlock(&cpt_lock);
469
470        mutex_lock(&sync_mutex);
471        if (compactor_terminate_signal) {
472            mutex_unlock(&sync_mutex);
473            break;
474        }
475        // As each database file can be opened at different times, we need to
476        // wake up each compaction thread with a shorter interval to check if
477        // the time since the last compaction of a given file is already passed
478        // by a configured compaction interval and consequently the file should
479        // be compacted or not.
480        thread_cond_timedwait(&sync_cond, &sync_mutex, 15 * 1000); // Wait for 15 secs.
481        if (compactor_terminate_signal) {
482            mutex_unlock(&sync_mutex);
483            break;
484        }
485        mutex_unlock(&sync_mutex);
486    }
487    return NULL;
488}
489
490void compactor_init(struct compactor_config *config)
491{
492    if (!compactor_initialized) {
493        // Note that this function is synchronized by the spin lock in fdb_init API.
494        mutex_init(&cpt_lock);
495
496        mutex_lock(&cpt_lock);
497        if (!compactor_initialized) {
498            // initialize
499            compactor_args.strcmp_len = MAX_FNAMELEN;
500            avl_init(&openfiles, &compactor_args);
501
502            if (config) {
503                if (config->sleep_duration > 0) {
504                    sleep_duration = config->sleep_duration;
505                }
506            }
507
508            compactor_terminate_signal = 0;
509
510            mutex_init(&sync_mutex);
511            thread_cond_init(&sync_cond);
512
513            // create worker threads
514            num_compactor_threads = config->num_threads;
515            compactor_tids = (thread_t *) calloc(num_compactor_threads, sizeof(thread_t));
516            for (size_t i = 0; i < num_compactor_threads; ++i) {
517                thread_create(&compactor_tids[i], compactor_thread, NULL);
518            }
519
520            compactor_initialized = 1;
521        }
522        mutex_unlock(&cpt_lock);
523    }
524}
525
526void compactor_shutdown()
527{
528    void *ret;
529    struct avl_node *a = NULL;
530    struct openfiles_elem *elem;
531
532    // set terminate signal
533    mutex_lock(&sync_mutex);
534    compactor_terminate_signal = 1;
535    thread_cond_broadcast(&sync_cond);
536    mutex_unlock(&sync_mutex);
537
538    for (size_t i = 0; i < num_compactor_threads; ++i) {
539        thread_join(compactor_tids[i], &ret);
540    }
541    free(compactor_tids);
542
543    mutex_lock(&cpt_lock);
544    // free all elems in the tree
545    a = avl_first(&openfiles);
546    while (a) {
547        elem = _get_entry(a, struct openfiles_elem, avl);
548        a = avl_next(a);
549
550        if (_compactor_check_file_removal(elem)) {
551            // remove file if removal is pended.
552            remove(elem->file->filename);
553            filemgr_free_func(&elem->file->e);
554        }
555
556        avl_remove(&openfiles, &elem->avl);
557        free(elem);
558    }
559
560    sleep_duration = FDB_COMPACTOR_SLEEP_DURATION;
561    compactor_initialized = 0;
562    mutex_destroy(&sync_mutex);
563    thread_cond_destroy(&sync_cond);
564    mutex_unlock(&cpt_lock);
565
566    mutex_destroy(&cpt_lock);
567}
568
569static fdb_status _compactor_store_metafile(char *metafile,
570                                            struct compactor_meta *metadata,
571                                            err_log_callback *log_callback);
572
573fdb_status compactor_register_file(struct filemgr *file,
574                                   fdb_config *config,
575                                   err_log_callback *log_callback)
576{
577    file_status_t fstatus;
578    fdb_status fs = FDB_RESULT_SUCCESS;
579    struct avl_node *a = NULL;
580    struct openfiles_elem query, *elem;
581
582    // Ignore files whose status is COMPACT_OLD or REMOVED_PENDING.
583    // Those files do not need to be compacted again.
584    fstatus = filemgr_get_file_status(file);
585    if (fstatus == FILE_COMPACT_OLD ||
586        fstatus == FILE_REMOVED_PENDING) {
587        return fs;
588    }
589
590    strcpy(query.filename, file->filename);
591    // first search the existing file
592    mutex_lock(&cpt_lock);
593    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
594    if (a == NULL) {
595        // doesn't exist
596        // create elem and insert into tree
597        char path[MAX_FNAMELEN];
598        struct compactor_meta meta;
599
600        elem = (struct openfiles_elem *)calloc(1, sizeof(struct openfiles_elem));
601        strcpy(elem->filename, file->filename);
602        elem->file = file;
603        elem->config = *config;
604        elem->config.cleanup_cache_onclose = false; // prevent MB-16422
605        elem->register_count = 1;
606        elem->compaction_flag = false;
607        elem->daemon_compact_in_progress = false;
608        elem->removal_activated = false;
609        elem->log_callback = log_callback;
610        gettimeofday(&elem->last_compaction_timestamp, NULL);
611        // Init the compaction interval using the global param
612        elem->interval = sleep_duration;
613        avl_insert(&openfiles, &elem->avl, _compactor_cmp);
614        mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as
615                                 // subsequent registration attempts for the same file
616                                 // will be simply processed by incrementing its
617                                 // counter below.
618
619        // store in metafile
620        _compactor_convert_dbfile_to_metafile(file->filename, path);
621        _strcpy_fname(meta.filename, file->filename);
622        fs = _compactor_store_metafile(path, &meta, log_callback);
623    } else {
624        // already exists
625        elem = _get_entry(a, struct openfiles_elem, avl);
626        if (!elem->file) {
627            elem->file = file;
628        }
629        elem->register_count++;
630        mutex_unlock(&cpt_lock);
631    }
632    return fs;
633}
634
635void compactor_deregister_file(struct filemgr *file)
636{
637    struct avl_node *a = NULL;
638    struct openfiles_elem query, *elem;
639
640    strcpy(query.filename, file->filename);
641    mutex_lock(&cpt_lock);
642    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
643    if (a) {
644        elem = _get_entry(a, struct openfiles_elem, avl);
645        if ((--elem->register_count) == 0) {
646            // if no handle refers this file
647            if (elem->daemon_compact_in_progress) {
648                // This file is waiting for compaction by compactor (but not opened
649                // yet). Do not remove 'elem' for now. The 'elem' will be automatically
650                // replaced after the compaction is done by calling
651                // 'compactor_switch_file()'. However, elem->file should be set to NULL
652                // in order to be removed from the AVL tree in case of the compaction
653                // failure.
654                elem->file = NULL;
655            } else {
656                // remove from the tree
657                avl_remove(&openfiles, &elem->avl);
658                free(elem);
659            }
660        }
661    }
662    mutex_unlock(&cpt_lock);
663}
664
665fdb_status compactor_register_file_removing(struct filemgr *file,
666                                            err_log_callback *log_callback)
667{
668    fdb_status fs = FDB_RESULT_SUCCESS;
669    struct avl_node *a = NULL;
670    struct openfiles_elem query, *elem;
671
672    strcpy(query.filename, file->filename);
673    // first search the existing file
674    mutex_lock(&cpt_lock);
675    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
676    if (a == NULL) {
677        // doesn't exist
678        // create a fake & temporary element for the file to be removed.
679        elem = (struct openfiles_elem *)calloc(1, sizeof(struct openfiles_elem));
680        strcpy(elem->filename, file->filename);
681
682        // set flag
683        file->fflags |= FILEMGR_REMOVAL_IN_PROG;
684
685        elem->file = file;
686        elem->register_count = 1;
687        // to prevent this element to be compacted, set all flags
688        elem->compaction_flag = true;
689        elem->daemon_compact_in_progress = true;
690        elem->removal_activated = false;
691        elem->log_callback = log_callback;
692        gettimeofday(&elem->last_compaction_timestamp, NULL);
693        // Init the compaction interval using the global param
694        elem->interval = sleep_duration;
695        avl_insert(&openfiles, &elem->avl, _compactor_cmp);
696        mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as
697                                 // subsequent registration attempts for the same file
698                                 // will be simply processed by incrementing its
699                                 // counter below.
700
701        // wake up any sleeping thread
702        mutex_lock(&sync_mutex);
703        thread_cond_signal(&sync_cond);
704        mutex_unlock(&sync_mutex);
705
706    } else {
707        // already exists .. just ignore
708        mutex_unlock(&cpt_lock);
709    }
710    return fs;
711}
712
713void compactor_change_threshold(struct filemgr *file, size_t new_threshold)
714{
715    struct avl_node *a = NULL;
716    struct openfiles_elem query, *elem;
717
718    strcpy(query.filename, file->filename);
719    mutex_lock(&cpt_lock);
720    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
721    if (a) {
722        elem = _get_entry(a, struct openfiles_elem, avl);
723        elem->config.compaction_threshold = new_threshold;
724    }
725    mutex_unlock(&cpt_lock);
726}
727
728fdb_status compactor_set_compaction_interval(struct filemgr *file,
729                                             size_t interval)
730{
731    struct avl_node *a = NULL;
732    struct openfiles_elem query, *elem;
733    fdb_status result = FDB_RESULT_SUCCESS;
734
735    strcpy(query.filename, file->filename);
736    mutex_lock(&cpt_lock);
737    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
738    if (a) {
739        elem = _get_entry(a, struct openfiles_elem, avl);
740        elem->interval = interval;
741    } else {
742        result = FDB_RESULT_INVALID_ARGS;
743    }
744    mutex_unlock(&cpt_lock);
745    return result;
746}
747
748struct compactor_meta * _compactor_read_metafile(char *metafile,
749                                                 struct compactor_meta *metadata,
750                                                 err_log_callback *log_callback)
751{
752    int fd_meta, fd_db;
753    ssize_t ret;
754    uint8_t *buf = alca(uint8_t, sizeof(struct compactor_meta));
755    uint32_t crc;
756    char fullpath[MAX_FNAMELEN];
757    struct filemgr_ops *ops;
758    struct compactor_meta meta;
759
760    ops = get_filemgr_ops();
761    fd_meta = ops->open(metafile, O_RDONLY, 0644);
762
763    if (fd_meta >= 0) {
764        // metafile exists .. read metadata
765        ret = ops->pread(fd_meta, buf, sizeof(struct compactor_meta), 0);
766        if (ret < 0 || (size_t)ret < sizeof(struct compactor_meta)) {
767            char errno_msg[512];
768            ops->get_errno_str(errno_msg, 512);
769            // As a workaround for MB-17009, call fprintf instead of fdb_log
770            // until c->cgo->go callback trace issue is resolved.
771            fprintf(stderr,
772                    "Error status code: %d, Failed to read the meta file '%s', "
773                    "errno_message: %s\n",
774                    (int)ret, metafile, errno_msg);
775            ret = ops->close(fd_meta);
776            if (ret < 0) {
777                ops->get_errno_str(errno_msg, 512);
778                fprintf(stderr,
779                        "Error status code: %d, Failed to close the meta "
780                        "file '%s', errno_message: %s\n",
781                        (int)ret, metafile, errno_msg);
782            }
783            return NULL;
784        }
785        memcpy(&meta, buf, sizeof(struct compactor_meta));
786        meta.version = _endian_decode(meta.version);
787        meta.crc = _endian_decode(meta.crc);
788        ops->close(fd_meta);
789
790        // CRC check, mode UNKNOWN means all modes are checked.
791        if (!perform_integrity_check(buf,
792                                     sizeof(struct compactor_meta) - sizeof(crc),
793                                     meta.crc,
794                                     CRC_UNKNOWN)) {
795            fprintf(stderr,
796                    "Error status code: %d, Checksum mismatch in the meta file '%s'\n",
797                    FDB_RESULT_CHECKSUM_ERROR, metafile);
798            return NULL;
799        }
800        // check if the file exists
801        _reconstruct_path(fullpath, metafile, meta.filename);
802        fd_db = ops->open(fullpath, O_RDONLY, 0644);
803        if (fd_db < 0) {
804            // file doesn't exist
805            return NULL;
806        }
807        ops->close(fd_db);
808    } else {
809        // file doesn't exist
810        return NULL;
811    }
812
813    *metadata = meta;
814    return metadata;
815}
816
817static fdb_status _compactor_store_metafile(char *metafile,
818                                            struct compactor_meta *metadata,
819                                            err_log_callback *log_callback)
820{
821    int fd_meta;
822    ssize_t ret;
823    uint32_t crc;
824    struct filemgr_ops *ops;
825    struct compactor_meta meta;
826
827    ops = get_filemgr_ops();
828    fd_meta = ops->open(metafile, O_RDWR | O_CREAT, 0644);
829
830    if (fd_meta >= 0){
831        meta.version = _endian_encode(COMPACTOR_META_VERSION);
832        strcpy(meta.filename, metadata->filename);
833        crc = get_checksum(reinterpret_cast<const uint8_t*>(&meta),
834                           sizeof(struct compactor_meta) - sizeof(crc));
835        meta.crc = _endian_encode(crc);
836
837        char errno_msg[512];
838        ret = ops->pwrite(fd_meta, &meta, sizeof(struct compactor_meta), 0);
839        if (ret < 0 || (size_t)ret < sizeof(struct compactor_meta)) {
840            ops->get_errno_str(errno_msg, 512);
841            // As a workaround for MB-17009, call fprintf instead of fdb_log
842            // until c->cgo->go callback trace issue is resolved.
843            fprintf(stderr,
844                    "Error status code: %d, Failed to perform a write in the meta "
845                    "file '%s', errno_message: %s\n",
846                    (int)ret, metafile, errno_msg);
847            ops->close(fd_meta);
848            return (fdb_status) ret;
849        }
850        ret = ops->fsync(fd_meta);
851        if (ret < 0) {
852            ops->get_errno_str(errno_msg, 512);
853            fprintf(stderr,
854                    "Error status code: %d, Failed to perform a sync in the meta "
855                    "file '%s', errno_message: %s\n",
856                    (int)ret, metafile, errno_msg);
857            ops->close(fd_meta);
858            return (fdb_status) ret;
859        }
860        ops->close(fd_meta);
861    } else {
862        return FDB_RESULT_OPEN_FAIL;
863    }
864
865    return FDB_RESULT_SUCCESS;
866}
867
868void compactor_switch_file(struct filemgr *old_file, struct filemgr *new_file,
869                           err_log_callback *log_callback)
870{
871    struct avl_node *a = NULL;
872    struct openfiles_elem query, *elem;
873    struct compactor_meta meta;
874
875    strcpy(query.filename, old_file->filename);
876    mutex_lock(&cpt_lock);
877    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
878    if (a) {
879        char metafile[MAX_FNAMELEN];
880        fdb_compaction_mode_t comp_mode;
881
882        elem = _get_entry(a, struct openfiles_elem, avl);
883        avl_remove(&openfiles, a);
884        strcpy(elem->filename, new_file->filename);
885        elem->file = new_file;
886        elem->register_count = 1;
887        elem->daemon_compact_in_progress = false;
888        // clear compaction flag
889        elem->compaction_flag = false;
890        // As this function is invoked at the end of compaction, set the compaction
891        // timestamp to the current time.
892        gettimeofday(&elem->last_compaction_timestamp, NULL);
893        avl_insert(&openfiles, &elem->avl, _compactor_cmp);
894        comp_mode = elem->config.compaction_mode;
895        mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as we don't
896                                 // expect more than one compaction task completion for
897                                 // the same file.
898
899        if (comp_mode == FDB_COMPACTION_AUTO) {
900            _compactor_convert_dbfile_to_metafile(new_file->filename, metafile);
901            _strcpy_fname(meta.filename, new_file->filename);
902            _compactor_store_metafile(metafile, &meta, log_callback);
903        }
904    } else {
905        mutex_unlock(&cpt_lock);
906    }
907}
908
909void compactor_get_virtual_filename(const char *filename,
910                                    char *virtual_filename)
911{
912    int prefix_len = _compactor_prefix_len((char*)filename) - 1;
913    if (prefix_len > 0) {
914        strncpy(virtual_filename, filename, prefix_len);
915        virtual_filename[prefix_len] = 0;
916    } else {
917        strcpy(virtual_filename, filename);
918    }
919}
920
921fdb_status compactor_get_actual_filename(const char *filename,
922                                         char *actual_filename,
923                                         fdb_compaction_mode_t comp_mode,
924                                         err_log_callback *log_callback)
925{
926    int i;
927    int filename_len;
928    int dirname_len;
929    int compaction_no, max_compaction_no = -1;
930    char path[MAX_FNAMELEN];
931    char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
932    char ret_name[MAX_FNAMELEN];
933    fdb_status fs = FDB_RESULT_SUCCESS;
934    struct compactor_meta meta, *meta_ptr;
935
936    // get actual filename from metafile
937    sprintf(path, "%s.meta", filename);
938    meta_ptr = _compactor_read_metafile(path, &meta, log_callback);
939
940    if (meta_ptr == NULL) {
941        if (comp_mode == FDB_COMPACTION_MANUAL && does_file_exist(filename)) {
942            strcpy(actual_filename, filename);
943            return FDB_RESULT_SUCCESS;
944        }
945
946        // error handling .. scan directory
947        // backward search until find the first '/' or '\' (Windows)
948        filename_len = strlen(filename);
949        dirname_len = 0;
950
951#if !defined(WIN32) && !defined(_WIN32)
952        DIR *dir_info;
953        struct dirent *dir_entry;
954
955        for (i=filename_len-1; i>=0; --i){
956            if (filename[i] == '/') {
957                dirname_len = i+1;
958                break;
959            }
960        }
961
962        if (dirname_len > 0) {
963            strncpy(dirname, filename, dirname_len);
964            dirname[dirname_len] = 0;
965        } else {
966            strcpy(dirname, ".");
967        }
968        strcpy(prefix, filename + dirname_len);
969        strcat(prefix, ".");
970
971        dir_info = opendir(dirname);
972        if (dir_info != NULL) {
973            while ((dir_entry = readdir(dir_info))) {
974                if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
975                    compaction_no = -1;
976                    sscanf(dir_entry->d_name + strlen(prefix), "%d", &compaction_no);
977                    if (compaction_no >= 0) {
978                        if (compaction_no > max_compaction_no) {
979                            max_compaction_no = compaction_no;
980                        }
981                    }
982                }
983            }
984            closedir(dir_info);
985        }
986#else
987        // Windows
988        for (i=filename_len-1; i>=0; --i){
989            if (filename[i] == '/' || filename[i] == '\\') {
990                dirname_len = i+1;
991                break;
992            }
993        }
994
995        if (dirname_len > 0) {
996            strncpy(dirname, filename, dirname_len);
997            dirname[dirname_len] = 0;
998        } else {
999            strcpy(dirname, ".");
1000        }
1001        strcpy(prefix, filename + dirname_len);
1002        strcat(prefix, ".");
1003
1004        WIN32_FIND_DATA filedata;
1005        HANDLE hfind;
1006        char query_str[MAX_FNAMELEN];
1007
1008        // find all files start with 'prefix'
1009        sprintf(query_str, "%s*", prefix);
1010        hfind = FindFirstFile(query_str, &filedata);
1011        while (hfind != INVALID_HANDLE_VALUE) {
1012            if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
1013                compaction_no = -1;
1014                sscanf(filedata.cFileName + strlen(prefix), "%d", &compaction_no);
1015                if (compaction_no >= 0) {
1016                    if (compaction_no > max_compaction_no) {
1017                        max_compaction_no = compaction_no;
1018                    }
1019                }
1020            }
1021
1022            if (!FindNextFile(hfind, &filedata)) {
1023                FindClose(hfind);
1024                hfind = INVALID_HANDLE_VALUE;
1025            }
1026        }
1027
1028#endif
1029
1030        if (max_compaction_no < 0) {
1031            if (comp_mode == FDB_COMPACTION_AUTO) {
1032                // DB files with a revision number are not found.
1033                // initialize filename to '[filename].0'
1034                sprintf(ret_name, "%s.0", filename);
1035            } else { // Manual compaction mode.
1036                // Simply use the file name passed to this function.
1037                strcpy(actual_filename, filename);
1038                return FDB_RESULT_SUCCESS;
1039            }
1040        } else {
1041            // return the file that has the largest compaction number
1042            sprintf(ret_name, "%s.%d", filename, max_compaction_no);
1043            fs = FDB_RESULT_SUCCESS;
1044        }
1045        if (fs == FDB_RESULT_SUCCESS) {
1046            strcpy(actual_filename, ret_name);
1047        }
1048        return fs;
1049
1050    } else {
1051        // metadata is successfully read from the metafile .. just return the filename
1052        _reconstruct_path(ret_name, (char*)filename, meta.filename);
1053        strcpy(actual_filename, ret_name);
1054        return FDB_RESULT_SUCCESS;
1055    }
1056}
1057
1058bool compactor_is_valid_mode(const char *filename, fdb_config *config)
1059{
1060    int fd;
1061    char path[MAX_FNAMELEN];
1062    struct filemgr_ops *ops;
1063
1064    ops = get_filemgr_ops();
1065
1066    if (config->compaction_mode == FDB_COMPACTION_AUTO) {
1067        // auto compaction mode: invalid when
1068        // the file '[filename]' exists
1069        fd = ops->open(filename, O_RDONLY, 0644);
1070        if (fd != FDB_RESULT_NO_SUCH_FILE) {
1071            ops->close(fd);
1072            return false;
1073        }
1074
1075    } else if (config->compaction_mode == FDB_COMPACTION_MANUAL) {
1076        // manual compaction mode: invalid when
1077        // the file '[filename].meta' exists
1078        sprintf(path, "%s.meta", filename);
1079        fd = ops->open(path, O_RDONLY, 0644);
1080        if (fd != FDB_RESULT_NO_SUCH_FILE) {
1081            ops->close(fd);
1082            return false;
1083        }
1084
1085    } else {
1086        // unknown mode
1087        return false;
1088    }
1089
1090    return true;
1091}
1092
1093static fdb_status _compactor_search_n_destroy(const char *filename)
1094{
1095    int i;
1096    int filename_len;
1097    int dirname_len;
1098    char dirname[MAX_FNAMELEN], prefix[MAX_FNAMELEN];
1099    char full_fname[MAX_FNAMELEN];
1100    fdb_status fs = FDB_RESULT_SUCCESS;
1101
1102    // error handling .. scan directory
1103    // backward search until find the first '/' or '\' (Windows)
1104    filename_len = strlen(filename);
1105    dirname_len = 0;
1106
1107#if !defined(WIN32) && !defined(_WIN32)
1108    DIR *dir_info;
1109    struct dirent *dir_entry;
1110
1111    for (i=filename_len-1; i>=0; --i){
1112        if (filename[i] == '/') {
1113            dirname_len = i+1;
1114            break;
1115        }
1116    }
1117
1118    if (dirname_len > 0) {
1119        strncpy(dirname, filename, dirname_len);
1120        dirname[dirname_len] = 0;
1121    } else {
1122        strcpy(dirname, ".");
1123    }
1124    strcpy(prefix, filename + dirname_len);
1125    strcat(prefix, ".");
1126
1127    dir_info = opendir(dirname);
1128    if (dir_info != NULL) {
1129        while ((dir_entry = readdir(dir_info))) {
1130            if (!strncmp(dir_entry->d_name, prefix, strlen(prefix))) {
1131                // Need to check filemgr for possible open entry?
1132                // Reconstruct full path
1133                _reconstruct_path(full_fname, dirname, dir_entry->d_name);
1134                if (remove(full_fname)) {
1135                    fs = FDB_RESULT_FILE_REMOVE_FAIL;
1136                    closedir(dir_info);
1137                    return fs;
1138                }
1139            }
1140        }
1141        closedir(dir_info);
1142    }
1143#else
1144    // Windows
1145    for (i=filename_len-1; i>=0; --i){
1146        if (filename[i] == '/' || filename[i] == '\\') {
1147            dirname_len = i+1;
1148            break;
1149        }
1150    }
1151
1152    if (dirname_len > 0) {
1153        strncpy(dirname, filename, dirname_len);
1154        dirname[dirname_len] = 0;
1155    } else {
1156        strcpy(dirname, ".");
1157    }
1158    strcpy(prefix, filename + dirname_len);
1159    strcat(prefix, ".");
1160
1161    WIN32_FIND_DATA filedata;
1162    HANDLE hfind;
1163    char query_str[MAX_FNAMELEN];
1164
1165    // find all files start with 'prefix'
1166    sprintf(query_str, "%s*", prefix);
1167    hfind = FindFirstFile(query_str, &filedata);
1168    while (hfind != INVALID_HANDLE_VALUE) {
1169        if (!strncmp(filedata.cFileName, prefix, strlen(prefix))) {
1170            // Need to check filemgr for possible open entry?
1171            // Reconstruct full path
1172            _reconstruct_path(full_fname, dirname, filedata.cFileName);
1173            if (remove(full_fname)) {
1174                fs = FDB_RESULT_FILE_REMOVE_FAIL;
1175                FindClose(hfind);
1176                hfind = INVALID_HANDLE_VALUE;
1177                return fs;
1178            }
1179        }
1180
1181        if (!FindNextFile(hfind, &filedata)) {
1182            FindClose(hfind);
1183            hfind = INVALID_HANDLE_VALUE;
1184        }
1185    }
1186
1187#endif
1188    return fs;
1189}
1190
1191fdb_status compactor_destroy_file(char *filename,
1192                                  fdb_config *config)
1193{
1194    struct avl_node *a = NULL;
1195    struct openfiles_elem query, *elem;
1196    size_t strcmp_len;
1197    fdb_status status = FDB_RESULT_SUCCESS;
1198    compactor_config c_config;
1199
1200    strcmp_len = strlen(filename);
1201    filename[strcmp_len] = '.'; // add a . suffix in place
1202    strcmp_len++;
1203    filename[strcmp_len] = '\0';
1204    strcpy(query.filename, filename);
1205
1206    c_config.sleep_duration = config->compactor_sleep_duration;
1207    c_config.num_threads = config->num_compactor_threads;
1208    compactor_init(&c_config);
1209
1210    mutex_lock(&cpt_lock);
1211    compactor_args.strcmp_len = strcmp_len; // Do prefix match for all vers
1212    a = avl_search(&openfiles, &query.avl, _compactor_cmp);
1213    if (a) {
1214        elem = _get_entry(a, struct openfiles_elem, avl);
1215        // if no handle refers this file
1216        if (elem->daemon_compact_in_progress) {
1217            // This file is waiting for compaction by compactor
1218            // Return a temporary failure, user must retry after sometime
1219            status = FDB_RESULT_IN_USE_BY_COMPACTOR;
1220        } else { // File handle not closed, fail operation
1221            status = FDB_RESULT_FILE_IS_BUSY;
1222        }
1223    }
1224
1225    compactor_args.strcmp_len = MAX_FNAMELEN; // restore for normal compare
1226    mutex_unlock(&cpt_lock); // Releasing the lock here should be OK as file
1227                             // deletions doesn't require strict synchronization.
1228    filename[strcmp_len - 1] = '\0'; // restore the filename
1229    if (status == FDB_RESULT_SUCCESS) {
1230        status = _compactor_search_n_destroy(filename);
1231    }
1232
1233    return status;
1234}
1235