xref: /4.0.0/couchstore/src/iobuffer.cc (revision dcca3a2e)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2//
3//  iobuffer.c
4//  couchstore
5//
6//  Created by Jens Alfke on 4/12/12.
7//  Copyright (c) 2012 Couchbase, Inc. All rights reserved.
8//
9
10#include "config.h"
11#include "iobuffer.h"
12#include "internal.h"
13#include <stdlib.h>
14#include <string.h>
15
16#define LOG_BUFFER 0 && defined(DEBUG)
17#if LOG_BUFFER
18#include <stdio.h>
19#endif
20
21
22#define MAX_READ_BUFFERS 8
23#define WRITE_BUFFER_CAPACITY (128*1024)
24#define READ_BUFFER_CAPACITY (8*1024)
25
26#ifdef min
27#undef min
28#endif
29
30static inline ssize_t min(ssize_t a, ssize_t b) {return a < b ? a : b;}
31
32
33typedef struct file_buffer {
34    struct file_buffer* prev;
35    struct file_buffer* next;
36    struct buffered_file_handle *owner;
37    size_t capacity;
38    size_t length;
39    cs_off_t offset;
40    uint8_t dirty;
41    uint8_t bytes[1];
42} file_buffer;
43
44
45// How I interpret a couch_file_handle:
46typedef struct buffered_file_handle {
47    const couch_file_ops* raw_ops;
48    couch_file_handle raw_ops_handle;
49    unsigned nbuffers;
50    file_buffer* write_buffer;
51    file_buffer* first_buffer;
52} buffered_file_handle;
53
54
55static file_buffer* new_buffer(buffered_file_handle* owner, size_t capacity) {
56    file_buffer *buf = static_cast<file_buffer*>(malloc(sizeof(file_buffer) + capacity));
57    if (buf) {
58        buf->prev = buf->next = NULL;
59        buf->owner = owner;
60        buf->capacity = capacity;
61        buf->length = 0;
62        buf->offset = 0;
63        buf->dirty = 0;
64    }
65#if LOG_BUFFER
66    fprintf(stderr, "BUFFER: %p <- new_buffer(%zu)\n", buf, capacity);
67#endif
68    return buf;
69}
70
71static void free_buffer(file_buffer* buf) {
72#if LOG_BUFFER
73    fprintf(stderr, "BUFFER: %p freed\n", buf);
74#endif
75    free(buf);
76}
77
78
79//////// BUFFER WRITES:
80
81
82// Write as many bytes as possible into the buffer, returning the count
83static size_t write_to_buffer(file_buffer* buf, const void *bytes, size_t nbyte, cs_off_t offset)
84{
85    if (buf->length == 0) {
86        // If buffer is empty, align it to start at the current offset:
87        buf->offset = offset;
88    } else if (offset < buf->offset || offset > buf->offset + (cs_off_t)buf->length) {
89        // If it's out of range, don't write anything
90        return 0;
91    }
92    size_t offset_in_buffer = (size_t)(offset - buf->offset);
93    size_t buffer_nbyte = min(buf->capacity - offset_in_buffer, nbyte);
94
95    memcpy(buf->bytes + offset_in_buffer, bytes, buffer_nbyte);
96    buf->dirty = 1;
97    offset_in_buffer += buffer_nbyte;
98    if (offset_in_buffer > buf->length)
99        buf->length = offset_in_buffer;
100
101    return buffer_nbyte;
102}
103
104// Write the current buffer to disk and empty it.
105static couchstore_error_t flush_buffer(couchstore_error_info_t *errinfo,
106                                       file_buffer* buf) {
107    while (buf->length > 0 && buf->dirty) {
108        ssize_t raw_written;
109        raw_written = buf->owner->raw_ops->pwrite(errinfo,
110                                                  buf->owner->raw_ops_handle,
111                                                  buf->bytes,
112                                                  buf->length,
113                                                  buf->offset);
114#if LOG_BUFFER
115        fprintf(stderr, "BUFFER: %p flush %zd bytes at %zd --> %zd\n",
116                buf, buf->length, buf->offset, raw_written);
117#endif
118        if (raw_written <= 0)
119            return (couchstore_error_t) raw_written;
120        buf->length -= raw_written;
121        buf->offset += raw_written;
122        memmove(buf->bytes, buf->bytes + raw_written, buf->length);
123    }
124    buf->dirty = 0;
125    return COUCHSTORE_SUCCESS;
126}
127
128
129//////// BUFFER READS:
130
131
132static size_t read_from_buffer(file_buffer* buf, void *bytes, size_t nbyte, cs_off_t offset) {
133    if (offset < buf->offset || offset >= buf->offset + (cs_off_t)buf->length) {
134        return 0;
135    }
136    size_t offset_in_buffer = (size_t)(offset - buf->offset);
137    size_t buffer_nbyte = min(buf->length - offset_in_buffer, nbyte);
138
139    memcpy(bytes, buf->bytes + offset_in_buffer, buffer_nbyte);
140    return buffer_nbyte;
141}
142
143
144static couchstore_error_t load_buffer_from(couchstore_error_info_t *errinfo,
145                                           file_buffer* buf,
146                                           cs_off_t offset,
147                                           size_t nbyte) {
148    if (buf->dirty) {
149        // If buffer contains data to be written, flush it first:
150        couchstore_error_t err = flush_buffer(errinfo, buf);
151        if (err < 0) {
152            return err;
153        }
154    }
155
156    if (offset < buf->offset || offset + nbyte > buf->offset + buf->capacity) {
157        // Reset the buffer to empty if it has to move:
158        buf->offset = offset;
159        buf->length = 0;
160    }
161
162    // Read data to extend the buffer to its capacity (if possible):
163    ssize_t bytes_read = buf->owner->raw_ops->pread(errinfo,
164                                                    buf->owner->raw_ops_handle,
165                                                    buf->bytes + buf->length,
166                                                    buf->capacity - buf->length,
167                                                    buf->offset + buf->length);
168#if LOG_BUFFER
169    fprintf(stderr, "BUFFER: %p loaded %zd bytes from %zd\n", buf, bytes_read, offset + buf->length);
170#endif
171    if (bytes_read < 0) {
172        return (couchstore_error_t) bytes_read;
173    }
174    buf->length += bytes_read;
175    return COUCHSTORE_SUCCESS;
176}
177
178
179//////// BUFFER MANAGEMENT:
180
181
182static file_buffer* find_buffer(buffered_file_handle* h, cs_off_t offset) {
183    offset = offset - offset % READ_BUFFER_CAPACITY;
184    // Find a buffer for this offset, or use the last one:
185    file_buffer* buffer = h->first_buffer;
186    while (buffer->offset != offset && buffer->next != NULL)
187        buffer = buffer->next;
188    if (buffer->offset != offset) {
189        if (h->nbuffers < MAX_READ_BUFFERS) {
190            // Didn't find a matching one, but we can still create another:
191            file_buffer* buffer2 = new_buffer(h, READ_BUFFER_CAPACITY);
192            if (buffer2) {
193                buffer = buffer2;
194                ++h->nbuffers;
195            }
196        } else {
197#if LOG_BUFFER
198            fprintf(stderr, "BUFFER: %p recycled, from %zd to %zd\n", buffer, buffer->offset, offset);
199#endif
200        }
201    }
202    if (buffer != h->first_buffer) {
203        // Move the buffer to the start of the list:
204        if (buffer->prev) buffer->prev->next = buffer->next;
205        if (buffer->next) buffer->next->prev = buffer->prev;
206        buffer->prev = NULL;
207        h->first_buffer->prev = buffer;
208        buffer->next = h->first_buffer;
209        h->first_buffer = buffer;
210    }
211    return buffer;
212}
213
214
215//////// FILE API:
216
217
218static void buffered_destructor(couchstore_error_info_t *errinfo,
219                                couch_file_handle handle)
220{
221    buffered_file_handle *h = (buffered_file_handle*)handle;
222    if (!h) {
223        return;
224    }
225    h->raw_ops->destructor(errinfo, h->raw_ops_handle);
226
227    free_buffer(h->write_buffer);
228    file_buffer* buffer, *next;
229    for (buffer = h->first_buffer; buffer; buffer = next) {
230        next = buffer->next;
231        free_buffer(buffer);
232    }
233    free(h);
234}
235
236static couch_file_handle buffered_constructor_with_raw_ops(couchstore_error_info_t *errinfo,
237                                                           const couch_file_ops* raw_ops,
238                                                           bool readOnly)
239{
240    buffered_file_handle *h = static_cast<buffered_file_handle*>(malloc(sizeof(buffered_file_handle)));
241    if (h) {
242        h->raw_ops = raw_ops;
243        h->raw_ops_handle = raw_ops->constructor(errinfo, raw_ops->cookie);
244        h->nbuffers = 1;
245        h->write_buffer = new_buffer(h, readOnly ? 0 : WRITE_BUFFER_CAPACITY);
246        h->first_buffer = new_buffer(h, READ_BUFFER_CAPACITY);
247
248        if (!h->write_buffer || !h->first_buffer) {
249            buffered_destructor(errinfo, (couch_file_handle)h);
250            h = NULL;
251        }
252    }
253    return (couch_file_handle) h;
254}
255
256static couch_file_handle buffered_constructor(couchstore_error_info_t *errinfo,
257                                              void* cookie)
258{
259    (void) cookie;
260    return buffered_constructor_with_raw_ops(errinfo, couchstore_get_default_file_ops(), false);
261}
262
263static couchstore_error_t buffered_open(couchstore_error_info_t *errinfo,
264                                        couch_file_handle* handle,
265                                        const char *path,
266                                        int oflag)
267{
268    buffered_file_handle *h = (buffered_file_handle*)*handle;
269    return h->raw_ops->open(errinfo, &h->raw_ops_handle, path, oflag);
270}
271
272static void buffered_close(couchstore_error_info_t *errinfo,
273                           couch_file_handle handle)
274{
275    buffered_file_handle *h = (buffered_file_handle*)handle;
276    if (!h) {
277        return;
278    }
279    flush_buffer(errinfo, h->write_buffer);
280    h->raw_ops->close(errinfo, h->raw_ops_handle);
281}
282
283static ssize_t buffered_pread(couchstore_error_info_t *errinfo,
284                              couch_file_handle handle,
285                              void *buf,
286                              size_t nbyte,
287                              cs_off_t offset)
288{
289#if LOG_BUFFER
290    //fprintf(stderr, "r");
291#endif
292    buffered_file_handle *h = (buffered_file_handle*)handle;
293    // Flush the write buffer before trying to read anything:
294    couchstore_error_t err = flush_buffer(errinfo, h->write_buffer);
295    if (err < 0) {
296        return err;
297    }
298
299    ssize_t total_read = 0;
300    while (nbyte > 0) {
301        file_buffer* buffer = find_buffer(h, offset);
302
303        // Read as much as we can from the current buffer:
304        ssize_t nbyte_read = read_from_buffer(buffer, buf, nbyte, offset);
305        if (nbyte_read == 0) {
306            /*if (nbyte > buffer->capacity) {
307                // Remainder won't fit in a single buffer, so just read it directly:
308                nbyte_read = h->raw_ops->pread(h->raw_ops_handle, buf, nbyte, offset);
309                if (nbyte_read < 0) {
310                    return nbyte_read;
311                }
312            } else*/ {
313                // Move the buffer to cover the remainder of the data to be read.
314                cs_off_t block_start = offset - (offset % READ_BUFFER_CAPACITY);
315                err = load_buffer_from(errinfo, buffer, block_start, (size_t)(offset + nbyte - block_start));
316                if (err < 0) {
317                    return err;
318                }
319                nbyte_read = read_from_buffer(buffer, buf, nbyte, offset);
320                if (nbyte_read == 0)
321                    break;  // must be at EOF
322            }
323        }
324        buf = (char*)buf + nbyte_read;
325        nbyte -= nbyte_read;
326        offset += nbyte_read;
327        total_read += nbyte_read;
328    }
329    return total_read;
330}
331
332static ssize_t buffered_pwrite(couchstore_error_info_t *errinfo,
333                               couch_file_handle handle,
334                               const void *buf,
335                               size_t nbyte,
336                               cs_off_t offset)
337{
338#if LOG_BUFFER
339    //fprintf(stderr, "w");
340#endif
341    if (nbyte == 0) {
342        return 0;
343    }
344
345    buffered_file_handle *h = (buffered_file_handle*)handle;
346    file_buffer* buffer = h->write_buffer;
347
348    // Write data to the current buffer:
349    size_t nbyte_written = write_to_buffer(buffer, buf, nbyte, offset);
350    if (nbyte_written > 0) {
351        buf = (char*)buf + nbyte_written;
352        offset += nbyte_written;
353        nbyte -= nbyte_written;
354    }
355
356    // Flush the buffer if it's full, or if it isn't aligned with the current write:
357    if (buffer->length == buffer->capacity || nbyte_written == 0) {
358        couchstore_error_t error = flush_buffer(errinfo, buffer);
359        if (error < 0)
360            return error;
361    }
362
363    if (nbyte > 0) {
364        ssize_t written;
365        // If the remaining data will fit into the buffer, write it; else write directly:
366        if (nbyte <= (buffer->capacity - buffer->length)) {
367            written = write_to_buffer(buffer, buf, nbyte, offset);
368        } else {
369            written = h->raw_ops->pwrite(errinfo, h->raw_ops_handle, buf,
370                                         nbyte, offset);
371#if LOG_BUFFER
372            fprintf(stderr, "BUFFER: passthru %zd bytes at %zd --> %zd\n",
373                    nbyte, offset, written);
374#endif
375            if (written < 0) {
376                return written;
377            }
378        }
379        nbyte_written += written;
380    }
381
382    return nbyte_written;
383}
384
385static cs_off_t buffered_goto_eof(couchstore_error_info_t *errinfo,
386                                  couch_file_handle handle)
387{
388    buffered_file_handle *h = (buffered_file_handle*)handle;
389    return h->raw_ops->goto_eof(errinfo, h->raw_ops_handle);
390}
391
392static couchstore_error_t buffered_sync(couchstore_error_info_t *errinfo,
393                                        couch_file_handle handle)
394{
395    buffered_file_handle *h = (buffered_file_handle*)handle;
396    couchstore_error_t err = flush_buffer(errinfo, h->write_buffer);
397    if (err == COUCHSTORE_SUCCESS) {
398        err = h->raw_ops->sync(errinfo, h->raw_ops_handle);
399    }
400    return err;
401}
402
403static couchstore_error_t buffered_advise(couchstore_error_info_t *errinfo,
404                                          couch_file_handle handle,
405                                          cs_off_t offs,
406                                          cs_off_t len,
407                                          couchstore_file_advice_t adv)
408{
409    buffered_file_handle *h = (buffered_file_handle*)handle;
410    return h->raw_ops->advise(errinfo, h->raw_ops_handle, offs, len, adv);
411}
412
413static const couch_file_ops ops = {
414    (uint64_t)5,
415    buffered_constructor,
416    buffered_open,
417    buffered_close,
418    buffered_pread,
419    buffered_pwrite,
420    buffered_goto_eof,
421    buffered_sync,
422    buffered_advise,
423    buffered_destructor,
424    NULL
425};
426
427const couch_file_ops *couch_get_buffered_file_ops(couchstore_error_info_t *errinfo,
428                                                  const couch_file_ops* raw_ops,
429                                                  couch_file_handle* handle,
430                                                  bool readOnly)
431{
432    *handle = buffered_constructor_with_raw_ops(errinfo, raw_ops, readOnly);
433
434    if (*handle) {
435        return &ops;
436    } else {
437        return NULL;
438    }
439}
440