1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"github.com/couchbase/gometa/common"
20	"sync"
21)
22
23/////////////////////////////////////////////////////////////////////////////
24// PeerKind
25/////////////////////////////////////////////////////////////////////////////
26
27type PeerRole byte
28
29const (
30	LEADER PeerRole = iota
31	FOLLOWER
32	WATCHER
33)
34
35/////////////////////////////////////////////////////////////////////////////
36// PeerStatus
37/////////////////////////////////////////////////////////////////////////////
38
39type PeerStatus byte
40
41const (
42	ELECTING PeerStatus = iota
43	LEADING
44	FOLLOWING
45	WATCHING
46)
47
48/////////////////////////////////////////////////////////////////////////////
49// ActionHandler
50/////////////////////////////////////////////////////////////////////////////
51
52type ActionHandler interface {
53
54	//
55	// Environment API
56	//
57	GetEnsembleSize() uint64
58
59	//
60	// The following API are used during election
61	//
62	GetLastLoggedTxid() (common.Txnid, error)
63
64	GetLastCommittedTxid() (common.Txnid, error)
65
66	GetStatus() PeerStatus
67
68	GetQuorumVerifier() QuorumVerifier
69
70	// Current Epoch is set during leader/followr discovery phase.
71	// It is the current epoch (term) of the leader.
72	GetCurrentEpoch() (uint32, error)
73
74	// This is the Epoch that leader/follower agrees during discovery/sync phase.
75	GetAcceptedEpoch() (uint32, error)
76
77	//
78	// The following API are used during discovery/sync
79	//
80
81	GetCommitedEntries(txid1, txid2 common.Txnid) (<-chan LogEntryMsg, <-chan error, chan<- bool, error)
82
83	LogAndCommit(txid common.Txnid, op uint32, key string, content []byte, toCommit bool) error
84
85	// Set new accepted epoch as well as creating new txnid
86	NotifyNewAcceptedEpoch(uint32) error
87
88	NotifyNewCurrentEpoch(uint32) error
89
90	//
91	// The following API are used during normal execution
92	//
93	GetNextTxnId() common.Txnid
94
95	GetFollowerId() string
96
97	LogProposal(proposal ProposalMsg) error
98
99	Commit(txid common.Txnid) error
100
101	Abort(fid string, reqId uint64, err string) error
102
103	Respond(fid string, reqId uint64, err string, content []byte) error
104}
105
106/////////////////////////////////////////////////////////////////////////////
107// QuorumVerifier
108/////////////////////////////////////////////////////////////////////////////
109
110type QuorumVerifier interface {
111	HasQuorum(count int) bool
112}
113
114/////////////////////////////////////////////////////////////////////////////
115// MsgFactory
116/////////////////////////////////////////////////////////////////////////////
117
118type MsgFactory interface {
119	CreateProposal(txnid uint64, fid string, reqId uint64, op uint32, key string, content []byte) ProposalMsg
120
121	CreateAccept(txnid uint64, fid string) AcceptMsg
122
123	CreateCommit(txnid uint64) CommitMsg
124
125	CreateAbort(fid string, reqId uint64, err string) AbortMsg
126
127	CreateVote(round uint64, status uint32, epoch uint32, cndId string, cndLoggedTxnId uint64,
128		cndCommittedTxnId uint64, solicit bool) VoteMsg
129
130	CreateFollowerInfo(epoch uint32, fid string, voting bool) FollowerInfoMsg
131
132	CreateEpochAck(lastLoggedTxid uint64, epoch uint32) EpochAckMsg
133
134	CreateLeaderInfo(epoch uint32) LeaderInfoMsg
135
136	CreateNewLeader(epoch uint32) NewLeaderMsg
137
138	CreateNewLeaderAck() NewLeaderAckMsg
139
140	CreateLogEntry(txnid uint64, opCode uint32, key string, content []byte) LogEntryMsg
141
142	CreateRequest(id uint64, opCode uint32, key string, content []byte) RequestMsg
143
144	CreateResponse(fid string, reqId uint64, err string, content []byte) ResponseMsg
145}
146
147/////////////////////////////////////////////////////////////////////////////
148// Message for normal execution
149/////////////////////////////////////////////////////////////////////////////
150
151type ProposalMsg interface {
152	common.Packet
153	GetTxnid() uint64
154	GetFid() string
155	GetReqId() uint64
156	GetOpCode() uint32
157	GetKey() string
158	GetContent() []byte
159}
160
161type AcceptMsg interface {
162	common.Packet
163	GetTxnid() uint64
164	GetFid() string
165}
166
167type CommitMsg interface {
168	common.Packet
169	GetTxnid() uint64
170}
171
172type AbortMsg interface {
173	common.Packet
174	GetFid() string
175	GetReqId() uint64
176	GetError() string
177}
178
179type RequestMsg interface {
180	common.Packet
181	GetReqId() uint64
182	GetOpCode() uint32
183	GetKey() string
184	GetContent() []byte
185}
186
187type ResponseMsg interface {
188	common.Packet
189	GetFid() string
190	GetReqId() uint64
191	GetError() string
192	GetContent() []byte
193}
194
195/////////////////////////////////////////////////////////////////////////////
196// Message for master election
197/////////////////////////////////////////////////////////////////////////////
198
199type VoteMsg interface {
200	common.Packet
201	GetRound() uint64
202	GetStatus() uint32
203	GetEpoch() uint32
204	GetCndId() string
205	GetCndLoggedTxnId() uint64
206	GetCndCommittedTxnId() uint64
207	GetSolicit() bool
208}
209
210/////////////////////////////////////////////////////////////////////////////
211// Message for discovery
212/////////////////////////////////////////////////////////////////////////////
213
214type FollowerInfoMsg interface {
215	common.Packet
216	GetAcceptedEpoch() uint32
217	GetFid() string
218	GetVoting() bool
219}
220
221type LeaderInfoMsg interface {
222	common.Packet
223	GetAcceptedEpoch() uint32
224}
225
226type EpochAckMsg interface {
227	common.Packet
228	GetLastLoggedTxid() uint64
229	GetCurrentEpoch() uint32
230}
231
232type NewLeaderMsg interface {
233	common.Packet
234	GetCurrentEpoch() uint32
235}
236
237type NewLeaderAckMsg interface {
238	common.Packet
239}
240
241type LogEntryMsg interface {
242	common.Packet
243	GetTxnid() uint64
244	GetOpCode() uint32
245	GetKey() string
246	GetContent() []byte
247}
248
249/////////////////////////////////////////////////////////////////////////////
250// Request Management
251/////////////////////////////////////////////////////////////////////////////
252
253type RequestHandle struct {
254	Request   RequestMsg
255	Err       error
256	Mutex     sync.Mutex
257	CondVar   *sync.Cond
258	StartTime int64
259	Content   []byte
260}
261
262type RequestMgr interface {
263	GetRequestChannel() <-chan *RequestHandle
264	AddPendingRequest(handle *RequestHandle)
265	CleanupOnError()
266}
267
268type CustomRequestHandler interface {
269	OnNewRequest(fid string, request RequestMsg)
270	GetResponseChannel() <-chan common.Packet
271}
272