1// Copyright (c) 2014 Couchbase, Inc.
2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3// except in compliance with the License. You may obtain a copy of the License at
4//   http://www.apache.org/licenses/LICENSE-2.0
5// Unless required by applicable law or agreed to in writing, software distributed under the
6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7// either express or implied. See the License for the specific language governing permissions
8// and limitations under the License.
9package client
10
11import (
12	"github.com/couchbase/indexing/secondary/common"
13	"github.com/couchbase/query/value"
14	"sync/atomic"
15)
16
17//-----------------------------
18// Queue with a rotating buffer
19//-----------------------------
20
21type Row struct {
22	pkey  []byte
23	value []value.Value
24	skey  common.SecondaryKey
25	last  bool
26}
27
28type Queue struct {
29	size    int64
30	count   int64
31	isClose int32
32
33	buf  []Row
34	head int64 // pos of head of buf
35	free int64 // pos of head of free list
36
37	notifych chan bool
38	enqch    chan bool
39	deqch    chan bool
40	donech   chan bool
41}
42
43//
44// Constructor
45//
46func NewQueue(size int64, notifych chan bool) *Queue {
47
48	rbuf := &Queue{}
49	rbuf.size = size
50	rbuf.buf = make([]Row, size)
51	rbuf.notifych = notifych
52	rbuf.enqch = make(chan bool, 1)
53	rbuf.deqch = make(chan bool, 1)
54	rbuf.donech = make(chan bool)
55
56	return rbuf
57}
58
59//
60// This funciton notifies a new row added to buffer.
61//
62func (b *Queue) NotifyEnq() {
63
64	select {
65	case b.enqch <- true:
66	default:
67	}
68
69	if b.notifych != nil {
70		select {
71		case b.notifych <- true:
72		default:
73		}
74	}
75}
76
77//
78// This funciton notifies a new row removed from buffer
79//
80func (b *Queue) NotifyDeq() {
81
82	select {
83	case b.deqch <- true:
84	default:
85	}
86}
87
88//
89// Add a new row to the rotating buffer.  If the buffer is full,
90// this function will be blocked.
91//
92func (b *Queue) Enqueue(key *Row) {
93
94	for {
95		count := atomic.LoadInt64(&b.count)
96
97		if count < b.size {
98			next := b.free + 1
99			if next >= b.size {
100				next = 0
101			}
102
103			b.buf[b.free] = *key
104			b.free = next
105			atomic.AddInt64(&b.count, 1)
106			if key.last {
107				b.NotifyEnq()
108			}
109			return
110		}
111
112		b.NotifyEnq()
113		select {
114		case <-b.deqch:
115		case <-b.donech:
116			return
117		}
118	}
119}
120
121//
122// Remove the first row from the rotating buffer.  If the buffer is empty,
123// this function will be blocked.
124//
125func (b *Queue) Dequeue(row *Row) bool {
126
127	for {
128		count := atomic.LoadInt64(&b.count)
129
130		if count > 0 {
131			next := b.head + 1
132			if next >= b.size {
133				next = 0
134			}
135
136			*row = b.buf[b.head]
137			b.head = next
138			if atomic.AddInt64(&b.count, -1) == (b.size - 1) {
139				b.NotifyDeq()
140			}
141			return true
142		}
143
144		select {
145		case <-b.enqch:
146		case <-b.donech:
147			return false
148		}
149	}
150
151	return false
152}
153
154//
155// Get a copy of the first row in the rotating buffer.  This
156// function is not blocked and will not return a copy.
157//
158func (b *Queue) Peek(row *Row) bool {
159
160	count := atomic.LoadInt64(&b.count)
161
162	if count > 0 {
163		*row = b.buf[b.head]
164		return true
165	}
166
167	return false
168}
169
170func (b *Queue) Len() int64 {
171
172	return atomic.LoadInt64(&b.count)
173}
174
175func (b *Queue) Cap() int64 {
176
177	return b.size
178}
179
180// Unblock all Enqueue and Dequeue calls
181func (b *Queue) Close() {
182
183	if atomic.SwapInt32(&b.isClose, 1) == 0 {
184		close(b.donech)
185	}
186}
187