xref: /6.0.3/platform/include/platform/pipe.h (revision 40021669)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17#pragma once
18
19#include <cJSON_utils.h>
20#include <platform/cb_malloc.h>
21#include <platform/platform.h>
22#include <platform/sized_buffer.h>
23
24#include <algorithm>
25#include <cinttypes>
26#include <cstdint>
27#include <functional>
28#include <memory>
29#include <sstream>
30#include <stdexcept>
31
32namespace cb {
33
34/**
35 * The Pipe class is a buffered pipe where you may insert data in one
36 * end, and read it back out from the other (like a normal pipe).
37 *
38 * Instead of using customized iosteram buffers and allows for that kind
39 * of use, this class provides a "produce" method and a "consume" method
40 * where you provide a callback which receives the memory area it may operate
41 * on so that it can perform more optimal IO by populating the buffer
42 * directly:
43 *
44 *    pipe.produce([&sock](void* data, size_t size) -> ssize_t {
45 *        return cb::net::recv(sock, data, size, 0);
46 *    });
47 *
48 *    And now you may use the data by calling:
49 *
50 *    pipe.consume([](const void* data, size_t size) -> ssize_t {
51 *        // do whatever we want with the data
52 *        return nbytes;
53 *    });
54 *
55 * The return value from the consume method is the number of bytes you've
56 * consumed (so that they may be dropped from the pipe). It means that if
57 * you just want to look at the bytes you would return 0, and they would
58 * still be present the next time you invoke the consume method.
59 *
60 * The pipe class is _not_ thread-safe as it provides absolutely no
61 * locking internally.
62 *
63 * Implementation details:
64 *
65 * The Pipe is implemented by using allocating an underlying buffer of a
66 * fixed size. The caller may grow this buffer by calling ensureCapacity,
67 * which would:
68 *
69 *    * return immediately if available segment at the end is big enough
70 *    * pack the buffer if the free segment at the end and the beginning
71 *      is big enough
72 *    * reallocate the underlying buffer if we need more space
73 *
74 * As you can see from the above the last two bullets would invalidate
75 * all addresses previously provided to the "produce" and "consume" methods
76 * which means that you _cannot_ keep pointers into this buffer and use
77 * ensureCapacity.
78 *
79 * Data written to the pipe is always put at the end of the allocated
80 * buffer, and the write_head is moved forward. Data is always read from
81 * the read_head, and once consumed the read head is moved forward. Whenever
82 * the read_head catch up with the write_head they're both set to 0 (the
83 * beginning of the buffer).
84 */
85class Pipe {
86public:
87    /**
88     * Initialize a pipe with the given buffer size (default empty).
89     *
90     * The write end of the pipe may be increased by calling ensureCapacity.
91     *
92     * The minimum size of the buffer is 128 bytes (to avoid having to deal
93     * with 0 sized buffers and code which accidentally end up with such a
94     * buffer in a corner case and get a std::logic_error thrown in their
95     * face).
96     *
97     * @param size The initial size of the buffer in the pipe (default 2k)
98     */
99    explicit Pipe(size_t size = 2048) {
100        const size_t allocation_size = std::max(size, size_t(128));
101        memory.reset(static_cast<uint8_t*>(cb_malloc(allocation_size)));
102        if (!memory) {
103            throw std::bad_alloc();
104        }
105        buffer = {memory.get(), size};
106    }
107
108    /**
109     * Make sure that one may insert at least the specified number
110     * of bytes in the buffer.
111     *
112     * This method might reallocate the buffers and invalidate all pointers
113     * into the buffer.
114     *
115     * @param nbytes The number of bytes needed in the buffer
116     * @return The number of bytes of available space in the buffer (for the
117     *         write end)
118     * @throws std::bad_alloc if memory allocation fails
119     */
120    size_t ensureCapacity(size_t nbytes) {
121        const size_t tail_space = buffer.size() - write_head;
122        if (tail_space >= nbytes) {
123            // There is enough space available at the tail
124            return wsize();
125        }
126
127        // No; we need a bigger buffer (or "pack" our existing buffer
128        // by moving the data in the pipe to the beginning of the buffer).
129        // We don't want to allocate buffers of all kinds of sizes, so just
130        // keep on doubling the size of the buffer until we find a buffer
131        // which is big enough.
132        const size_t needed = nbytes + rsize();
133        size_t nsize = buffer.size();
134        while (needed > nsize) {
135            nsize *= 2;
136        }
137
138        if (nsize != buffer.size()) {
139            // We need to reallocate in order to satisfy the allocation
140            // request.
141            auto* newm = cb_realloc(memory.get(), nsize);
142            if (newm == nullptr) {
143                throw std::bad_alloc();
144            }
145
146            // We're using a std::unique_ptr to keep track of the allocated
147            // memory segment, but realloc may or may not perform a
148            // reallocation. If it did perform a reallocation it released
149            // the allocated memory, so we need to release that from our
150            // unique_ptr to avoid unique_ptr to free the memory when
151            // we try to reset the new memory (if not we'll try to free
152            // the memory twice).
153            memory.release();
154            memory.reset(static_cast<uint8_t*>(newm));
155            buffer = {memory.get(), nsize};
156        }
157
158        // Pack the buffer by moving all of the data to the beginning of
159        // the buffer.
160        pack();
161        return wsize();
162    }
163
164    /**
165     * Get the current allocation size of the buffer
166     */
167    size_t capacity() const {
168        return buffer.size();
169    }
170
171    /**
172     * Read the number of bytes currently available in the read end
173     * of the pipe
174     */
175    size_t rsize() const {
176        return getAvailableReadSpace().size();
177    }
178
179    /**
180     * Get the available read buffer (this may be used for simplicity
181     * rather than calling consume to have to copy it out)
182     */
183    cb::const_byte_buffer rdata() const {
184        return getAvailableReadSpace();
185    }
186
187    /**
188     * Returns the number of bytes available to be written to in the
189     * write end of the pipe
190     */
191    size_t wsize() const {
192        return getAvailableWriteSpace().size();
193    }
194
195    /**
196     * Get the available write buffer
197     */
198    cb::byte_buffer wdata() const {
199        return getAvailableWriteSpace();
200    }
201
202    /**
203     * Try to produce a number of bytes by providing a callback function
204     * which will receive the buffer where the data may be inserted
205     *
206     * @param producer a callback function to produce data into the
207     *                 continuous memory area from ptr and size bytes
208     *                 long
209     * @return the number of bytes produced
210     */
211    ssize_t produce(std::function<ssize_t(void* /* ptr */, size_t /* size */)>
212                            producer) {
213        auto avail = getAvailableWriteSpace();
214
215        const ssize_t ret =
216                producer(static_cast<void*>(avail.data()), avail.size());
217
218        if (ret > 0) {
219            produced(ret);
220        }
221
222        return ret;
223    }
224
225    /**
226     * Try to produce a number of bytes by providing a callback function
227     * which will receive the buffer where the data may be inserted
228     *
229     * @param producer a callback function to produce data into the
230     *                 provided buffer.
231     * @return the number of bytes produced
232     */
233    ssize_t produce(std::function<ssize_t(cb::byte_buffer)> producer) {
234        auto avail = getAvailableWriteSpace();
235
236        const ssize_t ret = producer({avail.data(), avail.size()});
237
238        if (ret > 0) {
239            produced(ret);
240        }
241
242        return ret;
243    }
244
245    /**
246     * A number of bytes was made available for the consumer
247     */
248    void produced(size_t nbytes) {
249        if (write_head + nbytes > buffer.size()) {
250            std::stringstream ss;
251            ss << "Pipe::produced(): Produced bytes exceeds the number of "
252                  "available bytes. { \"nbytes\": "
253               << std::to_string(nbytes) << ","
254               << " \"buffer.size()\": " << std::to_string(buffer.size()) << ","
255               << " \"write_head\": " << std::to_string(write_head) << "}";
256            throw std::logic_error(ss.str());
257        }
258        write_head += nbytes;
259    }
260
261    /**
262     * Try to consume data from the buffer by providing a callback function
263     *
264     * @param producer a callback function to consume data from the provided
265     *                 continuous memory area from ptr and size bytes long.
266     *                 The number of bytes consumed should be returned.
267     * @return the number of bytes consumed
268     */
269    ssize_t consume(std::function<ssize_t(const void* /* ptr */,
270                                          size_t /* size */)> consumer) {
271        auto avail = getAvailableReadSpace();
272        const ssize_t ret =
273                consumer(static_cast<const void*>(avail.data()), avail.size());
274        if (ret > 0) {
275            consumed(ret);
276        }
277        return ret;
278    }
279
280    /**
281     * Try to consume data from the buffer by providing a callback function
282     *
283     * @param producer a callback function to consume data from the provided
284     *                 memory area. The number of bytes consumed should be
285     *                 returned.
286     * @return the number of bytes consumed
287     */
288    ssize_t consume(std::function<ssize_t(cb::const_byte_buffer)> consumer) {
289        auto avail = getAvailableReadSpace();
290        const ssize_t ret = consumer({avail.data(), avail.size()});
291        if (ret > 0) {
292            consumed(ret);
293        }
294        return ret;
295    }
296
297    /**
298     * The number of bytes just removed from the consumer end of the buffer.
299     *
300     * If the consumer catch up with the producer all pointers returned
301     * could be invalidated (most likely reset to the beginning of the
302     * buffer)
303     */
304    void consumed(size_t nbytes) {
305        if (read_head + nbytes > write_head) {
306            throw std::logic_error(
307                    "Pipe::consumed(): Consumed bytes exceeds "
308                    "the number of available bytes");
309        }
310
311        read_head += nbytes;
312        if (empty()) {
313            read_head = write_head = 0;
314        }
315    }
316
317    /**
318     * Pack this buffer
319     *
320     * Given that we use a write_head and read_head in an array instead of
321     * using a ringbuffer one may end up in the situation where the pipe
322     * contains a single byte, but we can't insert any data because that
323     * byte is at the end of the pipe.
324     *
325     * Packing the buffer moves all of the bytes in the pipe to the beginning
326     * of the internal buffer resulting in a larger available memory segment
327     * at the end.
328     *
329     * @return true if the buffer is empty after packing
330     */
331    bool pack() {
332        if (read_head == write_head) {
333            read_head = write_head = 0;
334        } else if (read_head != 0) {
335            ::memmove(buffer.data(),
336                      buffer.data() + read_head,
337                      write_head - read_head);
338            write_head -= read_head;
339            read_head = 0;
340        }
341
342        return empty();
343    }
344
345    /**
346     * Is this buffer empty (the consumer end completely caught up with
347     * the producer)
348     */
349    bool empty() const {
350        return read_head == write_head;
351    }
352
353    /**
354     * Is this buffer full or not
355     */
356    bool full() const {
357        return write_head == buffer.size();
358    }
359
360    /**
361     * Clear all of the content in the buffer
362     */
363    void clear() {
364        write_head = read_head = 0;
365    }
366
367    /**
368     * Get the (internal) properties of the pipe
369     */
370    unique_cJSON_ptr to_json() const {
371        unique_cJSON_ptr ret(cJSON_CreateObject());
372        cJSON_AddUintPtrToObject(ret.get(), "buffer", uintptr_t(buffer.data()));
373        cJSON_AddNumberToObject(ret.get(), "size", buffer.size());
374        cJSON_AddNumberToObject(ret.get(), "read_head", read_head);
375        cJSON_AddNumberToObject(ret.get(), "write_head", write_head);
376        cJSON_AddBoolToObject(ret.get(), "empty", empty());
377        return ret;
378    }
379
380protected:
381    /**
382     * Get information of the _unused_ space in the write end of
383     * the pipe. This is a contiguous space the caller may use, and
384     * call produced() later on to mark the space as used and that it
385     * should be available for the consumer in the read end of the pipe.
386     */
387    cb::byte_buffer getAvailableWriteSpace() const {
388        return {const_cast<uint8_t*>(buffer.data()) + write_head,
389                buffer.size() - write_head};
390    }
391
392    /**
393     * Get information of the available data the consumer may read.
394     * This is a contiguous space the caller may
395     * use, and call consumed() later on to mark the space as free and
396     * available for the producer.
397     */
398    cb::const_byte_buffer getAvailableReadSpace() const {
399        return {buffer.data() + read_head, write_head - read_head};
400    }
401
402    // The information about the underlying buffer
403    cb::byte_buffer buffer;
404
405    struct cb_malloc_deleter {
406        void operator()(uint8_t* ptr) {
407            cb_free(static_cast<void*>(ptr));
408        }
409    };
410    std::unique_ptr<uint8_t, cb_malloc_deleter> memory;
411
412    // The offset in the buffer where we may start write
413    size_t write_head = 0;
414
415    // The offset in the buffer where we may start deading
416    size_t read_head = 0;
417};
418
419} // namespace cb
420