xref: /6.6.0/platform/include/platform/pipe.h (revision cdf833f2)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2019 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#pragma once
18
19#include <folly/portability/SysTypes.h>
20#include <nlohmann/json_fwd.hpp>
21#include <platform/cb_malloc.h>
22#include <platform/sized_buffer.h>
23
24#include <algorithm>
25#include <cinttypes>
26#include <cstdint>
27#include <functional>
28#include <iosfwd>
29#include <memory>
30#include <stdexcept>
31
32namespace cb {
33
34/**
35 * The Pipe class is a buffered pipe where you may insert data in one
36 * end, and read it back out from the other (like a normal pipe).
37 *
38 * Instead of using customized iosteram buffers and allows for that kind
39 * of use, this class provides a "produce" method and a "consume" method
40 * where you provide a callback which receives the memory area it may operate
41 * on so that it can perform more optimal IO by populating the buffer
42 * directly:
43 *
44 *    pipe.produce([&sock](void* data, size_t size) -> ssize_t {
45 *        return cb::net::recv(sock, data, size, 0);
46 *    });
47 *
48 *    And now you may use the data by calling:
49 *
50 *    pipe.consume([](const void* data, size_t size) -> ssize_t {
51 *        // do whatever we want with the data
52 *        return nbytes;
53 *    });
54 *
55 * The return value from the consume method is the number of bytes you've
56 * consumed (so that they may be dropped from the pipe). It means that if
57 * you just want to look at the bytes you would return 0, and they would
58 * still be present the next time you invoke the consume method.
59 *
60 * The pipe class is _not_ thread-safe as it provides absolutely no
61 * locking internally.
62 *
63 * Implementation details:
64 *
65 * The Pipe is implemented by using allocating an underlying buffer of a
66 * fixed size. The caller may grow this buffer by calling ensureCapacity,
67 * which would:
68 *
69 *    * return immediately if available segment at the end is big enough
70 *    * pack the buffer if the free segment at the end and the beginning
71 *      is big enough
72 *    * reallocate the underlying buffer if we need more space
73 *
74 * As you can see from the above the last two bullets would invalidate
75 * all addresses previously provided to the "produce" and "consume" methods
76 * which means that you _cannot_ keep pointers into this buffer and use
77 * ensureCapacity.
78 *
79 * Data written to the pipe is always put at the end of the allocated
80 * buffer, and the write_head is moved forward. Data is always read from
81 * the read_head, and once consumed the read head is moved forward. Whenever
82 * the read_head catch up with the write_head they're both set to 0 (the
83 * beginning of the buffer).
84 */
85class PLATFORM_PUBLIC_API Pipe {
86public:
87    /**
88     * Initialize a pipe with the given buffer size (default empty).
89     *
90     * The write end of the pipe may be increased by calling ensureCapacity.
91     *
92     * The minimum size of the buffer is 128 bytes (to avoid having to deal
93     * with 0 sized buffers and code which accidentally end up with such a
94     * buffer in a corner case and get a std::logic_error thrown in their
95     * face).
96     *
97     * @param size The initial size of the buffer in the pipe (default 2k)
98     */
99    explicit Pipe(size_t size = 2048) {
100        const size_t allocation_size = std::max(size, size_t(128));
101        memory.reset(static_cast<uint8_t*>(cb_malloc(allocation_size)));
102        if (!memory) {
103            throw std::bad_alloc();
104        }
105        buffer = {memory.get(), size};
106    }
107
108    /**
109     * Make sure that one may insert at least the specified number
110     * of bytes in the buffer.
111     *
112     * This method might reallocate the buffers and invalidate all pointers
113     * into the buffer.
114     *
115     * @param nbytes The number of bytes needed in the buffer
116     * @return The number of bytes of available space in the buffer (for the
117     *         write end)
118     * @throws std::bad_alloc if memory allocation fails
119     */
120    size_t ensureCapacity(size_t nbytes) {
121        const size_t tail_space = buffer.size() - write_head;
122        if (tail_space >= nbytes) {
123            // There is enough space available at the tail
124            return wsize();
125        }
126
127        // No; we need a bigger buffer (or "pack" our existing buffer
128        // by moving the data in the pipe to the beginning of the buffer).
129        // We don't want to allocate buffers of all kinds of sizes, so just
130        // keep on doubling the size of the buffer until we find a buffer
131        // which is big enough.
132        const size_t needed = nbytes + rsize();
133        size_t nsize = buffer.size();
134        while (needed > nsize) {
135            nsize *= 2;
136        }
137
138        if (nsize != buffer.size()) {
139            // We need to reallocate in order to satisfy the allocation
140            // request.
141            auto* newm = cb_realloc(memory.get(), nsize);
142            if (newm == nullptr) {
143                throw std::bad_alloc();
144            }
145
146            // We're using a std::unique_ptr to keep track of the allocated
147            // memory segment, but realloc may or may not perform a
148            // reallocation. If it did perform a reallocation it released
149            // the allocated memory, so we need to release that from our
150            // unique_ptr to avoid unique_ptr to free the memory when
151            // we try to reset the new memory (if not we'll try to free
152            // the memory twice).
153            memory.release();
154            memory.reset(static_cast<uint8_t*>(newm));
155            buffer = {memory.get(), nsize};
156        }
157
158        // Pack the buffer by moving all of the data to the beginning of
159        // the buffer.
160        pack();
161        return wsize();
162    }
163
164    /**
165     * Get the current allocation size of the buffer
166     */
167    size_t capacity() const {
168        return buffer.size();
169    }
170
171    /**
172     * Read the number of bytes currently available in the read end
173     * of the pipe
174     */
175    size_t rsize() const {
176        return getAvailableReadSpace().size();
177    }
178
179    /**
180     * Get the available read buffer (this may be used for simplicity
181     * rather than calling consume to have to copy it out)
182     */
183    cb::const_byte_buffer rdata() const {
184        return getAvailableReadSpace();
185    }
186
187    /**
188     * Returns the number of bytes available to be written to in the
189     * write end of the pipe
190     */
191    size_t wsize() const {
192        return getAvailableWriteSpace().size();
193    }
194
195    /**
196     * Get the available write buffer
197     */
198    cb::byte_buffer wdata() const {
199        return getAvailableWriteSpace();
200    }
201
202    /**
203     * Try to produce a number of bytes by providing a callback function
204     * which will receive the buffer where the data may be inserted
205     *
206     * @param producer a callback function to produce data into the
207     *                 continuous memory area from ptr and size bytes
208     *                 long
209     * @return the number of bytes produced
210     */
211    ssize_t produce(std::function<ssize_t(void* /* ptr */, size_t /* size */)>
212                            producer) {
213        auto avail = getAvailableWriteSpace();
214
215        const ssize_t ret =
216                producer(static_cast<void*>(avail.data()), avail.size());
217
218        if (ret > 0) {
219            produced(ret);
220        }
221
222        return ret;
223    }
224
225    /**
226     * Try to produce a number of bytes by providing a callback function
227     * which will receive the buffer where the data may be inserted
228     *
229     * @param producer a callback function to produce data into the
230     *                 provided buffer.
231     * @return the number of bytes produced
232     */
233    ssize_t produce(std::function<ssize_t(cb::byte_buffer)> producer) {
234        auto avail = getAvailableWriteSpace();
235
236        const ssize_t ret = producer({avail.data(), avail.size()});
237
238        if (ret > 0) {
239            produced(ret);
240        }
241
242        return ret;
243    }
244
245    /**
246     * A number of bytes was made available for the consumer
247     */
248    void produced(size_t nbytes);
249
250    /**
251     * Try to consume data from the buffer by providing a callback function
252     *
253     * @param producer a callback function to consume data from the provided
254     *                 continuous memory area from ptr and size bytes long.
255     *                 The number of bytes consumed should be returned.
256     * @return the number of bytes consumed
257     */
258    ssize_t consume(std::function<ssize_t(const void* /* ptr */,
259                                          size_t /* size */)> consumer) {
260        auto avail = getAvailableReadSpace();
261        const ssize_t ret =
262                consumer(static_cast<const void*>(avail.data()), avail.size());
263        if (ret > 0) {
264            consumed(ret);
265        }
266        return ret;
267    }
268
269    /**
270     * Try to consume data from the buffer by providing a callback function
271     *
272     * @param producer a callback function to consume data from the provided
273     *                 memory area. The number of bytes consumed should be
274     *                 returned.
275     * @return the number of bytes consumed
276     */
277    ssize_t consume(std::function<ssize_t(cb::const_byte_buffer)> consumer) {
278        auto avail = getAvailableReadSpace();
279        const ssize_t ret = consumer({avail.data(), avail.size()});
280        if (ret > 0) {
281            consumed(ret);
282        }
283        return ret;
284    }
285
286    /**
287     * The number of bytes just removed from the consumer end of the buffer.
288     *
289     * If the consumer catch up with the producer all pointers returned
290     * could be invalidated (most likely reset to the beginning of the
291     * buffer)
292     */
293    void consumed(size_t nbytes) {
294        if (read_head + nbytes > write_head) {
295            throw std::logic_error(
296                    "Pipe::consumed(): Consumed bytes exceeds "
297                    "the number of available bytes");
298        }
299
300        read_head += nbytes;
301        if (empty()) {
302            read_head = write_head = 0;
303        }
304    }
305
306    /**
307     * Pack this buffer
308     *
309     * Given that we use a write_head and read_head in an array instead of
310     * using a ringbuffer one may end up in the situation where the pipe
311     * contains a single byte, but we can't insert any data because that
312     * byte is at the end of the pipe.
313     *
314     * Packing the buffer moves all of the bytes in the pipe to the beginning
315     * of the internal buffer resulting in a larger available memory segment
316     * at the end.
317     *
318     * @return true if the buffer is empty after packing
319     */
320    bool pack() {
321        if (read_head == write_head) {
322            read_head = write_head = 0;
323        } else if (read_head != 0) {
324            ::memmove(buffer.data(),
325                      buffer.data() + read_head,
326                      write_head - read_head);
327            write_head -= read_head;
328            read_head = 0;
329        }
330
331        return empty();
332    }
333
334    /**
335     * Is this buffer empty (the consumer end completely caught up with
336     * the producer)
337     */
338    bool empty() const {
339        return read_head == write_head;
340    }
341
342    /**
343     * Is this buffer full or not
344     */
345    bool full() const {
346        return write_head == buffer.size();
347    }
348
349    /**
350     * Clear all of the content in the buffer
351     */
352    void clear() {
353        write_head = read_head = 0;
354    }
355
356    /**
357     * Get the (internal) properties of the pipe
358     */
359    nlohmann::json to_json();
360
361protected:
362    /**
363     * Get information of the _unused_ space in the write end of
364     * the pipe. This is a contiguous space the caller may use, and
365     * call produced() later on to mark the space as used and that it
366     * should be available for the consumer in the read end of the pipe.
367     */
368    cb::byte_buffer getAvailableWriteSpace() const {
369        return {const_cast<uint8_t*>(buffer.data()) + write_head,
370                buffer.size() - write_head};
371    }
372
373    /**
374     * Get information of the available data the consumer may read.
375     * This is a contiguous space the caller may
376     * use, and call consumed() later on to mark the space as free and
377     * available for the producer.
378     */
379    cb::const_byte_buffer getAvailableReadSpace() const {
380        return {buffer.data() + read_head, write_head - read_head};
381    }
382
383    // The information about the underlying buffer
384    cb::byte_buffer buffer;
385
386    struct cb_malloc_deleter {
387        void operator()(uint8_t* ptr) {
388            cb_free(static_cast<void*>(ptr));
389        }
390    };
391    std::unique_ptr<uint8_t, cb_malloc_deleter> memory;
392
393    // The offset in the buffer where we may start write
394    size_t write_head = 0;
395
396    // The offset in the buffer where we may start deading
397    size_t read_head = 0;
398};
399
400} // namespace cb
401