1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"fmt"
20	"github.com/couchbase/gometa/common"
21	"github.com/couchbase/gometa/log"
22	"sync"
23	"time"
24)
25
26/////////////////////////////////////////////////////////////////////////////
27// Type Declaration
28/////////////////////////////////////////////////////////////////////////////
29
30type LeaderSyncProxy struct {
31	state         *ConsentState
32	follower      *common.PeerPipe
33	handler       ActionHandler
34	factory       MsgFactory
35	followerState *followerState
36	leader        *Leader
37
38	mutex    sync.Mutex
39	isClosed bool
40	donech   chan bool
41	killch   chan bool
42}
43
44type FollowerSyncProxy struct {
45	leader  *common.PeerPipe
46	handler ActionHandler
47	factory MsgFactory
48	state   *followerState
49
50	mutex    sync.Mutex
51	isClosed bool
52	donech   chan bool
53	killch   chan bool
54}
55
56type ConsentState struct {
57	acceptedEpoch      uint32
58	acceptedEpochSet   map[string]uint32
59	acceptedEpochCond  *sync.Cond
60	acceptedEpochMutex sync.Mutex
61
62	ackEpochSet   map[string]string
63	ackEpochCond  *sync.Cond
64	ackEpochMutex sync.Mutex
65
66	newLeaderAckSet   map[string]string
67	newLeaderAckCond  *sync.Cond
68	newLeaderAckMutex sync.Mutex
69
70	ensembleSize uint64
71}
72
73type followerState struct {
74	currentEpoch   uint32
75	lastLoggedTxid common.Txnid
76	fid            string
77	voting         bool
78}
79
80type LeaderStageCode uint16
81
82const (
83	UPDATE_ACCEPTED_EPOCH_AFTER_QUORUM LeaderStageCode = iota
84	NOTIFY_NEW_EPOCH
85	UPDATE_CURRENT_EPOCH_AFTER_QUORUM
86	SYNC_SEND
87	DECLARE_NEW_LEADER_AFTER_QUORUM
88	LEADER_SYNC_DONE
89)
90
91type FollowerStageCode uint16
92
93const (
94	SEND_FOLLOWERINFO FollowerStageCode = iota
95	RECEIVE_UPDATE_ACCEPTED_EPOCH
96	SYNC_RECEIVE
97	RECEIVE_UPDATE_CURRENT_EPOCH
98	FOLLOWER_SYNC_DONE
99)
100
101/////////////////////////////////////////////////////////////////////////////
102// ConsentState
103/////////////////////////////////////////////////////////////////////////////
104
105//
106// Create a new ConsentState for synchronization.   The leader must proceed in
107// 4 stages:
108// 1) Reach quourm of followers for sending its persisted acceptedEpoch to the leader.
109//    The leader uses followers acceptedEpoch to determine the next epoch.
110// 2) Reach quorum of followers to accept the new epoch.
111// 3) Synchronizes the commit log between leader and each follower
112// 4) Reach quorum of followers to accepts this leader (NewLeaderAck)
113//
114// The ConsentState is used for keep that state for stages (1), (2) and (4) where
115// quorum is required to proceed to next stage.
116//
117// The ConsentState is using the physical host (actual port) which is different for each TCP connection.  This requires
118// the ConsentState to be cleaned up if synchronization with a particular follower aborts.   After synchronization
119// with a follower succeeds, the follower's vote will stay in the ConsentState, since the main purpose of the
120// ConsentState is for voting on a new epoch, as well as establishing that a majority of followers are going
121// to follower the leader.   A node can only establish leadership until stage 4 passes.  Once leadership is
122// established, if the node looses majority of followers, the server should abort and go through re-election again
123// with a new ConsentState.
124//
125func NewConsentState(sid string, epoch uint32, ensemble uint64) *ConsentState {
126
127	epoch = common.CompareAndIncrementEpoch(epoch, 0) // increment epoch to next value
128
129	state := &ConsentState{
130		acceptedEpoch:    epoch,
131		acceptedEpochSet: make(map[string]uint32),
132		ackEpochSet:      make(map[string]string),
133		newLeaderAckSet:  make(map[string]string),
134		ensembleSize:     ensemble}
135
136	state.acceptedEpochCond = sync.NewCond(&state.acceptedEpochMutex)
137	state.ackEpochCond = sync.NewCond(&state.ackEpochMutex)
138	state.newLeaderAckCond = sync.NewCond(&state.newLeaderAckMutex)
139
140	// add the leader to both sets, since enemble size can count the leader as well.
141	state.acceptedEpochSet[sid] = epoch
142	state.ackEpochSet[sid] = sid
143	state.newLeaderAckSet[sid] = sid
144
145	return state
146}
147
148func (s *ConsentState) voteAcceptedEpoch(voter string, newEpoch uint32, voting bool) (uint32, bool) {
149	s.acceptedEpochCond.L.Lock()
150	defer s.acceptedEpochCond.L.Unlock()
151
152	// Reach quorum. Just Return
153	if len(s.acceptedEpochSet) > int(s.ensembleSize/2) {
154		return s.acceptedEpoch, true
155	}
156
157	if voting {
158		s.acceptedEpochSet[voter] = newEpoch
159
160		// This function can panic if we exceed epoch limit
161		s.acceptedEpoch = common.CompareAndIncrementEpoch(newEpoch, s.acceptedEpoch)
162
163		if len(s.acceptedEpochSet) > int(s.ensembleSize/2) {
164			// reach quorum. Notify
165			s.acceptedEpochCond.Broadcast()
166			return s.acceptedEpoch, true
167		}
168	}
169
170	// wait for quorum to be reached.  It is possible
171	// that the go-routine is woken up before quorum is
172	// reached (if Terminate() is called).   It is
173	// also possible that a concurrent go-routine has
174	// remove the voter after reaching quorum.  In these
175	// cases, return false.
176	s.acceptedEpochCond.Wait()
177	return s.acceptedEpoch, len(s.acceptedEpochSet) > int(s.ensembleSize/2)
178}
179
180func (s *ConsentState) removeAcceptedEpoch(voter string) {
181	s.acceptedEpochCond.L.Lock()
182	defer s.acceptedEpochCond.L.Unlock()
183
184	delete(s.acceptedEpochSet, voter)
185}
186
187func (s *ConsentState) voteEpochAck(voter string, voting bool) bool {
188	s.ackEpochCond.L.Lock()
189	defer s.ackEpochCond.L.Unlock()
190
191	// Reach quorum. Just Return
192	if len(s.ackEpochSet) > int(s.ensembleSize/2) {
193		return true
194	}
195
196	if voting {
197		s.ackEpochSet[voter] = voter
198
199		if len(s.ackEpochSet) > int(s.ensembleSize/2) {
200			// reach quorum. Notify
201			s.ackEpochCond.Broadcast()
202			return true
203		}
204	}
205
206	// wait for quorum to be reached.  It is possible
207	// that the go-routine is woken up before quorum is
208	// reached (if Terminate() is called).   It is
209	// also possible that a concurrent go-routine has
210	// remove the voter after reaching quorum.  In these
211	// cases, return false.
212	s.ackEpochCond.Wait()
213	return len(s.ackEpochSet) > int(s.ensembleSize/2)
214}
215
216func (s *ConsentState) removeEpochAck(voter string) {
217	s.ackEpochCond.L.Lock()
218	defer s.ackEpochCond.L.Unlock()
219
220	delete(s.ackEpochSet, voter)
221}
222
223func (s *ConsentState) voteNewLeaderAck(voter string, voting bool) bool {
224	s.newLeaderAckCond.L.Lock()
225	defer s.newLeaderAckCond.L.Unlock()
226
227	// Reach quorum. Just Return
228	if len(s.newLeaderAckSet) > int(s.ensembleSize/2) {
229		return true
230	}
231
232	if voting {
233		s.newLeaderAckSet[voter] = voter
234
235		if len(s.newLeaderAckSet) > int(s.ensembleSize/2) {
236			// reach quorum. Notify
237			s.newLeaderAckCond.Broadcast()
238			return true
239		}
240	}
241
242	// wait for quorum to be reached.  It is possible
243	// that the go-routine is woken up before quorum is
244	// reached (if Terminate() is called).   It is
245	// also possible that a concurrent go-routine has
246	// remove the voter after reaching quorum.  In these
247	// cases, return false.
248	s.newLeaderAckCond.Wait()
249	return len(s.newLeaderAckSet) > int(s.ensembleSize/2)
250}
251
252func (s *ConsentState) removeNewLeaderAck(voter string) {
253	s.newLeaderAckCond.L.Lock()
254	defer s.newLeaderAckCond.L.Unlock()
255
256	delete(s.newLeaderAckSet, voter)
257}
258
259func (s *ConsentState) Terminate() {
260	s.acceptedEpochCond.L.Lock()
261	s.acceptedEpochCond.Broadcast()
262	s.acceptedEpochCond.L.Unlock()
263
264	s.ackEpochCond.L.Lock()
265	s.ackEpochCond.Broadcast()
266	s.ackEpochCond.L.Unlock()
267
268	s.newLeaderAckCond.L.Lock()
269	s.newLeaderAckCond.Broadcast()
270	s.newLeaderAckCond.L.Unlock()
271}
272
273/////////////////////////////////////////////////////////////////////////////
274// LeaderSyncProxy - Public Function
275/////////////////////////////////////////////////////////////////////////////
276
277//
278// Create a LeaderSyncProxy to synchronize with a follower.  The proxy
279// requires 2 stateful variables to be provided as inputs:
280// 1) ConsentState:  The LeaderSyncProxy requires a quorum of followers
281//    to follow before leader can process client request.  The consentState
282//    is a shared state (shared among multiple LeaderSyncProxy) to keep track of
283//    the followers following this leader during synchronziation. Note
284//    that a follower may leave the leader after synchronziation, but the
285//    ConsentState will not keep track of follower leaving.
286// 2) Follower: The follower is a PeerPipe (TCP connection).    This is
287//    used to exchange messages with the follower node.
288//
289func NewLeaderSyncProxy(leader *Leader,
290	state *ConsentState,
291	follower *common.PeerPipe,
292	handler ActionHandler,
293	factory MsgFactory) *LeaderSyncProxy {
294
295	sync := &LeaderSyncProxy{
296		followerState: nil,
297		leader:        leader,
298		state:         state,
299		follower:      follower,
300		handler:       handler,
301		factory:       factory,
302		donech:        make(chan bool, 1), // donech should not be closed
303		killch:        make(chan bool, 1), // donech should not be closed
304		isClosed:      false}
305
306	return sync
307}
308
309//
310// Start synchronization with a speicfic follower.   This function
311// can be run as regular function or go-routine.   If the caller runs this
312// as a go-routine, the caller should use GetDoneChannel()
313// to tell when this function completes.
314//
315// There are 3 cases when this function sends "false" to donech:
316// 1) When there is an error during synchronization
317// 2) When synchronization timeout
318// 3) Terminate() is called
319//
320// When a failure (false) result is sent to the donech, this function
321// will also close the PeerPipe to the follower.  This will force
322// the follower to restart election.
323//
324// This function will catch panic.
325//
326func (l *LeaderSyncProxy) Start(o *observer) bool {
327
328	defer func() {
329		if r := recover(); r != nil {
330			log.Current.Errorf("panic in LeaderSyncProxy.Start() : %s\n", r)
331			log.Current.Errorf("LeaderSyncProxy.Start() terminates : Diagnostic Stack ...")
332			log.Current.Errorf("%s", log.Current.StackTrace())
333
334			l.abort() // ensure proper cleanup and unblock caller
335		}
336
337		l.close()
338	}()
339
340	timeout := time.After(common.SYNC_TIMEOUT * time.Millisecond)
341
342	// spawn a go-routine to perform synchronziation.  Do not close donech2, just
343	// let it garbage collect when the go-routine is done.	 Make sure using
344	// buffered channel since this go-routine may go away before execute() does.
345	donech2 := make(chan bool, 1)
346	go l.execute(donech2, o)
347
348	select {
349	case success, ok := <-donech2:
350		if !ok {
351			// channel is not going to close but just to be safe ...
352			success = false
353		}
354		l.donech <- success
355		return success
356	case <-timeout:
357		log.Current.Infof("LeaderSyncProxy.Start(): Synchronization timeout for peer (TCP %s). Terminate.", l.follower.GetAddr())
358		l.abort()
359	case <-l.killch:
360		log.Current.Debugf("LeaderSyncProxy.Start(): Receive kill signal for peer (TCP %s).  Terminate.", l.follower.GetAddr())
361		l.abort()
362	}
363
364	return false
365}
366
367//
368// Terminate the syncrhonization with this follower.  Upon temrination, the follower
369// will enter into election again.    This function cannot guarantee that the go-routine
370// will terminate until the given ConsentState is terminated as well.
371// This function is an no-op if the LeaderSyncProxy already completes successfully.
372//
373func (l *LeaderSyncProxy) Terminate() {
374	l.mutex.Lock()
375	defer l.mutex.Unlock()
376
377	if !l.isClosed {
378		l.isClosed = true
379		l.killch <- true
380	}
381}
382
383//
384// Return a channel that tells when the syncrhonization is done.
385// This is unbuffered channel such that LeaderSyncProxy will not be blocked
386// upon completion of synchronization (whether successful or not).
387//
388func (l *LeaderSyncProxy) GetDoneChannel() <-chan bool {
389	// do not return nil (can cause caller block forever)
390	return (<-chan bool)(l.donech)
391}
392
393//
394// Return the fid (follower id)
395//
396func (l *LeaderSyncProxy) GetFid() string {
397	if l.followerState != nil {
398		return l.followerState.fid
399	}
400	return ""
401}
402
403//
404// Can the follower vote?
405//
406func (l *LeaderSyncProxy) CanFollowerVote() bool {
407	if l.followerState != nil {
408		return l.followerState.voting
409	}
410
411	return false
412}
413
414/////////////////////////////////////////////////////////////////////////////
415// LeaderSyncProxy - Private Function
416/////////////////////////////////////////////////////////////////////////////
417
418//
419// Abort the LeaderSyncProxy.
420//
421func (l *LeaderSyncProxy) abort() {
422
423	voter := l.GetFid()
424
425	common.SafeRun("LeaderSyncProxy.abort()",
426		func() {
427			// terminate any on-going messaging with follower.  This will force
428			// the follower to go through election again
429			l.follower.Close()
430		})
431
432	common.SafeRun("LeaderSyncProxy.abort()",
433		func() {
434			// clean up the ConsentState
435			l.state.removeAcceptedEpoch(voter)
436			l.state.removeEpochAck(voter)
437			l.state.removeNewLeaderAck(voter)
438		})
439
440	// donech should never be closed.  But just to be safe ...
441	common.SafeRun("LeaderSyncProxy.abort()",
442		func() {
443			l.donech <- false
444		})
445}
446
447//
448// close the proxy
449//
450func (l *LeaderSyncProxy) close() {
451	l.mutex.Lock()
452	defer l.mutex.Unlock()
453
454	l.isClosed = true
455}
456
457//
458// Main go-routine for handling sycnrhonization with a follower.  Note that
459// this go-routine may be blocked by a non-interuptable condition variable,
460// in which the caller may have aborted.  donech must be a buffered channel
461// to ensure that this go-routine will not get blocked if the caller dies first.
462//
463func (l *LeaderSyncProxy) execute(donech chan bool, o *observer) {
464
465	defer func() {
466		if r := recover(); r != nil {
467			log.Current.Errorf("panic in LeaderSyncProxy.execute() : %s\n", r)
468			log.Current.Errorf("LeaderSyncProxy.execute() terminates : Diagnostic Stack ...")
469			log.Current.Errorf("%s", log.Current.StackTrace())
470
471			donech <- false // unblock caller
472		}
473	}()
474
475	var stage LeaderStageCode = UPDATE_ACCEPTED_EPOCH_AFTER_QUORUM
476
477	for stage != LEADER_SYNC_DONE {
478		switch stage {
479		case UPDATE_ACCEPTED_EPOCH_AFTER_QUORUM:
480			{
481				err := l.updateAcceptedEpochAfterQuorum()
482				if err != nil {
483					log.Current.Errorf("LeaderSyncProxy.updateAcceptEpochAfterQuorum(): Error encountered = %s", err.Error())
484					safeSend("LeaderSyncProxy:execute()", donech, false)
485					return
486				}
487				stage = NOTIFY_NEW_EPOCH
488			}
489		case NOTIFY_NEW_EPOCH:
490			{
491				err := l.notifyNewEpoch()
492				if err != nil {
493					log.Current.Errorf("LeaderSyncProxy.notifyNewEpoch(): Error encountered = %s", err.Error())
494					safeSend("LeaderSyncProxy:execute()", donech, false)
495					return
496				}
497				stage = UPDATE_CURRENT_EPOCH_AFTER_QUORUM
498			}
499		case UPDATE_CURRENT_EPOCH_AFTER_QUORUM:
500			{
501				err := l.updateCurrentEpochAfterQuorum()
502				if err != nil {
503					log.Current.Errorf("LeaderSyncProxy.updateCurrentEpochAfterQuorum(): Error encountered = %s", err.Error())
504					safeSend("LeaderSyncProxy:execute()", donech, false)
505					return
506				}
507				stage = SYNC_SEND
508			}
509		case SYNC_SEND:
510			{
511				err := l.syncWithLeader(o)
512				if err != nil {
513					log.Current.Errorf("LeaderSyncProxy.syncWithLeader(): Error encountered = %s", err.Error())
514					safeSend("LeaderSyncProxy:execute()", donech, false)
515					return
516				}
517				stage = DECLARE_NEW_LEADER_AFTER_QUORUM
518			}
519		case DECLARE_NEW_LEADER_AFTER_QUORUM:
520			{
521				err := l.declareNewLeaderAfterQuorum()
522				if err != nil {
523					log.Current.Errorf("LeaderSyncProxy.declareNewLeaderAfterQuorum(): Error encountered = %s", err.Error())
524					safeSend("LeaderSyncProxy:execute()", donech, false)
525					return
526				}
527				stage = LEADER_SYNC_DONE
528			}
529		}
530	}
531
532	// Use SafeReturn just to be sure, even though donech should not be closed
533	safeSend("LeaderSyncProxy:execute()", donech, true)
534}
535
536func (l *LeaderSyncProxy) updateAcceptedEpochAfterQuorum() error {
537
538	log.Current.Debugf("LeaderSyncProxy.updateAcceptedEpochAfterQuroum()")
539
540	// Get my follower's vote for the accepted epoch
541	packet, err := listen("FollowerInfo", l.follower)
542	if err != nil {
543		return err
544	}
545
546	// Get epoch from follower message
547	info := packet.(FollowerInfoMsg)
548	epoch := info.GetAcceptedEpoch()
549	fid := info.GetFid()
550	voting := info.GetVoting()
551
552	// initialize the follower state
553	l.followerState = &followerState{lastLoggedTxid: 0, currentEpoch: 0, fid: fid, voting: voting}
554
555	// update my vote and wait for epoch to reach quorum
556	newEpoch, ok := l.state.voteAcceptedEpoch(l.GetFid(), epoch, l.followerState.voting)
557	if !ok {
558		return common.NewError(common.ELECTION_ERROR,
559			"LeaderSyncProxy.updateAcceptedEpochAfterQuorum(): Fail to reach quorum on accepted epoch (FollowerInfo)")
560	}
561
562	// update the accepted epoch based on the quorum result.   This function
563	// will perform update only if the new epoch is larger than existing value.
564	err = l.handler.NotifyNewAcceptedEpoch(newEpoch)
565	if err != nil {
566		return err
567	}
568
569	return nil
570}
571
572func (l *LeaderSyncProxy) notifyNewEpoch() error {
573
574	log.Current.Debugf("LeaderSyncProxy.notifyNewEpoch()")
575
576	epoch, err := l.handler.GetAcceptedEpoch()
577	if err != nil {
578		return err
579	}
580	packet := l.factory.CreateLeaderInfo(epoch)
581	return send(packet, l.follower)
582}
583
584func (l *LeaderSyncProxy) updateCurrentEpochAfterQuorum() error {
585
586	log.Current.Debugf("LeaderSyncProxy.updateCurrentEpochAfterQuorum()")
587
588	// Get my follower's vote for the epoch ack
589	packet, err := listen("EpochAck", l.follower)
590	if err != nil {
591		return err
592	}
593
594	// Get epoch from follower message
595	// TODO : Validate follower epoch
596	info := packet.(EpochAckMsg)
597	l.followerState.currentEpoch = info.GetCurrentEpoch()
598	l.followerState.lastLoggedTxid = common.Txnid(info.GetLastLoggedTxid())
599
600	// update my vote and wait for quorum of ack from followers
601	ok := l.state.voteEpochAck(l.GetFid(), l.followerState.voting)
602	if !ok {
603		return common.NewError(common.ELECTION_ERROR,
604			"LeaderSyncProxy.updateCurrentEpochAfterQuorum(): Fail to reach quorum on current epoch (EpochAck)")
605	}
606
607	// update the current epock after quorum of followers have ack'ed
608	epoch, err := l.handler.GetAcceptedEpoch()
609	if err != nil {
610		return err
611	}
612
613	// update the current epoch based on the quorum result.   This function
614	// will perform update only if the new epoch is larger than existing value.
615	err = l.handler.NotifyNewCurrentEpoch(epoch)
616	if err != nil {
617		return err
618	}
619
620	return nil
621}
622
623func (l *LeaderSyncProxy) declareNewLeaderAfterQuorum() error {
624
625	log.Current.Debugf("LeaderSyncProxy.declareNewLeaderAfterQuorum()")
626
627	// return the new epoch to the follower
628	epoch, err := l.handler.GetCurrentEpoch()
629	if err != nil {
630		return err
631	}
632	packet := l.factory.CreateNewLeader(epoch)
633	err = send(packet, l.follower)
634	if err != nil {
635		return err
636	}
637
638	// Get the new leader ack
639	ack, err := listen("NewLeaderAck", l.follower)
640	if err != nil {
641		return err
642	}
643
644	// TODO : Verify the ack
645	ack = ack // TODO : just to get around compile error
646
647	// update my vote and wait for quorum of ack from followers
648	ok := l.state.voteNewLeaderAck(l.GetFid(), l.followerState.voting)
649	if !ok {
650		return common.NewError(common.ELECTION_ERROR,
651			"LeaderSyncProxy.declareNewLeaderAfterQuorum(): Fail to reach quorum on NewLeaderAck")
652	}
653
654	return nil
655}
656
657func (l *LeaderSyncProxy) syncWithLeader(o *observer) error {
658
659	log.Current.Debugf("LeaderSyncProxy.syncWithLeader()")
660
661	// Figure out the data that needs to be read from commit log.
662	// The start key is the last logged txid from the follower
663	// The end key is the first txid in the observer queue.
664	// If observer queue is empty, endTxid is 0 which will stream
665	// until either there is no more data in repository or there
666	// is a new entry added to the observer queue.
667	startTxid := l.followerState.lastLoggedTxid
668	endTxid := l.firstTxnIdInObserver(o) // inclusive
669
670	// First, Send the header with the last committed txid being seen so far.
671	if err := l.sendHeader(); err != nil {
672		return err
673	}
674
675	// Second, Now stream the entry from the log
676	lastSeen, err := l.sendEntriesInCommittedLog(startTxid, endTxid, o)
677	if err != nil {
678		return err
679	}
680
681	// Third, stream the trailer with the committed txid
682	if err = l.sendTrailer(); err != nil {
683		return err
684	}
685
686	// Forth, if lastSeen matches first entry in observer, remove
687	// that entry since it has been sent.
688	packet := o.peekFirst()
689	if packet != nil {
690		txid := l.getPacketTxnId(packet)
691		if txid == lastSeen && packet.Name() != "Commit" {
692			o.getNext() // skip any entry that has been sent
693		}
694	}
695
696	return nil
697}
698
699//
700// Send the header with the last committed txid that has seen so far.
701// The last committed txid could have changed by the time we finish
702// synchronization, since the leaders can commit proposal concurrently.
703// The final commit txid would be sent by the last log entry (StreamEnd).
704//
705func (l *LeaderSyncProxy) sendHeader() error {
706
707	lastCommittedTxid, err := l.handler.GetLastCommittedTxid()
708	if err != nil {
709		return err
710	}
711
712	msg := l.factory.CreateLogEntry(
713		uint64(lastCommittedTxid),
714		uint32(common.OPCODE_STREAM_BEGIN_MARKER),
715		"StreamBegin",
716		([]byte)("StreamBegin"))
717
718	return send(msg, l.follower)
719}
720
721//
722// Send the entries in the committed log to the follower.  Termination condition:
723// 1) There is no more entry in commit log
724// 2) The entry in commit log matches the first entry in the observer queue
725// This function returns the txid of the last entry sent from the log. Return 0 if nothing is sent from commit log.
726//
727func (l *LeaderSyncProxy) sendEntriesInCommittedLog(startTxid, endTxid common.Txnid, o *observer) (common.Txnid, error) {
728
729	log.Current.Debugf("LeaderSyncProxy.sendEntriesInCommittedLog(): startTxid %d endTxid %d observer first txid %d",
730		startTxid, endTxid, l.firstTxnIdInObserver(o))
731
732	var lastSeen common.Txnid = common.BOOTSTRAP_LAST_LOGGED_TXID
733
734	logChan, errChan, killch, err := l.handler.GetCommitedEntries(startTxid, endTxid)
735	if logChan == nil || errChan == nil || err != nil {
736		return lastSeen, err
737	}
738
739	for {
740		select {
741		case entry, ok := <-logChan:
742			if !ok {
743				killch <- true
744				return lastSeen, nil // no more data
745			}
746
747			// send the entry to follower
748			err = send(entry, l.follower)
749			if err != nil {
750				// lastSeen can be 0 if there is no new entry in repo
751				killch <- true
752				return lastSeen, err
753			}
754
755			lastSeen = common.Txnid(entry.GetTxnid())
756
757			// we found the committed entries matches what's in observer queue
758			if l.hasSeenEntryInObserver(o, common.Txnid(entry.GetTxnid())) {
759				killch <- true
760				return lastSeen, nil
761			}
762
763		case err := <-errChan:
764			if err != nil {
765				return lastSeen, err
766			}
767			break
768		}
769	}
770
771	return lastSeen, nil
772}
773
774//
775// Send the trailer with the last committed txid
776//
777func (l *LeaderSyncProxy) sendTrailer() (err error) {
778
779	lastCommittedTxid, err := l.handler.GetLastCommittedTxid()
780	if err != nil {
781		return err
782	}
783
784	msg := l.factory.CreateLogEntry(
785		uint64(lastCommittedTxid),
786		uint32(common.OPCODE_STREAM_END_MARKER),
787		"StreamEnd",
788		([]byte)("StreamEnd"))
789
790	return send(msg, l.follower)
791}
792
793//
794// Check if I see the given Txnid in observer
795//
796func (l *LeaderSyncProxy) hasSeenEntryInObserver(o *observer, lastSeen common.Txnid) bool {
797
798	txnid := l.firstTxnIdInObserver(o)
799	return txnid != common.BOOTSTRAP_LAST_LOGGED_TXID && txnid <= lastSeen
800}
801
802//
803// Get the txnid from the head of the observer
804//
805func (l *LeaderSyncProxy) firstTxnIdInObserver(o *observer) common.Txnid {
806
807	packet := o.peekFirst()
808	return l.getPacketTxnId(packet)
809}
810
811//
812// Get the txnid from the packet if the packet is a ProposalMsg or CommitMsg.
813//
814func (l *LeaderSyncProxy) getPacketTxnId(packet common.Packet) common.Txnid {
815
816	txid := common.BOOTSTRAP_LAST_LOGGED_TXID
817	if packet != nil {
818		switch request := packet.(type) {
819		case ProposalMsg:
820			txid = common.Txnid(request.GetTxnid())
821		case CommitMsg:
822			txid = common.Txnid(request.GetTxnid())
823		}
824	}
825
826	return txid
827}
828
829/////////////////////////////////////////////////////////////////////////////
830// FollowerSyncProxy - Public Function
831/////////////////////////////////////////////////////////////////////////////
832
833//
834// Create a FollowerSyncProxy to synchronize with a leader.  The proxy
835// requires 1 stateful variables to be provided as inputs:
836// 1) Leader : The leader is a PeerPipe (TCP connection).    This is
837//    used to exchange messages with the leader node.
838//
839func NewFollowerSyncProxy(leader *common.PeerPipe,
840	handler ActionHandler,
841	factory MsgFactory,
842	voting bool) *FollowerSyncProxy {
843
844	sync := &FollowerSyncProxy{leader: leader,
845		handler:  handler,
846		factory:  factory,
847		donech:   make(chan bool, 1), // donech should not be closed
848		killch:   make(chan bool, 1), // donech should not be closed
849		isClosed: false}
850
851	sync.state = &followerState{
852		lastLoggedTxid: common.BOOTSTRAP_LAST_LOGGED_TXID,
853		currentEpoch:   common.BOOTSTRAP_CURRENT_EPOCH,
854		voting:         voting}
855
856	return sync
857}
858
859//
860// Start synchronization with a speicfic leader.   This function
861// can be run as regular function or go-routine.   If the caller runs this
862// as a go-routine, the caller should use GetDoneChannel()
863// to tell when this function completes.
864//
865// There are 3 cases when this function sends "false" to donech:
866// 1) When there is an error during synchronization
867// 2) When synchronization timeout
868// 3) Terminate() is called
869//
870// When a failure (false) result is sent to the donech, this function
871// will also close the PeerPipe to the leader.  This will force
872// the leader to skip this follower.
873//
874// This function will catch panic.
875//
876func (f *FollowerSyncProxy) Start() bool {
877
878	defer func() {
879		if r := recover(); r != nil {
880			log.Current.Errorf("panic in FollowerSyncProxy.Start() : %s\n", r)
881			log.Current.Errorf("FollowerSyncProxy.Start() terminates : Diagnostic Stack ...")
882			log.Current.Errorf("%s", log.Current.StackTrace())
883
884			f.abort() // ensure proper cleanup and unblock caller
885		}
886
887		f.close()
888	}()
889
890	timeout := time.After(common.SYNC_TIMEOUT * time.Millisecond)
891
892	// spawn a go-routine to perform synchronziation.  Do not close donech2, just
893	// let it garbage collect when the go-routine is done.	 Make sure using
894	// buffered channel since this go-routine may go away before execute() does.
895	donech2 := make(chan bool, 1)
896	go f.execute(donech2)
897
898	select {
899	case success, ok := <-donech2:
900		if !ok {
901			success = false
902		}
903		f.donech <- success
904		return success
905	case <-timeout:
906		log.Current.Infof("FollowerSyncProxy.Start(): Synchronization timeout for peer %s. Terminate.", f.leader.GetAddr())
907		f.abort()
908	case <-f.killch:
909		log.Current.Debugf("FollowerSyncProxy.Start(): Receive kill signal for peer %s.  Terminate.", f.leader.GetAddr())
910		f.abort()
911	}
912
913	return false
914}
915
916//
917// Terminate the syncrhonization with this leader.  Upon temrination, the leader
918// will skip this follower.  This function is an no-op if the FollowerSyncProxy
919// already completes successfully.
920//
921func (l *FollowerSyncProxy) Terminate() {
922	l.mutex.Lock()
923	defer l.mutex.Unlock()
924
925	if !l.isClosed {
926		l.isClosed = true
927		l.killch <- true
928	}
929}
930
931//
932// Return a channel that tells when the syncrhonization is done.
933// This is unbuffered channel such that FollowerSyncProxy will not be blocked
934// upon completion of synchronization (whether successful or not).
935//
936func (l *FollowerSyncProxy) GetDoneChannel() <-chan bool {
937	// do not return nil (can cause caller block forever)
938	return (<-chan bool)(l.donech)
939}
940
941/////////////////////////////////////////////////////////////////////////////
942// FollowerSyncProxy - Private Function
943/////////////////////////////////////////////////////////////////////////////
944
945//
946// Abort the FollowerSyncProxy.  By killing the leader's PeerPipe,
947// the execution go-rountine will eventually error out and terminate by itself.
948//
949func (f *FollowerSyncProxy) abort() {
950
951	common.SafeRun("FollowerSyncProxy.abort()",
952		func() {
953			// terminate any on-going messaging with follower
954			f.leader.Close()
955		})
956
957	common.SafeRun("FollowerSyncProxy.abort()",
958		func() {
959			f.donech <- false
960		})
961}
962
963//
964// close the proxy
965//
966func (l *FollowerSyncProxy) close() {
967	l.mutex.Lock()
968	defer l.mutex.Unlock()
969
970	l.isClosed = true
971}
972
973func (l *FollowerSyncProxy) execute(donech chan bool) {
974
975	defer func() {
976		if r := recover(); r != nil {
977			log.Current.Errorf("panic in FollowerSyncProxy.execute() : %s\n", r)
978			log.Current.Errorf("FollowerSyncProxy.execute() terminates : Diagnostic Stack ...")
979			log.Current.Errorf("%s", log.Current.StackTrace())
980
981			donech <- false // unblock caller
982		}
983	}()
984
985	var stage FollowerStageCode = SEND_FOLLOWERINFO
986
987	for stage != FOLLOWER_SYNC_DONE {
988		switch stage {
989		case SEND_FOLLOWERINFO:
990			{
991				err := l.sendFollowerInfo()
992				if err != nil {
993					log.Current.Errorf("FollowerSyncProxy.sendFollowerInfo(): Error encountered = %s", err.Error())
994					safeSend("FollowerSyncProxy:execute()", donech, false)
995					return
996				}
997				stage = RECEIVE_UPDATE_ACCEPTED_EPOCH
998			}
999		case RECEIVE_UPDATE_ACCEPTED_EPOCH:
1000			{
1001				err := l.receiveAndUpdateAcceptedEpoch()
1002				if err != nil {
1003					log.Current.Errorf("FollowerSyncProxy.receiveAndUpdateAcceptedEpoch(): Error encountered = %s", err.Error())
1004					safeSend("FollowerSyncProxy:execute()", donech, false)
1005					return
1006				}
1007				stage = SYNC_RECEIVE
1008			}
1009		case SYNC_RECEIVE:
1010			{
1011				err := l.syncReceive()
1012				if err != nil {
1013					log.Current.Errorf("FollowerSyncProxy.syncReceive(): Error encountered = %s", err.Error())
1014					safeSend("FollowerSyncProxy:execute()", donech, false)
1015					return
1016				}
1017				stage = RECEIVE_UPDATE_CURRENT_EPOCH
1018			}
1019		case RECEIVE_UPDATE_CURRENT_EPOCH:
1020			{
1021				err := l.receiveAndUpdateCurrentEpoch()
1022				if err != nil {
1023					log.Current.Errorf("FollowerSyncProxy.receiveAndUpdateCurrentEpoch(): Error encountered = %s", err.Error())
1024					safeSend("FollowerSyncProxy:execute()", donech, false)
1025					return
1026				}
1027				stage = FOLLOWER_SYNC_DONE
1028			}
1029		}
1030	}
1031
1032	safeSend("FollowerSyncProxy:execute()", donech, true)
1033}
1034
1035func (l *FollowerSyncProxy) sendFollowerInfo() error {
1036
1037	log.Current.Debugf("FollowerSyncProxy.sendFollowerInfo()")
1038
1039	// Send my accepted epoch to the leader for voting (don't send current epoch)
1040	epoch, err := l.handler.GetAcceptedEpoch()
1041	if err != nil {
1042		return err
1043	}
1044	packet := l.factory.CreateFollowerInfo(epoch, l.handler.GetFollowerId(), l.state.voting)
1045	return send(packet, l.leader)
1046}
1047
1048func (l *FollowerSyncProxy) receiveAndUpdateAcceptedEpoch() error {
1049
1050	log.Current.Debugf("FollowerSyncProxy.receiveAndUpdateAcceptedEpoch()")
1051
1052	// Get the accepted epoch from the leader.   This epoch
1053	// is already being voted on by multiple followers (the highest
1054	// epoch among the quorum of followers).
1055	packet, err := listen("LeaderInfo", l.leader)
1056	if err != nil {
1057		return err
1058	}
1059
1060	// Get epoch from leader message
1061	info := packet.(LeaderInfoMsg)
1062	epoch := info.GetAcceptedEpoch()
1063	if err != nil {
1064		return err
1065	}
1066
1067	acceptedEpoch, err := l.handler.GetAcceptedEpoch()
1068	if err != nil {
1069		return err
1070	}
1071	if epoch > acceptedEpoch {
1072		// Update the accepted epoch based on the quorum result.   This function
1073		// will perform update only if the new epoch is larger than existing value.
1074		// Once the accepted epoch is updated, it will not be reset even if the
1075		// sychornization with the leader fails.  Therefore, the follower will always
1076		// remember the largest accepted epoch known to it, such that it can be used
1077		// in the next round of voting.   Note that the leader derives this new accepted
1078		// epoch only after it has polled from a quorum of followers.  So even if sync fails,
1079		// it is unlikey that in the next sync, the leader will give a new accepted epoch smaller
1080		// than what is being stored now.
1081		err = l.handler.NotifyNewAcceptedEpoch(epoch)
1082		if err != nil {
1083			return err
1084		}
1085	} else if epoch == acceptedEpoch {
1086		// In ZK, if the local epoch (acceptedEpoch) == leader's epoch (epoch), it will replly an EpochAck with epoch = -1.
1087		// This is to tell the leader that it should not count this EpockAck when computing quorum of EpochAck.
1088		// This is to ensure that this follower does not "double ack" to the leader (e.g. when this follower rejoins a
1089		// stable ensemble).   In our implementation for ConsentState, it should not be affected by double ack from the same host.
1090	} else {
1091		return common.NewError(common.PROTOCOL_ERROR, "Accepted Epoch from leader is smaller or equal to my epoch.")
1092	}
1093
1094	// Notify the leader that I have accepted the epoch.  Send
1095	// the last logged txid and current epoch to the leader.
1096	txid, err := l.handler.GetLastLoggedTxid()
1097	if err != nil {
1098		return err
1099	}
1100	currentEpoch, err := l.handler.GetCurrentEpoch()
1101	if err != nil {
1102		return err
1103	}
1104
1105	l.state.lastLoggedTxid = common.Txnid(txid)
1106	l.state.currentEpoch = currentEpoch
1107
1108	packet = l.factory.CreateEpochAck(uint64(txid), currentEpoch)
1109	return send(packet, l.leader)
1110}
1111
1112func (l *FollowerSyncProxy) receiveAndUpdateCurrentEpoch() error {
1113
1114	log.Current.Debugf("FollowerSyncProxy.receiveAndUpdateCurrentEpoch()")
1115
1116	// Get the accepted epoch from the leader.   This epoch
1117	// is already being voted on by multiple followers (the highest
1118	// epoch among the quorum of followers).
1119	packet, err := listen("NewLeader", l.leader)
1120	if err != nil {
1121		return err
1122	}
1123
1124	// Get epoch from follower message
1125	info := packet.(NewLeaderMsg)
1126	epoch := info.GetCurrentEpoch()
1127
1128	// TODO : validate the epoch from leader
1129
1130	// Update the current epoch based on the quorum result.   This function
1131	// will perform update only if the new epoch is larger than existing value.
1132	// Once the current epoch is updated, it will not be reset even if the
1133	// sychornization with the leader fails.  Therefore, the follower will always
1134	// remember the largest current epoch known to it, such that it can be used
1135	// in the next round of voting.   Note that the leader derives this new current
1136	// epoch only after it has polled from a quorum of followers.  So even if sync fails,
1137	// it is unlikey that in the next sync, the leader will give a new current epoch smaller
1138	// than what is being stored now.
1139	err = l.handler.NotifyNewCurrentEpoch(epoch)
1140	if err != nil {
1141		return err
1142	}
1143
1144	// Notify the leader that I have accepted the epoch
1145	packet = l.factory.CreateNewLeaderAck()
1146	return send(packet, l.leader)
1147}
1148
1149func (l *FollowerSyncProxy) syncReceive() error {
1150
1151	log.Current.Tracef("FollowerSyncProxy.syncReceive()")
1152
1153	lastCommittedFromLeader := common.BOOTSTRAP_LAST_COMMITTED_TXID
1154	pendingCommit := make([]LogEntryMsg, 0, common.MAX_PROPOSALS)
1155
1156	for {
1157		packet, err := listen("LogEntry", l.leader)
1158		if err != nil {
1159			return err
1160		}
1161
1162		entry := packet.(LogEntryMsg)
1163		lastTxnid := common.Txnid(entry.GetTxnid())
1164
1165		// If this is the first one, skip
1166		if entry.GetOpCode() == uint32(common.OPCODE_STREAM_BEGIN_MARKER) {
1167			log.Current.Debugf("LeaderSyncProxy.syncReceive(). Receive stream_begin.  Txid : %d", lastTxnid)
1168			lastCommittedFromLeader = lastTxnid
1169			continue
1170		}
1171
1172		// If this is the last one, then flush the pending log entry as well.  The streamEnd
1173		// message has a more recent lastCommitedTxid from the leader which is retreievd after
1174		// the last log entry is sent.
1175		if entry.GetOpCode() == uint32(common.OPCODE_STREAM_END_MARKER) {
1176			log.Current.Debugf("LeaderSyncProxy.syncReceive(). Receive stream_end.  Txid : %d", lastTxnid)
1177			lastCommittedFromLeader = lastTxnid
1178
1179			// write any log entry that has not been logged.
1180			for _, entry := range pendingCommit {
1181				toCommit := entry.GetTxnid() <= uint64(lastCommittedFromLeader)
1182
1183				if err := l.handler.LogAndCommit(common.Txnid(entry.GetTxnid()),
1184					entry.GetOpCode(),
1185					entry.GetKey(),
1186					entry.GetContent(),
1187					toCommit); err != nil {
1188					return err
1189				}
1190			}
1191
1192			return nil
1193		}
1194
1195		// write the new log entry.  If the txid is less than the last known committed txid
1196		// from the leader, then commit the entry. Otherwise, keep it in a pending list.
1197		toCommit := lastTxnid <= lastCommittedFromLeader
1198		if toCommit {
1199			// This call needs to be atomic to ensure that the commit log and the data store
1200			// are updated transactionally.  This ensures that if the follower crashes, the
1201			// repository as a while remains in a consistent state.
1202			if err := l.handler.LogAndCommit(common.Txnid(entry.GetTxnid()),
1203				entry.GetOpCode(),
1204				entry.GetKey(),
1205				entry.GetContent(),
1206				true); err != nil {
1207				return err
1208			}
1209		} else {
1210			pendingCommit = append(pendingCommit, entry)
1211		}
1212	}
1213
1214	return nil
1215}
1216
1217/////////////////////////////////////////////////////////////////////////////
1218// Private Function
1219/////////////////////////////////////////////////////////////////////////////
1220
1221func listen(name string, pipe *common.PeerPipe) (common.Packet, error) {
1222
1223	reqch := pipe.ReceiveChannel()
1224	req, ok := <-reqch
1225	if !ok {
1226		return nil, common.NewError(common.SERVER_ERROR, "SyncProxy.listen(): channel closed. Terminate")
1227	}
1228
1229	if req.Name() != name {
1230		return nil, common.NewError(common.PROTOCOL_ERROR,
1231			"SyncProxy.listen(): Expect message "+name+", Receive message "+req.Name())
1232	}
1233
1234	return req, nil
1235}
1236
1237func send(packet common.Packet, pipe *common.PeerPipe) error {
1238
1239	log.Current.Tracef("SyncProxy.send(): sending packet %s to peer (TCP %s)", packet.Name(), pipe.GetAddr())
1240	if !pipe.Send(packet) {
1241		return common.NewError(common.SERVER_ERROR, fmt.Sprintf("SyncProxy.listen(): Fail to send packet %s to peer (TCP %s)",
1242			packet.Name(), pipe.GetAddr()))
1243	}
1244
1245	return nil
1246}
1247
1248func safeSend(header string, donech chan bool, result bool) {
1249	common.SafeRun(header,
1250		func() {
1251			donech <- result
1252		})
1253}
1254