1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"github.com/couchbase/gometa/common"
20)
21
22/////////////////////////////////////////////////
23// Type Declaration
24/////////////////////////////////////////////////
25
26type observer struct {
27	packets chan common.Packet
28	head    common.Packet
29	killch  chan bool
30}
31
32func NewObserver() *observer {
33
34	return &observer{
35		packets: make(chan common.Packet, common.MAX_PROPOSALS),
36		head:    nil,
37		killch:  make(chan bool)} // buffered - unblock sender
38}
39
40func (o *observer) close() {
41	close(o.killch)
42}
43
44func (o *observer) send(msg common.Packet) {
45
46	defer common.SafeRun("observer.Send()",
47		func() {
48			select {
49			case o.packets <- msg: //no-op
50			case <-o.killch:
51				// if killch is closed, this is non-blocking.
52				return
53			}
54		})
55}
56
57func (o *observer) getNext() common.Packet {
58	if o.head != nil {
59		head := o.head
60		o.head = nil
61		return head
62	}
63
64	if len(o.packets) > 0 {
65		packet := <-o.packets
66		return packet
67	}
68
69	return nil
70}
71
72func (o *observer) peekFirst() common.Packet {
73	if o.head != nil {
74		return o.head
75	}
76
77	if len(o.packets) > 0 {
78		o.head = <-o.packets
79		return o.head
80	}
81
82	return nil
83}
84