xref: /6.0.3/couchstore/src/iobuffer.cc (revision eef37beb)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19#include "iobuffer.h"
20#include "internal.h"
21
22#include <algorithm>
23#include <boost/intrusive/list.hpp>
24#include <memory>
25#include <new>
26#include <unordered_map>
27#include <vector>
28
29#include <platform/cb_malloc.h>
30#include <platform/make_unique.h>
31#include <stdlib.h>
32#include <string.h>
33
34// Uncomment to enable debug logging of buffer operations.
35// #define LOG_BUFFER 1
36#if defined(LOG_BUFFER)
37#include <stdio.h>
38#endif
39
40struct buffered_file_handle;
41struct file_buffer : public boost::intrusive::list_base_hook<> {
42    file_buffer(buffered_file_handle* _owner, size_t _capacity)
43        : owner(_owner),
44          capacity(_capacity),
45          length(0),
46          // Setting initial offset to 0 may cause problem
47          // as there can be an actual buffer corresponding
48          // to offset 0.
49          offset(static_cast<cs_off_t>(-1)),
50          dirty(0),
51          bytes(new uint8_t[_capacity]) {
52    }
53
54    uint8_t* getRawPtr() {
55        return &bytes[0];
56    }
57
58    // Hook for intrusive list.
59    boost::intrusive::list_member_hook<> _lru_hook;
60    // File handle that owns this buffer instance.
61    struct buffered_file_handle *owner;
62    // Buffer capacity.
63    size_t capacity;
64    // Length of data written.
65    size_t length;
66    // Starting offset of buffer.
67    cs_off_t offset;
68    // Flag indicating whether or not this buffer contains dirty data.
69    uint8_t dirty;
70    // Data array.
71    std::unique_ptr<uint8_t[]> bytes;
72};
73
74using UniqueFileBufferPtr = std::unique_ptr<file_buffer>;
75
76using ListMember =
77        boost::intrusive::member_hook< file_buffer,
78                                       boost::intrusive::list_member_hook<>,
79                                       &file_buffer::_lru_hook >;
80
81using FileBufferList = boost::intrusive::list<file_buffer, ListMember>;
82using FileBufferMap = std::unordered_map<size_t, UniqueFileBufferPtr>;
83
84class ReadBufferManager;
85
86// How I interpret a couch_file_handle:
87struct buffered_file_handle {
88    FileOpsInterface* raw_ops;
89    couch_file_handle raw_ops_handle;
90    unsigned nbuffers;
91    UniqueFileBufferPtr write_buffer;
92    ReadBufferManager *read_buffer_mgr;
93    buffered_file_ops_params params;
94};
95
96/**
97 * Class for management of LRU list and hash index for read buffers.
98 * All buffer instances are tracked by using shared pointers.
99 */
100class ReadBufferManager {
101public:
102    ReadBufferManager() : nBuffers(0) {
103    }
104
105    ~ReadBufferManager() {
106        // Note: all elements in intrusive list MUST be unlinked
107        //       before they are freed (unless it will internally
108        //       invoke an assertion failure).
109        auto itr = readLRU.begin();
110        while (itr != readLRU.end()) {
111            itr = readLRU.erase(itr);
112        }
113    }
114
115    file_buffer* findBuffer(buffered_file_handle* h, cs_off_t offset) {
116        // Align offset.
117        offset = offset - offset % h->params.read_buffer_capacity;
118
119        // Find a buffer for this offset,
120        // OR use the last one in LRU list.
121        file_buffer* buffer = nullptr;
122        auto itr_map = readMap.find(offset);
123        if (itr_map != readMap.end()) {
124            // Matching buffer exists.
125            // Move it to the front of LRU, and return.
126            buffer = itr_map->second.get();
127            readLRU.splice(readLRU.begin(), readLRU, readLRU.iterator_to(*buffer));
128            return buffer;
129        }
130
131        // ==== Otherwise: not found.
132
133        if (nBuffers < h->params.max_read_buffers) {
134            // We can still create another buffer.
135            UniqueFileBufferPtr buffer_unique;
136            buffer_unique = std::make_unique<file_buffer>(
137                    h, h->params.read_buffer_capacity);
138            buffer = buffer_unique.get();
139            ++nBuffers;
140            readMap.insert( std::make_pair(buffer->offset,
141                                           std::move(buffer_unique)) );
142            // Locate it at the front of LRU, and return.
143            readLRU.push_front(*buffer);
144            return buffer;
145        }
146
147        // We cannot create a new one.
148        // Recycle the last buffer in the LRU list.
149        auto itr_list = readLRU.rbegin();
150        buffer = &(*itr_list);
151#if defined(LOG_BUFFER)
152        fprintf(stderr, "BUFFER: %p recycled, from %zd to %zd\n",
153                buffer, buffer->offset, offset);
154#endif
155        // Move the buffer to the front of LRU.
156        readLRU.splice(readLRU.begin(), readLRU, itr_list.base());
157        return buffer;
158    }
159
160    void relocateBuffer(cs_off_t old_offset, cs_off_t new_offset) {
161        auto itr = readMap.find(old_offset);
162        if (itr == readMap.end()) {
163            return;
164        }
165
166        UniqueFileBufferPtr tmp = std::move(itr->second);
167        readMap.erase(itr);
168        tmp->offset = new_offset;
169        tmp->length = 0;
170        readMap.insert( std::make_pair(new_offset, std::move(tmp)) );
171    }
172
173private:
174    // LRU list for buffers.
175    FileBufferList readLRU;
176    // Map from offset to buffer instance.
177    FileBufferMap readMap;
178    // Number of buffers allocated.
179    size_t nBuffers;
180};
181
182
183//////// BUFFER WRITES:
184
185
186// Write as many bytes as possible into the buffer, returning the count
187static size_t write_to_buffer(file_buffer* buf,
188                              const void *bytes,
189                              size_t nbyte,
190                              cs_off_t offset)
191{
192    if (buf->length == 0) {
193        // If buffer is empty, align it to start at the current offset:
194        buf->offset = offset;
195    } else if (offset < buf->offset || offset > buf->offset + (cs_off_t)buf->length) {
196        // If it's out of range, don't write anything
197        return 0;
198    }
199    size_t offset_in_buffer = (size_t)(offset - buf->offset);
200    size_t buffer_nbyte = std::min(buf->capacity - offset_in_buffer, nbyte);
201
202    memcpy(buf->getRawPtr() + offset_in_buffer, bytes, buffer_nbyte);
203    buf->dirty = 1;
204    offset_in_buffer += buffer_nbyte;
205    if (offset_in_buffer > buf->length)
206        buf->length = offset_in_buffer;
207
208    return buffer_nbyte;
209}
210
211// Write the current buffer to disk and empty it.
212static couchstore_error_t flush_buffer(couchstore_error_info_t *errinfo,
213                                       file_buffer* buf) {
214    while (buf->length > 0 && buf->dirty) {
215        ssize_t raw_written;
216        raw_written = buf->owner->raw_ops->pwrite(errinfo,
217                                                  buf->owner->raw_ops_handle,
218                                                  buf->getRawPtr(),
219                                                  buf->length,
220                                                  buf->offset);
221#if defined(LOG_BUFFER)
222        fprintf(stderr, "BUFFER: %p flush %zd bytes at %zd --> %zd\n",
223                buf, buf->length, buf->offset, raw_written);
224#endif
225        if (raw_written < 0)
226            return (couchstore_error_t) raw_written;
227        buf->length -= raw_written;
228        buf->offset += raw_written;
229        memmove(buf->getRawPtr(), buf->getRawPtr() + raw_written, buf->length);
230    }
231    buf->dirty = 0;
232    return COUCHSTORE_SUCCESS;
233}
234
235
236//////// BUFFER READS:
237
238
239static size_t read_from_buffer(file_buffer* buf,
240                               void *bytes,
241                               size_t nbyte,
242                               cs_off_t offset) {
243    if (offset < buf->offset || offset >= buf->offset + (cs_off_t)buf->length) {
244        return 0;
245    }
246    size_t offset_in_buffer = (size_t)(offset - buf->offset);
247    size_t buffer_nbyte = std::min(buf->length - offset_in_buffer, nbyte);
248
249    memcpy(bytes, buf->getRawPtr() + offset_in_buffer, buffer_nbyte);
250    return buffer_nbyte;
251}
252
253
254static couchstore_error_t load_buffer_from(couchstore_error_info_t *errinfo,
255                                           file_buffer* buf,
256                                           cs_off_t offset,
257                                           size_t nbyte) {
258    if (buf->dirty) {
259        // If buffer contains data to be written, flush it first:
260        couchstore_error_t err = flush_buffer(errinfo, buf);
261        if (err < 0) {
262            return err;
263        }
264    }
265
266    if (offset < buf->offset || offset + nbyte > buf->offset + buf->capacity) {
267        // Reset the buffer to empty if it has to move:
268        buf->offset = offset;
269        buf->length = 0;
270    }
271
272    // Read data to extend the buffer to its capacity (if possible):
273    ssize_t bytes_read = buf->owner->raw_ops->pread(errinfo,
274                                                    buf->owner->raw_ops_handle,
275                                                    buf->getRawPtr() + buf->length,
276                                                    buf->capacity - buf->length,
277                                                    buf->offset + buf->length);
278#if defined(LOG_BUFFER)
279    fprintf(stderr, "BUFFER: %p loaded %zd bytes from %zd\n",
280            buf, bytes_read, offset + buf->length);
281#endif
282    if (bytes_read < 0) {
283        return (couchstore_error_t) bytes_read;
284    }
285    buf->length += bytes_read;
286    return COUCHSTORE_SUCCESS;
287}
288
289
290//////// PARAMS:
291
292buffered_file_ops_params::buffered_file_ops_params() :
293    readOnly(false),
294    read_buffer_capacity(READ_BUFFER_CAPACITY),
295    max_read_buffers(MAX_READ_BUFFERS)
296{ }
297
298buffered_file_ops_params::buffered_file_ops_params(const buffered_file_ops_params& src) :
299    readOnly(src.readOnly),
300    read_buffer_capacity(src.read_buffer_capacity),
301    max_read_buffers(src.max_read_buffers)
302{ }
303
304buffered_file_ops_params::buffered_file_ops_params(const bool _read_only,
305                                                   const uint32_t _read_buffer_capacity,
306                                                   const uint32_t _max_read_buffers) :
307    readOnly(_read_only),
308    read_buffer_capacity(_read_buffer_capacity),
309    max_read_buffers(_max_read_buffers)
310{ }
311
312
313//////// FILE API:
314
315void BufferedFileOps::destructor(couch_file_handle handle)
316{
317    buffered_file_handle *h = (buffered_file_handle*)handle;
318    if (!h) {
319        return;
320    }
321    h->raw_ops->destructor(h->raw_ops_handle);
322
323    delete h->read_buffer_mgr;
324    delete h;
325}
326
327couch_file_handle BufferedFileOps::constructor(couchstore_error_info_t* errinfo,
328                                               FileOpsInterface* raw_ops,
329                                               buffered_file_ops_params params)
330{
331    buffered_file_handle *h = new buffered_file_handle();
332    if (h) {
333        h->raw_ops = raw_ops;
334        h->raw_ops_handle = raw_ops->constructor(errinfo);
335        h->nbuffers = 1;
336        h->params = params;
337
338        try {
339            h->write_buffer = std::make_unique<file_buffer>(
340                    h, h->params.readOnly ? 0 : WRITE_BUFFER_CAPACITY);
341            h->read_buffer_mgr = new ReadBufferManager();
342        } catch (const std::bad_alloc&) {
343            destructor(reinterpret_cast<couch_file_handle>(h));
344            return NULL;
345        }
346    }
347    return (couch_file_handle) h;
348}
349
350couch_file_handle BufferedFileOps::constructor(couchstore_error_info_t* errinfo)
351{
352    return constructor(errinfo, couchstore_get_default_file_ops(),
353                       buffered_file_ops_params());
354}
355
356couchstore_error_t BufferedFileOps::open(couchstore_error_info_t* errinfo,
357                                         couch_file_handle* handle,
358                                         const char* path,
359                                         int oflag)
360{
361    buffered_file_handle *h = (buffered_file_handle*)*handle;
362    return h->raw_ops->open(errinfo, &h->raw_ops_handle, path, oflag);
363}
364
365couchstore_error_t BufferedFileOps::close(couchstore_error_info_t* errinfo,
366                            couch_file_handle handle)
367{
368    buffered_file_handle *h = (buffered_file_handle*)handle;
369    if (!h) {
370        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
371    }
372    flush_buffer(errinfo, h->write_buffer.get());
373    return h->raw_ops->close(errinfo, h->raw_ops_handle);
374}
375
376couchstore_error_t BufferedFileOps::set_periodic_sync(couch_file_handle handle,
377                                                      uint64_t period_bytes) {
378    // Delegate to underlying file ops, given they perform the real disk
379    // writes.
380    buffered_file_handle *h = (buffered_file_handle*)handle;
381    return h->raw_ops->set_periodic_sync(h->raw_ops_handle, period_bytes);
382}
383
384ssize_t BufferedFileOps::pread(couchstore_error_info_t* errinfo,
385                               couch_file_handle handle,
386                               void *buf,
387                               size_t nbyte,
388                               cs_off_t offset)
389{
390#if defined(LOG_BUFFER)
391    //fprintf(stderr, "r");
392#endif
393    buffered_file_handle *h = (buffered_file_handle*)handle;
394    // Flush the write buffer before trying to read anything:
395    couchstore_error_t err = flush_buffer(errinfo, h->write_buffer.get());
396    if (err < 0) {
397        return err;
398    }
399
400    ssize_t total_read = 0;
401    while (nbyte > 0) {
402        file_buffer* buffer = h->read_buffer_mgr->findBuffer(h, offset);
403
404        // Read as much as we can from the current buffer:
405        ssize_t nbyte_read = read_from_buffer(buffer, buf, nbyte, offset);
406        if (nbyte_read == 0) {
407            // 'nbyte_read==0' means that the returned buffer contains
408            // data for other offset and needs to be recycled.
409
410            // Move the buffer to cover the remainder of the data to be read.
411            cs_off_t block_start = offset -
412                                   (offset % h->params.read_buffer_capacity);
413            h->read_buffer_mgr->relocateBuffer(buffer->offset, block_start);
414            err = load_buffer_from(errinfo, buffer, block_start,
415                                   (size_t)(offset + nbyte - block_start));
416            if (err < 0) {
417                return err;
418            }
419
420            nbyte_read = read_from_buffer(buffer, buf, nbyte, offset);
421            if (nbyte_read == 0)
422                break;  // must be at EOF
423        }
424        buf = (char*)buf + nbyte_read;
425        nbyte -= nbyte_read;
426        offset += nbyte_read;
427        total_read += nbyte_read;
428    }
429    return total_read;
430}
431
432ssize_t BufferedFileOps::pwrite(couchstore_error_info_t* errinfo,
433                                couch_file_handle handle,
434                                const void* buf,
435                                size_t nbyte,
436                                cs_off_t offset)
437{
438#if defined(LOG_BUFFER)
439    //fprintf(stderr, "w");
440#endif
441    if (nbyte == 0) {
442        return 0;
443    }
444
445    buffered_file_handle *h = (buffered_file_handle*)handle;
446    file_buffer* buffer = h->write_buffer.get();
447
448    // Write data to the current buffer:
449    size_t nbyte_written = write_to_buffer(buffer, buf, nbyte, offset);
450    if (nbyte_written > 0) {
451        buf = (char*)buf + nbyte_written;
452        offset += nbyte_written;
453        nbyte -= nbyte_written;
454    }
455
456    // Flush the buffer if it's full, or if it isn't aligned with the current write:
457    if (buffer->length == buffer->capacity || nbyte_written == 0) {
458        couchstore_error_t error = flush_buffer(errinfo, buffer);
459        if (error < 0)
460            return error;
461    }
462
463    if (nbyte > 0) {
464        ssize_t written;
465        // If the remaining data will fit into the buffer, write it; else write directly:
466        if (nbyte <= (buffer->capacity - buffer->length)) {
467            written = write_to_buffer(buffer, buf, nbyte, offset);
468        } else {
469            written = h->raw_ops->pwrite(errinfo, h->raw_ops_handle, buf,
470                                         nbyte, offset);
471#if defined(LOG_BUFFER)
472            fprintf(stderr, "BUFFER: passthru %zd bytes at %zd --> %zd\n",
473                    nbyte, offset, written);
474#endif
475            if (written < 0) {
476                return written;
477            }
478        }
479        nbyte_written += written;
480    }
481
482    return nbyte_written;
483}
484
485cs_off_t BufferedFileOps::goto_eof(couchstore_error_info_t* errinfo,
486                                  couch_file_handle handle)
487{
488    buffered_file_handle *h = (buffered_file_handle*)handle;
489    return h->raw_ops->goto_eof(errinfo, h->raw_ops_handle);
490}
491
492couchstore_error_t BufferedFileOps::sync(couchstore_error_info_t* errinfo,
493                                         couch_file_handle handle)
494{
495    buffered_file_handle *h = (buffered_file_handle*)handle;
496    couchstore_error_t err = flush_buffer(errinfo, h->write_buffer.get());
497    if (err == COUCHSTORE_SUCCESS) {
498        err = h->raw_ops->sync(errinfo, h->raw_ops_handle);
499    }
500    return err;
501}
502
503couchstore_error_t BufferedFileOps::advise(couchstore_error_info_t* errinfo,
504                                           couch_file_handle handle,
505                                           cs_off_t offs,
506                                           cs_off_t len,
507                                           couchstore_file_advice_t adv)
508{
509    buffered_file_handle *h = (buffered_file_handle*)handle;
510    return h->raw_ops->advise(errinfo, h->raw_ops_handle, offs, len, adv);
511}
512
513FileOpsInterface::FHStats* BufferedFileOps::get_stats(
514        couch_file_handle handle) {
515    // Not implemeted ourselves, just forward to wrapped ops.
516    buffered_file_handle* h = (buffered_file_handle*)handle;
517    return h->raw_ops->get_stats(h->raw_ops_handle);
518}
519
520static BufferedFileOps ops;
521
522FileOpsInterface* couch_get_buffered_file_ops(couchstore_error_info_t* errinfo,
523                                              FileOpsInterface* raw_ops,
524                                              couch_file_handle* handle,
525                                              buffered_file_ops_params params)
526{
527    *handle = ops.constructor(errinfo, raw_ops, params);
528
529    if (*handle) {
530        return &ops;
531    } else {
532        return NULL;
533    }
534}
535