1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"fmt"
20	"github.com/couchbase/gometa/common"
21	"github.com/couchbase/gometa/log"
22	"sync"
23)
24
25/////////////////////////////////////////////////
26// Type Declaration
27/////////////////////////////////////////////////
28
29type Follower struct {
30	kind     PeerRole
31	pipe     *common.PeerPipe
32	pendings []ProposalMsg
33	handler  ActionHandler
34	factory  MsgFactory
35
36	mutex    sync.Mutex
37	isClosed bool
38	donech   chan bool
39	killch   chan bool
40}
41
42/////////////////////////////////////////////////
43// Follower - Public Function
44/////////////////////////////////////////////////
45
46//
47// Create a new Follower.  This will run the
48// follower protocol to communicate with the leader
49// in voting proposal as well as sending new proposal
50// to leader.
51//
52func NewFollower(kind PeerRole,
53	pipe *common.PeerPipe,
54	handler ActionHandler,
55	factory MsgFactory) *Follower {
56
57	follower := &Follower{kind: kind,
58		pipe:     pipe,
59		pendings: make([]ProposalMsg, 0, common.MAX_PROPOSALS),
60		handler:  handler,
61		factory:  factory,
62		isClosed: false,
63		donech:   make(chan bool, 1), // make buffered channel so sender won't block
64		killch:   make(chan bool, 1)} // make buffered channel so sender won't block
65
66	return follower
67}
68
69//
70// Start the listener.  This is running in a goroutine.
71// The follower can be shutdown by calling Terminate()
72// function or by closing the PeerPipe.
73//
74func (f *Follower) Start() <-chan bool {
75
76	go f.startListener()
77
78	return f.donech
79}
80
81//
82// Return the follower ID
83//
84func (f *Follower) GetFollowerId() string {
85	return f.handler.GetFollowerId()
86}
87
88//
89// Forward the request to the leader
90//
91func (f *Follower) ForwardRequest(request RequestMsg) bool {
92	log.Current.Debugf("Follower.ForwardRequest(): Follower %s forward request to leader (TCP %s)",
93		f.GetFollowerId(), f.pipe.GetAddr())
94	return f.pipe.Send(request)
95
96	// do not process request if I am a watcher
97	return false
98}
99
100//
101// Terminate.  This function is an no-op if the
102// follower already complete successfully.
103//
104func (f *Follower) Terminate() {
105	f.mutex.Lock()
106	defer f.mutex.Unlock()
107
108	if !f.isClosed {
109		f.isClosed = true
110		f.killch <- true
111	}
112}
113
114/////////////////////////////////////////////////
115// Follower - Private Function
116/////////////////////////////////////////////////
117
118//
119// Goroutine.  Start a new listener for the follower.
120// Listen to any new message coming from the leader.
121// This is the main routine for the follower to interact
122// with the leader.  If there is any error (network
123// error commuincating to leader or internal failure),
124// this loop will terminate.   The server (that contains
125// the follower) will need to run leader election again.
126//
127func (f *Follower) startListener() {
128
129	defer func() {
130
131		if r := recover(); r != nil {
132			log.Current.Errorf("panic in Follower.startListener() : %s\n", r)
133			log.Current.Errorf("%s", log.Current.StackTrace())
134		} else {
135			log.Current.Debugf("Follower.startListener() terminates.")
136			log.Current.Tracef(log.Current.StackTrace())
137		}
138
139		common.SafeRun("Follower.startListener()",
140			func() {
141				f.close()
142				f.donech <- true
143			})
144	}()
145
146	reqch := f.pipe.ReceiveChannel()
147
148	for {
149		select {
150		case msg, ok := <-reqch:
151			if ok {
152				err := f.handleMessage(msg.(common.Packet))
153				if err != nil {
154					// If there is an error, terminate
155					log.Current.Errorf("Follower.startListener(): There is an error in handling leader message.  Error = %s.  Terminate.",
156						err.Error())
157					return
158				}
159			} else {
160				log.Current.Debugf("Follower.startListener(): message channel closed.  Terminate.")
161				return
162			}
163		case <-f.killch:
164			return
165		}
166	}
167}
168
169//
170// Signal that the follower is closed (done)
171//
172func (f *Follower) close() {
173	f.mutex.Lock()
174	defer f.mutex.Unlock()
175
176	f.isClosed = true
177}
178
179//
180// Handle message from the leader.
181//
182func (f *Follower) handleMessage(msg common.Packet) (err error) {
183	log.Current.Debugf("Follower.handleMessage(): message %s.", msg.Name())
184
185	err = nil
186	switch request := msg.(type) {
187	case ProposalMsg:
188		err = f.handleProposal(request)
189	case CommitMsg: // Commit has to be after Proposal
190		err = f.handleCommit(request)
191	case ResponseMsg:
192		err = f.handleResponse(request)
193	case AbortMsg: // Abort has to be after Proposal
194		err = f.handleAbort(request)
195	default:
196		log.Current.Infof("Follower.handleMessage(): unrecognized message %s.  Ignore.", msg.Name())
197	}
198	return err
199}
200
201//
202// Handle proposal message from the leader.
203//
204func (f *Follower) handleProposal(msg ProposalMsg) error {
205	// TODO : Check if the txnid is the next one (last txnid + 1)
206	log.Current.Debugf("Follower.handleProposal(): reqId %d", msg.GetReqId())
207
208	// Call service to log the proposal
209	err := f.handler.LogProposal(msg)
210	if err != nil {
211		return err
212	}
213
214	// Add to pending list
215	f.pendings = append(f.pendings, msg)
216
217	// Send Accept Message only if I am a follower (not watcher)
218	if f.kind == FOLLOWER {
219		return f.sendAccept(common.Txnid(msg.GetTxnid()), f.GetFollowerId())
220	}
221
222	return nil
223}
224
225//
226// Handle commit message from the leader.
227//
228func (f *Follower) handleCommit(msg CommitMsg) error {
229
230	log.Current.Debugf("Follower.handleCommit(): txnId %d", msg.GetTxnid())
231
232	// If there is pending propsoal in memory, then make sure
233	// that the commit are processed in order.  If there is no
234	// pending proposal, we may still receive commit since commit
235	// can be sent by the leader/peer after synchronization.
236	if len(f.pendings) != 0 {
237
238		// Check if the commit is the first one in the pending list.
239		// All commits are processed sequentially to ensure serializability.
240		p := f.pendings[0]
241		if p == nil || p.GetTxnid() != msg.GetTxnid() {
242			log.Current.Errorf("Proposal must committed in sequential order for the same leader term. "+
243				"Found out-of-order commit. Last proposal txid %d, commit msg %d", p.GetTxnid(), msg.GetTxnid())
244
245			return common.NewError(common.PROTOCOL_ERROR,
246				fmt.Sprintf("Proposal must committed in sequential order for the same leader term. "+
247					"Found out-of-order commit. Last proposal txid %d, commit msg %d", p.GetTxnid(), msg.GetTxnid()))
248		}
249
250		// remove proposal from pendings
251		f.pendings = f.pendings[1:]
252	}
253
254	// commit
255	err := f.handler.Commit(common.Txnid(msg.GetTxnid()))
256	if err != nil {
257		return err
258	}
259
260	// TODO: do we need to update election site?  I don't think so, but need to double check.
261
262	return nil
263}
264
265//
266// Handle abort message from the leader.
267//
268func (f *Follower) handleAbort(msg AbortMsg) error {
269
270	log.Current.Debugf("Follower.handleAbort(): reqId %d", msg.GetReqId())
271
272	return f.handler.Abort(msg.GetFid(), msg.GetReqId(), msg.GetError())
273}
274
275//
276// Handle response message from the leader.
277//
278func (f *Follower) handleResponse(msg ResponseMsg) error {
279
280	log.Current.Debugf("Follower.handleResponse(): reqId %d, len(msg.Content) %d", msg.GetReqId(), len(msg.GetContent()))
281
282	return f.handler.Respond(msg.GetFid(), msg.GetReqId(), msg.GetError(), msg.GetContent())
283}
284
285//
286// Send accept message to the leader.
287//
288func (f *Follower) sendAccept(txnid common.Txnid, fid string) error {
289	accept := f.factory.CreateAccept(uint64(txnid), fid)
290
291	// Send the message to the leader through a reliable protocol (TCP).
292	success := f.pipe.Send(accept)
293	if !success {
294		// It is a fatal error if not able to send to the leader.  It will require the server to
295		// do leader election again.
296		return common.NewError(common.FATAL_ERROR, "Fail to send accept message for to leader from "+fid)
297	}
298
299	return nil
300}
301