1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"github.com/couchbase/gometa/common"
20	"github.com/couchbase/gometa/log"
21	"sync"
22	"time"
23)
24
25/////////////////////////////////////////////////////////////////////////////
26// Type Declaration
27/////////////////////////////////////////////////////////////////////////////
28
29type LeaderServer struct {
30	leader       *Leader
31	listener     *common.PeerListener
32	consentState *ConsentState
33	state        *LeaderState
34	handler      ActionHandler
35	factory      MsgFactory
36}
37
38type LeaderState struct {
39	requestMgr RequestMgr
40
41	// mutex protected variable
42	mutex   sync.Mutex
43	ready   bool
44	readych chan bool
45	proxies map[string](chan bool)
46}
47
48type ListenerState struct {
49	donech chan bool
50	killch chan bool
51}
52
53/////////////////////////////////////////////////////////////////////////////
54// LeaderServer - Public Function
55/////////////////////////////////////////////////////////////////////////////
56
57//
58// Create a new LeaderServer.  This is a blocking call until the LeaderServer
59// termintates.
60//
61// killch should be unbuffered to ensure the sender won't block
62//
63func RunLeaderServer(naddr string,
64	listener *common.PeerListener,
65	ss RequestMgr,
66	handler ActionHandler,
67	factory MsgFactory,
68	killch <-chan bool) (err error) {
69
70	return RunLeaderServerWithCustomHandler(naddr, listener, ss, handler, factory, nil, killch)
71}
72
73func RunLeaderServerWithCustomHandler(naddr string,
74	listener *common.PeerListener,
75	ss RequestMgr,
76	handler ActionHandler,
77	factory MsgFactory,
78	reqHandler CustomRequestHandler,
79	killch <-chan bool) (err error) {
80
81	log.Current.Debugf("LeaderServer.RunLeaderServer(): start leader server %s", naddr)
82
83	// Catch panic at the main entry point for LeaderServer
84	defer func() {
85		if r := recover(); r != nil {
86			log.Current.Errorf("panic in RunLeaderServer() : %s\n", r)
87			log.Current.Errorf("%s", log.Current.StackTrace())
88			err = r.(error)
89		} else {
90			log.Current.Debugf("RunLeaderServer terminates.")
91			log.Current.Tracef(log.Current.StackTrace())
92		}
93	}()
94
95	// create a leader
96	leader, err := NewLeaderWithCustomHandler(naddr, handler, factory, reqHandler)
97	if err != nil {
98		return err
99	}
100	defer leader.Terminate()
101
102	// create a ConsentState
103	epoch, err := handler.GetAcceptedEpoch()
104	if err != nil {
105		return err
106	}
107	ensembleSize := handler.GetEnsembleSize()
108	consentState := NewConsentState(naddr, epoch, ensembleSize)
109	defer consentState.Terminate()
110
111	// create the leader state
112	state := newLeaderState(ss)
113
114	// create the server
115	server := &LeaderServer{leader: leader,
116		listener:     listener,
117		consentState: consentState,
118		state:        state,
119		handler:      handler,
120		factory:      factory}
121
122	// start the listener.  This goroutine would continue to new follower even while
123	// it is processing request.
124	listenerState := newListenerState()
125	go server.listenFollower(listenerState)
126
127	// start the main loop for processing incoming request.  The leader will
128	// process request only after it has received quorum of followers to
129	// synchronized with it.
130	err = server.processRequest(killch, listenerState, reqHandler)
131
132	log.Current.Debugf("LeaderServer.RunLeaderServer(): leader server %s terminate", naddr)
133
134	return err
135}
136
137/////////////////////////////////////////////////////////////////////////////
138// LeaderServer - Private Function : Discovery Phase (synchronize with follower)
139/////////////////////////////////////////////////////////////////////////////
140
141//
142// Listen to new connection request from the follower/peer.
143// Start a new LeaderSyncProxy to synchronize the state
144// between the leader and the peer.
145//
146func (l *LeaderServer) listenFollower(listenerState *ListenerState) {
147
148	defer func() {
149		if r := recover(); r != nil {
150			log.Current.Errorf("panic in LeaderServer.listenFollower() : %s\n", r)
151			log.Current.Errorf("%s", log.Current.StackTrace())
152		} else {
153			log.Current.Debugf("LeaderServer.listenFollower() terminates.")
154			log.Current.Tracef(log.Current.StackTrace())
155		}
156
157		common.SafeRun("LeaderServer.listenFollower()",
158			func() {
159				l.terminateAllOutstandingProxies()
160			})
161
162		common.SafeRun("LeaderServer.listenFollower()",
163			func() {
164				listenerState.donech <- true
165			})
166	}()
167
168	connCh := l.listener.ConnChannel()
169	if connCh == nil {
170		// It should not happen unless the listener is closed
171		return
172	}
173
174	// if there is a single server, then we don't need to wait for follower
175	// for the server to be ready to process request.
176	if l.handler.GetEnsembleSize() == 1 {
177		if err := l.incrementEpoch(); err != nil {
178			log.Current.Errorf("LeaderServer.listenFollower(): Error when boostraping leader with ensembleSize=1. Error = %s", err)
179			return
180		}
181
182		l.notifyReady()
183	}
184
185	for {
186		select {
187		case conn, ok := <-connCh:
188			{
189				if !ok {
190					// channel close.  Simply return.
191					return
192				}
193
194				// There is a new peer connection request from the follower.  Start a proxy to synchronize with the follower.
195				// The leader does not proactively connect to follower:
196				// 1) The ensemble is stable, but a follower may just reboot and needs to connect to the leader
197				// 2) Even if the leader receives votes from the leader, the leader cannot tell for sure that the follower does
198				//    not change its vote.  Only if the follower connects, the leader can confirm the follower's alliance.
199				//
200				log.Current.Debugf("LeaderServer.listenFollower(): Receive connection request from follower %s", conn.RemoteAddr())
201				if l.registerOutstandingProxy(conn.RemoteAddr().String()) {
202					pipe := common.NewPeerPipe(conn)
203					go l.startProxy(pipe)
204				} else {
205					log.Current.Infof("LeaderServer.listenFollower(): Sync Proxy already running for %s. Ignore new request.", conn.RemoteAddr())
206					conn.Close()
207				}
208			}
209		case <-listenerState.killch:
210			log.Current.Debugf("LeaderServer.listenFollower(): Receive kill signal. Terminate.")
211			return
212		}
213	}
214}
215
216//
217// Start a LeaderSyncProxy to synchornize the leader
218// and follower state.
219//
220func (l *LeaderServer) startProxy(peer *common.PeerPipe) {
221
222	defer func() {
223		if r := recover(); r != nil {
224			log.Current.Errorf("panic in LeaderServer.startProxy() : %s\n", r)
225			log.Current.Errorf("%s", log.Current.StackTrace())
226		} else {
227			log.Current.Debugf("LeaderServer.startProxy() : Terminates.")
228			log.Current.Tracef(log.Current.StackTrace())
229		}
230
231		// deregister the proxy with the leader Server upon exit
232		l.deregisterOutstandingProxy(peer.GetAddr())
233	}()
234
235	// create a proxy that will sycnhronize with the peer.
236	log.Current.Debugf("LeaderServer.startProxy(): Start synchronization with follower. Peer TCP connection (%s)", peer.GetAddr())
237	proxy := NewLeaderSyncProxy(l.leader, l.consentState, peer, l.handler, l.factory)
238	donech := proxy.GetDoneChannel()
239
240	// Create an observer for the leader.  The leader will put on-going proposal msg and commit msg
241	// onto the observer queue.  This ensure that we can won't miss those mutations as the leader is
242	// sync'ign withe follower.  The messages in observer queue will eventually route to follower.
243	o := NewObserver()
244	l.leader.AddObserver(peer.GetAddr(), o)
245	defer l.leader.RemoveObserver(peer.GetAddr())
246
247	// start the proxy
248	go proxy.Start(o)
249	defer proxy.Terminate()
250
251	// Get the killch for this go-routine
252	killch := l.getProxyKillChan(peer.GetAddr())
253	if killch == nil {
254		log.Current.Debugf("LeaderServer.startProxy(): Cannot find killch for proxy (TCP connection = %s).", peer.GetAddr())
255		log.Current.Debugf("LeaderServer.startProxy(): Cannot start follower sync.")
256		return
257	}
258
259	// this go-routine will be blocked until handshake is completed between the
260	// leader and the follower.  By then, the leader will also get majority
261	// confirmation that it is a leader.
262	select {
263	case success := <-donech:
264		if success {
265			// tell the leader to add this follower for processing request.  If there is a follower running already,
266			// AddFollower() will terminate the existing follower instance, and then create a new one.
267			fid := proxy.GetFid()
268			if proxy.CanFollowerVote() {
269				l.leader.AddFollower(fid, peer, o)
270				log.Current.Debugf("LeaderServer.startProxy(): Synchronization with follower %s done (TCP conn = %s).  Add follower.",
271					fid, peer.GetAddr())
272
273				// At this point, the follower has voted this server as the leader.
274				// Notify the request processor to start processing new request for this host
275				l.notifyReady()
276			} else {
277				l.leader.AddWatcher(fid, peer, o)
278				log.Current.Debugf("LeaderServer.startProxy(): Sync with watcher done.  Add Watcher %s (TCP conn = %s)",
279					fid, peer.GetAddr())
280			}
281		} else {
282			log.Current.Errorf("LeaderServer:startProxy(): Leader Fail to synchronization with follower (TCP conn = %s)", peer.GetAddr())
283		}
284	case <-killch:
285		log.Current.Infof("LeaderServer:startProxy(): Sync proxy is killed while synchronizing with follower (TCP conn == %s)",
286			peer.GetAddr())
287	}
288}
289
290//
291// Create a new LeaderState
292//
293func newLeaderState(ss RequestMgr) *LeaderState {
294	state := &LeaderState{requestMgr: ss,
295		ready:   false,
296		readych: make(chan bool, 1), // buffered so sender won't wait
297		proxies: make(map[string](chan bool))}
298
299	return state
300}
301
302//
303// Increment the epoch.  Only call this method if the ensemble size is 1 (single server).
304// This function can panic if the epoch reaches its limit.
305//
306func (l *LeaderServer) incrementEpoch() error {
307
308	epoch, err := l.handler.GetCurrentEpoch()
309	if err != nil {
310		return err
311	}
312
313	epoch = common.CompareAndIncrementEpoch(epoch, epoch)
314
315	log.Current.Debugf("LeaderServer.incrementEpoch(): new epoch %d", epoch)
316
317	if err := l.handler.NotifyNewAcceptedEpoch(epoch); err != nil {
318		return err
319	}
320
321	if err := l.handler.NotifyNewCurrentEpoch(epoch); err != nil {
322		return err
323	}
324
325	return nil
326}
327
328/////////////////////////////////////////////////////////////////////////////
329// LeaderServer - Private Function : Broadcast phase (handle request)
330/////////////////////////////////////////////////////////////////////////////
331
332//
333// Goroutine for processing each request one-by-one
334//
335func (s *LeaderServer) processRequest(killch <-chan bool,
336	listenerState *ListenerState,
337	reqHandler CustomRequestHandler) (err error) {
338
339	defer func() {
340		if r := recover(); r != nil {
341			log.Current.Errorf("panic in LeaderServer.processRequest() : %s\n", r)
342			log.Current.Errorf("%s", log.Current.StackTrace())
343			err = r.(error)
344		} else {
345			log.Current.Debugf("LeaderServer.processRequest() : Terminates.")
346			log.Current.Tracef(log.Current.StackTrace())
347		}
348
349		common.SafeRun("LeaderServer.processRequest()",
350			func() {
351				listenerState.killch <- true
352			})
353	}()
354
355	// start processing loop after I am being confirmed as a leader (there
356	// is a quorum of followers that have sync'ed with me)
357	if !s.waitTillReady() {
358		return common.NewError(common.ELECTION_ERROR,
359			"LeaderServer.processRequest(): Leader times out waiting for quorum of followers. Terminate")
360	}
361
362	// At this point, the leader has gotten a majority of followers to follow, so it
363	// can proceed.  It is possible that it may loose quorum of followers. But in that
364	// case, the leader will not be able to process any request.
365	log.Current.Debugf("LeaderServer.processRequest(): Leader Server is ready to proces request")
366
367	// Leader is ready at this time.  This implies that there is a quorum of follower has
368	// followed this leader.  Get the change channel to keep track of  number of followers.
369	// If the leader no longer has quorum, it needs to let go of its leadership.
370	leaderchangech := s.leader.GetEnsembleChangeChannel()
371	ensembleSize := s.handler.GetEnsembleSize()
372
373	// notify the request processor to start processing new request
374	incomings := s.state.requestMgr.GetRequestChannel()
375
376	var outgoings <-chan common.Packet = nil
377	if reqHandler != nil {
378		outgoings = reqHandler.GetResponseChannel()
379	} else {
380		outgoings = make(<-chan common.Packet)
381	}
382
383	for {
384		select {
385		case handle, ok := <-incomings:
386			if ok {
387				// de-queue the request
388				s.state.requestMgr.AddPendingRequest(handle)
389
390				// forward request to the leader
391				s.leader.QueueRequest(s.leader.GetFollowerId(), handle.Request)
392			} else {
393				// server shutdown.
394				log.Current.Debugf("LeaderServer.processRequest(): channel for receiving client request is closed. Terminate.")
395				return nil
396			}
397		case msg, ok := <-outgoings:
398			if ok {
399				// forward msg to the leader
400				s.leader.QueueResponse(msg)
401			} else {
402				log.Current.Infof("LeaderServer.processRequest(): channel for receiving custom response is closed. Ignore.")
403			}
404		case <-killch:
405			// server shutdown
406			log.Current.Debugf("LeaderServer.processRequest(): receive kill signal. Stop Client request processing.")
407			return nil
408		case <-listenerState.donech:
409			// listener is down.  Terminate this request processing loop as well.
410			log.Current.Infof("LeaderServer.processRequest(): follower listener terminates. Stop client request processing.")
411			return nil
412		case <-leaderchangech:
413			// Listen to any change to the leader's active ensemble, and to ensure that the leader maintain majority.
414			// The active ensemble is the set of running followers connected to the leader.
415			numFollowers := s.leader.GetActiveEnsembleSize()
416			if numFollowers <= int(ensembleSize/2) {
417				// leader looses majority of follower.
418				log.Current.Infof("LeaderServer.processRequest(): leader looses majority of follower. Stop client request processing.")
419				return nil
420			}
421		}
422	}
423
424	return nil
425}
426
427/////////////////////////////////////////////////////////////////////////////
428// Private Function for protecting shared state
429/////////////////////////////////////////////////////////////////////////////
430
431//
432// Notify when server is ready
433//
434func (s *LeaderServer) notifyReady() {
435
436	s.state.mutex.Lock()
437	defer s.state.mutex.Unlock()
438
439	if !s.state.ready {
440		s.state.ready = true
441		s.state.readych <- true
442	}
443}
444
445//
446// Wait for the ready flag to be set.  This is when the leader has gotten
447// the quorum of followers to join/sync.
448//
449func (s *LeaderServer) waitTillReady() bool {
450
451	timeout := time.After(common.LEADER_TIMEOUT * time.Millisecond)
452
453	select {
454	case <-s.state.readych:
455		return true
456
457	case <-timeout:
458		log.Current.Infof("LeaderServer.waitTillReady(): Leader cannot get quorum of followers to follow before timing out. Termiate.")
459		return false
460	}
461}
462
463//
464// Tell if the server is ready
465//
466func (s *LeaderServer) isReady() bool {
467	s.state.mutex.Lock()
468	defer s.state.mutex.Unlock()
469
470	return s.state.ready
471}
472
473/////////////////////////////////////////////////////////////////////////////
474// Private Function for mananaging proxies
475/////////////////////////////////////////////////////////////////////////////
476
477// Add Proxy
478func (s *LeaderServer) registerOutstandingProxy(key string) bool {
479	s.state.mutex.Lock()
480	defer s.state.mutex.Unlock()
481
482	_, ok := s.state.proxies[key]
483	if !ok {
484		killch := make(chan bool, 1) // make it buffered
485		s.state.proxies[key] = killch
486		return true
487	}
488
489	return false
490}
491
492//
493// Get proxy kill channel
494//
495func (s *LeaderServer) getProxyKillChan(key string) <-chan bool {
496	s.state.mutex.Lock()
497	defer s.state.mutex.Unlock()
498
499	killch, ok := s.state.proxies[key]
500	if ok {
501		return killch
502	}
503
504	return nil
505}
506
507//
508// Get proxy kill channel
509//
510func (s *LeaderServer) deregisterOutstandingProxy(key string) {
511	s.state.mutex.Lock()
512	defer s.state.mutex.Unlock()
513
514	delete(s.state.proxies, key)
515}
516
517//
518// Terminate all proxies
519//
520func (s *LeaderServer) terminateAllOutstandingProxies() {
521	// TODO: Should copy the proxies and release the mutex
522	// before sending to the channels
523	s.state.mutex.Lock()
524	defer s.state.mutex.Unlock()
525
526	for _, killch := range s.state.proxies {
527		killch <- true
528	}
529}
530
531//
532// Create the listener state
533//
534func newListenerState() *ListenerState {
535	return &ListenerState{killch: make(chan bool, 1), // buffered so sender won't block
536		donech: make(chan bool, 1)} // buffered so sender won't block
537}
538