1// @author Couchbase <info@couchbase.com> 2// @copyright 2014 Couchbase, Inc. 3// 4// Licensed under the Apache License, Version 2.0 (the "License"); 5// you may not use this file except in compliance with the License. 6// You may obtain a copy of the License at 7// 8// http://www.apache.org/licenses/LICENSE-2.0 9// 10// Unless required by applicable law or agreed to in writing, software 11// distributed under the License is distributed on an "AS IS" BASIS, 12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13// See the License for the specific language governing permissions and 14// limitations under the License. 15 16package protocol 17 18import ( 19 "github.com/couchbase/gometa/common" 20 "sync" 21) 22 23///////////////////////////////////////////////////////////////////////////// 24// PeerKind 25///////////////////////////////////////////////////////////////////////////// 26 27type PeerRole byte 28 29const ( 30 LEADER PeerRole = iota 31 FOLLOWER 32 WATCHER 33) 34 35///////////////////////////////////////////////////////////////////////////// 36// PeerStatus 37///////////////////////////////////////////////////////////////////////////// 38 39type PeerStatus byte 40 41const ( 42 ELECTING PeerStatus = iota 43 LEADING 44 FOLLOWING 45 WATCHING 46) 47 48///////////////////////////////////////////////////////////////////////////// 49// ActionHandler 50///////////////////////////////////////////////////////////////////////////// 51 52type ActionHandler interface { 53 54 // 55 // Environment API 56 // 57 GetEnsembleSize() uint64 58 59 // 60 // The following API are used during election 61 // 62 GetLastLoggedTxid() (common.Txnid, error) 63 64 GetLastCommittedTxid() (common.Txnid, error) 65 66 GetStatus() PeerStatus 67 68 GetQuorumVerifier() QuorumVerifier 69 70 // Current Epoch is set during leader/followr discovery phase. 71 // It is the current epoch (term) of the leader. 72 GetCurrentEpoch() (uint32, error) 73 74 // This is the Epoch that leader/follower agrees during discovery/sync phase. 75 GetAcceptedEpoch() (uint32, error) 76 77 // 78 // The following API are used during discovery/sync 79 // 80 81 GetCommitedEntries(txid1, txid2 common.Txnid) (<-chan LogEntryMsg, <-chan error, chan<- bool, error) 82 83 LogAndCommit(txid common.Txnid, op uint32, key string, content []byte, toCommit bool) error 84 85 // Set new accepted epoch as well as creating new txnid 86 NotifyNewAcceptedEpoch(uint32) error 87 88 NotifyNewCurrentEpoch(uint32) error 89 90 // 91 // The following API are used during normal execution 92 // 93 GetNextTxnId() common.Txnid 94 95 GetFollowerId() string 96 97 LogProposal(proposal ProposalMsg) error 98 99 Commit(txid common.Txnid) error 100 101 Abort(fid string, reqId uint64, err string) error 102 103 Respond(fid string, reqId uint64, err string, content []byte) error 104} 105 106///////////////////////////////////////////////////////////////////////////// 107// QuorumVerifier 108///////////////////////////////////////////////////////////////////////////// 109 110type QuorumVerifier interface { 111 HasQuorum(count int) bool 112} 113 114///////////////////////////////////////////////////////////////////////////// 115// MsgFactory 116///////////////////////////////////////////////////////////////////////////// 117 118type MsgFactory interface { 119 CreateProposal(txnid uint64, fid string, reqId uint64, op uint32, key string, content []byte) ProposalMsg 120 121 CreateAccept(txnid uint64, fid string) AcceptMsg 122 123 CreateCommit(txnid uint64) CommitMsg 124 125 CreateAbort(fid string, reqId uint64, err string) AbortMsg 126 127 CreateVote(round uint64, status uint32, epoch uint32, cndId string, cndLoggedTxnId uint64, 128 cndCommittedTxnId uint64, solicit bool) VoteMsg 129 130 CreateFollowerInfo(epoch uint32, fid string, voting bool) FollowerInfoMsg 131 132 CreateEpochAck(lastLoggedTxid uint64, epoch uint32) EpochAckMsg 133 134 CreateLeaderInfo(epoch uint32) LeaderInfoMsg 135 136 CreateNewLeader(epoch uint32) NewLeaderMsg 137 138 CreateNewLeaderAck() NewLeaderAckMsg 139 140 CreateLogEntry(txnid uint64, opCode uint32, key string, content []byte) LogEntryMsg 141 142 CreateRequest(id uint64, opCode uint32, key string, content []byte) RequestMsg 143 144 CreateResponse(fid string, reqId uint64, err string, content []byte) ResponseMsg 145} 146 147///////////////////////////////////////////////////////////////////////////// 148// Message for normal execution 149///////////////////////////////////////////////////////////////////////////// 150 151type ProposalMsg interface { 152 common.Packet 153 GetTxnid() uint64 154 GetFid() string 155 GetReqId() uint64 156 GetOpCode() uint32 157 GetKey() string 158 GetContent() []byte 159} 160 161type AcceptMsg interface { 162 common.Packet 163 GetTxnid() uint64 164 GetFid() string 165} 166 167type CommitMsg interface { 168 common.Packet 169 GetTxnid() uint64 170} 171 172type AbortMsg interface { 173 common.Packet 174 GetFid() string 175 GetReqId() uint64 176 GetError() string 177} 178 179type RequestMsg interface { 180 common.Packet 181 GetReqId() uint64 182 GetOpCode() uint32 183 GetKey() string 184 GetContent() []byte 185} 186 187type ResponseMsg interface { 188 common.Packet 189 GetFid() string 190 GetReqId() uint64 191 GetError() string 192 GetContent() []byte 193} 194 195///////////////////////////////////////////////////////////////////////////// 196// Message for master election 197///////////////////////////////////////////////////////////////////////////// 198 199type VoteMsg interface { 200 common.Packet 201 GetRound() uint64 202 GetStatus() uint32 203 GetEpoch() uint32 204 GetCndId() string 205 GetCndLoggedTxnId() uint64 206 GetCndCommittedTxnId() uint64 207 GetSolicit() bool 208} 209 210///////////////////////////////////////////////////////////////////////////// 211// Message for discovery 212///////////////////////////////////////////////////////////////////////////// 213 214type FollowerInfoMsg interface { 215 common.Packet 216 GetAcceptedEpoch() uint32 217 GetFid() string 218 GetVoting() bool 219} 220 221type LeaderInfoMsg interface { 222 common.Packet 223 GetAcceptedEpoch() uint32 224} 225 226type EpochAckMsg interface { 227 common.Packet 228 GetLastLoggedTxid() uint64 229 GetCurrentEpoch() uint32 230} 231 232type NewLeaderMsg interface { 233 common.Packet 234 GetCurrentEpoch() uint32 235} 236 237type NewLeaderAckMsg interface { 238 common.Packet 239} 240 241type LogEntryMsg interface { 242 common.Packet 243 GetTxnid() uint64 244 GetOpCode() uint32 245 GetKey() string 246 GetContent() []byte 247} 248 249///////////////////////////////////////////////////////////////////////////// 250// Request Management 251///////////////////////////////////////////////////////////////////////////// 252 253type RequestHandle struct { 254 Request RequestMsg 255 Err error 256 Mutex sync.Mutex 257 CondVar *sync.Cond 258 StartTime int64 259 Content []byte 260} 261 262type RequestMgr interface { 263 GetRequestChannel() <-chan *RequestHandle 264 AddPendingRequest(handle *RequestHandle) 265 CleanupOnError() 266} 267 268type CustomRequestHandler interface { 269 OnNewRequest(fid string, request RequestMsg) 270 GetResponseChannel() <-chan common.Packet 271} 272