1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"fmt"
20	"github.com/couchbase/gometa/common"
21	"github.com/couchbase/gometa/log"
22	"sync"
23	"sync/atomic"
24)
25
26/////////////////////////////////////////////////
27// Proposal Processing for ZK
28/////////////////////////////////////////////////
29
30// 1) TCP is used as the network protocol (guarantee packet ordering)
31// 2) The leader sends the proposal sequentially based on the order of
32//    received requests.  Due to (1), it expects the proposal to be
33//    arrived at each follower in the same order.
34// 3) Each follower processes the proposal in the same sequential order
35//    Therefore, the ack/accept is returned in the same order as the proposal.
36// 4) In the follower, proposal and commit can be processed out-of-order.
37//    These 2 messages go through different queues on the follower side.
38// 5) The leader has a dedicated go-routine (2 threads in ZK) to send/recieve
39//    proposal/commit for a specific follower.  If messaging err-out,
40//    it will close the socket.  This, in turn, will terminate both the sending
41//    and recieving threads and force the go-routine to shutdown.
42//    In  doing so, the leader will also remove the follower.  The leader will
43//    listen to any new socket connection to re-estabilish communication with the follower.
44// 6) When the follower fails to send a Ack/Accept to the leader, it will close the socket.
45//    This, in turn, will shutdown the follower.  The main thread will be back to
46//    election (QuorumPeer in "looking" state).
47// 7) When the follower re-connect to the leader, the leader will go through the following:
48//    a) open the commit log and re-send {proposals, committed messages} to the follower.
49//       The leader will hold the read lock on the commit log as to avoid any concurrent
50//       commit being written to the log.
51//    b) new pending {proposal, commit} will be sent to follower.
52//    c) new pending proposal will also be sent (note it is synchornized such that
53//        no pending proposal is added during this step).
54//    d) add follower as the participant of future proposal.
55// 8) Due to (7), the proposal must be committed serially in order.  The proposal cannot
56//    be skipped. The voting can stall if the leader lose the majority (by design).
57//
58
59/////////////////////////////////////////////////
60// Type Declaration
61/////////////////////////////////////////////////
62
63type Leader struct {
64	naddr      string
65	handler    ActionHandler
66	factory    MsgFactory
67	reqHandler CustomRequestHandler
68
69	notifications chan *notification
70	lastCommitted common.Txnid
71	quorums       map[common.Txnid][]string
72	proposals     map[common.Txnid]ProposalMsg
73
74	// mutex protected variable
75	mutex     sync.Mutex
76	followers map[string]*messageListener
77	watchers  map[string]*messageListener
78	observers map[string]*observer
79	isClosed  bool
80	changech  chan bool // notify membership of active followers have changed
81	idGen     *IdGen
82}
83
84// AddWatcher and AddFollower should safely handle the situation where
85// there is exising watcher/follower having same fid. If a listener
86// with same "fid" already exists, the termination of existing
87// listener is triggered and new listener is added to the list
88// of watchers/followers. As the termination of the existing listener is
89// an asynchronous operation, there is a small time interval when there
90// will be two listeners in the system having same fid. To identify these
91// two listeners separately, "listenerid" will be used.
92
93type messageListener struct {
94	fid        string
95	pipe       *common.PeerPipe
96	leader     *Leader
97	killch     chan bool
98	listenerid uint64
99}
100
101type notification struct {
102	// follower message
103	fid     string
104	payload common.Packet
105}
106
107/////////////////////////////////////////////////
108// Leader - Public Function
109/////////////////////////////////////////////////
110
111//
112// Create a new leader
113//
114func NewLeader(naddr string,
115	handler ActionHandler,
116	factory MsgFactory) (leader *Leader, err error) {
117
118	leader = &Leader{naddr: naddr,
119		followers:     make(map[string]*messageListener),
120		watchers:      make(map[string]*messageListener),
121		observers:     make(map[string]*observer),
122		quorums:       make(map[common.Txnid][]string),
123		proposals:     make(map[common.Txnid]ProposalMsg),
124		notifications: make(chan *notification, common.MAX_PROPOSALS),
125		handler:       handler,
126		factory:       factory,
127		isClosed:      false,
128		reqHandler:    nil,
129		changech:      make(chan bool, common.MAX_PEERS), // make it buffered so sender won't block
130		idGen:         newIdGen(),
131	}
132
133	// This is initialized to the lastCommitted in repository. Subsequent commit() will update this
134	// field to the latest committed txnid. This field is used for ensuring the commit order is preserved.
135	// The leader will commit the proposal in the strict order as it arrives as to preserve serializability.
136	leader.lastCommitted, err = handler.GetLastCommittedTxid()
137	if err != nil {
138		return nil, err
139	}
140
141	// start a listener go-routine.  This will be closed when the leader terminate.
142	go leader.listen()
143
144	return leader, nil
145}
146
147func NewLeaderWithCustomHandler(naddr string,
148	handler ActionHandler,
149	factory MsgFactory,
150	reqHandler CustomRequestHandler) (leader *Leader, err error) {
151
152	leader = &Leader{naddr: naddr,
153		followers:     make(map[string]*messageListener),
154		watchers:      make(map[string]*messageListener),
155		observers:     make(map[string]*observer),
156		quorums:       make(map[common.Txnid][]string),
157		proposals:     make(map[common.Txnid]ProposalMsg),
158		notifications: make(chan *notification, common.MAX_PROPOSALS),
159		handler:       handler,
160		factory:       factory,
161		isClosed:      false,
162		reqHandler:    reqHandler,
163		changech:      make(chan bool, common.MAX_PEERS), // make it buffered so sender won't block
164		idGen:         newIdGen(),
165	}
166
167	// This is initialized to the lastCommitted in repository. Subsequent commit() will update this
168	// field to the latest committed txnid. This field is used for ensuring the commit order is preserved.
169	// The leader will commit the proposal in the strict order as it arrives as to preserve serializability.
170	leader.lastCommitted, err = handler.GetLastCommittedTxid()
171	if err != nil {
172		return nil, err
173	}
174
175	// start a listener go-routine.  This will be closed when the leader terminate.
176	go leader.listen()
177
178	return leader, nil
179}
180
181//
182// Terminate the leader. It is an no-op if the leader is already
183// completed successfully.
184//
185func (l *Leader) Terminate() {
186
187	l.mutex.Lock()
188	defer l.mutex.Unlock()
189
190	if !l.isClosed {
191		l.isClosed = true
192		for _, listener := range l.followers {
193			listener.terminate()
194		}
195		for _, listener := range l.watchers {
196			listener.terminate()
197		}
198		common.SafeRun("Leader.Terminate()",
199			func() {
200				close(l.notifications)
201			})
202	}
203}
204
205//
206// Has the leader terminated/closed?
207//
208func (l *Leader) IsClosed() bool {
209	l.mutex.Lock()
210	defer l.mutex.Unlock()
211
212	return l.isClosed
213}
214
215//
216// Get the channel for notify when the ensemble of followers
217// changes.  The receiver of the channel can then tell
218// if the leader has a quorum of followers.
219//
220func (l *Leader) GetEnsembleChangeChannel() <-chan bool {
221	l.mutex.Lock()
222	defer l.mutex.Unlock()
223
224	return l.changech
225}
226
227//
228// Get the current ensmeble size of the leader.
229// It is the number of followers + 1 (including leader)
230//
231func (l *Leader) GetActiveEnsembleSize() int {
232	l.mutex.Lock()
233	defer l.mutex.Unlock()
234
235	return len(l.followers) + 1
236}
237
238//
239// Add a watcher. If the leader is terminated, the pipe between leader
240// and watcher will also be closed.
241//
242func (l *Leader) AddWatcher(fid string,
243	peer *common.PeerPipe,
244	o *observer) {
245	l.mutex.Lock()
246	defer l.mutex.Unlock()
247
248	// AddWatcher requires holding the mutex such that the leader thread
249	// will not be sending new proposal or commit (see sendProposal() and
250	// sendCommit()) to watchers.  This allow this function to copy the
251	// proposals and commits from the observer queue into the pipe, before
252	// the leader has a chance to send new messages.
253	for packet := o.getNext(); packet != nil; packet = o.getNext() {
254
255		switch request := packet.(type) {
256		case ProposalMsg:
257			txid := common.Txnid(request.GetTxnid())
258			log.Current.Debugf("Leader.AddWatcher() : send observer's packet %s, txid %d", packet.Name(), txid)
259		case CommitMsg:
260			txid := common.Txnid(request.GetTxnid())
261			log.Current.Debugf("Leader.AddWatcher() : send observer's packet %s, txid %d", packet.Name(), txid)
262		}
263
264		peer.Send(packet)
265	}
266
267	oldListener, ok := l.watchers[fid]
268	listener := newListener(fid, peer, l, l.idGen.getNextId())
269	l.watchers[fid] = listener
270	go listener.start()
271
272	// kill the old message listener
273	if ok && oldListener != nil {
274		log.Current.Debugf("Leader.AddWatcher() : old Listener found for watcher %s.  Terminating old listener", fid)
275		oldListener.terminate()
276	}
277}
278
279//
280// Add a follower and starts a listener for the follower.
281// If the leader is terminated, the pipe between leader
282// and follower will also be closed.
283//
284func (l *Leader) AddFollower(fid string,
285	peer *common.PeerPipe,
286	o *observer) {
287	l.mutex.Lock()
288	defer l.mutex.Unlock()
289
290	// AddFollower requires holding the mutex such that the leader thread
291	// will not be sending new proposal or commit (see sendProposal() and
292	// sendCommit()) to followers.  This allow this function to copy the
293	// proposals and commits from the observer queue into the pipe, before
294	// the leader has a chance to send new messages.
295	for packet := o.getNext(); packet != nil; packet = o.getNext() {
296
297		switch request := packet.(type) {
298		case ProposalMsg:
299			txid := common.Txnid(request.GetTxnid())
300			log.Current.Debugf("Leader.AddFollower() : send observer's packet %s, txid %d", packet.Name(), txid)
301		case CommitMsg:
302			txid := common.Txnid(request.GetTxnid())
303			log.Current.Debugf("Leader.AddFollower() : send observer's packet %s, txid %d", packet.Name(), txid)
304		}
305
306		peer.Send(packet)
307	}
308
309	oldListener, ok := l.followers[fid]
310	listener := newListener(fid, peer, l, l.idGen.getNextId())
311	l.followers[fid] = listener
312	go listener.start()
313
314	// kill the old message listener
315	if ok && oldListener != nil {
316		log.Current.Debugf("Leader.AddFollower() : old Listener found for follower %s.  Terminating old listener", fid)
317		oldListener.terminate()
318	} else {
319		// notify a brand new follower (not just replacing an existing one)
320		l.changech <- true
321	}
322}
323
324// Return the follower ID.  The leader is a follower to itself.
325//
326func (l *Leader) GetFollowerId() string {
327	return l.handler.GetFollowerId()
328}
329
330/////////////////////////////////////////////////////////
331// Leader - Public Function : Observer
332/////////////////////////////////////////////////////////
333
334//
335// Add observer
336//
337func (l *Leader) AddObserver(id string, o *observer) {
338	l.mutex.Lock()
339	defer l.mutex.Unlock()
340
341	l.observers[id] = o
342}
343
344//
345// Remove observer
346//
347func (l *Leader) RemoveObserver(id string) {
348	l.mutex.Lock()
349	defer l.mutex.Unlock()
350
351	observer, ok := l.observers[id]
352	if ok {
353		observer.close()
354	}
355	delete(l.observers, id)
356}
357
358func (l *Leader) QueueRequest(fid string, req common.Packet) {
359	n := &notification{fid: fid, payload: req}
360	l.notifications <- n
361}
362
363func (l *Leader) QueueResponse(req common.Packet) {
364	n := &notification{fid: l.GetFollowerId(), payload: req}
365	l.notifications <- n
366}
367
368/////////////////////////////////////////////////
369// messageListener
370/////////////////////////////////////////////////
371
372// Create a new listener
373func newListener(fid string, pipe *common.PeerPipe, leader *Leader,
374	listenerid uint64) *messageListener {
375
376	return &messageListener{fid: fid,
377		pipe:       pipe,
378		leader:     leader,
379		killch:     make(chan bool, 1),
380		listenerid: listenerid,
381	}
382}
383
384//
385// Gorountine.  Start listener to listen to message from follower.
386// Note that each follower has their own receive queue.  This
387// is to ensure if the queue is filled up for a single follower,
388// only that the connection of that follower may get affected.
389// The listener can be killed by calling terminate() or closing
390// the PeerPipe.
391//
392func (l *messageListener) start() {
393
394	defer func() {
395		if r := recover(); r != nil {
396			log.Current.Errorf("panic in messageListener.start() : %s\n", r)
397			log.Current.Errorf("%s", log.Current.StackTrace())
398		} else {
399			log.Current.Debugf("leader's messageListener.start() terminates.")
400			log.Current.Tracef(log.Current.StackTrace())
401		}
402
403		common.SafeRun("messageListener.start()",
404			func() {
405				l.leader.removeListener(l)
406			})
407
408		common.SafeRun("messageListener.start()",
409			func() {
410				l.pipe.Close()
411			})
412	}()
413
414	log.Current.Debugf("messageListener.start(): start listening to message from peer %s", l.fid)
415	reqch := l.pipe.ReceiveChannel()
416
417	for {
418		select {
419		case req, ok := <-reqch:
420			if ok {
421				// TODO:  Let's say send is blocked because l.notifications is full, will it becomes unblock
422				// when leader.notifications is unblock.
423				l.leader.QueueRequest(l.fid, req)
424			} else {
425				// The channel is closed.  Need to shutdown the listener.
426				log.Current.Infof("messageListener.start(): message channel closed. Remove peer %s as follower.", l.fid)
427				return
428			}
429		case <-l.killch:
430			log.Current.Debugf("messageListener.start(): Listener for %s receive kill signal. Terminate.", l.fid)
431			return
432
433		}
434	}
435}
436
437//
438// Terminate the listener.  This should only be called by the leader.
439//
440func (l *messageListener) terminate() {
441	l.killch <- true
442}
443
444func (l *messageListener) getListenerid() uint64 {
445	return l.listenerid
446}
447
448/////////////////////////////////////////////////
449// IdGen
450// In-memory unique ID generator using a counter.
451// Used for message listener ids.
452/////////////////////////////////////////////////
453
454type IdGen struct {
455	id uint64
456}
457
458func newIdGen() *IdGen {
459	return &IdGen{}
460}
461
462func (g *IdGen) getNextId() uint64 {
463	return atomic.AddUint64(&g.id, uint64(1))
464}
465
466/////////////////////////////////////////////////////
467// Leader - Private Function : Listener Management
468/////////////////////////////////////////////////////
469
470//
471// Remove the follower from being tracked
472//
473func (l *Leader) removeListener(peer *messageListener) {
474	l.mutex.Lock()
475	defer l.mutex.Unlock()
476
477	follower, ok := l.followers[peer.fid]
478	if ok && follower != nil {
479		if follower.getListenerid() == peer.getListenerid() {
480			delete(l.followers, peer.fid)
481		}
482	}
483
484	watcher, ok := l.watchers[peer.fid]
485	if ok && watcher != nil {
486		if watcher.getListenerid() == peer.getListenerid() {
487			delete(l.watchers, peer.fid)
488		}
489	}
490
491	l.changech <- true
492}
493
494/////////////////////////////////////////////////////////
495// Leader - Private Function : Message Processing
496/////////////////////////////////////////////////////////
497
498//
499// Main processing message loop for leader.
500//
501func (l *Leader) listen() {
502	defer func() {
503		if r := recover(); r != nil {
504			log.Current.Errorf("panic in Leader.listen() : %s\n", r)
505			log.Current.Errorf("%s", log.Current.StackTrace())
506		} else {
507			log.Current.Debugf("Leader.listen() terminates.")
508			log.Current.Tracef(log.Current.StackTrace())
509		}
510
511		common.SafeRun("Leader.listen()",
512			func() {
513				l.Terminate()
514			})
515	}()
516
517	log.Current.Debugf("Leader.listen(): start listening to message for leader")
518
519	for {
520		select {
521		case msg, ok := <-l.notifications:
522			if ok {
523				if !l.IsClosed() {
524					err := l.handleMessage(msg.payload, msg.fid)
525					if err != nil {
526						log.Current.Errorf("Leader.listen(): Encounter error when processing message %s. Error %s. Terminate",
527							msg.fid, err.Error())
528						return
529					}
530				} else {
531					log.Current.Debugf("Leader.listen(): Leader is closed. Terminate message processing loop.")
532					return
533				}
534			} else {
535				// The channel is closed.
536				log.Current.Debugf("Leader.listen(): message channel closed. Terminate message processing loop for leader.")
537				return
538			}
539		}
540	}
541}
542
543//
544// Handle an incoming message based on its type.  All incoming messages from followers are processed serially
545// (by Leader.listen()).  Therefore, the order of corresponding outbound messages (proposal, commit) will be placed
546// in the same order into each follower's pipe.   This implies that we can use 2 state variables (LastLoggedTxid and
547// LastCommittedTxid) to determine which peer has the latest repository.  This, in turn, enforces stronger serializability
548// semantics, since we won't have a case where one peer may have a higher LastLoggedTxid while another has a higher
549// LastCommittedTxid.
550//
551func (l *Leader) handleMessage(msg common.Packet, follower string) (err error) {
552
553	err = nil
554	switch request := msg.(type) {
555	case RequestMsg:
556		if common.IsCustomOpCode(common.OpCode(request.GetOpCode())) {
557			if l.reqHandler != nil {
558				l.reqHandler.OnNewRequest(follower, request)
559			} else {
560				log.Current.Debugf("Leader.handleMessage(): No custom request handler registered to handle custom request.")
561				response := l.factory.CreateResponse(follower, request.GetReqId(), "No custom request handler", nil)
562				l.sendResponse(response)
563			}
564		} else {
565			err = l.createProposal(follower, request)
566		}
567	case AcceptMsg:
568		err = l.handleAccept(request)
569	case ResponseMsg:
570		l.sendResponse(request)
571	default:
572		// TODO: Should throw exception.  There is a possiblity that there is another leader.
573		log.Current.Infof("Leader.handleMessage(): Leader unable to process message of type %s. Ignore message.", request.Name())
574	}
575	return err
576}
577
578/////////////////////////////////////////////////////////////////////////
579// Leader - Private Function : Handle Request Message  (New Proposal)
580/////////////////////////////////////////////////////////////////////////
581
582//
583// Create a new proposal from request
584//
585func (l *Leader) createProposal(host string, req RequestMsg) error {
586
587	// This should be the only place to call GetNextTxnId().  This function
588	// can panic if the txnid overflows.   In this case, this should terminate
589	// the leader and forces a new election for getting a new epoch. ZK has the
590	// same behavior.
591	txnid := l.handler.GetNextTxnId()
592	log.Current.Debugf("Leader.createProposal(): New Proposal : Txnid %d (Epoch %d, Counter %d)",
593		txnid, txnid.GetEpoch(), txnid.GetCounter())
594
595	// Create a new proposal
596	proposal := l.factory.CreateProposal(uint64(txnid),
597		host, // this is the host the originates the request
598		req.GetReqId(),
599		req.GetOpCode(),
600		req.GetKey(),
601		req.GetContent())
602
603	return l.newProposal(proposal)
604}
605
606//
607// Handle a new proposal
608//
609func (l *Leader) newProposal(proposal ProposalMsg) error {
610
611	// Call out to log the proposal.  Always do this first before
612	// sending to followers.
613	err := l.handler.LogProposal(proposal)
614	if err != nil {
615		if _, ok := err.(*common.RecoverableError); ok {
616			/// update the last committed to advacne the txnid.
617			l.lastCommitted = common.Txnid(proposal.GetTxnid())
618			l.sendAbort(proposal.GetFid(), proposal.GetReqId(), err.Error())
619			return nil
620		}
621
622		// If fails to log the proposal, return the error.
623		// This can cause the leader to re-elect.  Just to handle
624		// case where there is hardware failure or repository
625		// corruption.
626		return err
627	}
628
629	// The leader votes for itself
630	l.updateQuorum(common.Txnid(proposal.GetTxnid()), l.GetFollowerId())
631
632	// remember this proposal so I can commmit it later
633	l.proposals[common.Txnid(proposal.GetTxnid())] = proposal
634
635	// Send the proposal to follower
636	l.sendProposal(proposal)
637
638	// check if proposal has quorum (if ensembleSize <= 2).  Make sure that this
639	// is after sendProposal() so that we can still send proposal to follower BEFORE
640	// we send the commit message.
641	if l.hasQuorum(common.Txnid(proposal.GetTxnid())) {
642		// proposal has quorum. Now commit. If cannot commit, then return error
643		// which will cause the leader to re-election.  Just to handle
644		// case where there is hardware failure or repository corruption.
645		err := l.commit(common.Txnid(proposal.GetTxnid()))
646		if err != nil {
647			return err
648		}
649	}
650
651	return nil
652}
653
654//
655// send the proposal to the followers
656//
657func (l *Leader) sendProposal(proposal ProposalMsg) {
658
659	l.mutex.Lock()
660	defer l.mutex.Unlock()
661
662	// Send the request to the followers.  Each follower
663	// has a pipe (channel) and there is separate go-routine
664	// that will read from the channel and send to the follower
665	// through reliable connection (TCP).   If the connection is
666	// broken, then the follower will go through election again,
667	// and the follower will re-sync the repository with the leader.
668	// Therefore, we don't have to worry about re-sending proposal
669	// to disconnected follower here.
670	//
671	// If the leader cannot send to a follower, then pipe will
672	// be broken and the leader will take the follower out.  If
673	// the leader looses majority of followers, it will bring
674	// itself into election again.    So we can process this
675	// asynchronously without worrying about sending the message
676	// to majority of followers.  If that can't be done, the
677	// leader re-elect.
678	//
679	msg := l.factory.CreateProposal(proposal.GetTxnid(),
680		proposal.GetFid(),
681		proposal.GetReqId(),
682		proposal.GetOpCode(),
683		proposal.GetKey(),
684		proposal.GetContent())
685
686	for _, f := range l.followers {
687		f.pipe.Send(msg)
688	}
689
690	for _, w := range l.watchers {
691		w.pipe.Send(msg)
692	}
693
694	for _, o := range l.observers {
695		o.send(msg)
696	}
697}
698
699//
700// send the proposal to the followers
701//
702func (l *Leader) sendAbort(fid string, reqId uint64, err string) {
703
704	log.Current.Tracef("leader.sendAbort(): Send Abort to %s", fid)
705
706	l.mutex.Lock()
707	defer l.mutex.Unlock()
708
709	for _, f := range l.followers {
710		if f.fid == fid {
711			msg := l.factory.CreateAbort(fid, reqId, err)
712			f.pipe.Send(msg)
713			return
714		}
715	}
716
717	for _, w := range l.watchers {
718		if w.fid == fid {
719			msg := l.factory.CreateAbort(fid, reqId, err)
720			w.pipe.Send(msg)
721			return
722		}
723	}
724
725	if l.GetFollowerId() == fid {
726		l.handler.Abort(fid, reqId, err)
727	}
728}
729
730//
731// send the response to the followers
732//
733func (l *Leader) sendResponse(msg ResponseMsg) {
734
735	log.Current.Tracef("leader.sendResponse(): Send Response to %s", msg.GetFid())
736
737	l.mutex.Lock()
738	defer l.mutex.Unlock()
739
740	for _, f := range l.followers {
741		if f.fid == msg.GetFid() {
742			f.pipe.Send(msg)
743			return
744		}
745	}
746
747	for _, w := range l.watchers {
748		if w.fid == msg.GetFid() {
749			w.pipe.Send(msg)
750			return
751		}
752	}
753
754	if l.GetFollowerId() == msg.GetFid() {
755		l.handler.Respond(msg.GetFid(), msg.GetReqId(), msg.GetError(), msg.GetContent())
756	}
757}
758
759/////////////////////////////////////////////////////////
760// Leader - Private Function : Handle Accept Message
761/////////////////////////////////////////////////////////
762
763//
764// handle accept message from follower
765//
766func (l *Leader) handleAccept(msg AcceptMsg) error {
767
768	// If this Ack is on an old proposal, then ignore.
769	// This indicates that the follower may be slower
770	// than others.  Therefore, the proposal may be
771	// committed, before the follower can Ack.
772	mtxid := common.Txnid(msg.GetTxnid())
773	if l.lastCommitted >= mtxid {
774		// cleanup.  l.quorums should not have mtxid.
775		// This is just in case since we will never commit
776		// this proposal.
777		_, ok := l.quorums[mtxid]
778		if ok {
779			delete(l.quorums, mtxid)
780			delete(l.proposals, mtxid)
781		}
782		return nil
783	}
784
785	// update quorum
786	l.updateQuorum(mtxid, msg.GetFid())
787	if l.hasQuorum(mtxid) {
788		// proposal has quorum. Now commit. If cannot commit, then return error
789		// which will cause the leader to re-election.  Just to handle
790		// case where there is hardware failure or repository corruption.
791		err := l.commit(mtxid)
792		if err != nil {
793			return err
794		}
795	}
796
797	return nil
798}
799
800//
801// update quorum of proposal
802//
803func (l *Leader) updateQuorum(txid common.Txnid, fid string) {
804
805	if l.quorums[txid] == nil {
806		l.quorums[txid] = make([]string, 0, common.MAX_FOLLOWERS)
807	}
808	log.Current.Tracef("Leader.updateQuorum: current quorum for txid %d : %d", uint64(txid), len(l.quorums[txid]))
809
810	// Just to double check if the follower has already voted on this proposal.
811	var found bool
812	for i := 0; i < len(l.quorums[txid]); i++ {
813		a := l.quorums[txid][i]
814		if a == fid {
815			found = true
816			break
817		}
818	}
819
820	if !found {
821		l.quorums[txid] = append(l.quorums[txid], fid)
822	}
823
824	log.Current.Tracef("Leader.updateQuorum: new quorum for txid %d : %d", uint64(txid), len(l.quorums[txid]))
825}
826
827//
828// check if a proposal has reached quorum
829//
830func (l *Leader) hasQuorum(txid common.Txnid) bool {
831	// This uses a simple majority of quorum.  ZK also has a
832	// hierarchy quorums for scalability (servers are put into different
833	// groups and quorums are obtained within a group).
834
835	log.Current.Tracef("Leader.hasQuorum: accepted response for txid %d = %d, ensemble size = %d",
836		uint64(txid), len(l.quorums[txid]), l.handler.GetEnsembleSize())
837
838	accepted, ok := l.quorums[txid]
839	if !ok {
840		// no one has voted yet on this proposal
841		return false
842	}
843
844	return uint64(len(accepted)) > (l.handler.GetEnsembleSize() / 2)
845}
846
847//
848// commit proposal
849//
850func (l *Leader) commit(txid common.Txnid) error {
851
852	// We are skipping proposal.  The protocol expects that each follower must
853	// send Accept Msg in the order of Proposal being received.   Since the
854	// message is sent out a reliable TCP connection, it is not possible to reach
855	// quorum out of order.  Particularly, if a new follower leaves and rejoins,
856	// the leader is responsible for resending all the pending proposals to the
857	// followers.   So if we see the txid is out-of-order there, then it is
858	// a fatal condition due to protocol error.
859	//
860	if !common.IsNextInSequence(txid, l.lastCommitted) {
861		log.Current.Debugf("Proposal must committed in sequential order for the same leader term. "+
862			"Found out-of-order commit. Leader last committed txid %d, commit msg %d", l.lastCommitted, txid)
863
864		return common.NewError(common.PROTOCOL_ERROR,
865			fmt.Sprintf("Proposal must committed in sequential order for the same leader term. "+
866				"Found out-of-order commit. Leader last committed txid %d, commit msg %d", l.lastCommitted, txid))
867	}
868
869	_, ok := l.proposals[txid]
870	if !ok {
871		return common.NewError(common.SERVER_ERROR,
872			fmt.Sprintf("Cannot find a proposal for the txid %d. Fail to commit the proposal.", txid))
873	}
874
875	// marking the proposal as committed.  Always do this first before sending to followers.
876	err := l.handler.Commit(txid)
877	if err != nil {
878		return err
879	}
880
881	// remove the votes
882	delete(l.quorums, txid)
883	delete(l.proposals, txid)
884
885	// Update lastCommitted
886	l.lastCommitted = txid
887
888	// Send the commit to followers
889	l.sendCommit(txid)
890
891	// TODO: do we need to update election site?  I don't think so, but need to double check.
892
893	return nil
894}
895
896//
897// send commit messages to all followers
898//
899func (l *Leader) sendCommit(txnid common.Txnid) error {
900
901	l.mutex.Lock()
902	defer l.mutex.Unlock()
903
904	msg := l.factory.CreateCommit(uint64(txnid))
905
906	// Send the request to the followers.  See the same
907	// comment as in sendProposal()
908	//
909	for _, f := range l.followers {
910		f.pipe.Send(msg)
911	}
912
913	// send message to watchers
914	for _, w := range l.watchers {
915		w.pipe.Send(msg)
916	}
917
918	// Send the message to the observer
919	for _, o := range l.observers {
920		o.send(msg)
921	}
922
923	return nil
924}
925