1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2018 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /**
19  * StreamContainer is a custom container designed for use by the DcpProducer so
20  * that it can enable the multiple streams per VB feature. The class is
21  * templated primarily to simplify testing and is not attempting to be a
22  * generic container.
23  *
24  * A StreamContainer default constructs to store a single element, as that is
25  * how the DcpProducer will initialise a vbucket.
26  *
27  * A StreamContainer exposes a thread-safe API and internally uses shared
28  * locking to enable multiple readers. A read API is exposed via a
29  * ReadLockedHandle obtained by calling lock() and a  write API is exposed via
30  * WriteLockedHandle obtained by calling wlock().
31  *
32  * The ReadLockedHandle and WriteLockedHandle both also expose iteration methods
33  * begin()/end()/next() so that the elements of the container can be iterated.
34  *
35  * To support the DcpProducer a ResumableIterationHandle can be obtained by
36  * calling startResumable(). A ResumableIterationHandle is an object which
37  * provides an iteration style interface, but crucially remembers the current
38  * position, this allows the user to begin iterating the elements and destruct
39  * their ResumableIterationHandle and later resume from where they had
40  * previously iterated to (providing that the StreamContainer membership
41  * hasn't changed).
42  *
43  * Examples of ResumableIterationHandle
44  *
45  * If the container store integers 5, 4, 3, 2 and 1 an initial loop (begin to
46  * end) yields:
47  *
48  *    54321
49  *
50  * A new ResumableIterationHandle will from 5, but assume this time we break the
51  * loop 3 is returned:
52  *
53  *    543 <break>
54  *
55  * The next ResumableIterationHandle will start from 2 and will end when a
56  * complete cycle of the elements occurred:
57  *
58  * Now a new loop will start from 2 and would return true from complete when a
59  * full cycle has completed, e.g.
60  *
61  *    21543
62  *
63  * Resuming iteration this time again starts from 2:
64  *
65  *    21543
66  *
67  * Note1: only a single resume point is maintained, so if two threads were
68  * creating ResumableIterationHandle from the same StreamContainer, they will be
69  * interfering with each others resume point.
70  *
71  * Note2: Inserting elements (push_back) or erase() them from the
72  * StreamContainer resets the resume point.
73  */
74 
75 #pragma once
76 
77 #include <forward_list>
78 #include <shared_mutex>
79 
80 #include <folly/SharedMutex.h>
81 
82 template <class Element>
83 class StreamContainer {
84 public:
85     using container = std::forward_list<Element>;
86     using container_itr = typename container::iterator;
87     using container_const_itr = typename container::const_iterator;
88     using element = Element;
89 
90     /**
91      * Default construction is deleted, only a StreamContainer with one element
92      * should be constructed.
93      */
94     StreamContainer() = delete;
95 
96     /**
97      * Create a new StreamContainer with one element
98      */
StreamContainer(Element e)99     StreamContainer(Element e) : c{e}, resumePosition{c.begin()} {
100     }
101 
102     StreamContainer(const StreamContainer&) = delete;
103     StreamContainer& operator=(const StreamContainer&) = delete;
104 
105     StreamContainer(StreamContainer&&) = default;
106     StreamContainer& operator=(StreamContainer&&) = default;
107 
108     /**
109      * StreamContainer::ResumableIterationHandle
110      * This object enables the caller to iterate the StreamContainer, destroy
111      * the iterator and later resume iteration from the previous element
112      */
113     class ResumableIterationHandle {
114     public:
ResumableIterationHandle(StreamContainer& c)115         ResumableIterationHandle(StreamContainer& c)
116             : sharedLock(c.rwlock),
117               container(c),
118               startPosition(container.resumePosition),
119               currentPosition(startPosition) {
120             // call next to prepare the resume position
121             container.next(sharedLock);
122         }
123 
124         /**
125          * @return true if we have completed a full cycle of the StreamContainer
126          *         elements
127          */
complete()128         bool complete() {
129             // empty or we've reached the start
130             bool completed = container.c.empty() ||
131                              (cycled && (currentPosition == startPosition));
132             if (completed) {
133                 // Update myqueue with the resume position
134                 container.resumePosition = currentPosition;
135             }
136             return completed;
137         }
138 
139         /**
140          * Advance the iterator to the next position in the StreamContainer
141          * and handle wrapping around from the end of the underlying container
142          */
next()143         void next() {
144             currentPosition++;
145             if (currentPosition == container.c.end()) {
146                 cycled = true;
147                 currentPosition = container.c.begin();
148             }
149             // Update the resume position owned by the queue
150             container.next(sharedLock);
151         }
152 
153         /// @return a const reference to the current Element
get() const154         const Element& get() const {
155             return *currentPosition;
156         }
157 
158     private:
159         bool cycled = false;
160         std::shared_lock<folly::SharedMutex> sharedLock;
161         StreamContainer& container;
162         typename container::iterator startPosition;
163         typename container::iterator currentPosition;
164     };
165 
166     /// Internal parent class for the iterable read/write handles
167     template <class Iterator>
168     class Iterable {
169     public:
end() const170         bool end() const {
171             return itr == endItr;
172         }
173 
get() const174         const Element& get() const {
175             return *itr;
176         }
177 
next()178         void next() {
179             itr++;
180             before++;
181         }
182 
183     protected:
setIterator(Iterator i)184         void setIterator(Iterator i) {
185             itr = i;
186         }
187 
setBeforeIterator(Iterator i)188         void setBeforeIterator(Iterator i) {
189             before = i;
190         }
191 
setEnd(Iterator i)192         void setEnd(Iterator i) {
193             endItr = i;
194         }
195 
196         /// The iterator we will increment and access through
197         Iterator itr;
198         /// The end iterator for implementing end()
199         Iterator endItr;
200         /// As we use forward_list we need a special 'before' iterator to allow
201         /// erase_after
202         Iterator before;
203     };
204 
205     /**
206      * StreamContainer::ReadLockedHandle
207      * This object obtains read access to the StreamContainer and exposes some
208      * methods for inspecting the StreamContainer.
209      */
210     class ReadLockedHandle : public Iterable<container_const_itr> {
211     public:
ReadLockedHandle(const StreamContainer& c)212         ReadLockedHandle(const StreamContainer& c)
213             : readLock(c.rwlock), container(c) {
214             // sets a const iterator
215             this->setIterator(container.c.begin());
216             this->setBeforeIterator(container.c.before_begin());
217             this->setEnd(container.c.end());
218         }
219 
size() const220         auto size() const {
221             return container.size;
222         }
223 
224     private:
225         std::shared_lock<folly::SharedMutex> readLock;
226         const StreamContainer& container;
227     };
228 
229     /**
230      * StreamContainer::WriteLockedHandle
231      * This object obtains write access to the StreamContainer and exposes some
232      * methods for inspecting and mutating the StreamContainer.
233      */
234     class WriteLockedHandle : public Iterable<container_itr> {
235     public:
WriteLockedHandle(StreamContainer& c)236         WriteLockedHandle(StreamContainer& c)
237             : writeLock(c.rwlock), container(c) {
238             this->setIterator(container.c.begin());
239             this->setBeforeIterator(container.c.before_begin());
240             this->setEnd(container.c.end());
241         }
242 
swap(Element& e)243         void swap(Element& e) {
244             std::swap((*this->itr), e);
245         }
246 
push_front(const Element& e)247         void push_front(const Element& e) {
248             container.push_front(e, writeLock);
249         }
250 
erase()251         void erase() {
252             container.erase_after(this->before, writeLock);
253         }
254 
clear()255         void clear() {
256             container.clear(writeLock);
257         }
258 
empty() const259         bool empty() const {
260             return container.c.empty();
261         }
262 
263     private:
264         std::unique_lock<folly::SharedMutex> writeLock;
265         StreamContainer& container;
266     };
267 
startResumable()268     ResumableIterationHandle startResumable() {
269         return {*this};
270     }
271 
rlock() const272     ReadLockedHandle rlock() const {
273         return ReadLockedHandle{*this};
274     }
275 
wlock()276     WriteLockedHandle wlock() {
277         return WriteLockedHandle{*this};
278     }
279 
280 private:
281     /**
282      * Push an element to the front of the list and reset the resume iterator
283      */
push_front(const Element& e, std::unique_lock<folly::SharedMutex>&)284     void push_front(const Element& e, std::unique_lock<folly::SharedMutex>&) {
285         c.push_front(e);
286         size++;
287         resumePosition = c.begin();
288     }
289 
erase_after(const typename container::iterator& before, std::unique_lock<folly::SharedMutex>&)290     void erase_after(const typename container::iterator& before,
291                      std::unique_lock<folly::SharedMutex>&) {
292         c.erase_after(before);
293         size--;
294         resumePosition = c.begin();
295     }
296 
clear(std::unique_lock<folly::SharedMutex>&)297     void clear(std::unique_lock<folly::SharedMutex>&) {
298         c.clear();
299         size = 0;
300         resumePosition = c.begin();
301     }
302 
next(std::shared_lock<folly::SharedMutex>&)303     void next(std::shared_lock<folly::SharedMutex>&) {
304         if (c.empty()) {
305             return;
306         }
307 
308         resumePosition++;
309         if (resumePosition == c.end()) {
310             resumePosition = c.begin();
311         }
312     }
313 
314     container c;
315     size_t size{1};
316     // StreamContainer supports 'resumable' iteration (only one resume point)
317     // this object stores where to resume from
318     typename container::iterator resumePosition{c.end()};
319     mutable folly::SharedMutex rwlock;
320 };
321