1// @author Couchbase <info@couchbase.com> 2// @copyright 2014 Couchbase, Inc. 3// 4// Licensed under the Apache License, Version 2.0 (the "License"); 5// you may not use this file except in compliance with the License. 6// You may obtain a copy of the License at 7// 8// http://www.apache.org/licenses/LICENSE-2.0 9// 10// Unless required by applicable law or agreed to in writing, software 11// distributed under the License is distributed on an "AS IS" BASIS, 12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13// See the License for the specific language governing permissions and 14// limitations under the License. 15 16package protocol 17 18import ( 19 "github.com/couchbase/gometa/common" 20 "github.com/couchbase/gometa/log" 21 "net" 22 "sync" 23 "time" 24) 25 26///////////////////////////////////////////////////////////////////////////// 27// Type Declaration 28///////////////////////////////////////////////////////////////////////////// 29 30// 31// The ElectionSite controls all the participants of a election. 32// 1) messenger - repsonsible for sending messages to other voter 33// 2) ballot master - manages a ballot orginated from this node. This includes 34// re-balloting if there is no convergence on the votes. 35// 3) poll worker - recieve votes from other voters and determine if majority is reached 36// 37type ElectionSite struct { 38 messenger *common.PeerMessenger 39 master *ballotMaster 40 worker *pollWorker 41 42 solicitOnly bool 43 ensemble []net.Addr 44 fullEnsemble []string 45 factory MsgFactory 46 handler ActionHandler 47 48 mutex sync.Mutex 49 isClosed bool 50} 51 52type ballotResult struct { 53 proposed VoteMsg 54 winningEpoch uint32 // winning epoch : can be updated after follower has sync with leader 55 receivedVotes map[string]VoteMsg // the map key is voter UDP address 56 activePeers map[string]VoteMsg // the map key is voter UDP address 57} 58 59type Ballot struct { 60 result *ballotResult 61 resultch chan bool // should only be closed by pollWorker 62} 63 64type ballotMaster struct { 65 site *ElectionSite 66 67 // mutex protected state 68 mutex sync.Mutex 69 winner *ballotResult 70 inProg bool 71 round uint64 72} 73 74type pollWorker struct { 75 site *ElectionSite 76 ballot *Ballot 77 listench chan *Ballot 78 killch chan bool 79} 80 81// 82// The election round is incremented for every new election being run in 83// this process. If there is an ensemble of peers are running election, 84// these peers will need to be in the same round in order to achieve quorum. 85// Essentially, if a peer joins an electing ensemble, it can either join 86// the current round of voting or start a new round. If it start a new round, 87// then it must have enough peers to join his round before a quorum can be reached. 88// If a peer leaves an ensemble, its vote still count (ZK does not take away vote). 89// The sycnhronization (recovery) phase will double check if a quorum of followers 90// agree to the leader before the algorithm is fully converged. 91// 92var gElectionRound uint64 = 0 93 94///////////////////////////////////////////////////////////////////////////// 95// ElectionSite (Public API) 96///////////////////////////////////////////////////////////////////////////// 97 98// 99// Create ElectionSite 100// 101func CreateElectionSite(laddr string, 102 peers []string, 103 factory MsgFactory, 104 handler ActionHandler, 105 solicitOnly bool) (election *ElectionSite, err error) { 106 107 // create a full ensemble (including the local host) 108 en, fullEn, err := cloneEnsemble(peers, laddr) 109 if err != nil { 110 return nil, err 111 } 112 113 election = &ElectionSite{isClosed: false, 114 factory: factory, 115 handler: handler, 116 ensemble: en, 117 fullEnsemble: fullEn, 118 solicitOnly: solicitOnly} 119 120 // Create a new messenger 121 election.messenger, err = newMessenger(laddr) 122 if err != nil { 123 return nil, err 124 } 125 126 // Create a new ballot master 127 election.master = newBallotMaster(election) 128 129 // Create a new poll worker. This will start the 130 // goroutine for the pollWorker. 131 election.worker = startPollWorker(election) 132 133 return election, nil 134} 135 136// 137// Start a new Election. If there is a ballot in progress, this function 138// will return a nil channel. The ballot will happen indefinitely until a winner 139// emerge or there is an error. The winner will be returned through winnerch. 140// If there is an error, the channel will be closed without sending a value. 141// 142func (e *ElectionSite) StartElection() <-chan string { 143 144 // ballot in progress 145 if !e.master.setBallotInProg(true) || e.IsClosed() { 146 return nil 147 } 148 149 // create a buffered channel so sender won't block. 150 winnerch := make(chan string, 1) 151 152 go e.master.castBallot(winnerch) 153 154 return (<-chan string)(winnerch) 155} 156 157// 158// Close ElectionSite. Any pending ballot will be closed immediately. 159// 160// 161func (e *ElectionSite) Close() { 162 e.mutex.Lock() 163 defer e.mutex.Unlock() 164 165 if !e.isClosed { 166 log.Current.Debugf("ElectionSite.Close() : Diagnostic Stack ...") 167 log.Current.LazyDebug(log.Current.StackTrace) 168 169 e.isClosed = true 170 171 e.messenger.Close() 172 e.master.close() 173 e.worker.close() 174 } 175} 176 177// 178// Tell if the ElectionSite is closed. 179// 180func (e *ElectionSite) IsClosed() bool { 181 e.mutex.Lock() 182 defer e.mutex.Unlock() 183 184 return e.isClosed 185} 186 187// 188// Update the winning epoch. The epoch can change after 189// the synchronization phase (when leader tells the 190// follower what is the actual epoch value -- after the 191// leader gets a quorum of followers). There are other 192// possible implementations (e.g. keeping the winning 193// vote with the server -- not the ballotMaster), but 194// for now, let's just have this API to update the 195// epoch. Note that this is just a public wrapper 196// method on top of ballotMaster. 197// 198func (s *ElectionSite) UpdateWinningEpoch(epoch uint32) { 199 200 s.master.updateWinningEpoch(epoch) 201} 202 203///////////////////////////////////////////////////////////////////////////// 204// ElectionSite (Private) 205///////////////////////////////////////////////////////////////////////////// 206 207// 208// Create Vote from State 209// 210func (s *ElectionSite) createVoteFromCurState() VoteMsg { 211 212 epoch, err := s.handler.GetCurrentEpoch() 213 if err != nil { 214 // if epoch is missing, set the epoch to the smallest possible 215 // number. This is to allow the voting peers to tell me what 216 // the right epoch would be during balloting. This allows me 217 // to proceed leader election. After leader election, this 218 // node will either be a leader or follower, and it will need 219 // to synchornize with the peer's state (acceptedEpoch, currentEpoch). 220 epoch = common.BOOTSTRAP_CURRENT_EPOCH 221 } 222 223 lastLoggedTxid, err := s.handler.GetLastLoggedTxid() 224 if err != nil { 225 // if txid is missing, set the txid to the smallest possible 226 // number. This likely will cause the peer to ignore my vote. 227 lastLoggedTxid = common.BOOTSTRAP_LAST_LOGGED_TXID 228 } 229 230 lastCommittedTxid, err := s.handler.GetLastCommittedTxid() 231 if err != nil { 232 // if txid is missing, set the txid to the smallest possible 233 // number. This likely will cause the peer to ignore my vote. 234 lastCommittedTxid = common.BOOTSTRAP_LAST_COMMITTED_TXID 235 } 236 237 vote := s.factory.CreateVote(s.master.round, 238 uint32(s.handler.GetStatus()), 239 epoch, 240 s.messenger.GetLocalAddr(), // this is localhost UDP port 241 uint64(lastLoggedTxid), 242 uint64(lastCommittedTxid), 243 s.solicitOnly) 244 245 return vote 246} 247 248// 249// Tell if a particular voter is in the ensemble 250// 251func (s *ElectionSite) inEnsemble(voter net.Addr) bool { 252 for _, peer := range s.fullEnsemble { 253 if peer == voter.String() { 254 return true 255 } 256 } 257 return false 258} 259 260// 261// Create an ensemble for voting 262// 263func cloneEnsemble(peers []string, laddr string) ([]net.Addr, []string, error) { 264 265 en := make([]net.Addr, 0, len(peers)) 266 fullEn := make([]string, 0, len(peers)+1) 267 268 for i := 0; i < len(peers); i++ { 269 rAddr, err := net.ResolveUDPAddr("udp", peers[i]) 270 if err != nil { 271 return nil, nil, err 272 } 273 en = append(en, rAddr) 274 fullEn = append(fullEn, rAddr.String()) 275 } 276 277 fullEn = append(fullEn, laddr) 278 279 return en, fullEn, nil 280} 281 282///////////////////////////////////////////////////////////////////////////// 283// ballotMaster 284///////////////////////////////////////////////////////////////////////////// 285 286// 287// Create a new ballotMaster. 288// 289func newBallotMaster(site *ElectionSite) *ballotMaster { 290 291 master := &ballotMaster{site: site, 292 winner: nil, 293 round: gElectionRound, 294 inProg: false} 295 296 return master 297} 298 299// 300// Start a new round of ballot. 301// 302func (b *ballotMaster) castBallot(winnerch chan string) { 303 304 // close the channel to make sure that the caller won't be 305 // block forever. If the balltot is successful, a value would 306 // have sent to the channel before being closed. Otherwise, 307 // a closed channel without value means the ballot is not 308 // successful. 309 defer func() { 310 if r := recover(); r != nil { 311 log.Current.Errorf("panic in ballotMaster.castBallot() : %s\n", r) 312 common.SafeRun("ballotMaster.castBallot()", 313 func() { 314 b.site.Close() 315 }) 316 } 317 318 common.SafeRun("ballotMaster.castBallot()", 319 func() { 320 close(winnerch) // unblock caller 321 322 // balloting complete 323 b.setBallotInProg(false) 324 }) 325 }() 326 327 // create a channel to receive the ballot result 328 // should only be closed by Poll Worker. Make 329 // if buffered so the sender won't block. 330 resultch := make(chan bool, 1) 331 332 // Create a new ballot 333 ballot := b.createInitialBallot(resultch) 334 335 // Tell the worker to observe this ballot. This forces 336 // the worker to start collecting new ballot result. 337 b.site.worker.observe(ballot) 338 339 // let the peer to know about this ballot. It is expected 340 // that the peer will reply with a vote. 341 b.site.messenger.Multicast(ballot.result.proposed, b.site.ensemble) 342 343 success, ok := <-resultch 344 if !ok { 345 // channel close. Ballot done 346 success = false 347 } 348 349 // Announce the winner 350 if success { 351 winner, ok := b.GetWinner() 352 if ok { 353 common.SafeRun("ballotMaster.castBallot()", 354 func() { 355 // Remember the last round. 356 gElectionRound = b.round 357 // Announce the result 358 winnerch <- winner 359 }) 360 } 361 } 362} 363 364// 365// close the ballot master. 366// 367func (b *ballotMaster) close() { 368 // Nothing to do now (jsut placeholder). The current ballot 369 // is closed when the ballot resultch is closed. 370 // Instead of doing it in this method, should 371 // do it in the poll worker to avoid race condition. 372} 373 374// 375// Update the flag indicating if there is a balllot in progress. Return value 376// is true if the flag value was changed. 377// 378func (b *ballotMaster) setBallotInProg(value bool) bool { 379 b.mutex.Lock() 380 defer b.mutex.Unlock() 381 382 if b.inProg == value { 383 return false 384 } 385 386 b.inProg = value 387 return true 388} 389 390// 391// Get the next id for ballot. 392// 393func (b *ballotMaster) getNextRound() uint64 { 394 395 result := b.round 396 b.round++ 397 return result 398} 399 400// 401// Create a ballot 402// 403func (b *ballotMaster) createInitialBallot(resultch chan bool) *Ballot { 404 405 result := &ballotResult{winningEpoch: 0, 406 receivedVotes: make(map[string]VoteMsg), 407 activePeers: make(map[string]VoteMsg)} 408 409 ballot := &Ballot{result: result, 410 resultch: resultch} 411 412 b.getNextRound() 413 newVote := b.site.createVoteFromCurState() 414 ballot.updateProposed(newVote, b.site) 415 416 return ballot 417} 418 419// 420// Copy a winning vote. This function is called when 421// there is no active ballot going on. 422// 423func (b *ballotMaster) cloneWinningVote() VoteMsg { 424 425 b.mutex.Lock() 426 defer b.mutex.Unlock() 427 428 // If b.winner is not nil, then it indicates that I have concluded my leader 429 // election. 430 if b.winner != nil { 431 return b.site.factory.CreateVote( 432 b.winner.proposed.GetRound(), 433 uint32(b.site.handler.GetStatus()), 434 b.winner.winningEpoch, 435 b.winner.proposed.GetCndId(), 436 b.winner.proposed.GetCndLoggedTxnId(), 437 b.winner.proposed.GetCndCommittedTxnId(), 438 b.site.solicitOnly) 439 } 440 441 return nil 442} 443 444///////////////////////////////////////////////////////////////////////////// 445// Function for upkeeping the election state. These covers function from 446// ballotMaster and Ballot. 447///////////////////////////////////////////////////////////////////////////// 448 449// 450// Return the winner 451// 452func (b *ballotMaster) GetWinner() (string, bool) { 453 b.mutex.Lock() 454 defer b.mutex.Unlock() 455 456 if b.winner != nil { 457 return b.winner.proposed.GetCndId(), true 458 } 459 460 return "", false 461} 462 463// 464// Set the winner 465// 466func (b *ballotMaster) setWinner(result *ballotResult) { 467 b.mutex.Lock() 468 defer b.mutex.Unlock() 469 470 b.winner = result 471 b.winner.winningEpoch = result.proposed.GetEpoch() 472} 473 474// 475// Update the epcoh of the winning vote 476// 477func (b *ballotMaster) updateWinningEpoch(epoch uint32) { 478 b.mutex.Lock() 479 defer b.mutex.Unlock() 480 481 b.winner.winningEpoch = epoch 482} 483 484// 485// Set the current round. This function is there just for 486// easier to keep track of different places that set 487// the ballotMaster.round. ballotMaster.round should 488// always be in sycn with the ballot.result.proposed.round or 489// the master.winner.proposed.round. 490// 491func (b *ballotMaster) setCurrentRound(round uint64) { 492 b.mutex.Lock() 493 defer b.mutex.Unlock() 494 495 b.round = round 496} 497 498// 499// Get the current round 500// 501func (b *ballotMaster) getCurrentRound() uint64 { 502 b.mutex.Lock() 503 defer b.mutex.Unlock() 504 505 return b.round 506} 507 508// 509// Make the given vote as the proposed vote. Since pollWorker 510// executes serially, this does not need mutex. 511// 512func (b *Ballot) updateProposed(proposed VoteMsg, site *ElectionSite) { 513 514 // update the ballot 515 b.result.proposed = proposed 516 517 // esnure the ballotMaster's round matches the proposed vote. 518 // These 2 values should be always in sync. 519 site.master.setCurrentRound(proposed.GetRound()) 520 521 // update the recieved votes (for quorum) 522 b.result.receivedVotes[site.messenger.GetLocalAddr()] = proposed 523} 524 525// 526// Reset the ballot with a new proposed vote 527// 528func (b *Ballot) resetAndUpdateProposed(proposed VoteMsg, site *ElectionSite) { 529 b.result.receivedVotes = make(map[string]VoteMsg) 530 // To be safe, clean up the active peers as well. This is just to ensure 531 // when an active peers becomes an electing peer, we don't keep old votes 532 // around. This deviates from ZK (which does not clear the active votes 533 // -- possibly for faster convergence to quorum). 534 b.result.activePeers = make(map[string]VoteMsg) 535 536 // update the proposed 537 b.updateProposed(proposed, site) 538} 539 540///////////////////////////////////////////////////////////////////////////// 541// pollWorker 542///////////////////////////////////////////////////////////////////////////// 543 544// 545// Create a new pollWorker. The pollWorker listens to the Vote receving from 546// the peers for a particular ballot. 547// 548func startPollWorker(site *ElectionSite) *pollWorker { 549 550 worker := &pollWorker{site: site, 551 ballot: nil, 552 killch: make(chan bool, 1), // make sure sender won't block 553 listench: make(chan *Ballot, 1)} // make sure sender won't block 554 555 go worker.listen() 556 557 return worker 558} 559 560// 561// Notify the pollWorker that there is a new ballot. 562// 563func (w *pollWorker) observe(ballot *Ballot) { 564 // This synchronous. This is to ensure that listen() receives the ballot 565 // before this function return to the ballotMaster. 566 w.ballot = ballot 567 w.listench <- ballot 568} 569 570// 571// Close the pollWorker 572// 573func (p *pollWorker) close() { 574 p.killch <- true 575} 576 577// 578// Goroutine. Listen to vote coming from the peer for a 579// particular ballot. This is the only goroutine that 580// handle all incoming requests. 581// 582// Voter -> the peer that replies the ballot with a vote 583// Candidate -> the peer that is voted for by the voter. 584// It is the peer (CndId) that is inside the vote. 585// 586func (w *pollWorker) listen() { 587 588 // If this loop terminates (e.g. due to panic), then make sure 589 // there is no outstanding ballot waiting for a result. Close 590 // any channel for outstanding ballot such that the caller 591 // won't get blocked forever. 592 defer func() { 593 if r := recover(); r != nil { 594 log.Current.Errorf("panic in pollWorker.listen() : %s\n", r) 595 } 596 597 // make sure we close the ElectionSite first such that 598 // there is no new ballot coming while we are shutting 599 // down the pollWorker. If not, then the some go-routine 600 // may be waiting forever for the new ballot to complete. 601 common.SafeRun("pollWorker.listen()", 602 func() { 603 w.site.Close() 604 }) 605 606 // unlock anyone waiting for existing ballot to complete. 607 common.SafeRun("pollWorker.listen()", 608 func() { 609 if w.ballot != nil { 610 close(w.ballot.resultch) 611 w.ballot = nil 612 } 613 }) 614 }() 615 616 // Get the channel for receiving votes from the peer. 617 reqch := w.site.messenger.DefaultReceiveChannel() 618 619 timeout := common.NewBackoffTimer( 620 common.BALLOT_TIMEOUT*time.Millisecond, 621 common.BALLOT_MAX_TIMEOUT*time.Millisecond, 622 2, 623 ) 624 625 inFinalize := false 626 finalizeTimer := common.NewStoppedResettableTimer(common.BALLOT_FINALIZE_WAIT * time.Millisecond) 627 628 for { 629 select { 630 case w.ballot = <-w.listench: // listench should never close 631 { 632 // Before listening to any vote, see if we reach quorum already. 633 // This should only happen if there is only one server in the 634 // ensemble. If this election is for solicit purpose, then 635 // run election all the time. 636 if !w.site.solicitOnly && 637 w.checkQuorum(w.ballot.result.receivedVotes, w.ballot.result.proposed) { 638 w.site.master.setWinner(w.ballot.result) 639 w.ballot.resultch <- true 640 w.ballot = nil 641 } else { 642 // There is a new ballot. 643 timeout.Reset() 644 inFinalize = false 645 finalizeTimer.Stop() 646 } 647 } 648 // Receiving a vote 649 case msg, ok := <-reqch: 650 { 651 if !ok { 652 return 653 } 654 655 // Receive a new vote. The voter is identified by its UDP port, 656 // which must remain the same during the election phase. 657 vote := msg.Content.(VoteMsg) 658 voter := msg.Peer 659 660 // If I am receiving a vote that just for soliciting my response, 661 // then respond with my winning vote only after I am confirmed as 662 // either a leader or follower. This ensure that the watcher will 663 // only find a leader from a stable ensemble. This also ensures 664 // that the watcher will only count the votes from active participant, 665 // therefore, it will not count from other watcher as well as its 666 // own vote (code path for handling votes from electing member will 667 // never called for watcher). 668 if vote.GetSolicit() { 669 status := w.site.handler.GetStatus() 670 if status == LEADING || status == FOLLOWING { 671 w.respondInquiry(voter, vote) 672 } 673 continue 674 } 675 676 // Check if the voter is in the ensemble 677 if !w.site.inEnsemble(voter) { 678 continue 679 } 680 681 if w.ballot == nil { 682 // If there is no ballot or the vote is from a watcher, 683 // then just need to respond if I have a winner. 684 w.respondInquiry(voter, vote) 685 continue 686 } 687 688 timeout.Reset() 689 690 proposed := w.cloneProposedVote() 691 if w.handleVote(voter, vote) { 692 proposedUpdated := 693 w.compareVote(w.ballot.result.proposed, proposed) != common.EQUAL 694 695 if !inFinalize || proposedUpdated { 696 inFinalize = true 697 finalizeTimer.Reset() 698 } 699 } else { 700 if inFinalize { 701 // we had a quorum but not anymore 702 inFinalize = false 703 finalizeTimer.Stop() 704 } 705 } 706 } 707 case <-finalizeTimer.C: 708 { 709 // we achieve quorum, set the winner. 710 // setting the winner and usetting the ballot 711 // should be done together. 712 // NOTE: ZK does not notify other peers when this node has 713 // select a leader 714 w.site.master.setWinner(w.ballot.result) 715 w.ballot.resultch <- true 716 w.ballot = nil 717 timeout.Stop() 718 } 719 case <-timeout.GetChannel(): 720 { 721 // If there is a timeout but no response, send vote again. 722 if w.ballot != nil { 723 w.site.messenger.Multicast(w.cloneProposedVote(), w.site.ensemble) 724 timeout.Backoff() 725 } 726 } 727 case <-w.killch: 728 { 729 return 730 } 731 } 732 } 733} 734 735// 736// The pollWorker is no longer in election. Respond to inquiry from 737// the peer. 738// 739func (w *pollWorker) respondInquiry(voter net.Addr, vote VoteMsg) { 740 741 if PeerStatus(vote.GetStatus()) == ELECTING { 742 // Make sure that we only send this when there is a winning vote, such 743 // that the vote has a majority support. 744 msg := w.site.master.cloneWinningVote() 745 if msg != nil { 746 // send the winning vote if there is no error 747 w.site.messenger.Send(msg, voter) 748 } 749 } 750} 751 752// 753// Handle a new vote. 754// 755func (w *pollWorker) handleVote(voter net.Addr, vote VoteMsg) bool { 756 757 if PeerStatus(vote.GetStatus()) == ELECTING { 758 // if peer is still in election 759 return w.handleVoteForElectingPeer(voter, vote) 760 } else { 761 // if peer is either leading or following 762 return w.handleVoteForActivePeer(voter, vote) 763 } 764} 765 766// 767// Handle a new vote if peer is electing. 768// 769func (w *pollWorker) handleVoteForElectingPeer(voter net.Addr, vote VoteMsg) bool { 770 771 // compare the round. When there are electing peers, they will eventually 772 // converge to the same round when quorum is reached. This implies that 773 // an established ensemble should share the same round, and this value 774 // remains stable for the ensemble. 775 compareRound := w.compareRound(vote) 776 777 // if the incoming vote has a greater round, re-ballot. 778 if compareRound == common.GREATER { 779 780 // update the current round. This need to be done 781 // before updateProposed() is called. 782 w.site.master.setCurrentRound(vote.GetRound()) 783 784 if w.compareVoteWithCurState(vote) == common.GREATER { 785 // Update my vote if the incoming vote is larger. 786 w.ballot.resetAndUpdateProposed(vote, w.site) 787 } else { 788 // otherwise udpate my vote using lastLoggedTxid 789 w.ballot.resetAndUpdateProposed(w.site.createVoteFromCurState(), w.site) 790 } 791 792 // notify that our new vote 793 w.site.messenger.Multicast(w.cloneProposedVote(), w.site.ensemble) 794 795 // if we reach quorum with this vote, announce the result 796 // and stop election 797 return w.acceptAndCheckQuorum(voter, vote) 798 799 } else if compareRound == common.EQUAL { 800 // if it is the same round and the incoming vote has higher epoch or txid, 801 // update myself to the incoming vote and broadcast my new vote 802 switch w.compareVoteWithProposed(vote) { 803 case common.GREATER: 804 // update and notify that our new vote 805 w.ballot.updateProposed(vote, w.site) 806 w.site.messenger.Multicast(w.cloneProposedVote(), w.site.ensemble) 807 808 // Add this vote to the received list. Note that even if 809 // the peer went down there is network partition after the 810 // vote is being sent by peer, we still count this vote. 811 // If somehow we got the wrong leader because of this, we 812 // not be able to finish in the discovery/sync phase anyway, 813 // and a new election will get started. 814 815 // If I believe I am chosen as a leader in the election 816 // and the network is partitioned afterwards. The 817 // sychonization phase will check if I do get a majorty 818 // of followers connecting to me before proceeding. So 819 // for now, I can return as long as I reach quorum and 820 // let subsequent phase to do more checking. 821 822 return w.acceptAndCheckQuorum(voter, vote) 823 case common.EQUAL: 824 return w.acceptAndCheckQuorum(voter, vote) 825 } 826 } else { 827 // My round is higher. Send back the notification to the sender with my round 828 w.site.messenger.Send(w.cloneProposedVote(), voter) 829 } 830 831 return false 832} 833 834// 835// Handle a new vote from a leader or follower. This implies that this vote 836// has already reached quorum and this node belongs to the quorum. When we 837// reach this method, it can be: 838// 1) An new ensemble is converging from a set of electing nodes. So nodes are 839// reaching this conclusion faster than I am. 840// 2) I am joining an established ensemble (I rejoin the network or restart). 841// 3) A node from an established ensemble responds to me, but the ensemble 842// could soon be dissolve (lose majority) after the node sends the message. 843// 4) An rogue node re-join the network while I am running election 844// (due to bug/race condition?). 845// 846func (w *pollWorker) handleVoteForActivePeer(voter net.Addr, vote VoteMsg) bool { 847 848 // compare the round 849 compareRound := w.compareRound(vote) 850 851 if compareRound == common.EQUAL { 852 // If I recieve a vote with the same round, then it could mean 853 // that an esemble is forming from a set of electing peers. Add 854 // this vote to the list of received votes. All the received votes 855 // are from the same round. If we get a quorum from the received 856 // votes, then announce the result. 857 // NOTE: ZK does not check the epoch nor update the proposed vote upon 858 // receiving a vote from an active member (unlike receiving a vote from 859 // electing peer). This implies that if this is a rogue vote (a node 860 // sends out a vote and the ensemble loses majority), the election alogrithm 861 // will not get affected -- it can still converge if there is majority of 862 // electing peer to reach quorum. If the established ensmeble remains stable, 863 // then there should be enough active member responds to me and I will 864 // eventually reach quorum (based on ballot.result.activePeers -- see below). 865 w.ballot.result.receivedVotes[voter.String()] = vote 866 867 if w.checkQuorum(w.ballot.result.receivedVotes, vote) && w.certifyLeader(vote) { 868 // accept this vote from the peer 869 w.ballot.updateProposed(vote, w.site) 870 return true 871 } 872 } 873 874 // The active peer has chosen a leader, but we cannot confirm it yet. 875 // Keep the active peer onto a different list, since receivedVotes 876 // can be reset (all received votes must be from the same round). 877 // If this peer goes down after sending us his vote, his vote still count 878 // in this ballot. By calling certifyLeader(), we can also makes sure that 879 // the candidate has established itself to us as a leader. 880 w.ballot.result.activePeers[voter.String()] = vote 881 882 // Check the quorum only for the active peers. In this case, the vote 883 // can have a different round than mime. There may already be an established 884 // ensemble and I am merely trying to join them. 885 if w.checkQuorum(w.ballot.result.activePeers, vote) && w.certifyLeader(vote) { 886 887 w.ballot.updateProposed(vote, w.site) 888 return true 889 } 890 891 return false 892} 893 894// 895// Compare the current round with the given vote 896// 897func (w *pollWorker) compareRound(vote VoteMsg) common.CompareResult { 898 899 currentRound := w.site.master.round 900 901 if vote.GetRound() == currentRound { 902 return common.EQUAL 903 } 904 905 if vote.GetRound() > currentRound { 906 return common.GREATER 907 } 908 909 return common.LESSER 910} 911 912// 913// Compare two votes. Return true if vote1 is larger than vote2. 914// 915func (w *pollWorker) compareVote(vote1, vote2 VoteMsg) common.CompareResult { 916 917 // Vote with the larger epoch always is larger 918 result := common.CompareEpoch(vote1.GetEpoch(), vote2.GetEpoch()) 919 920 if result == common.MORE_RECENT { 921 return common.GREATER 922 } 923 924 if result == common.LESS_RECENT { 925 return common.LESSER 926 } 927 928 // If a candidate has a larger logged txid, it means the candidate 929 // has processed more proposals. This vote is larger. 930 if vote1.GetCndLoggedTxnId() > vote2.GetCndLoggedTxnId() { 931 return common.GREATER 932 } 933 934 if vote1.GetCndLoggedTxnId() < vote2.GetCndLoggedTxnId() { 935 return common.LESSER 936 } 937 938 // This candidate has the same number of proposals in his committed log as 939 // the other one. But if a candidate has a larger committed txid, 940 // it means this candidate also has processed more commit messages from the 941 // previous leader. This vote is larger. 942 if vote1.GetCndCommittedTxnId() > vote2.GetCndCommittedTxnId() { 943 return common.GREATER 944 } 945 946 if vote1.GetCndCommittedTxnId() < vote2.GetCndCommittedTxnId() { 947 return common.LESSER 948 } 949 950 // All else is equal (e.g. during inital system startup -- repository is emtpy), 951 // use the ip address. 952 if vote1.GetCndId() > vote2.GetCndId() { 953 return common.GREATER 954 } 955 956 if vote1.GetCndId() < vote2.GetCndId() { 957 return common.LESSER 958 } 959 960 return common.EQUAL 961} 962 963// 964// Compare the given vote with currennt state (epoch, lastLoggedTxnid) 965// 966func (w *pollWorker) compareVoteWithCurState(vote VoteMsg) common.CompareResult { 967 968 vote2 := w.site.createVoteFromCurState() 969 return w.compareVote(vote, vote2) 970} 971 972// 973// Compare the given vote with proposed vote 974// 975func (w *pollWorker) compareVoteWithProposed(vote VoteMsg) common.CompareResult { 976 977 return w.compareVote(vote, w.ballot.result.proposed) 978} 979 980// 981// Accept the check quorum 982// 983func (w *pollWorker) acceptAndCheckQuorum(voter net.Addr, vote VoteMsg) bool { 984 985 // Remember this peer's vote. Note that ZK never takes away a voter's votes 986 // even if the voter has gone down (ZK would not know). But ZK will ensure 987 // that the new leader will have a quorum of followers (in synchronization/recovery 988 // phase) before the ensemble become stable. 989 w.ballot.result.receivedVotes[voter.String()] = vote 990 return w.checkQuorum(w.ballot.result.receivedVotes, w.ballot.result.proposed) 991 992 //TODO: After quorum is reached, ZK will wait to see if there is any additional 993 //messages that will change leader. It is possibly an optimization because if 994 //it is a wrong leader, it will fail during discovery/sync and force a new 995 //re-election. 996} 997 998// 999// Check Quorum 1000// 1001func (w *pollWorker) checkQuorum(votes map[string]VoteMsg, candidate VoteMsg) bool { 1002 1003 count := 0 1004 for _, vote := range votes { 1005 if PeerStatus(vote.GetStatus()) == ELECTING || 1006 PeerStatus(candidate.GetStatus()) == ELECTING { 1007 if w.compareVote(vote, candidate) == common.EQUAL && 1008 vote.GetRound() == candidate.GetRound() { 1009 count++ 1010 } 1011 } else if vote.GetCndId() == candidate.GetCndId() && 1012 vote.GetEpoch() == candidate.GetEpoch() { 1013 count++ 1014 } 1015 } 1016 1017 return w.site.handler.GetQuorumVerifier().HasQuorum(count) 1018} 1019 1020// 1021// Copy a proposed vote 1022// 1023func (w *pollWorker) cloneProposedVote() VoteMsg { 1024 1025 // w.site.master.round should be in sycn with 1026 // w.ballot.result.proposed.round. Use w.site.master.round 1027 // to be consistent. 1028 return w.site.factory.CreateVote(w.site.master.round, 1029 uint32(w.site.handler.GetStatus()), 1030 uint32(w.ballot.result.proposed.GetEpoch()), 1031 w.ballot.result.proposed.GetCndId(), 1032 w.ballot.result.proposed.GetCndLoggedTxnId(), 1033 w.ballot.result.proposed.GetCndCommittedTxnId(), 1034 w.site.solicitOnly) 1035} 1036 1037// 1038// Certify the leader before declaring followship 1039// 1040func (w *pollWorker) certifyLeader(vote VoteMsg) bool { 1041 1042 // I am not voted as leader 1043 if vote.GetCndId() != w.site.messenger.GetLocalAddr() { 1044 // The leader must be known to me as active 1045 leaderVote, ok := w.ballot.result.activePeers[vote.GetCndId()] 1046 if ok && PeerStatus(leaderVote.GetStatus()) == LEADING { 1047 return true 1048 } 1049 return false 1050 } 1051 1052 // If someone voting me as a leader, make sure that we have the same round 1053 return w.site.master.round == vote.GetRound() 1054} 1055 1056///////////////////////////////////////////////////////////////////////////// 1057// Private Function 1058///////////////////////////////////////////////////////////////////////////// 1059 1060func newMessenger(laddr string) (*common.PeerMessenger, error) { 1061 1062 messenger, err := common.NewPeerMessenger(laddr, nil) 1063 if err != nil { 1064 return nil, err 1065 } 1066 1067 return messenger, nil 1068} 1069