1package gocbcore
2
3import (
4	"container/list"
5	"errors"
6	"fmt"
7	"sync"
8	"sync/atomic"
9	"unsafe"
10)
11
12var (
13	errOpQueueClosed = errors.New("queue is closed")
14	errOpQueueFull   = errors.New("queue is full")
15	errAlreadyQueued = errors.New("request was already queued somewhere else")
16)
17
18type memdOpConsumer struct {
19	parent   *memdOpQueue
20	isClosed bool
21}
22
23func (c *memdOpConsumer) Queue() *memdOpQueue {
24	return c.parent
25}
26
27func (c *memdOpConsumer) Pop() *memdQRequest {
28	return c.parent.pop(c)
29}
30
31func (c *memdOpConsumer) Close() {
32	c.parent.closeConsumer(c)
33}
34
35type memdOpQueue struct {
36	lock   sync.Mutex
37	signal *sync.Cond
38	items  *list.List
39	isOpen bool
40}
41
42func newMemdOpQueue() *memdOpQueue {
43	q := memdOpQueue{
44		isOpen: true,
45		items:  list.New(),
46	}
47	q.signal = sync.NewCond(&q.lock)
48	return &q
49}
50
51func (q *memdOpQueue) debugString() string {
52	var outStr string
53	q.lock.Lock()
54
55	outStr += fmt.Sprintf("Num Items: %d\n", q.items.Len())
56
57	if q.isOpen {
58		outStr += fmt.Sprintf("Is Open: true")
59	} else {
60		outStr += fmt.Sprintf("Is Open: false")
61	}
62
63	q.lock.Unlock()
64	return outStr
65}
66
67func (q *memdOpQueue) Remove(req *memdQRequest) bool {
68	q.lock.Lock()
69
70	if !atomic.CompareAndSwapPointer(&req.queuedWith, unsafe.Pointer(q), nil) {
71		q.lock.Unlock()
72		return false
73	}
74
75	for e := q.items.Front(); e != nil; e = e.Next() {
76		if e.Value.(*memdQRequest) == req {
77			q.items.Remove(e)
78			break
79		}
80	}
81
82	q.lock.Unlock()
83
84	return true
85}
86
87func (q *memdOpQueue) Push(req *memdQRequest, maxItems int) error {
88	q.lock.Lock()
89	if !q.isOpen {
90		q.lock.Unlock()
91		return errOpQueueClosed
92	}
93
94	if maxItems > 0 && q.items.Len() >= maxItems {
95		q.lock.Unlock()
96		return errOpQueueFull
97	}
98
99	if !atomic.CompareAndSwapPointer(&req.queuedWith, nil, unsafe.Pointer(q)) {
100		q.lock.Unlock()
101		return errAlreadyQueued
102	}
103
104	if req.isCancelled() {
105		// TODO(brett19): Maybe should ensure this was meant to be in this opqueue.
106		atomic.CompareAndSwapPointer(&req.queuedWith, unsafe.Pointer(q), nil)
107		q.lock.Unlock()
108		// TODO(brett19): Better error for cancelled requests pending insertion on an opqueue.
109		return errAlreadyQueued
110	}
111
112	q.items.PushBack(req)
113	q.lock.Unlock()
114
115	q.signal.Broadcast()
116	return nil
117}
118
119func (q *memdOpQueue) Consumer() *memdOpConsumer {
120	return &memdOpConsumer{
121		parent:   q,
122		isClosed: false,
123	}
124}
125
126func (q *memdOpQueue) closeConsumer(c *memdOpConsumer) {
127	q.lock.Lock()
128	c.isClosed = true
129	q.lock.Unlock()
130
131	q.signal.Broadcast()
132}
133
134func (q *memdOpQueue) pop(c *memdOpConsumer) *memdQRequest {
135	q.lock.Lock()
136
137	for q.isOpen && !c.isClosed && q.items.Len() == 0 {
138		q.signal.Wait()
139	}
140
141	if !q.isOpen || c.isClosed {
142		q.lock.Unlock()
143		return nil
144	}
145
146	e := q.items.Front()
147	q.items.Remove(e)
148
149	req, ok := e.Value.(*memdQRequest)
150	if !ok {
151		logErrorf("Encountered incorrect type in memdOpQueue")
152		return q.pop(c)
153	}
154
155	atomic.CompareAndSwapPointer(&req.queuedWith, unsafe.Pointer(q), nil)
156
157	q.lock.Unlock()
158
159	return req
160}
161
162type drainCallback func(*memdQRequest)
163
164func (q *memdOpQueue) Drain(cb drainCallback) {
165	q.lock.Lock()
166
167	if q.isOpen {
168		logErrorf("Attempted to Drain open memdOpQueue, ignoring")
169		q.lock.Unlock()
170		return
171	}
172
173	for e := q.items.Front(); e != nil; e = e.Next() {
174		req, ok := e.Value.(*memdQRequest)
175		if !ok {
176			logErrorf("Encountered incorrect type in memdOpQueue")
177			continue
178		}
179
180		atomic.CompareAndSwapPointer(&req.queuedWith, unsafe.Pointer(q), nil)
181
182		cb(req)
183	}
184
185	q.lock.Unlock()
186}
187
188func (q *memdOpQueue) Close() {
189	q.lock.Lock()
190	q.isOpen = false
191	q.lock.Unlock()
192
193	q.signal.Broadcast()
194}
195