1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #pragma once
19 
20 #include <queue>
21 
22 #include <atomic>
23 
24 #include "atomic.h"
25 #include "threadlocal.h"
26 #include "utility.h"
27 
28 /**
29  * Efficient approximate-FIFO queue optimize for concurrent writers.
30  */
31 template <typename T>
32 class AtomicQueue {
33 public:
AtomicQueue()34     AtomicQueue() : counter(0), numItems(0) {}
35 
~AtomicQueue()36     ~AtomicQueue() {
37         size_t i;
38         for (i = 0; i < counter; ++i) {
39             delete queues[i].load();
40         }
41     }
42 
43     /**
44      * Place an item in the queue.
45      */
push(const T& value)46     void push(const T& value) {
47         std::queue<T> *q = swapQueue(); // steal our queue
48         q->push(value);
49         ++numItems;
50         q = swapQueue(q);
51     }
52 
53     /**
54      * Place an item in the queue.
55      */
push(T&& value)56     void push(T&& value) {
57         std::queue<T>* q = swapQueue(); // steal our queue
58         q->push(std::move(value));
59         ++numItems;
60         q = swapQueue(q);
61     }
62 
63     /**
64      * Grab all items from this queue an place them into the provided
65      * output queue.
66      *
67      * @param outQueue a destination queue to fill
68      */
getAll(std::queue<T> &outQueue)69     void getAll(std::queue<T> &outQueue) {
70         std::queue<T> *q(swapQueue()); // Grab my own queue
71         std::queue<T> *newQueue(NULL);
72         int count(0);
73 
74         // Will start empty unless this thread is adding stuff
75         while (!q->empty()) {
76             outQueue.push(q->front());
77             q->pop();
78             ++count;
79         }
80 
81         size_t c(counter);
82         for (size_t i = 0; i < c; ++i) {
83             // Swap with another thread
84             std::queue<T> *nullQueue(NULL);
85             newQueue = atomic_swapIfNot(queues[i], nullQueue, q);
86             // Empty the queue
87             if (newQueue != NULL) {
88                 q = newQueue;
89                 while (!q->empty()) {
90                     outQueue.push(q->front());
91                     q->pop();
92                     ++count;
93                 }
94             }
95         }
96 
97         q = swapQueue(q);
98         numItems.fetch_sub(count);
99     }
100 
101     /**
102      * True if this queue is empty.
103      */
empty() const104     bool empty() const {
105         return size() == 0;
106     }
107 
108     /**
109      * Return the number of queued items.
110      */
size() const111     size_t size() const {
112         return numItems;
113     }
114 private:
115     static constexpr size_t MAX_THREADS = 500;
116 
initialize()117     AtomicPtr<std::queue<T> > *initialize() {
118         std::queue<T> *q = new std::queue<T>;
119         size_t i(counter++);
120         if (counter > MAX_THREADS) {
121             throw std::overflow_error("AtomicQueue::initialize: exceeded maximum allowed threads");
122         }
123         queues[i].store(q);
124         threadQueue = &queues[i];
125         return &queues[i];
126     }
127 
swapQueue(std::queue<T> *newQueue = NULL)128     std::queue<T> *swapQueue(std::queue<T> *newQueue = NULL) {
129         AtomicPtr<std::queue<T> > *qPtr(threadQueue);
130         if (qPtr == NULL) {
131             qPtr = initialize();
132         }
133         return qPtr->exchange(newQueue);
134     }
135 
136     ThreadLocalPtr<AtomicPtr<std::queue<T> > > threadQueue;
137     AtomicPtr<std::queue<T> > queues[MAX_THREADS];
138     std::atomic<size_t> counter;
139     std::atomic<size_t> numItems;
140     DISALLOW_COPY_AND_ASSIGN(AtomicQueue);
141 };
142