1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2010 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #ifndef SRC_ATOMIC_H_
19 #define SRC_ATOMIC_H_ 1
20
21 #include "config.h"
22
23 #include <algorithm>
24 #include <queue>
25 #include <vector>
26
27 #ifdef USE_CXX11_ATOMICS
28 #include <atomic>
29 #include <thread>
30 #define AtomicValue std::atomic
31 using std::memory_order;
32 using std::memory_order_relaxed;
33 using std::memory_order_consume;
34 using std::memory_order_acquire;
35 using std::memory_order_release;
36 using std::memory_order_acq_rel;
37 using std::memory_order_seq_cst;
38 #else
39 #define AtomicValue CouchbaseAtomic
40 enum memory_order {
41 memory_order_relaxed,
42 memory_order_consume,
43 memory_order_acquire,
44 memory_order_release,
45 memory_order_acq_rel,
46 memory_order_seq_cst
47 };
48 #endif
49
50 #if defined(HAVE_GCC_ATOMICS)
51 #include "atomic/gcc_atomics.h"
52 #elif defined(HAVE_ATOMIC_H)
53 #include "atomic/libatomic.h"
54 #elif _MSC_VER
55 #define ep_sync_synchronize() MemoryBarrier()
56 #else
57 #error "Don't know how to use atomics on your target system!"
58 #endif
59
60 #include "callbacks.h"
61 #include "locks.h"
62
63 #ifndef _MSC_VER
64 /**
65 * Holder of atomic values.
66 */
67 template <typename T>
68 class CouchbaseAtomic {
69 public:
70
CouchbaseAtomic(const T &initial = (T)0)71 CouchbaseAtomic(const T &initial = (T)0) {
72 store(initial);
73 }
74
~CouchbaseAtomic()75 ~CouchbaseAtomic() {}
76
load(memory_order sync = memory_order_acq_rel) const77 T load(memory_order sync = memory_order_acq_rel) const {
78 (void) sync;
79 return value;
80 }
81
store(const T &newValue, memory_order sync = memory_order_acq_rel)82 void store(const T &newValue, memory_order sync = memory_order_acq_rel) {
83 (void) sync;
84 value = newValue;
85 ep_sync_synchronize();
86 }
87
88
compare_exchange_strong(T& expected, T val, memory_order sync = memory_order_acq_rel)89 bool compare_exchange_strong(T& expected, T val,
90 memory_order sync = memory_order_acq_rel) {
91 (void) sync;
92 if (ep_sync_bool_compare_and_swap(&value, expected, val)) {
93 return true;
94 } else {
95 expected = load();
96 return false;
97 }
98 }
99
operator T() const100 operator T() const {
101 return load();
102 }
103
operator =(const T &newValue)104 void operator =(const T &newValue) {
105 store(newValue);
106 }
107
operator ++()108 T operator ++() { // prefix
109 return ep_sync_add_and_fetch(&value, 1);
110 }
111
operator ++(int)112 T operator ++(int) { // postfix
113 return ep_sync_fetch_and_add(&value, 1);
114 }
115
operator --()116 T operator --() { // prefix
117 return ep_sync_add_and_fetch(&value, -1);
118 }
119
operator --(int)120 T operator --(int) { // postfix
121 return ep_sync_fetch_and_add(&value, -1);
122 }
123
fetch_add(const T &increment, memory_order sync = memory_order_acq_rel)124 T fetch_add(const T &increment, memory_order sync = memory_order_acq_rel) {
125 // Returns the old value
126 (void) sync;
127 return ep_sync_fetch_and_add(&value, increment);
128 }
129
fetch_sub(const T &decrement, memory_order sync = memory_order_acq_rel)130 T fetch_sub(const T &decrement, memory_order sync = memory_order_acq_rel) {
131 (void) sync;
132 return ep_sync_add_and_fetch(&value, -(signed)decrement);
133 }
134
exchange(const T &newValue)135 T exchange(const T &newValue) {
136 T rv;
137 while (true) {
138 rv = load();
139 if (compare_exchange_strong(rv, newValue)) {
140 break;
141 }
142 }
143 return rv;
144 }
145
swapIfNot(const T &badValue, const T &newValue)146 T swapIfNot(const T &badValue, const T &newValue) {
147 T oldValue;
148 while (true) {
149 oldValue = load();
150 if (oldValue != badValue) {
151 if (compare_exchange_strong(oldValue, newValue)) {
152 break;
153 }
154 } else {
155 break;
156 }
157 }
158 return oldValue;
159 }
160
161 private:
162
163 volatile T value;
164 };
165
166 #endif
167
168 template <typename T>
atomic_setIfBigger(AtomicValue<T> &obj, const T &newValue)169 void atomic_setIfBigger(AtomicValue<T> &obj, const T &newValue) {
170 T oldValue = obj.load();
171 while (newValue > oldValue) {
172 if (obj.compare_exchange_strong(oldValue, newValue)) {
173 break;
174 }
175 oldValue = obj.load();
176 }
177 }
178
179 template <typename T>
atomic_setIfLess(AtomicValue<T> &obj, const T &newValue)180 void atomic_setIfLess(AtomicValue<T> &obj, const T &newValue) {
181 T oldValue = obj.load();
182 while (newValue < oldValue) {
183 if (obj.compare_exchange_strong(oldValue, newValue)) {
184 break;
185 }
186 oldValue = obj.load();
187 }
188 }
189
190 /**
191 * Atomic pointer.
192 *
193 * This does *not* make the item that's pointed to atomic.
194 */
195 template <typename T>
196 class AtomicPtr : public AtomicValue<T*> {
197 public:
AtomicPtr(T *initial = NULL)198 AtomicPtr(T *initial = NULL) : AtomicValue<T*>(initial) {}
199
~AtomicPtr()200 ~AtomicPtr() {}
201
operator ->()202 T *operator ->() {
203 return AtomicValue<T*>::load();
204 }
205
operator *()206 T &operator *() {
207 return *AtomicValue<T*>::load();
208 }
209
operator bool() const210 operator bool() const {
211 return AtomicValue<T*>::load() != NULL;
212 }
213
operator !() const214 bool operator !() const {
215 return AtomicValue<T*>::load() == NULL;
216 }
217 };
218
219 /**
220 * A lighter-weight, smaller lock than a mutex.
221 *
222 * This is primarily useful when contention is rare.
223 */
224 class SpinLock {
225 public:
226 // It seems like inlining the code caused the dtrace probe to
227 // be optimized away ;)
228 SpinLock();
229 ~SpinLock();
230
231 void acquire(void);
232 void release(void);
233
234 private:
235 bool tryAcquire(void);
236
237 #ifdef USE_CXX11_ATOMICS
238 std::atomic_flag lock;
239 #else
240 volatile int lock;
241 #endif
242 DISALLOW_COPY_AND_ASSIGN(SpinLock);
243 };
244
245 /**
246 * Safe LockHolder for SpinLock instances.
247 */
248 class SpinLockHolder {
249 public:
SpinLockHolder(SpinLock *theLock)250 SpinLockHolder(SpinLock *theLock) : sl(theLock) {
251 lock();
252 }
253
~SpinLockHolder()254 ~SpinLockHolder() {
255 unlock();
256 }
257
lock()258 void lock() {
259 sl->acquire();
260 locked = true;
261 }
262
unlock()263 void unlock() {
264 if (locked) {
265 sl->release();
266 locked = false;
267 }
268 }
269 private:
270 SpinLock *sl;
271 bool locked;
272 };
273
274 template <class T> class RCPtr;
275 template <class S> class SingleThreadedRCPtr;
276
277 /**
278 * A reference counted value (used by RCPtr and SingleThreadedRCPtr).
279 */
280 class RCValue {
281 public:
RCValue()282 RCValue() : _rc_refcount(0) {}
RCValue(const RCValue &)283 RCValue(const RCValue &) : _rc_refcount(0) {}
~RCValue()284 ~RCValue() {}
285 private:
286 template <class MyTT> friend class RCPtr;
287 template <class MySS> friend class SingleThreadedRCPtr;
_rc_incref() const288 int _rc_incref() const {
289 return ++_rc_refcount;
290 }
291
_rc_decref() const292 int _rc_decref() const {
293 return --_rc_refcount;
294 }
295
296 mutable AtomicValue<int> _rc_refcount;
297 };
298
299 /**
300 * Concurrent reference counted pointer.
301 */
302 template <class C>
303 class RCPtr {
304 public:
RCPtr(C *init = NULL)305 RCPtr(C *init = NULL) : value(init) {
306 if (init != NULL) {
307 static_cast<RCValue*>(value)->_rc_incref();
308 }
309 }
310
RCPtr(const RCPtr<C> &other)311 RCPtr(const RCPtr<C> &other) : value(other.gimme()) {}
312
~RCPtr()313 ~RCPtr() {
314 if (value && static_cast<RCValue *>(value)->_rc_decref() == 0) {
315 delete get();
316 }
317 }
318
reset(C *newValue = NULL)319 void reset(C *newValue = NULL) {
320 if (newValue != NULL) {
321 static_cast<RCValue *>(newValue)->_rc_incref();
322 }
323 swap(newValue);
324 }
325
reset(const RCPtr<C> &other)326 void reset(const RCPtr<C> &other) {
327 swap(other.gimme());
328 }
329
330 // safe for the lifetime of this instance
get() const331 C *get() const {
332 return value;
333 }
334
operator =(const RCPtr<C> &other)335 RCPtr<C> & operator =(const RCPtr<C> &other) {
336 reset(other);
337 return *this;
338 }
339
operator *() const340 C &operator *() const {
341 return *value;
342 }
343
operator ->() const344 C *operator ->() const {
345 return value;
346 }
347
operator !() const348 bool operator! () const {
349 return !value;
350 }
351
operator bool() const352 operator bool () const {
353 return (bool)value;
354 }
355
356 private:
gimme() const357 C *gimme() const {
358 SpinLockHolder lh(&lock);
359 if (value) {
360 static_cast<RCValue *>(value)->_rc_incref();
361 }
362 return value;
363 }
364
swap(C *newValue)365 void swap(C *newValue) {
366 SpinLockHolder lh(&lock);
367 C *tmp(value.exchange(newValue));
368 lh.unlock();
369 if (tmp != NULL && static_cast<RCValue *>(tmp)->_rc_decref() == 0) {
370 delete tmp;
371 }
372 }
373
374 AtomicPtr<C> value;
375 mutable SpinLock lock; // exists solely for the purpose of implementing reset() safely
376 };
377
378 /**
379 * Single-threaded reference counted pointer.
380 * "Single-threaded" means that the reference counted pointer should be accessed
381 * by only one thread at any time or accesses to the reference counted pointer
382 * by multiple threads should be synchronized by the external lock.
383 */
384 template <class T>
385 class SingleThreadedRCPtr {
386 public:
SingleThreadedRCPtr(T *init = NULL)387 SingleThreadedRCPtr(T *init = NULL) : value(init) {
388 if (init != NULL) {
389 static_cast<RCValue*>(value)->_rc_incref();
390 }
391 }
392
SingleThreadedRCPtr(const SingleThreadedRCPtr<T> &other)393 SingleThreadedRCPtr(const SingleThreadedRCPtr<T> &other) : value(other.gimme()) {}
394
~SingleThreadedRCPtr()395 ~SingleThreadedRCPtr() {
396 if (value && static_cast<RCValue *>(value)->_rc_decref() == 0) {
397 delete value;
398 }
399 }
400
reset(T *newValue = NULL)401 void reset(T *newValue = NULL) {
402 if (newValue != NULL) {
403 static_cast<RCValue *>(newValue)->_rc_incref();
404 }
405 swap(newValue);
406 }
407
reset(const SingleThreadedRCPtr<T> &other)408 void reset(const SingleThreadedRCPtr<T> &other) {
409 swap(other.gimme());
410 }
411
412 // safe for the lifetime of this instance
get() const413 T *get() const {
414 return value;
415 }
416
operator =(const SingleThreadedRCPtr<T> &other)417 SingleThreadedRCPtr<T> & operator =(const SingleThreadedRCPtr<T> &other) {
418 reset(other);
419 return *this;
420 }
421
operator *() const422 T &operator *() const {
423 return *value;
424 }
425
operator ->() const426 T *operator ->() const {
427 return value;
428 }
429
operator !() const430 bool operator! () const {
431 return !value;
432 }
433
operator bool() const434 operator bool () const {
435 return (bool)value;
436 }
437
438 private:
gimme() const439 T *gimme() const {
440 if (value) {
441 static_cast<RCValue *>(value)->_rc_incref();
442 }
443 return value;
444 }
445
swap(T *newValue)446 void swap(T *newValue) {
447 T *old = value;
448 value = newValue;
449 if (old != NULL && static_cast<RCValue *>(old)->_rc_decref() == 0) {
450 delete old;
451 }
452 }
453
454 T *value;
455 };
456
457 #endif // SRC_ATOMIC_H_
458