1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"github.com/couchbase/gometa/common"
20	"github.com/couchbase/gometa/log"
21	"net"
22	"sync"
23	"time"
24)
25
26/////////////////////////////////////////////////////////////////////////////
27// Type Declaration
28/////////////////////////////////////////////////////////////////////////////
29
30//
31// The ElectionSite controls all the participants of a election.
32// 1) messenger - repsonsible for sending messages to other voter
33// 2) ballot master - manages a ballot orginated from this node.   This includes
34//    re-balloting if there is no convergence on the votes.
35// 3) poll worker - recieve votes from other voters and determine if majority is reached
36//
37type ElectionSite struct {
38	messenger *common.PeerMessenger
39	master    *ballotMaster
40	worker    *pollWorker
41
42	solicitOnly  bool
43	ensemble     []net.Addr
44	fullEnsemble []string
45	factory      MsgFactory
46	handler      ActionHandler
47
48	mutex    sync.Mutex
49	isClosed bool
50}
51
52type ballotResult struct {
53	proposed      VoteMsg
54	winningEpoch  uint32             // winning epoch : can be updated after follower has sync with leader
55	receivedVotes map[string]VoteMsg // the map key is voter UDP address
56	activePeers   map[string]VoteMsg // the map key is voter UDP address
57}
58
59type Ballot struct {
60	result   *ballotResult
61	resultch chan bool // should only be closed by pollWorker
62}
63
64type ballotMaster struct {
65	site *ElectionSite
66
67	// mutex protected state
68	mutex  sync.Mutex
69	winner *ballotResult
70	inProg bool
71	round  uint64
72}
73
74type pollWorker struct {
75	site     *ElectionSite
76	ballot   *Ballot
77	listench chan *Ballot
78	killch   chan bool
79}
80
81//
82// The election round is incremented for every new election being run in
83// this process.    If there is an ensemble of peers are running election,
84// these peers will need to be in the same round in order to achieve quorum.
85// Essentially, if a peer joins an electing ensemble, it can either join
86// the current round of voting or start a new round.  If it start a new round,
87// then it must have enough peers to join his round before a quorum can be reached.
88// If a peer leaves an ensemble, its vote still count (ZK does not take away vote).
89// The sycnhronization (recovery) phase will double check if a quorum of followers
90// agree to the leader before the algorithm is fully converged.
91//
92var gElectionRound uint64 = 0
93
94/////////////////////////////////////////////////////////////////////////////
95// ElectionSite (Public API)
96/////////////////////////////////////////////////////////////////////////////
97
98//
99// Create ElectionSite
100//
101func CreateElectionSite(laddr string,
102	peers []string,
103	factory MsgFactory,
104	handler ActionHandler,
105	solicitOnly bool) (election *ElectionSite, err error) {
106
107	// create a full ensemble (including the local host)
108	en, fullEn, err := cloneEnsemble(peers, laddr)
109	if err != nil {
110		return nil, err
111	}
112
113	election = &ElectionSite{isClosed: false,
114		factory:      factory,
115		handler:      handler,
116		ensemble:     en,
117		fullEnsemble: fullEn,
118		solicitOnly:  solicitOnly}
119
120	// Create a new messenger
121	election.messenger, err = newMessenger(laddr)
122	if err != nil {
123		return nil, err
124	}
125
126	// Create a new ballot master
127	election.master = newBallotMaster(election)
128
129	// Create a new poll worker.  This will start the
130	// goroutine for the pollWorker.
131	election.worker = startPollWorker(election)
132
133	return election, nil
134}
135
136//
137// Start a new Election.  If there is a ballot in progress, this function
138// will return a nil channel.  The ballot will happen indefinitely until a winner
139// emerge or there is an error.   The winner will be returned through winnerch.
140// If there is an error, the channel will be closed without sending a value.
141//
142func (e *ElectionSite) StartElection() <-chan string {
143
144	// ballot in progress
145	if !e.master.setBallotInProg(true) || e.IsClosed() {
146		return nil
147	}
148
149	// create a buffered channel so sender won't block.
150	winnerch := make(chan string, 1)
151
152	go e.master.castBallot(winnerch)
153
154	return (<-chan string)(winnerch)
155}
156
157//
158// Close ElectionSite.  Any pending ballot will be closed immediately.
159//
160//
161func (e *ElectionSite) Close() {
162	e.mutex.Lock()
163	defer e.mutex.Unlock()
164
165	if !e.isClosed {
166		log.Current.Debugf("ElectionSite.Close() : Diagnostic Stack ...")
167		log.Current.LazyDebug(log.Current.StackTrace)
168
169		e.isClosed = true
170
171		e.messenger.Close()
172		e.master.close()
173		e.worker.close()
174	}
175}
176
177//
178// Tell if the ElectionSite is closed.
179//
180func (e *ElectionSite) IsClosed() bool {
181	e.mutex.Lock()
182	defer e.mutex.Unlock()
183
184	return e.isClosed
185}
186
187//
188// Update the winning epoch. The epoch can change after
189// the synchronization phase (when leader tells the
190// follower what is the actual epoch value -- after the
191// leader gets a quorum of followers).  There are other
192// possible implementations (e.g. keeping the winning
193// vote with the server -- not the ballotMaster), but
194// for now, let's just have this API to update the
195// epoch. Note that this is just a public wrapper
196// method on top of ballotMaster.
197//
198func (s *ElectionSite) UpdateWinningEpoch(epoch uint32) {
199
200	s.master.updateWinningEpoch(epoch)
201}
202
203/////////////////////////////////////////////////////////////////////////////
204// ElectionSite (Private)
205/////////////////////////////////////////////////////////////////////////////
206
207//
208// Create Vote from State
209//
210func (s *ElectionSite) createVoteFromCurState() VoteMsg {
211
212	epoch, err := s.handler.GetCurrentEpoch()
213	if err != nil {
214		// if epoch is missing, set the epoch to the smallest possible
215		// number.  This is to allow the voting peers to tell me what
216		// the right epoch would be during balloting.  This allows me
217		// to proceed leader election.  After leader election, this
218		// node will either be a leader or follower, and it will need
219		// to synchornize with the peer's state (acceptedEpoch, currentEpoch).
220		epoch = common.BOOTSTRAP_CURRENT_EPOCH
221	}
222
223	lastLoggedTxid, err := s.handler.GetLastLoggedTxid()
224	if err != nil {
225		// if txid is missing, set the txid to the smallest possible
226		// number.  This likely will cause the peer to ignore my vote.
227		lastLoggedTxid = common.BOOTSTRAP_LAST_LOGGED_TXID
228	}
229
230	lastCommittedTxid, err := s.handler.GetLastCommittedTxid()
231	if err != nil {
232		// if txid is missing, set the txid to the smallest possible
233		// number.  This likely will cause the peer to ignore my vote.
234		lastCommittedTxid = common.BOOTSTRAP_LAST_COMMITTED_TXID
235	}
236
237	vote := s.factory.CreateVote(s.master.round,
238		uint32(s.handler.GetStatus()),
239		epoch,
240		s.messenger.GetLocalAddr(), // this is localhost UDP port
241		uint64(lastLoggedTxid),
242		uint64(lastCommittedTxid),
243		s.solicitOnly)
244
245	return vote
246}
247
248//
249// Tell if a particular voter is in the ensemble
250//
251func (s *ElectionSite) inEnsemble(voter net.Addr) bool {
252	for _, peer := range s.fullEnsemble {
253		if peer == voter.String() {
254			return true
255		}
256	}
257	return false
258}
259
260//
261// Create an ensemble for voting
262//
263func cloneEnsemble(peers []string, laddr string) ([]net.Addr, []string, error) {
264
265	en := make([]net.Addr, 0, len(peers))
266	fullEn := make([]string, 0, len(peers)+1)
267
268	for i := 0; i < len(peers); i++ {
269		rAddr, err := net.ResolveUDPAddr("udp", peers[i])
270		if err != nil {
271			return nil, nil, err
272		}
273		en = append(en, rAddr)
274		fullEn = append(fullEn, rAddr.String())
275	}
276
277	fullEn = append(fullEn, laddr)
278
279	return en, fullEn, nil
280}
281
282/////////////////////////////////////////////////////////////////////////////
283// ballotMaster
284/////////////////////////////////////////////////////////////////////////////
285
286//
287// Create a new ballotMaster.
288//
289func newBallotMaster(site *ElectionSite) *ballotMaster {
290
291	master := &ballotMaster{site: site,
292		winner: nil,
293		round:  gElectionRound,
294		inProg: false}
295
296	return master
297}
298
299//
300// Start a new round of ballot.
301//
302func (b *ballotMaster) castBallot(winnerch chan string) {
303
304	// close the channel to make sure that the caller won't be
305	// block forever.  If the balltot is successful, a value would
306	// have sent to the channel before being closed. Otherwise,
307	// a closed channel without value means the ballot is not
308	// successful.
309	defer func() {
310		if r := recover(); r != nil {
311			log.Current.Errorf("panic in ballotMaster.castBallot() : %s\n", r)
312			common.SafeRun("ballotMaster.castBallot()",
313				func() {
314					b.site.Close()
315				})
316		}
317
318		common.SafeRun("ballotMaster.castBallot()",
319			func() {
320				close(winnerch) // unblock caller
321
322				// balloting complete
323				b.setBallotInProg(false)
324			})
325	}()
326
327	// create a channel to receive the ballot result
328	// should only be closed by Poll Worker.  Make
329	// if buffered so the sender won't block.
330	resultch := make(chan bool, 1)
331
332	// Create a new ballot
333	ballot := b.createInitialBallot(resultch)
334
335	// Tell the worker to observe this ballot.  This forces
336	// the worker to start collecting new ballot result.
337	b.site.worker.observe(ballot)
338
339	// let the peer to know about this ballot.  It is expected
340	// that the peer will reply with a vote.
341	b.site.messenger.Multicast(ballot.result.proposed, b.site.ensemble)
342
343	success, ok := <-resultch
344	if !ok {
345		// channel close. Ballot done
346		success = false
347	}
348
349	// Announce the winner
350	if success {
351		winner, ok := b.GetWinner()
352		if ok {
353			common.SafeRun("ballotMaster.castBallot()",
354				func() {
355					// Remember the last round.
356					gElectionRound = b.round
357					// Announce the result
358					winnerch <- winner
359				})
360		}
361	}
362}
363
364//
365// close the ballot master.
366//
367func (b *ballotMaster) close() {
368	// Nothing to do now (jsut placeholder).   The current ballot
369	// is closed when the ballot resultch is closed.
370	// Instead of doing it in this method, should
371	// do it in the poll worker to avoid race condition.
372}
373
374//
375// Update the flag indicating if there is a balllot in progress. Return value
376// is true if the flag value was changed.
377//
378func (b *ballotMaster) setBallotInProg(value bool) bool {
379	b.mutex.Lock()
380	defer b.mutex.Unlock()
381
382	if b.inProg == value {
383		return false
384	}
385
386	b.inProg = value
387	return true
388}
389
390//
391// Get the next id for ballot.
392//
393func (b *ballotMaster) getNextRound() uint64 {
394
395	result := b.round
396	b.round++
397	return result
398}
399
400//
401// Create a ballot
402//
403func (b *ballotMaster) createInitialBallot(resultch chan bool) *Ballot {
404
405	result := &ballotResult{winningEpoch: 0,
406		receivedVotes: make(map[string]VoteMsg),
407		activePeers:   make(map[string]VoteMsg)}
408
409	ballot := &Ballot{result: result,
410		resultch: resultch}
411
412	b.getNextRound()
413	newVote := b.site.createVoteFromCurState()
414	ballot.updateProposed(newVote, b.site)
415
416	return ballot
417}
418
419//
420// Copy a winning vote.  This function is called when
421// there is no active ballot going on.
422//
423func (b *ballotMaster) cloneWinningVote() VoteMsg {
424
425	b.mutex.Lock()
426	defer b.mutex.Unlock()
427
428	// If b.winner is not nil, then it indicates that I have concluded my leader
429	// election.
430	if b.winner != nil {
431		return b.site.factory.CreateVote(
432			b.winner.proposed.GetRound(),
433			uint32(b.site.handler.GetStatus()),
434			b.winner.winningEpoch,
435			b.winner.proposed.GetCndId(),
436			b.winner.proposed.GetCndLoggedTxnId(),
437			b.winner.proposed.GetCndCommittedTxnId(),
438			b.site.solicitOnly)
439	}
440
441	return nil
442}
443
444/////////////////////////////////////////////////////////////////////////////
445// Function for upkeeping the election state.  These covers function from
446// ballotMaster and Ballot.
447/////////////////////////////////////////////////////////////////////////////
448
449//
450// Return the winner
451//
452func (b *ballotMaster) GetWinner() (string, bool) {
453	b.mutex.Lock()
454	defer b.mutex.Unlock()
455
456	if b.winner != nil {
457		return b.winner.proposed.GetCndId(), true
458	}
459
460	return "", false
461}
462
463//
464// Set the winner
465//
466func (b *ballotMaster) setWinner(result *ballotResult) {
467	b.mutex.Lock()
468	defer b.mutex.Unlock()
469
470	b.winner = result
471	b.winner.winningEpoch = result.proposed.GetEpoch()
472}
473
474//
475// Update the epcoh of the winning vote
476//
477func (b *ballotMaster) updateWinningEpoch(epoch uint32) {
478	b.mutex.Lock()
479	defer b.mutex.Unlock()
480
481	b.winner.winningEpoch = epoch
482}
483
484//
485// Set the current round.  This function is there just for
486// easier to keep track of different places that set
487// the ballotMaster.round.   ballotMaster.round should
488// always be in sycn with the ballot.result.proposed.round or
489// the master.winner.proposed.round.
490//
491func (b *ballotMaster) setCurrentRound(round uint64) {
492	b.mutex.Lock()
493	defer b.mutex.Unlock()
494
495	b.round = round
496}
497
498//
499// Get the current round
500//
501func (b *ballotMaster) getCurrentRound() uint64 {
502	b.mutex.Lock()
503	defer b.mutex.Unlock()
504
505	return b.round
506}
507
508//
509// Make the given vote as the proposed vote. Since pollWorker
510// executes serially, this does not need mutex.
511//
512func (b *Ballot) updateProposed(proposed VoteMsg, site *ElectionSite) {
513
514	// update the ballot
515	b.result.proposed = proposed
516
517	// esnure the ballotMaster's round matches the proposed vote.
518	// These 2 values should be always in sync.
519	site.master.setCurrentRound(proposed.GetRound())
520
521	// update the recieved votes (for quorum)
522	b.result.receivedVotes[site.messenger.GetLocalAddr()] = proposed
523}
524
525//
526// Reset the ballot with a new proposed vote
527//
528func (b *Ballot) resetAndUpdateProposed(proposed VoteMsg, site *ElectionSite) {
529	b.result.receivedVotes = make(map[string]VoteMsg)
530	// To be safe, clean up the active peers as well.  This is just to ensure
531	// when an active peers becomes an electing peer, we don't keep old votes
532	// around.  This deviates from ZK (which does not clear the active votes
533	// -- possibly for faster convergence to quorum).
534	b.result.activePeers = make(map[string]VoteMsg)
535
536	// update the proposed
537	b.updateProposed(proposed, site)
538}
539
540/////////////////////////////////////////////////////////////////////////////
541// pollWorker
542/////////////////////////////////////////////////////////////////////////////
543
544//
545// Create a new pollWorker.  The pollWorker listens to the Vote receving from
546// the peers for a particular ballot.
547//
548func startPollWorker(site *ElectionSite) *pollWorker {
549
550	worker := &pollWorker{site: site,
551		ballot:   nil,
552		killch:   make(chan bool, 1),    // make sure sender won't block
553		listench: make(chan *Ballot, 1)} // make sure sender won't block
554
555	go worker.listen()
556
557	return worker
558}
559
560//
561// Notify the pollWorker that there is a new ballot.
562//
563func (w *pollWorker) observe(ballot *Ballot) {
564	// This synchronous.  This is to ensure that listen() receives the ballot
565	// before this function return to the ballotMaster.
566	w.ballot = ballot
567	w.listench <- ballot
568}
569
570//
571// Close the pollWorker
572//
573func (p *pollWorker) close() {
574	p.killch <- true
575}
576
577//
578// Goroutine.  Listen to vote coming from the peer for a
579// particular ballot.  This is the only goroutine that
580// handle all incoming requests.
581//
582// Voter -> the peer that replies the ballot with a vote
583// Candidate -> the peer that is voted for by the voter.
584// It is the peer (CndId) that is inside the vote.
585//
586func (w *pollWorker) listen() {
587
588	// If this loop terminates (e.g. due to panic), then make sure
589	// there is no outstanding ballot waiting for a result.   Close
590	// any channel for outstanding ballot such that the caller
591	// won't get blocked forever.
592	defer func() {
593		if r := recover(); r != nil {
594			log.Current.Errorf("panic in pollWorker.listen() : %s\n", r)
595		}
596
597		// make sure we close the ElectionSite first such that
598		// there is no new ballot coming while we are shutting
599		// down the pollWorker. If not, then the some go-routine
600		// may be waiting forever for the new ballot to complete.
601		common.SafeRun("pollWorker.listen()",
602			func() {
603				w.site.Close()
604			})
605
606		// unlock anyone waiting for existing ballot to complete.
607		common.SafeRun("pollWorker.listen()",
608			func() {
609				if w.ballot != nil {
610					close(w.ballot.resultch)
611					w.ballot = nil
612				}
613			})
614	}()
615
616	// Get the channel for receiving votes from the peer.
617	reqch := w.site.messenger.DefaultReceiveChannel()
618
619	timeout := common.NewBackoffTimer(
620		common.BALLOT_TIMEOUT*time.Millisecond,
621		common.BALLOT_MAX_TIMEOUT*time.Millisecond,
622		2,
623	)
624
625	inFinalize := false
626	finalizeTimer := common.NewStoppedResettableTimer(common.BALLOT_FINALIZE_WAIT * time.Millisecond)
627
628	for {
629		select {
630		case w.ballot = <-w.listench: // listench should never close
631			{
632				// Before listening to any vote, see if we reach quorum already.
633				// This should only happen if there is only one server in the
634				// ensemble.  If this election is for solicit purpose, then
635				// run election all the time.
636				if !w.site.solicitOnly &&
637					w.checkQuorum(w.ballot.result.receivedVotes, w.ballot.result.proposed) {
638					w.site.master.setWinner(w.ballot.result)
639					w.ballot.resultch <- true
640					w.ballot = nil
641				} else {
642					// There is a new ballot.
643					timeout.Reset()
644					inFinalize = false
645					finalizeTimer.Stop()
646				}
647			}
648		// Receiving a vote
649		case msg, ok := <-reqch:
650			{
651				if !ok {
652					return
653				}
654
655				// Receive a new vote.  The voter is identified by its UDP port,
656				// which must remain the same during the election phase.
657				vote := msg.Content.(VoteMsg)
658				voter := msg.Peer
659
660				// If I am receiving a vote that just for soliciting my response,
661				// then respond with my winning vote only after I am confirmed as
662				// either a leader or follower.  This ensure that the watcher will
663				// only find a leader from a stable ensemble.  This also ensures
664				// that the watcher will only count the votes from active participant,
665				// therefore, it will not count from other watcher as well as its
666				// own vote (code path for handling votes from electing member will
667				// never called for watcher).
668				if vote.GetSolicit() {
669					status := w.site.handler.GetStatus()
670					if status == LEADING || status == FOLLOWING {
671						w.respondInquiry(voter, vote)
672					}
673					continue
674				}
675
676				// Check if the voter is in the ensemble
677				if !w.site.inEnsemble(voter) {
678					continue
679				}
680
681				if w.ballot == nil {
682					// If there is no ballot or the vote is from a watcher,
683					// then just need to respond if I have a winner.
684					w.respondInquiry(voter, vote)
685					continue
686				}
687
688				timeout.Reset()
689
690				proposed := w.cloneProposedVote()
691				if w.handleVote(voter, vote) {
692					proposedUpdated :=
693						w.compareVote(w.ballot.result.proposed, proposed) != common.EQUAL
694
695					if !inFinalize || proposedUpdated {
696						inFinalize = true
697						finalizeTimer.Reset()
698					}
699				} else {
700					if inFinalize {
701						// we had a quorum but not anymore
702						inFinalize = false
703						finalizeTimer.Stop()
704					}
705				}
706			}
707		case <-finalizeTimer.C:
708			{
709				// we achieve quorum, set the winner.
710				// setting the winner and usetting the ballot
711				// should be done together.
712				// NOTE: ZK does not notify other peers when this node has
713				// select a leader
714				w.site.master.setWinner(w.ballot.result)
715				w.ballot.resultch <- true
716				w.ballot = nil
717				timeout.Stop()
718			}
719		case <-timeout.GetChannel():
720			{
721				// If there is a timeout but no response, send vote again.
722				if w.ballot != nil {
723					w.site.messenger.Multicast(w.cloneProposedVote(), w.site.ensemble)
724					timeout.Backoff()
725				}
726			}
727		case <-w.killch:
728			{
729				return
730			}
731		}
732	}
733}
734
735//
736// The pollWorker is no longer in election.  Respond to inquiry from
737// the peer.
738//
739func (w *pollWorker) respondInquiry(voter net.Addr, vote VoteMsg) {
740
741	if PeerStatus(vote.GetStatus()) == ELECTING {
742		// Make sure that we only send this when there is a winning vote, such
743		// that the vote has a majority support.
744		msg := w.site.master.cloneWinningVote()
745		if msg != nil {
746			// send the winning vote if there is no error
747			w.site.messenger.Send(msg, voter)
748		}
749	}
750}
751
752//
753// Handle a new vote.
754//
755func (w *pollWorker) handleVote(voter net.Addr, vote VoteMsg) bool {
756
757	if PeerStatus(vote.GetStatus()) == ELECTING {
758		// if peer is still in election
759		return w.handleVoteForElectingPeer(voter, vote)
760	} else {
761		// if peer is either leading or following
762		return w.handleVoteForActivePeer(voter, vote)
763	}
764}
765
766//
767// Handle a new vote if peer is electing.
768//
769func (w *pollWorker) handleVoteForElectingPeer(voter net.Addr, vote VoteMsg) bool {
770
771	// compare the round.  When there are electing peers, they will eventually
772	// converge to the same round when quorum is reached.  This implies that
773	// an established ensemble should share the same round, and this value
774	// remains stable for the ensemble.
775	compareRound := w.compareRound(vote)
776
777	// if the incoming vote has a greater round, re-ballot.
778	if compareRound == common.GREATER {
779
780		// update the current round.  This need to be done
781		// before updateProposed() is called.
782		w.site.master.setCurrentRound(vote.GetRound())
783
784		if w.compareVoteWithCurState(vote) == common.GREATER {
785			// Update my vote if the incoming vote is larger.
786			w.ballot.resetAndUpdateProposed(vote, w.site)
787		} else {
788			// otherwise udpate my vote using lastLoggedTxid
789			w.ballot.resetAndUpdateProposed(w.site.createVoteFromCurState(), w.site)
790		}
791
792		// notify that our new vote
793		w.site.messenger.Multicast(w.cloneProposedVote(), w.site.ensemble)
794
795		// if we reach quorum with this vote, announce the result
796		// and stop election
797		return w.acceptAndCheckQuorum(voter, vote)
798
799	} else if compareRound == common.EQUAL {
800		// if it is the same round and the incoming vote has higher epoch or txid,
801		// update myself to the incoming vote and broadcast my new vote
802		switch w.compareVoteWithProposed(vote) {
803		case common.GREATER:
804			// update and notify that our new vote
805			w.ballot.updateProposed(vote, w.site)
806			w.site.messenger.Multicast(w.cloneProposedVote(), w.site.ensemble)
807
808			// Add this vote to the received list.  Note that even if
809			// the peer went down there is network partition after the
810			// vote is being sent by peer, we still count this vote.
811			// If somehow we got the wrong leader because of this, we
812			// not be able to finish in the discovery/sync phase anyway,
813			// and a new election will get started.
814
815			// If I believe I am chosen as a leader in the election
816			// and the network is partitioned afterwards.  The
817			// sychonization phase will check if I do get a majorty
818			// of followers connecting to me before proceeding.  So
819			// for now, I can return as long as I reach quorum and
820			// let subsequent phase to do more checking.
821
822			return w.acceptAndCheckQuorum(voter, vote)
823		case common.EQUAL:
824			return w.acceptAndCheckQuorum(voter, vote)
825		}
826	} else {
827		// My round is higher. Send back the notification to the sender with my round
828		w.site.messenger.Send(w.cloneProposedVote(), voter)
829	}
830
831	return false
832}
833
834//
835// Handle a new vote from a leader or follower.   This implies that this vote
836// has already reached quorum and this node belongs to the quorum.  When we
837// reach this method, it can be:
838// 1) An new ensemble is converging from a set of electing nodes.  So nodes are
839//    reaching this conclusion faster than I am.
840// 2) I am joining an established ensemble (I rejoin the network or restart).
841// 3) A node from an established ensemble responds to me, but the ensemble
842//    could soon be dissolve (lose majority) after the node sends the message.
843// 4) An rogue node re-join the network while I am running election
844//    (due to bug/race condition?).
845//
846func (w *pollWorker) handleVoteForActivePeer(voter net.Addr, vote VoteMsg) bool {
847
848	// compare the round
849	compareRound := w.compareRound(vote)
850
851	if compareRound == common.EQUAL {
852		// If I recieve a vote with the same round, then it could mean
853		// that an esemble is forming from a set of electing peers.  Add
854		// this vote to the list of received votes.  All the received votes
855		// are from the same round.  If we get a quorum from the received
856		// votes, then announce the result.
857		// NOTE: ZK does not check the epoch nor update the proposed vote upon
858		// receiving a vote from an active member (unlike receiving a vote from
859		// electing peer).  This implies that if this is a rogue vote (a node
860		// sends out a vote and the ensemble loses majority), the election alogrithm
861		// will not get affected -- it can still converge if there is majority of
862		// electing peer to reach quorum.  If the established ensmeble remains stable,
863		// then there should be enough active member responds to me and I will
864		// eventually reach quorum (based on ballot.result.activePeers -- see below).
865		w.ballot.result.receivedVotes[voter.String()] = vote
866
867		if w.checkQuorum(w.ballot.result.receivedVotes, vote) && w.certifyLeader(vote) {
868			// accept this vote from the peer
869			w.ballot.updateProposed(vote, w.site)
870			return true
871		}
872	}
873
874	// The active peer has chosen a leader, but we cannot confirm it yet.
875	// Keep the active peer onto a different list, since receivedVotes
876	// can be reset (all received votes must be from the same round).
877	// If this peer goes down after sending us his vote, his vote still count
878	// in this ballot.  By calling certifyLeader(), we can also makes sure that
879	// the candidate has established itself to us as a leader.
880	w.ballot.result.activePeers[voter.String()] = vote
881
882	// Check the quorum only for the active peers.   In this case, the vote
883	// can have a different round than mime.   There may already be an established
884	// ensemble and I am merely trying to join them.
885	if w.checkQuorum(w.ballot.result.activePeers, vote) && w.certifyLeader(vote) {
886
887		w.ballot.updateProposed(vote, w.site)
888		return true
889	}
890
891	return false
892}
893
894//
895// Compare the current round with the given vote
896//
897func (w *pollWorker) compareRound(vote VoteMsg) common.CompareResult {
898
899	currentRound := w.site.master.round
900
901	if vote.GetRound() == currentRound {
902		return common.EQUAL
903	}
904
905	if vote.GetRound() > currentRound {
906		return common.GREATER
907	}
908
909	return common.LESSER
910}
911
912//
913// Compare two votes.  Return true if vote1 is larger than vote2.
914//
915func (w *pollWorker) compareVote(vote1, vote2 VoteMsg) common.CompareResult {
916
917	// Vote with the larger epoch always is larger
918	result := common.CompareEpoch(vote1.GetEpoch(), vote2.GetEpoch())
919
920	if result == common.MORE_RECENT {
921		return common.GREATER
922	}
923
924	if result == common.LESS_RECENT {
925		return common.LESSER
926	}
927
928	// If a candidate has a larger logged txid, it means the candidate
929	// has processed more proposals.   This vote is larger.
930	if vote1.GetCndLoggedTxnId() > vote2.GetCndLoggedTxnId() {
931		return common.GREATER
932	}
933
934	if vote1.GetCndLoggedTxnId() < vote2.GetCndLoggedTxnId() {
935		return common.LESSER
936	}
937
938	// This candidate has the same number of proposals in his committed log as
939	// the other one. But if a candidate has a larger committed txid,
940	// it means this candidate also has processed more commit messages from the
941	// previous leader.   This vote is larger.
942	if vote1.GetCndCommittedTxnId() > vote2.GetCndCommittedTxnId() {
943		return common.GREATER
944	}
945
946	if vote1.GetCndCommittedTxnId() < vote2.GetCndCommittedTxnId() {
947		return common.LESSER
948	}
949
950	// All else is equal (e.g. during inital system startup -- repository is emtpy),
951	// use the ip address.
952	if vote1.GetCndId() > vote2.GetCndId() {
953		return common.GREATER
954	}
955
956	if vote1.GetCndId() < vote2.GetCndId() {
957		return common.LESSER
958	}
959
960	return common.EQUAL
961}
962
963//
964// Compare the given vote with currennt state (epoch, lastLoggedTxnid)
965//
966func (w *pollWorker) compareVoteWithCurState(vote VoteMsg) common.CompareResult {
967
968	vote2 := w.site.createVoteFromCurState()
969	return w.compareVote(vote, vote2)
970}
971
972//
973// Compare the given vote with proposed vote
974//
975func (w *pollWorker) compareVoteWithProposed(vote VoteMsg) common.CompareResult {
976
977	return w.compareVote(vote, w.ballot.result.proposed)
978}
979
980//
981// Accept the check quorum
982//
983func (w *pollWorker) acceptAndCheckQuorum(voter net.Addr, vote VoteMsg) bool {
984
985	// Remember this peer's vote.  Note that ZK never takes away a voter's votes
986	// even if the voter has gone down (ZK would not know).  But ZK will ensure
987	// that the new leader will have a quorum of followers (in synchronization/recovery
988	// phase) before the ensemble become stable.
989	w.ballot.result.receivedVotes[voter.String()] = vote
990	return w.checkQuorum(w.ballot.result.receivedVotes, w.ballot.result.proposed)
991
992	//TODO: After quorum is reached, ZK will wait to see if there is any additional
993	//messages that will change leader.  It is possibly an optimization because if
994	//it is a wrong leader, it will fail during discovery/sync and force a new
995	//re-election.
996}
997
998//
999// Check Quorum
1000//
1001func (w *pollWorker) checkQuorum(votes map[string]VoteMsg, candidate VoteMsg) bool {
1002
1003	count := 0
1004	for _, vote := range votes {
1005		if PeerStatus(vote.GetStatus()) == ELECTING ||
1006			PeerStatus(candidate.GetStatus()) == ELECTING {
1007			if w.compareVote(vote, candidate) == common.EQUAL &&
1008				vote.GetRound() == candidate.GetRound() {
1009				count++
1010			}
1011		} else if vote.GetCndId() == candidate.GetCndId() &&
1012			vote.GetEpoch() == candidate.GetEpoch() {
1013			count++
1014		}
1015	}
1016
1017	return w.site.handler.GetQuorumVerifier().HasQuorum(count)
1018}
1019
1020//
1021// Copy a proposed vote
1022//
1023func (w *pollWorker) cloneProposedVote() VoteMsg {
1024
1025	// w.site.master.round should be in sycn with
1026	// w.ballot.result.proposed.round. Use w.site.master.round
1027	// to be consistent.
1028	return w.site.factory.CreateVote(w.site.master.round,
1029		uint32(w.site.handler.GetStatus()),
1030		uint32(w.ballot.result.proposed.GetEpoch()),
1031		w.ballot.result.proposed.GetCndId(),
1032		w.ballot.result.proposed.GetCndLoggedTxnId(),
1033		w.ballot.result.proposed.GetCndCommittedTxnId(),
1034		w.site.solicitOnly)
1035}
1036
1037//
1038// Certify the leader before declaring followship
1039//
1040func (w *pollWorker) certifyLeader(vote VoteMsg) bool {
1041
1042	// I am not voted as leader
1043	if vote.GetCndId() != w.site.messenger.GetLocalAddr() {
1044		// 	The leader must be known to me as active
1045		leaderVote, ok := w.ballot.result.activePeers[vote.GetCndId()]
1046		if ok && PeerStatus(leaderVote.GetStatus()) == LEADING {
1047			return true
1048		}
1049		return false
1050	}
1051
1052	// If someone voting me as a leader, make sure that we have the same round
1053	return w.site.master.round == vote.GetRound()
1054}
1055
1056/////////////////////////////////////////////////////////////////////////////
1057// Private Function
1058/////////////////////////////////////////////////////////////////////////////
1059
1060func newMessenger(laddr string) (*common.PeerMessenger, error) {
1061
1062	messenger, err := common.NewPeerMessenger(laddr, nil)
1063	if err != nil {
1064		return nil, err
1065	}
1066
1067	return messenger, nil
1068}
1069