1// @author Couchbase <info@couchbase.com> 2// @copyright 2014 Couchbase, Inc. 3// 4// Licensed under the Apache License, Version 2.0 (the "License"); 5// you may not use this file except in compliance with the License. 6// You may obtain a copy of the License at 7// 8// http://www.apache.org/licenses/LICENSE-2.0 9// 10// Unless required by applicable law or agreed to in writing, software 11// distributed under the License is distributed on an "AS IS" BASIS, 12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13// See the License for the specific language governing permissions and 14// limitations under the License. 15 16package protocol 17 18import ( 19 "fmt" 20 "github.com/couchbase/gometa/common" 21 "github.com/couchbase/gometa/log" 22 "sync" 23 "sync/atomic" 24) 25 26///////////////////////////////////////////////// 27// Proposal Processing for ZK 28///////////////////////////////////////////////// 29 30// 1) TCP is used as the network protocol (guarantee packet ordering) 31// 2) The leader sends the proposal sequentially based on the order of 32// received requests. Due to (1), it expects the proposal to be 33// arrived at each follower in the same order. 34// 3) Each follower processes the proposal in the same sequential order 35// Therefore, the ack/accept is returned in the same order as the proposal. 36// 4) In the follower, proposal and commit can be processed out-of-order. 37// These 2 messages go through different queues on the follower side. 38// 5) The leader has a dedicated go-routine (2 threads in ZK) to send/recieve 39// proposal/commit for a specific follower. If messaging err-out, 40// it will close the socket. This, in turn, will terminate both the sending 41// and recieving threads and force the go-routine to shutdown. 42// In doing so, the leader will also remove the follower. The leader will 43// listen to any new socket connection to re-estabilish communication with the follower. 44// 6) When the follower fails to send a Ack/Accept to the leader, it will close the socket. 45// This, in turn, will shutdown the follower. The main thread will be back to 46// election (QuorumPeer in "looking" state). 47// 7) When the follower re-connect to the leader, the leader will go through the following: 48// a) open the commit log and re-send {proposals, committed messages} to the follower. 49// The leader will hold the read lock on the commit log as to avoid any concurrent 50// commit being written to the log. 51// b) new pending {proposal, commit} will be sent to follower. 52// c) new pending proposal will also be sent (note it is synchornized such that 53// no pending proposal is added during this step). 54// d) add follower as the participant of future proposal. 55// 8) Due to (7), the proposal must be committed serially in order. The proposal cannot 56// be skipped. The voting can stall if the leader lose the majority (by design). 57// 58 59///////////////////////////////////////////////// 60// Type Declaration 61///////////////////////////////////////////////// 62 63type Leader struct { 64 naddr string 65 handler ActionHandler 66 factory MsgFactory 67 reqHandler CustomRequestHandler 68 69 notifications chan *notification 70 lastCommitted common.Txnid 71 quorums map[common.Txnid][]string 72 proposals map[common.Txnid]ProposalMsg 73 74 // mutex protected variable 75 mutex sync.Mutex 76 followers map[string]*messageListener 77 watchers map[string]*messageListener 78 observers map[string]*observer 79 isClosed bool 80 changech chan bool // notify membership of active followers have changed 81 idGen *IdGen 82} 83 84// AddWatcher and AddFollower should safely handle the situation where 85// there is exising watcher/follower having same fid. If a listener 86// with same "fid" already exists, the termination of existing 87// listener is triggered and new listener is added to the list 88// of watchers/followers. As the termination of the existing listener is 89// an asynchronous operation, there is a small time interval when there 90// will be two listeners in the system having same fid. To identify these 91// two listeners separately, "listenerid" will be used. 92 93type messageListener struct { 94 fid string 95 pipe *common.PeerPipe 96 leader *Leader 97 killch chan bool 98 listenerid uint64 99} 100 101type notification struct { 102 // follower message 103 fid string 104 payload common.Packet 105} 106 107///////////////////////////////////////////////// 108// Leader - Public Function 109///////////////////////////////////////////////// 110 111// 112// Create a new leader 113// 114func NewLeader(naddr string, 115 handler ActionHandler, 116 factory MsgFactory) (leader *Leader, err error) { 117 118 leader = &Leader{naddr: naddr, 119 followers: make(map[string]*messageListener), 120 watchers: make(map[string]*messageListener), 121 observers: make(map[string]*observer), 122 quorums: make(map[common.Txnid][]string), 123 proposals: make(map[common.Txnid]ProposalMsg), 124 notifications: make(chan *notification, common.MAX_PROPOSALS), 125 handler: handler, 126 factory: factory, 127 isClosed: false, 128 reqHandler: nil, 129 changech: make(chan bool, common.MAX_PEERS), // make it buffered so sender won't block 130 idGen: newIdGen(), 131 } 132 133 // This is initialized to the lastCommitted in repository. Subsequent commit() will update this 134 // field to the latest committed txnid. This field is used for ensuring the commit order is preserved. 135 // The leader will commit the proposal in the strict order as it arrives as to preserve serializability. 136 leader.lastCommitted, err = handler.GetLastCommittedTxid() 137 if err != nil { 138 return nil, err 139 } 140 141 // start a listener go-routine. This will be closed when the leader terminate. 142 go leader.listen() 143 144 return leader, nil 145} 146 147func NewLeaderWithCustomHandler(naddr string, 148 handler ActionHandler, 149 factory MsgFactory, 150 reqHandler CustomRequestHandler) (leader *Leader, err error) { 151 152 leader = &Leader{naddr: naddr, 153 followers: make(map[string]*messageListener), 154 watchers: make(map[string]*messageListener), 155 observers: make(map[string]*observer), 156 quorums: make(map[common.Txnid][]string), 157 proposals: make(map[common.Txnid]ProposalMsg), 158 notifications: make(chan *notification, common.MAX_PROPOSALS), 159 handler: handler, 160 factory: factory, 161 isClosed: false, 162 reqHandler: reqHandler, 163 changech: make(chan bool, common.MAX_PEERS), // make it buffered so sender won't block 164 idGen: newIdGen(), 165 } 166 167 // This is initialized to the lastCommitted in repository. Subsequent commit() will update this 168 // field to the latest committed txnid. This field is used for ensuring the commit order is preserved. 169 // The leader will commit the proposal in the strict order as it arrives as to preserve serializability. 170 leader.lastCommitted, err = handler.GetLastCommittedTxid() 171 if err != nil { 172 return nil, err 173 } 174 175 // start a listener go-routine. This will be closed when the leader terminate. 176 go leader.listen() 177 178 return leader, nil 179} 180 181// 182// Terminate the leader. It is an no-op if the leader is already 183// completed successfully. 184// 185func (l *Leader) Terminate() { 186 187 l.mutex.Lock() 188 defer l.mutex.Unlock() 189 190 if !l.isClosed { 191 l.isClosed = true 192 for _, listener := range l.followers { 193 listener.terminate() 194 } 195 for _, listener := range l.watchers { 196 listener.terminate() 197 } 198 common.SafeRun("Leader.Terminate()", 199 func() { 200 close(l.notifications) 201 }) 202 } 203} 204 205// 206// Has the leader terminated/closed? 207// 208func (l *Leader) IsClosed() bool { 209 l.mutex.Lock() 210 defer l.mutex.Unlock() 211 212 return l.isClosed 213} 214 215// 216// Get the channel for notify when the ensemble of followers 217// changes. The receiver of the channel can then tell 218// if the leader has a quorum of followers. 219// 220func (l *Leader) GetEnsembleChangeChannel() <-chan bool { 221 l.mutex.Lock() 222 defer l.mutex.Unlock() 223 224 return l.changech 225} 226 227// 228// Get the current ensmeble size of the leader. 229// It is the number of followers + 1 (including leader) 230// 231func (l *Leader) GetActiveEnsembleSize() int { 232 l.mutex.Lock() 233 defer l.mutex.Unlock() 234 235 return len(l.followers) + 1 236} 237 238// 239// Add a watcher. If the leader is terminated, the pipe between leader 240// and watcher will also be closed. 241// 242func (l *Leader) AddWatcher(fid string, 243 peer *common.PeerPipe, 244 o *observer) { 245 l.mutex.Lock() 246 defer l.mutex.Unlock() 247 248 // AddWatcher requires holding the mutex such that the leader thread 249 // will not be sending new proposal or commit (see sendProposal() and 250 // sendCommit()) to watchers. This allow this function to copy the 251 // proposals and commits from the observer queue into the pipe, before 252 // the leader has a chance to send new messages. 253 for packet := o.getNext(); packet != nil; packet = o.getNext() { 254 255 switch request := packet.(type) { 256 case ProposalMsg: 257 txid := common.Txnid(request.GetTxnid()) 258 log.Current.Debugf("Leader.AddWatcher() : send observer's packet %s, txid %d", packet.Name(), txid) 259 case CommitMsg: 260 txid := common.Txnid(request.GetTxnid()) 261 log.Current.Debugf("Leader.AddWatcher() : send observer's packet %s, txid %d", packet.Name(), txid) 262 } 263 264 peer.Send(packet) 265 } 266 267 oldListener, ok := l.watchers[fid] 268 listener := newListener(fid, peer, l, l.idGen.getNextId()) 269 l.watchers[fid] = listener 270 go listener.start() 271 272 // kill the old message listener 273 if ok && oldListener != nil { 274 log.Current.Debugf("Leader.AddWatcher() : old Listener found for watcher %s. Terminating old listener", fid) 275 oldListener.terminate() 276 } 277} 278 279// 280// Add a follower and starts a listener for the follower. 281// If the leader is terminated, the pipe between leader 282// and follower will also be closed. 283// 284func (l *Leader) AddFollower(fid string, 285 peer *common.PeerPipe, 286 o *observer) { 287 l.mutex.Lock() 288 defer l.mutex.Unlock() 289 290 // AddFollower requires holding the mutex such that the leader thread 291 // will not be sending new proposal or commit (see sendProposal() and 292 // sendCommit()) to followers. This allow this function to copy the 293 // proposals and commits from the observer queue into the pipe, before 294 // the leader has a chance to send new messages. 295 for packet := o.getNext(); packet != nil; packet = o.getNext() { 296 297 switch request := packet.(type) { 298 case ProposalMsg: 299 txid := common.Txnid(request.GetTxnid()) 300 log.Current.Debugf("Leader.AddFollower() : send observer's packet %s, txid %d", packet.Name(), txid) 301 case CommitMsg: 302 txid := common.Txnid(request.GetTxnid()) 303 log.Current.Debugf("Leader.AddFollower() : send observer's packet %s, txid %d", packet.Name(), txid) 304 } 305 306 peer.Send(packet) 307 } 308 309 oldListener, ok := l.followers[fid] 310 listener := newListener(fid, peer, l, l.idGen.getNextId()) 311 l.followers[fid] = listener 312 go listener.start() 313 314 // kill the old message listener 315 if ok && oldListener != nil { 316 log.Current.Debugf("Leader.AddFollower() : old Listener found for follower %s. Terminating old listener", fid) 317 oldListener.terminate() 318 } else { 319 // notify a brand new follower (not just replacing an existing one) 320 l.changech <- true 321 } 322} 323 324// Return the follower ID. The leader is a follower to itself. 325// 326func (l *Leader) GetFollowerId() string { 327 return l.handler.GetFollowerId() 328} 329 330///////////////////////////////////////////////////////// 331// Leader - Public Function : Observer 332///////////////////////////////////////////////////////// 333 334// 335// Add observer 336// 337func (l *Leader) AddObserver(id string, o *observer) { 338 l.mutex.Lock() 339 defer l.mutex.Unlock() 340 341 l.observers[id] = o 342} 343 344// 345// Remove observer 346// 347func (l *Leader) RemoveObserver(id string) { 348 l.mutex.Lock() 349 defer l.mutex.Unlock() 350 351 observer, ok := l.observers[id] 352 if ok { 353 observer.close() 354 } 355 delete(l.observers, id) 356} 357 358func (l *Leader) QueueRequest(fid string, req common.Packet) { 359 n := ¬ification{fid: fid, payload: req} 360 l.notifications <- n 361} 362 363func (l *Leader) QueueResponse(req common.Packet) { 364 n := ¬ification{fid: l.GetFollowerId(), payload: req} 365 l.notifications <- n 366} 367 368///////////////////////////////////////////////// 369// messageListener 370///////////////////////////////////////////////// 371 372// Create a new listener 373func newListener(fid string, pipe *common.PeerPipe, leader *Leader, 374 listenerid uint64) *messageListener { 375 376 return &messageListener{fid: fid, 377 pipe: pipe, 378 leader: leader, 379 killch: make(chan bool, 1), 380 listenerid: listenerid, 381 } 382} 383 384// 385// Gorountine. Start listener to listen to message from follower. 386// Note that each follower has their own receive queue. This 387// is to ensure if the queue is filled up for a single follower, 388// only that the connection of that follower may get affected. 389// The listener can be killed by calling terminate() or closing 390// the PeerPipe. 391// 392func (l *messageListener) start() { 393 394 defer func() { 395 if r := recover(); r != nil { 396 log.Current.Errorf("panic in messageListener.start() : %s\n", r) 397 log.Current.Errorf("%s", log.Current.StackTrace()) 398 } else { 399 log.Current.Debugf("leader's messageListener.start() terminates.") 400 log.Current.Tracef(log.Current.StackTrace()) 401 } 402 403 common.SafeRun("messageListener.start()", 404 func() { 405 l.leader.removeListener(l) 406 }) 407 408 common.SafeRun("messageListener.start()", 409 func() { 410 l.pipe.Close() 411 }) 412 }() 413 414 log.Current.Debugf("messageListener.start(): start listening to message from peer %s", l.fid) 415 reqch := l.pipe.ReceiveChannel() 416 417 for { 418 select { 419 case req, ok := <-reqch: 420 if ok { 421 // TODO: Let's say send is blocked because l.notifications is full, will it becomes unblock 422 // when leader.notifications is unblock. 423 l.leader.QueueRequest(l.fid, req) 424 } else { 425 // The channel is closed. Need to shutdown the listener. 426 log.Current.Infof("messageListener.start(): message channel closed. Remove peer %s as follower.", l.fid) 427 return 428 } 429 case <-l.killch: 430 log.Current.Debugf("messageListener.start(): Listener for %s receive kill signal. Terminate.", l.fid) 431 return 432 433 } 434 } 435} 436 437// 438// Terminate the listener. This should only be called by the leader. 439// 440func (l *messageListener) terminate() { 441 l.killch <- true 442} 443 444func (l *messageListener) getListenerid() uint64 { 445 return l.listenerid 446} 447 448///////////////////////////////////////////////// 449// IdGen 450// In-memory unique ID generator using a counter. 451// Used for message listener ids. 452///////////////////////////////////////////////// 453 454type IdGen struct { 455 id uint64 456} 457 458func newIdGen() *IdGen { 459 return &IdGen{} 460} 461 462func (g *IdGen) getNextId() uint64 { 463 return atomic.AddUint64(&g.id, uint64(1)) 464} 465 466///////////////////////////////////////////////////// 467// Leader - Private Function : Listener Management 468///////////////////////////////////////////////////// 469 470// 471// Remove the follower from being tracked 472// 473func (l *Leader) removeListener(peer *messageListener) { 474 l.mutex.Lock() 475 defer l.mutex.Unlock() 476 477 follower, ok := l.followers[peer.fid] 478 if ok && follower != nil { 479 if follower.getListenerid() == peer.getListenerid() { 480 delete(l.followers, peer.fid) 481 } 482 } 483 484 watcher, ok := l.watchers[peer.fid] 485 if ok && watcher != nil { 486 if watcher.getListenerid() == peer.getListenerid() { 487 delete(l.watchers, peer.fid) 488 } 489 } 490 491 l.changech <- true 492} 493 494///////////////////////////////////////////////////////// 495// Leader - Private Function : Message Processing 496///////////////////////////////////////////////////////// 497 498// 499// Main processing message loop for leader. 500// 501func (l *Leader) listen() { 502 defer func() { 503 if r := recover(); r != nil { 504 log.Current.Errorf("panic in Leader.listen() : %s\n", r) 505 log.Current.Errorf("%s", log.Current.StackTrace()) 506 } else { 507 log.Current.Debugf("Leader.listen() terminates.") 508 log.Current.Tracef(log.Current.StackTrace()) 509 } 510 511 common.SafeRun("Leader.listen()", 512 func() { 513 l.Terminate() 514 }) 515 }() 516 517 log.Current.Debugf("Leader.listen(): start listening to message for leader") 518 519 for { 520 select { 521 case msg, ok := <-l.notifications: 522 if ok { 523 if !l.IsClosed() { 524 err := l.handleMessage(msg.payload, msg.fid) 525 if err != nil { 526 log.Current.Errorf("Leader.listen(): Encounter error when processing message %s. Error %s. Terminate", 527 msg.fid, err.Error()) 528 return 529 } 530 } else { 531 log.Current.Debugf("Leader.listen(): Leader is closed. Terminate message processing loop.") 532 return 533 } 534 } else { 535 // The channel is closed. 536 log.Current.Debugf("Leader.listen(): message channel closed. Terminate message processing loop for leader.") 537 return 538 } 539 } 540 } 541} 542 543// 544// Handle an incoming message based on its type. All incoming messages from followers are processed serially 545// (by Leader.listen()). Therefore, the order of corresponding outbound messages (proposal, commit) will be placed 546// in the same order into each follower's pipe. This implies that we can use 2 state variables (LastLoggedTxid and 547// LastCommittedTxid) to determine which peer has the latest repository. This, in turn, enforces stronger serializability 548// semantics, since we won't have a case where one peer may have a higher LastLoggedTxid while another has a higher 549// LastCommittedTxid. 550// 551func (l *Leader) handleMessage(msg common.Packet, follower string) (err error) { 552 553 err = nil 554 switch request := msg.(type) { 555 case RequestMsg: 556 if common.IsCustomOpCode(common.OpCode(request.GetOpCode())) { 557 if l.reqHandler != nil { 558 l.reqHandler.OnNewRequest(follower, request) 559 } else { 560 log.Current.Debugf("Leader.handleMessage(): No custom request handler registered to handle custom request.") 561 response := l.factory.CreateResponse(follower, request.GetReqId(), "No custom request handler", nil) 562 l.sendResponse(response) 563 } 564 } else { 565 err = l.createProposal(follower, request) 566 } 567 case AcceptMsg: 568 err = l.handleAccept(request) 569 case ResponseMsg: 570 l.sendResponse(request) 571 default: 572 // TODO: Should throw exception. There is a possiblity that there is another leader. 573 log.Current.Infof("Leader.handleMessage(): Leader unable to process message of type %s. Ignore message.", request.Name()) 574 } 575 return err 576} 577 578///////////////////////////////////////////////////////////////////////// 579// Leader - Private Function : Handle Request Message (New Proposal) 580///////////////////////////////////////////////////////////////////////// 581 582// 583// Create a new proposal from request 584// 585func (l *Leader) createProposal(host string, req RequestMsg) error { 586 587 // This should be the only place to call GetNextTxnId(). This function 588 // can panic if the txnid overflows. In this case, this should terminate 589 // the leader and forces a new election for getting a new epoch. ZK has the 590 // same behavior. 591 txnid := l.handler.GetNextTxnId() 592 log.Current.Debugf("Leader.createProposal(): New Proposal : Txnid %d (Epoch %d, Counter %d)", 593 txnid, txnid.GetEpoch(), txnid.GetCounter()) 594 595 // Create a new proposal 596 proposal := l.factory.CreateProposal(uint64(txnid), 597 host, // this is the host the originates the request 598 req.GetReqId(), 599 req.GetOpCode(), 600 req.GetKey(), 601 req.GetContent()) 602 603 return l.newProposal(proposal) 604} 605 606// 607// Handle a new proposal 608// 609func (l *Leader) newProposal(proposal ProposalMsg) error { 610 611 // Call out to log the proposal. Always do this first before 612 // sending to followers. 613 err := l.handler.LogProposal(proposal) 614 if err != nil { 615 if _, ok := err.(*common.RecoverableError); ok { 616 /// update the last committed to advacne the txnid. 617 l.lastCommitted = common.Txnid(proposal.GetTxnid()) 618 l.sendAbort(proposal.GetFid(), proposal.GetReqId(), err.Error()) 619 return nil 620 } 621 622 // If fails to log the proposal, return the error. 623 // This can cause the leader to re-elect. Just to handle 624 // case where there is hardware failure or repository 625 // corruption. 626 return err 627 } 628 629 // The leader votes for itself 630 l.updateQuorum(common.Txnid(proposal.GetTxnid()), l.GetFollowerId()) 631 632 // remember this proposal so I can commmit it later 633 l.proposals[common.Txnid(proposal.GetTxnid())] = proposal 634 635 // Send the proposal to follower 636 l.sendProposal(proposal) 637 638 // check if proposal has quorum (if ensembleSize <= 2). Make sure that this 639 // is after sendProposal() so that we can still send proposal to follower BEFORE 640 // we send the commit message. 641 if l.hasQuorum(common.Txnid(proposal.GetTxnid())) { 642 // proposal has quorum. Now commit. If cannot commit, then return error 643 // which will cause the leader to re-election. Just to handle 644 // case where there is hardware failure or repository corruption. 645 err := l.commit(common.Txnid(proposal.GetTxnid())) 646 if err != nil { 647 return err 648 } 649 } 650 651 return nil 652} 653 654// 655// send the proposal to the followers 656// 657func (l *Leader) sendProposal(proposal ProposalMsg) { 658 659 l.mutex.Lock() 660 defer l.mutex.Unlock() 661 662 // Send the request to the followers. Each follower 663 // has a pipe (channel) and there is separate go-routine 664 // that will read from the channel and send to the follower 665 // through reliable connection (TCP). If the connection is 666 // broken, then the follower will go through election again, 667 // and the follower will re-sync the repository with the leader. 668 // Therefore, we don't have to worry about re-sending proposal 669 // to disconnected follower here. 670 // 671 // If the leader cannot send to a follower, then pipe will 672 // be broken and the leader will take the follower out. If 673 // the leader looses majority of followers, it will bring 674 // itself into election again. So we can process this 675 // asynchronously without worrying about sending the message 676 // to majority of followers. If that can't be done, the 677 // leader re-elect. 678 // 679 msg := l.factory.CreateProposal(proposal.GetTxnid(), 680 proposal.GetFid(), 681 proposal.GetReqId(), 682 proposal.GetOpCode(), 683 proposal.GetKey(), 684 proposal.GetContent()) 685 686 for _, f := range l.followers { 687 f.pipe.Send(msg) 688 } 689 690 for _, w := range l.watchers { 691 w.pipe.Send(msg) 692 } 693 694 for _, o := range l.observers { 695 o.send(msg) 696 } 697} 698 699// 700// send the proposal to the followers 701// 702func (l *Leader) sendAbort(fid string, reqId uint64, err string) { 703 704 log.Current.Tracef("leader.sendAbort(): Send Abort to %s", fid) 705 706 l.mutex.Lock() 707 defer l.mutex.Unlock() 708 709 for _, f := range l.followers { 710 if f.fid == fid { 711 msg := l.factory.CreateAbort(fid, reqId, err) 712 f.pipe.Send(msg) 713 return 714 } 715 } 716 717 for _, w := range l.watchers { 718 if w.fid == fid { 719 msg := l.factory.CreateAbort(fid, reqId, err) 720 w.pipe.Send(msg) 721 return 722 } 723 } 724 725 if l.GetFollowerId() == fid { 726 l.handler.Abort(fid, reqId, err) 727 } 728} 729 730// 731// send the response to the followers 732// 733func (l *Leader) sendResponse(msg ResponseMsg) { 734 735 log.Current.Tracef("leader.sendResponse(): Send Response to %s", msg.GetFid()) 736 737 l.mutex.Lock() 738 defer l.mutex.Unlock() 739 740 for _, f := range l.followers { 741 if f.fid == msg.GetFid() { 742 f.pipe.Send(msg) 743 return 744 } 745 } 746 747 for _, w := range l.watchers { 748 if w.fid == msg.GetFid() { 749 w.pipe.Send(msg) 750 return 751 } 752 } 753 754 if l.GetFollowerId() == msg.GetFid() { 755 l.handler.Respond(msg.GetFid(), msg.GetReqId(), msg.GetError(), msg.GetContent()) 756 } 757} 758 759///////////////////////////////////////////////////////// 760// Leader - Private Function : Handle Accept Message 761///////////////////////////////////////////////////////// 762 763// 764// handle accept message from follower 765// 766func (l *Leader) handleAccept(msg AcceptMsg) error { 767 768 // If this Ack is on an old proposal, then ignore. 769 // This indicates that the follower may be slower 770 // than others. Therefore, the proposal may be 771 // committed, before the follower can Ack. 772 mtxid := common.Txnid(msg.GetTxnid()) 773 if l.lastCommitted >= mtxid { 774 // cleanup. l.quorums should not have mtxid. 775 // This is just in case since we will never commit 776 // this proposal. 777 _, ok := l.quorums[mtxid] 778 if ok { 779 delete(l.quorums, mtxid) 780 delete(l.proposals, mtxid) 781 } 782 return nil 783 } 784 785 // update quorum 786 l.updateQuorum(mtxid, msg.GetFid()) 787 if l.hasQuorum(mtxid) { 788 // proposal has quorum. Now commit. If cannot commit, then return error 789 // which will cause the leader to re-election. Just to handle 790 // case where there is hardware failure or repository corruption. 791 err := l.commit(mtxid) 792 if err != nil { 793 return err 794 } 795 } 796 797 return nil 798} 799 800// 801// update quorum of proposal 802// 803func (l *Leader) updateQuorum(txid common.Txnid, fid string) { 804 805 if l.quorums[txid] == nil { 806 l.quorums[txid] = make([]string, 0, common.MAX_FOLLOWERS) 807 } 808 log.Current.Tracef("Leader.updateQuorum: current quorum for txid %d : %d", uint64(txid), len(l.quorums[txid])) 809 810 // Just to double check if the follower has already voted on this proposal. 811 var found bool 812 for i := 0; i < len(l.quorums[txid]); i++ { 813 a := l.quorums[txid][i] 814 if a == fid { 815 found = true 816 break 817 } 818 } 819 820 if !found { 821 l.quorums[txid] = append(l.quorums[txid], fid) 822 } 823 824 log.Current.Tracef("Leader.updateQuorum: new quorum for txid %d : %d", uint64(txid), len(l.quorums[txid])) 825} 826 827// 828// check if a proposal has reached quorum 829// 830func (l *Leader) hasQuorum(txid common.Txnid) bool { 831 // This uses a simple majority of quorum. ZK also has a 832 // hierarchy quorums for scalability (servers are put into different 833 // groups and quorums are obtained within a group). 834 835 log.Current.Tracef("Leader.hasQuorum: accepted response for txid %d = %d, ensemble size = %d", 836 uint64(txid), len(l.quorums[txid]), l.handler.GetEnsembleSize()) 837 838 accepted, ok := l.quorums[txid] 839 if !ok { 840 // no one has voted yet on this proposal 841 return false 842 } 843 844 return uint64(len(accepted)) > (l.handler.GetEnsembleSize() / 2) 845} 846 847// 848// commit proposal 849// 850func (l *Leader) commit(txid common.Txnid) error { 851 852 // We are skipping proposal. The protocol expects that each follower must 853 // send Accept Msg in the order of Proposal being received. Since the 854 // message is sent out a reliable TCP connection, it is not possible to reach 855 // quorum out of order. Particularly, if a new follower leaves and rejoins, 856 // the leader is responsible for resending all the pending proposals to the 857 // followers. So if we see the txid is out-of-order there, then it is 858 // a fatal condition due to protocol error. 859 // 860 if !common.IsNextInSequence(txid, l.lastCommitted) { 861 log.Current.Debugf("Proposal must committed in sequential order for the same leader term. "+ 862 "Found out-of-order commit. Leader last committed txid %d, commit msg %d", l.lastCommitted, txid) 863 864 return common.NewError(common.PROTOCOL_ERROR, 865 fmt.Sprintf("Proposal must committed in sequential order for the same leader term. "+ 866 "Found out-of-order commit. Leader last committed txid %d, commit msg %d", l.lastCommitted, txid)) 867 } 868 869 _, ok := l.proposals[txid] 870 if !ok { 871 return common.NewError(common.SERVER_ERROR, 872 fmt.Sprintf("Cannot find a proposal for the txid %d. Fail to commit the proposal.", txid)) 873 } 874 875 // marking the proposal as committed. Always do this first before sending to followers. 876 err := l.handler.Commit(txid) 877 if err != nil { 878 return err 879 } 880 881 // remove the votes 882 delete(l.quorums, txid) 883 delete(l.proposals, txid) 884 885 // Update lastCommitted 886 l.lastCommitted = txid 887 888 // Send the commit to followers 889 l.sendCommit(txid) 890 891 // TODO: do we need to update election site? I don't think so, but need to double check. 892 893 return nil 894} 895 896// 897// send commit messages to all followers 898// 899func (l *Leader) sendCommit(txnid common.Txnid) error { 900 901 l.mutex.Lock() 902 defer l.mutex.Unlock() 903 904 msg := l.factory.CreateCommit(uint64(txnid)) 905 906 // Send the request to the followers. See the same 907 // comment as in sendProposal() 908 // 909 for _, f := range l.followers { 910 f.pipe.Send(msg) 911 } 912 913 // send message to watchers 914 for _, w := range l.watchers { 915 w.pipe.Send(msg) 916 } 917 918 // Send the message to the observer 919 for _, o := range l.observers { 920 o.send(msg) 921 } 922 923 return nil 924} 925