xref: /6.0.3/couchstore/src/iobuffer.cc (revision c2c458ff)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19#include "iobuffer.h"
20#include "internal.h"
21
22#include <algorithm>
23#include <boost/intrusive/list.hpp>
24#include <memory>
25#include <new>
26#include <unordered_map>
27#include <vector>
28
29#include <phosphor/phosphor.h>
30#include <platform/cb_malloc.h>
31#include <platform/make_unique.h>
32#include <stdlib.h>
33#include <string.h>
34#ifndef WIN32
35#include <sys/mman.h>
36#endif
37#include "crc32.h"
38
39// Uncomment to enable debug logging of buffer operations.
40// #define LOG_BUFFER 1
41#if defined(LOG_BUFFER)
42#include <stdio.h>
43#endif
44
45size_t getCapacity(size_t capacity, bool mprotect){
46#ifndef WIN32
47    if (mprotect){
48        return capacity + sysconf(_SC_PAGE_SIZE) - 1;
49    } else
50#endif
51    {
52        return capacity;
53    }
54}
55struct buffered_file_handle;
56struct file_buffer : public boost::intrusive::list_base_hook<> {
57    file_buffer(buffered_file_handle* _owner,
58                size_t _capacity,
59                bool _tracing_enabled,
60                bool _write_validation_enabled,
61                bool _mprotect_enabled)
62        : owner(_owner),
63          capacity(_capacity),
64          length(0),
65          // Setting initial offset to 0 may cause problem
66          // as there can be an actual buffer corresponding
67          // to offset 0.
68          offset(static_cast<cs_off_t>(-1)),
69          dirty(0),
70          tracing_enabled(_tracing_enabled),
71          write_validation_enabled(_write_validation_enabled),
72#ifndef WIN32
73          mprotect_enabled(_mprotect_enabled),
74#else
75          /* For WIN32 mprotect is not supported and so
76           * mprotect_enabled will be always false
77           */
78          mprotect_enabled(false),
79#endif
80          bytes(new uint8_t[getCapacity(_capacity, _mprotect_enabled)])
81    {
82#ifndef WIN32
83        if (mprotect_enabled) {
84            /* Need to page align the ptr to data for mprotect,
85             * allocate more and align based on page size
86             */
87            int pagesize = sysconf(_SC_PAGE_SIZE);
88            raw_aligned_ptr_to_data =
89                    (uint8_t*)(((intptr_t)(&bytes[0]) + (pagesize - 1)) &
90                               (~(pagesize - 1)));
91        }
92#endif
93    }
94
95    ~file_buffer() {
96#ifndef WIN32
97        if (mprotect_enabled) {
98            mprotect(getRawPtr(), capacity, PROT_READ | PROT_WRITE);
99        }
100#endif
101    }
102    uint8_t* getRawPtr() {
103        if (mprotect_enabled) {
104            /* mprotect needs page aligned address */
105            return raw_aligned_ptr_to_data;
106        } else {
107            return &bytes[0];
108        }
109    }
110
111    // Hook for intrusive list.
112    boost::intrusive::list_member_hook<> _lru_hook;
113    // File handle that owns this buffer instance.
114    struct buffered_file_handle *owner;
115    // Buffer capacity.
116    size_t capacity;
117    // Length of data written.
118    size_t length;
119    // Starting offset of buffer.
120    cs_off_t offset;
121    // Flag indicating whether or not this buffer contains dirty data.
122    uint8_t dirty;
123    // Trace and verify flags
124    bool tracing_enabled;
125    bool write_validation_enabled;
126    bool mprotect_enabled;
127    // Aligned ptr to data for mprotect enabled case
128    uint8_t* raw_aligned_ptr_to_data;
129    // Data array.
130    std::unique_ptr<uint8_t[]> bytes;
131};
132
133using UniqueFileBufferPtr = std::unique_ptr<file_buffer>;
134
135using ListMember =
136        boost::intrusive::member_hook< file_buffer,
137                                       boost::intrusive::list_member_hook<>,
138                                       &file_buffer::_lru_hook >;
139
140using FileBufferList = boost::intrusive::list<file_buffer, ListMember>;
141using FileBufferMap = std::unordered_map<size_t, UniqueFileBufferPtr>;
142
143class ReadBufferManager;
144
145// How I interpret a couch_file_handle:
146struct buffered_file_handle {
147    FileOpsInterface* raw_ops;
148    couch_file_handle raw_ops_handle;
149    unsigned nbuffers;
150    UniqueFileBufferPtr write_buffer;
151    ReadBufferManager *read_buffer_mgr;
152    buffered_file_ops_params params;
153};
154
155/**
156 * Class for management of LRU list and hash index for read buffers.
157 * All buffer instances are tracked by using shared pointers.
158 */
159class ReadBufferManager {
160public:
161    ReadBufferManager() : nBuffers(0) {
162    }
163
164    ~ReadBufferManager() {
165        // Note: all elements in intrusive list MUST be unlinked
166        //       before they are freed (unless it will internally
167        //       invoke an assertion failure).
168        auto itr = readLRU.begin();
169        while (itr != readLRU.end()) {
170            itr = readLRU.erase(itr);
171        }
172    }
173
174    file_buffer* findBuffer(buffered_file_handle* h, cs_off_t offset) {
175        // Align offset.
176        offset = offset - offset % h->params.read_buffer_capacity;
177
178        // Find a buffer for this offset,
179        // OR use the last one in LRU list.
180        file_buffer* buffer = nullptr;
181        auto itr_map = readMap.find(offset);
182        if (itr_map != readMap.end()) {
183            // Matching buffer exists.
184            // Move it to the front of LRU, and return.
185            buffer = itr_map->second.get();
186            readLRU.splice(readLRU.begin(), readLRU, readLRU.iterator_to(*buffer));
187            return buffer;
188        }
189
190        // ==== Otherwise: not found.
191
192        if (nBuffers < h->params.max_read_buffers) {
193            // We can still create another buffer.
194            UniqueFileBufferPtr buffer_unique;
195            buffer_unique = std::make_unique<file_buffer>(
196                    h,
197                    h->params.read_buffer_capacity,
198                    h->params.tracing_enabled,
199                    h->params.write_validation_enabled,
200                    h->params.mprotect_enabled);
201            buffer = buffer_unique.get();
202            ++nBuffers;
203            readMap.insert( std::make_pair(buffer->offset,
204                                           std::move(buffer_unique)) );
205            // Locate it at the front of LRU, and return.
206            readLRU.push_front(*buffer);
207            return buffer;
208        }
209
210        // We cannot create a new one.
211        // Recycle the last buffer in the LRU list.
212        auto itr_list = readLRU.rbegin();
213        buffer = &(*itr_list);
214#if defined(LOG_BUFFER)
215        fprintf(stderr, "BUFFER: %p recycled, from %zd to %zd\n",
216                buffer, buffer->offset, offset);
217#endif
218        // Move the buffer to the front of LRU.
219        readLRU.splice(readLRU.begin(), readLRU, itr_list.base());
220        return buffer;
221    }
222
223    void relocateBuffer(cs_off_t old_offset, cs_off_t new_offset) {
224        auto itr = readMap.find(old_offset);
225        if (itr == readMap.end()) {
226            return;
227        }
228
229        UniqueFileBufferPtr tmp = std::move(itr->second);
230        readMap.erase(itr);
231        tmp->offset = new_offset;
232        tmp->length = 0;
233        readMap.insert( std::make_pair(new_offset, std::move(tmp)) );
234    }
235
236private:
237    // LRU list for buffers.
238    FileBufferList readLRU;
239    // Map from offset to buffer instance.
240    FileBufferMap readMap;
241    // Number of buffers allocated.
242    size_t nBuffers;
243};
244
245
246//////// BUFFER WRITES:
247
248
249// Write as many bytes as possible into the buffer, returning the count
250static size_t write_to_buffer(file_buffer* buf,
251                              const void *bytes,
252                              size_t nbyte,
253                              cs_off_t offset)
254{
255    if (buf->length == 0) {
256        // If buffer is empty, align it to start at the current offset:
257        buf->offset = offset;
258    } else if (offset < buf->offset || offset > buf->offset + (cs_off_t)buf->length) {
259        // If it's out of range, don't write anything
260        return 0;
261    }
262    size_t offset_in_buffer = (size_t)(offset - buf->offset);
263    size_t buffer_nbyte = std::min(buf->capacity - offset_in_buffer, nbyte);
264
265    if (buf->tracing_enabled) {
266        uint32_t crc32 = get_checksum(
267                reinterpret_cast<uint8_t*>(const_cast<void*>(bytes)),
268                buffer_nbyte,
269                CRC32C);
270        TRACE_INSTANT2("couchstore_write",
271                       "write_to_buffer",
272                       "offset",
273                       offset,
274                       "nbytes&CRC",
275                       buffer_nbyte << 32 | crc32);
276    }
277
278    if (buf->mprotect_enabled) {
279#ifndef WIN32
280        mprotect(buf->getRawPtr(),
281                 WRITE_BUFFER_CAPACITY,
282                 PROT_READ | PROT_WRITE);
283        memcpy(buf->getRawPtr() + offset_in_buffer, bytes, buffer_nbyte);
284        mprotect(buf->getRawPtr(), WRITE_BUFFER_CAPACITY, PROT_READ);
285#endif
286    } else {
287        memcpy(buf->getRawPtr() + offset_in_buffer, bytes, buffer_nbyte);
288    }
289
290    buf->dirty = 1;
291    offset_in_buffer += buffer_nbyte;
292    if (offset_in_buffer > buf->length)
293        buf->length = offset_in_buffer;
294
295    return buffer_nbyte;
296}
297
298// Write the current buffer to disk and empty it.
299static couchstore_error_t flush_buffer(couchstore_error_info_t *errinfo,
300                                       file_buffer* buf) {
301    while (buf->length > 0 && buf->dirty) {
302        ssize_t raw_written;
303        raw_written = buf->owner->raw_ops->pwrite(errinfo,
304                                                  buf->owner->raw_ops_handle,
305                                                  buf->getRawPtr(),
306                                                  buf->length,
307                                                  buf->offset);
308#if defined(LOG_BUFFER)
309        fprintf(stderr, "BUFFER: %p flush %zd bytes at %zd --> %zd\n",
310                buf, buf->length, buf->offset, raw_written);
311#endif
312        if (buf->tracing_enabled) {
313            uint32_t crc32 =
314                    get_checksum(reinterpret_cast<uint8_t*>(buf->getRawPtr()),
315                                 buf->length,
316                                 CRC32);
317            TRACE_INSTANT2("couchstore_write",
318                           "flush_buffer",
319                           "offset",
320                           buf->offset,
321                           "nbytes&CRC",
322                           raw_written << 32 | crc32);
323        }
324
325        if (raw_written < 0) {
326            if (buf->tracing_enabled) {
327                TRACE_INSTANT1("couchstore_write",
328                               "flush_buffer",
329                               "raw_written",
330                               raw_written);
331            }
332            return (couchstore_error_t) raw_written;
333        }
334        buf->length -= raw_written;
335        buf->offset += raw_written;
336        memmove(buf->getRawPtr(), buf->getRawPtr() + raw_written, buf->length);
337    }
338    buf->dirty = 0;
339    return COUCHSTORE_SUCCESS;
340}
341
342
343//////// BUFFER READS:
344
345
346static size_t read_from_buffer(file_buffer* buf,
347                               void *bytes,
348                               size_t nbyte,
349                               cs_off_t offset) {
350    if (offset < buf->offset || offset >= buf->offset + (cs_off_t)buf->length) {
351        return 0;
352    }
353    size_t offset_in_buffer = (size_t)(offset - buf->offset);
354    size_t buffer_nbyte = std::min(buf->length - offset_in_buffer, nbyte);
355
356    memcpy(bytes, buf->getRawPtr() + offset_in_buffer, buffer_nbyte);
357    return buffer_nbyte;
358}
359
360
361static couchstore_error_t load_buffer_from(couchstore_error_info_t *errinfo,
362                                           file_buffer* buf,
363                                           cs_off_t offset,
364                                           size_t nbyte) {
365    if (buf->dirty) {
366        // If buffer contains data to be written, flush it first:
367        couchstore_error_t err = flush_buffer(errinfo, buf);
368        if (err < 0) {
369            return err;
370        }
371    }
372
373    if (offset < buf->offset || offset + nbyte > buf->offset + buf->capacity) {
374        // Reset the buffer to empty if it has to move:
375        buf->offset = offset;
376        buf->length = 0;
377    }
378
379    // Read data to extend the buffer to its capacity (if possible):
380    ssize_t bytes_read = buf->owner->raw_ops->pread(errinfo,
381                                                    buf->owner->raw_ops_handle,
382                                                    buf->getRawPtr() + buf->length,
383                                                    buf->capacity - buf->length,
384                                                    buf->offset + buf->length);
385#if defined(LOG_BUFFER)
386    fprintf(stderr, "BUFFER: %p loaded %zd bytes from %zd\n",
387            buf, bytes_read, offset + buf->length);
388#endif
389    if (bytes_read < 0) {
390        return (couchstore_error_t) bytes_read;
391    }
392    buf->length += bytes_read;
393    return COUCHSTORE_SUCCESS;
394}
395
396
397//////// PARAMS:
398
399buffered_file_ops_params::buffered_file_ops_params()
400    : readOnly(false),
401      tracing_enabled(false),
402      write_validation_enabled(false),
403      mprotect_enabled(false),
404      read_buffer_capacity(READ_BUFFER_CAPACITY),
405      max_read_buffers(MAX_READ_BUFFERS) {
406}
407
408buffered_file_ops_params::buffered_file_ops_params(
409        const buffered_file_ops_params& src)
410    : readOnly(src.readOnly),
411      tracing_enabled(src.tracing_enabled),
412      write_validation_enabled(src.write_validation_enabled),
413      mprotect_enabled(src.mprotect_enabled),
414      read_buffer_capacity(src.read_buffer_capacity),
415      max_read_buffers(src.max_read_buffers) {
416}
417
418buffered_file_ops_params::buffered_file_ops_params(
419        const bool _read_only,
420        bool _tracing_enabled,
421        bool _write_validation_enabled,
422        bool _mprotect_enabled,
423        const uint32_t _read_buffer_capacity,
424        const uint32_t _max_read_buffers)
425    : readOnly(_read_only),
426      tracing_enabled(_tracing_enabled),
427      write_validation_enabled(_write_validation_enabled),
428      mprotect_enabled(_mprotect_enabled),
429      read_buffer_capacity(_read_buffer_capacity),
430      max_read_buffers(_max_read_buffers) {
431}
432
433//////// FILE API:
434
435void BufferedFileOps::destructor(couch_file_handle handle)
436{
437    buffered_file_handle *h = (buffered_file_handle*)handle;
438    if (!h) {
439        return;
440    }
441    h->raw_ops->destructor(h->raw_ops_handle);
442
443    delete h->read_buffer_mgr;
444    delete h;
445}
446
447couch_file_handle BufferedFileOps::constructor(couchstore_error_info_t* errinfo,
448                                               FileOpsInterface* raw_ops,
449                                               buffered_file_ops_params params)
450{
451    buffered_file_handle *h = new buffered_file_handle();
452    if (h) {
453        h->raw_ops = raw_ops;
454        h->raw_ops_handle = raw_ops->constructor(errinfo);
455        h->nbuffers = 1;
456        h->params = params;
457
458        try {
459            h->write_buffer = std::make_unique<file_buffer>(
460                    h,
461                    h->params.readOnly ? 0 : WRITE_BUFFER_CAPACITY,
462                    h->params.tracing_enabled,
463                    h->params.write_validation_enabled,
464                    h->params.mprotect_enabled);
465            h->read_buffer_mgr = new ReadBufferManager();
466        } catch (const std::bad_alloc&) {
467            destructor(reinterpret_cast<couch_file_handle>(h));
468            return NULL;
469        }
470    }
471    return (couch_file_handle) h;
472}
473
474couch_file_handle BufferedFileOps::constructor(couchstore_error_info_t* errinfo)
475{
476    return constructor(errinfo, couchstore_get_default_file_ops(),
477                       buffered_file_ops_params());
478}
479
480couchstore_error_t BufferedFileOps::open(couchstore_error_info_t* errinfo,
481                                         couch_file_handle* handle,
482                                         const char* path,
483                                         int oflag)
484{
485    buffered_file_handle *h = (buffered_file_handle*)*handle;
486    return h->raw_ops->open(errinfo, &h->raw_ops_handle, path, oflag);
487}
488
489couchstore_error_t BufferedFileOps::close(couchstore_error_info_t* errinfo,
490                            couch_file_handle handle)
491{
492    buffered_file_handle *h = (buffered_file_handle*)handle;
493    if (!h) {
494        return COUCHSTORE_ERROR_INVALID_ARGUMENTS;
495    }
496    flush_buffer(errinfo, h->write_buffer.get());
497    return h->raw_ops->close(errinfo, h->raw_ops_handle);
498}
499
500couchstore_error_t BufferedFileOps::set_periodic_sync(couch_file_handle handle,
501                                                      uint64_t period_bytes) {
502    // Delegate to underlying file ops, given they perform the real disk
503    // writes.
504    buffered_file_handle *h = (buffered_file_handle*)handle;
505    return h->raw_ops->set_periodic_sync(h->raw_ops_handle, period_bytes);
506}
507
508couchstore_error_t BufferedFileOps::set_tracing_enabled(
509        couch_file_handle handle) {
510    // trigger setting tracing flags at the file level */
511    buffered_file_handle* h = (buffered_file_handle*)handle;
512    return h->raw_ops->set_tracing_enabled(h->raw_ops_handle);
513}
514
515couchstore_error_t BufferedFileOps::set_write_validation_enabled(
516        couch_file_handle handle) {
517    // trigger setting write validation flags at the file level */
518    buffered_file_handle* h = (buffered_file_handle*)handle;
519    return h->raw_ops->set_write_validation_enabled(h->raw_ops_handle);
520}
521
522couchstore_error_t BufferedFileOps::set_mprotect_enabled(
523        couch_file_handle handle) {
524    // trigger setting mprotect flags at the file level */
525    buffered_file_handle* h = (buffered_file_handle*)handle;
526    return h->raw_ops->set_mprotect_enabled(h->raw_ops_handle);
527}
528
529ssize_t BufferedFileOps::pread(couchstore_error_info_t* errinfo,
530                               couch_file_handle handle,
531                               void *buf,
532                               size_t nbyte,
533                               cs_off_t offset)
534{
535#if defined(LOG_BUFFER)
536    //fprintf(stderr, "r");
537#endif
538    buffered_file_handle *h = (buffered_file_handle*)handle;
539    // Flush the write buffer before trying to read anything:
540    couchstore_error_t err = flush_buffer(errinfo, h->write_buffer.get());
541    if (err < 0) {
542        return err;
543    }
544
545    ssize_t total_read = 0;
546    while (nbyte > 0) {
547        file_buffer* buffer = h->read_buffer_mgr->findBuffer(h, offset);
548
549        // Read as much as we can from the current buffer:
550        ssize_t nbyte_read = read_from_buffer(buffer, buf, nbyte, offset);
551        if (nbyte_read == 0) {
552            // 'nbyte_read==0' means that the returned buffer contains
553            // data for other offset and needs to be recycled.
554
555            // Move the buffer to cover the remainder of the data to be read.
556            cs_off_t block_start = offset -
557                                   (offset % h->params.read_buffer_capacity);
558            h->read_buffer_mgr->relocateBuffer(buffer->offset, block_start);
559            err = load_buffer_from(errinfo, buffer, block_start,
560                                   (size_t)(offset + nbyte - block_start));
561            if (err < 0) {
562                return err;
563            }
564
565            nbyte_read = read_from_buffer(buffer, buf, nbyte, offset);
566            if (nbyte_read == 0)
567                break;  // must be at EOF
568        }
569        buf = (char*)buf + nbyte_read;
570        nbyte -= nbyte_read;
571        offset += nbyte_read;
572        total_read += nbyte_read;
573    }
574    return total_read;
575}
576
577ssize_t BufferedFileOps::pwrite(couchstore_error_info_t* errinfo,
578                                couch_file_handle handle,
579                                const void* buf,
580                                size_t nbyte,
581                                cs_off_t offset)
582{
583#if defined(LOG_BUFFER)
584    //fprintf(stderr, "w");
585#endif
586    if (nbyte == 0) {
587        return 0;
588    }
589
590    buffered_file_handle *h = (buffered_file_handle*)handle;
591    file_buffer* buffer = h->write_buffer.get();
592
593    // Write data to the current buffer:
594    size_t nbyte_written = write_to_buffer(buffer, buf, nbyte, offset);
595    if (nbyte_written > 0) {
596        buf = (char*)buf + nbyte_written;
597        offset += nbyte_written;
598        nbyte -= nbyte_written;
599    }
600
601    // Flush the buffer if it's full, or if it isn't aligned with the current write:
602    if (buffer->length == buffer->capacity || nbyte_written == 0) {
603        couchstore_error_t error = flush_buffer(errinfo, buffer);
604        if (error < 0)
605            return error;
606    }
607
608    if (nbyte > 0) {
609        ssize_t written;
610        // If the remaining data will fit into the buffer, write it; else write directly:
611        if (nbyte <= (buffer->capacity - buffer->length)) {
612            written = write_to_buffer(buffer, buf, nbyte, offset);
613        } else {
614            written = h->raw_ops->pwrite(errinfo, h->raw_ops_handle, buf,
615                                         nbyte, offset);
616#if defined(LOG_BUFFER)
617            fprintf(stderr, "BUFFER: passthru %zd bytes at %zd --> %zd\n",
618                    nbyte, offset, written);
619#endif
620            if (written < 0) {
621                return written;
622            }
623        }
624        nbyte_written += written;
625    }
626
627    return nbyte_written;
628}
629
630cs_off_t BufferedFileOps::goto_eof(couchstore_error_info_t* errinfo,
631                                  couch_file_handle handle)
632{
633    buffered_file_handle *h = (buffered_file_handle*)handle;
634    return h->raw_ops->goto_eof(errinfo, h->raw_ops_handle);
635}
636
637couchstore_error_t BufferedFileOps::sync(couchstore_error_info_t* errinfo,
638                                         couch_file_handle handle)
639{
640    buffered_file_handle *h = (buffered_file_handle*)handle;
641    couchstore_error_t err = flush_buffer(errinfo, h->write_buffer.get());
642    if (err == COUCHSTORE_SUCCESS) {
643        err = h->raw_ops->sync(errinfo, h->raw_ops_handle);
644    }
645    return err;
646}
647
648couchstore_error_t BufferedFileOps::advise(couchstore_error_info_t* errinfo,
649                                           couch_file_handle handle,
650                                           cs_off_t offs,
651                                           cs_off_t len,
652                                           couchstore_file_advice_t adv)
653{
654    buffered_file_handle *h = (buffered_file_handle*)handle;
655    return h->raw_ops->advise(errinfo, h->raw_ops_handle, offs, len, adv);
656}
657
658FileOpsInterface::FHStats* BufferedFileOps::get_stats(
659        couch_file_handle handle) {
660    // Not implemeted ourselves, just forward to wrapped ops.
661    buffered_file_handle* h = (buffered_file_handle*)handle;
662    return h->raw_ops->get_stats(h->raw_ops_handle);
663}
664
665static BufferedFileOps ops;
666
667FileOpsInterface* couch_get_buffered_file_ops(couchstore_error_info_t* errinfo,
668                                              FileOpsInterface* raw_ops,
669                                              couch_file_handle* handle,
670                                              buffered_file_ops_params params)
671{
672    *handle = ops.constructor(errinfo, raw_ops, params);
673
674    if (*handle) {
675        return &ops;
676    } else {
677        return NULL;
678    }
679}
680