xref: /4.0.0/forestdb/src/filemgr.h (revision 76143cfa)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef _JSAHN_FILEMGR_H
19#define _JSAHN_FILEMGR_H
20
21#include <sys/types.h>
22#include <sys/stat.h>
23#include <stdint.h>
24
25#ifdef _ASYNC_IO
26#if !defined(WIN32) && !defined(_WIN32)
27#include <libaio.h>
28#include <sys/time.h>
29#endif
30#endif
31
32#include "libforestdb/fdb_errors.h"
33
34#include "internal_types.h"
35#include "common.h"
36#include "hash.h"
37#include "partiallock.h"
38#include "atomic.h"
39
40#ifdef __cplusplus
41extern "C" {
42#endif
43
44struct filemgr_config {
45    int blocksize;
46    int ncacheblock;
47    int flag;
48    int chunksize;
49    uint8_t options;
50#define FILEMGR_SYNC 0x01
51#define FILEMGR_READONLY 0x02
52#define FILEMGR_ROLLBACK_IN_PROG 0x04
53#define FILEMGR_CREATE 0x08
54    uint64_t prefetch_duration;
55    uint16_t num_wal_shards;
56    uint16_t num_bcache_shards;
57};
58
59struct async_io_handle {
60#ifdef _ASYNC_IO
61#if !defined(WIN32) && !defined(_WIN32)
62    struct iocb **ioq;
63    struct io_event *events;
64    io_context_t ioctx;
65#endif
66#endif
67    uint8_t *aio_buf;
68    uint64_t *offset_array;
69    size_t queue_depth;
70    size_t block_size;
71    int fd;
72};
73
74struct filemgr_ops {
75    int (*open)(const char *pathname, int flags, mode_t mode);
76    ssize_t (*pwrite)(int fd, void *buf, size_t count, cs_off_t offset);
77    ssize_t (*pread)(int fd, void *buf, size_t count, cs_off_t offset);
78    int (*close)(int fd);
79    cs_off_t (*goto_eof)(int fd);
80    cs_off_t (*file_size)(const char *filename);
81    int (*fdatasync)(int fd);
82    int (*fsync)(int fd);
83    void (*get_errno_str)(char *buf, size_t size);
84
85    // Async I/O operations
86    int (*aio_init)(struct async_io_handle *aio_handle);
87    int (*aio_prep_read)(struct async_io_handle *aio_handle, size_t aio_idx,
88                         size_t read_size, uint64_t offset);
89    int (*aio_submit)(struct async_io_handle *aio_handle, int num_subs);
90    int (*aio_getevents)(struct async_io_handle *aio_handle, int min,
91                         int max, unsigned int timeout);
92    int (*aio_destroy)(struct async_io_handle *aio_handle);
93};
94
95struct filemgr_buffer{
96    void *block;
97    bid_t lastbid;
98};
99
100typedef uint16_t filemgr_header_len_t;
101typedef uint64_t filemgr_magic_t;
102typedef uint64_t filemgr_header_revnum_t;
103
104struct filemgr_header{
105    filemgr_header_len_t size;
106    filemgr_header_revnum_t revnum;
107    volatile fdb_seqnum_t seqnum;
108    atomic_uint64_t bid;
109    atomic_uint64_t dirty_idtree_root; // for wal_flush_before_commit option
110    atomic_uint64_t dirty_seqtree_root; // for wal_flush_before_commit option
111    struct kvs_ops_stat op_stat; // op stats for default KVS
112    struct kvs_stat stat; // stats for the default KVS
113    void *data;
114};
115
116typedef uint8_t filemgr_prefetch_status_t;
117enum {
118    FILEMGR_PREFETCH_IDLE = 0,
119    FILEMGR_PREFETCH_RUNNING = 1,
120    FILEMGR_PREFETCH_ABORT = 2
121};
122
123#define DLOCK_MAX (41) /* a prime number */
124struct wal;
125struct fnamedic_item;
126struct kvs_header;
127
128typedef struct {
129    mutex_t mutex;
130    bool locked;
131} mutex_lock_t;
132
133struct filemgr {
134    char *filename; // Current file name.
135    uint32_t ref_count;
136    uint8_t fflags;
137    uint16_t filename_len;
138    uint32_t blocksize;
139    int fd;
140    atomic_uint64_t pos;
141    atomic_uint64_t last_commit;
142    struct wal *wal;
143    struct filemgr_header header;
144    struct filemgr_ops *ops;
145    struct hash_elem e;
146    atomic_uint8_t status;
147    struct filemgr_config *config;
148    struct filemgr *new_file;
149    char *old_filename; // Old file name before compaction.
150    struct fnamedic_item *bcache;
151    fdb_txn global_txn;
152    bool in_place_compaction;
153    struct kvs_header *kv_header;
154    void (*free_kv_header)(struct filemgr *file); // callback function
155
156    // variables related to prefetching
157    volatile filemgr_prefetch_status_t prefetch_status;
158    thread_t prefetch_tid;
159
160    // spin lock for small region
161    spin_t lock;
162
163    // lock for data consistency
164#ifdef __FILEMGR_DATA_PARTIAL_LOCK
165    struct plock plock;
166#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
167    mutex_t data_mutex[DLOCK_MAX];
168#else
169    spin_t data_spinlock[DLOCK_MAX];
170#endif //__FILEMGR_DATA_PARTIAL_LOCK
171
172    // mutex for synchronization among multiple writers.
173    mutex_lock_t writer_lock;
174};
175
176typedef struct {
177    struct filemgr *file;
178    int rv;
179} filemgr_open_result;
180
181void filemgr_init(struct filemgr_config *config);
182
183uint64_t filemgr_get_bcache_used_space(void);
184
185size_t filemgr_get_ref_count(struct filemgr *file);
186
187INLINE void filemgr_incr_ref_count(struct filemgr *file) {
188    spin_lock(&file->lock);
189    ++file->ref_count;
190    spin_unlock(&file->lock);
191}
192
193filemgr_open_result filemgr_open(char *filename,
194                                 struct filemgr_ops *ops,
195                                 struct filemgr_config *config,
196                                 err_log_callback *log_callback);
197
198uint64_t filemgr_update_header(struct filemgr *file, void *buf, size_t len);
199filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file);
200
201fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file);
202void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum);
203
204INLINE bid_t filemgr_get_header_bid(struct filemgr *file)
205{
206    return ((file->header.size > 0) ?
207            atomic_get_uint64_t(&file->header.bid) : BLK_NOT_FOUND);
208}
209bid_t _filemgr_get_header_bid(struct filemgr *file);
210void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len);
211fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
212                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
213                                err_log_callback *log_callback);
214uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
215                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
216                                   err_log_callback *log_callback);
217fdb_status filemgr_close(struct filemgr *file,
218                         bool cleanup_cache_onclose,
219                         const char *orig_file_name,
220                         err_log_callback *log_callback);
221
222INLINE bid_t filemgr_get_next_alloc_block(struct filemgr *file)
223{
224    return atomic_get_uint64_t(&file->pos) / file->blocksize;
225}
226bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback);
227void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
228                            bid_t *end, err_log_callback *log_callback);
229bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
230                                  bid_t *begin, bid_t *end,
231                                  err_log_callback *log_callback);
232
233void filemgr_invalidate_block(struct filemgr *file, bid_t bid);
234
235fdb_status filemgr_read(struct filemgr *file,
236                        bid_t bid, void *buf,
237                        err_log_callback *log_callback,
238                        bool read_on_cache_miss);
239
240fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid, uint64_t offset,
241                          uint64_t len, void *buf, err_log_callback *log_callback);
242fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
243                   err_log_callback *log_callback);
244INLINE int filemgr_is_writable(struct filemgr *file, bid_t bid)
245{
246    uint64_t pos = bid * file->blocksize;
247    // Note that we don't need to grab file->lock here because
248    // 1) both file->pos and file->last_commit are only incremented.
249    // 2) file->last_commit is updated using the value of file->pos,
250    //    and always equal to or smaller than file->pos.
251    return (pos <  atomic_get_uint64_t(&file->pos) &&
252            pos >= atomic_get_uint64_t(&file->last_commit));
253}
254void filemgr_remove_file(struct filemgr *file);
255
256fdb_status filemgr_commit(struct filemgr *file,
257                          err_log_callback *log_callback);
258fdb_status filemgr_sync(struct filemgr *file,
259                        err_log_callback *log_callback);
260
261fdb_status filemgr_shutdown();
262int filemgr_update_file_status(struct filemgr *file, file_status_t status,
263                                char *old_filename);
264void filemgr_set_compaction_state(struct filemgr *old_file,
265                                  struct filemgr *new_file,
266                                  file_status_t status);
267void filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file);
268
269struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
270                                              struct filemgr *new_file,
271                                              struct kvs_info *kvs);
272fdb_status filemgr_destroy_file(char *filename,
273                                struct filemgr_config *config,
274                                struct hash *destroy_set);
275
276struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file);
277typedef char *filemgr_redirect_hdr_func(uint8_t *buf, char *new_filename,
278                                       uint16_t new_filename_len);
279char *filemgr_redirect_old_file(struct filemgr *very_old_file,
280                                struct filemgr *new_file,
281                                filemgr_redirect_hdr_func redirect_func);
282INLINE file_status_t filemgr_get_file_status(struct filemgr *file)
283{
284    return atomic_get_uint8_t(&file->status);
285}
286INLINE uint64_t filemgr_get_pos(struct filemgr *file)
287{
288    return atomic_get_uint64_t(&file->pos);
289}
290
291bool filemgr_is_rollback_on(struct filemgr *file);
292void filemgr_set_rollback(struct filemgr *file, uint8_t new_val);
293
294void filemgr_set_in_place_compaction(struct filemgr *file,
295                                     bool in_place_compaction);
296bool filemgr_is_in_place_compaction_set(struct filemgr *file);
297
298void filemgr_mutex_openlock(struct filemgr_config *config);
299void filemgr_mutex_openunlock(void);
300
301void filemgr_mutex_lock(struct filemgr *file);
302bool filemgr_mutex_trylock(struct filemgr *file);
303void filemgr_mutex_unlock(struct filemgr *file);
304
305void filemgr_set_dirty_root(struct filemgr *file,
306                            bid_t dirty_idtree_root,
307                            bid_t dirty_seqtree_root);
308INLINE void filemgr_get_dirty_root(struct filemgr *file,
309                                   bid_t *dirty_idtree_root,
310                                   bid_t *dirty_seqtree_root)
311{
312    *dirty_idtree_root = atomic_get_uint64_t(&file->header.dirty_idtree_root);
313    *dirty_seqtree_root = atomic_get_uint64_t(&file->header.dirty_seqtree_root);
314}
315
316INLINE bool filemgr_dirty_root_exist(struct filemgr *file)
317{
318    return (atomic_get_uint64_t(&file->header.dirty_idtree_root)  != BLK_NOT_FOUND ||
319            atomic_get_uint64_t(&file->header.dirty_seqtree_root) != BLK_NOT_FOUND);
320}
321
322bool filemgr_is_commit_header(void *head_buffer, size_t blocksize);
323
324void _kvs_stat_set(struct filemgr *file,
325                   fdb_kvs_id_t kv_id,
326                   struct kvs_stat stat);
327void _kvs_stat_update_attr(struct filemgr *file,
328                           fdb_kvs_id_t kv_id,
329                           kvs_stat_attr_t attr,
330                           int delta);
331int _kvs_stat_get_kv_header(struct kvs_header *kv_header,
332                            fdb_kvs_id_t kv_id,
333                            struct kvs_stat *stat);
334int _kvs_stat_get(struct filemgr *file,
335                  fdb_kvs_id_t kv_id,
336                  struct kvs_stat *stat);
337uint64_t _kvs_stat_get_sum(struct filemgr *file,
338                           kvs_stat_attr_t attr);
339int _kvs_ops_stat_get_kv_header(struct kvs_header *kv_header,
340                                fdb_kvs_id_t kv_id,
341                                struct kvs_ops_stat *stat);
342int _kvs_ops_stat_get(struct filemgr *file,
343                      fdb_kvs_id_t kv_id,
344                      struct kvs_ops_stat *stat);
345
346void _init_op_stats(struct kvs_ops_stat *stat);
347struct kvs_ops_stat *filemgr_get_ops_stats(struct filemgr *file,
348                                          struct kvs_info *info);
349#ifdef __cplusplus
350}
351#endif
352
353#endif
354