1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"fmt"
20	"github.com/couchbase/gometa/common"
21	"github.com/couchbase/gometa/log"
22	"net"
23)
24
25//////////////////////////////////////////////////////////////////////////////
26// Type Declaration
27/////////////////////////////////////////////////////////////////////////////
28
29type FollowerServer struct {
30	follower *Follower
31	state    *FollowerState
32}
33
34type FollowerState struct {
35	requestMgr RequestMgr
36}
37
38/////////////////////////////////////////////////////////////////////////////
39// FollowerServer - Public Function
40/////////////////////////////////////////////////////////////////////////////
41
42//
43// Create a new FollowerServer. This is a blocking call until
44// the FollowerServer terminates. Make sure the kilch is a buffered
45// channel such that if the goroutine running RunFollowerServer goes
46// away, the sender won't get blocked.
47//
48func RunFollowerServer(naddr string,
49	leader string,
50	ss RequestMgr,
51	handler ActionHandler,
52	factory MsgFactory,
53	killch <-chan bool) (err error) {
54
55	// Catch panic at the main entry point for FollowerServer
56	defer func() {
57		if r := recover(); r != nil {
58			log.Current.Errorf("panic in RunFollowerServer() : %s\n", r)
59			log.Current.Errorf("%s", log.Current.StackTrace())
60			err = r.(error)
61		} else {
62			log.Current.Debugf("%s", "RunFollowerServer terminates.")
63			log.Current.Tracef(log.Current.StackTrace())
64		}
65	}()
66
67	// create connection to leader
68	conn, err := createConnection(leader)
69	if err != nil {
70		return err
71	}
72
73	pipe := common.NewPeerPipe(conn)
74	log.Current.Debugf("FollowerServer.RunFollowerServer() : Follower %s successfully "+
75		"created TCP connection to leader %s, local address %s", naddr, leader, conn.LocalAddr())
76
77	// close the connection to the leader. If connection is closed,
78	// sync proxy and follower will also terminate by err-ing out.
79	// If sync proxy and follower terminates the pipe upon termination,
80	// it is ok to close it again here.
81	defer common.SafeRun("FollowerServer.runFollowerServer()",
82		func() {
83			pipe.Close()
84		})
85
86	// start syncrhorniziing with the leader
87	success := syncWithLeader(naddr, pipe, handler, factory, killch)
88
89	// run server after synchronization
90	if success {
91		runFollower(pipe, ss, handler, factory, killch)
92		log.Current.Debugf("FollowerServer.RunFollowerServer() : Follower Server %s terminate", naddr)
93		err = nil
94	} else {
95		err = common.NewError(common.SERVER_ERROR, fmt.Sprintf("Follower %s fail to synchronized with leader %s",
96			naddr, leader))
97	}
98
99	return err
100}
101
102/////////////////////////////////////////////////////////////////////////////
103// FollowerServer - Private Function
104/////////////////////////////////////////////////////////////////////////////
105
106//
107// Synchronize with the leader.
108//
109func syncWithLeader(naddr string,
110	pipe *common.PeerPipe,
111	handler ActionHandler,
112	factory MsgFactory,
113	killch <-chan bool) bool {
114
115	log.Current.Debugf("FollowerServer.syncWithLeader(): Follower %s start synchronization with leader (TCP %s)",
116		naddr, pipe.GetAddr())
117	proxy := NewFollowerSyncProxy(pipe, handler, factory, true)
118	donech := proxy.GetDoneChannel()
119	go proxy.Start()
120	defer proxy.Terminate()
121
122	// This will block until NewFollowerSyncProxy has sychronized with the leader (a bool is pushed to donech)
123	select {
124	case success := <-donech:
125		if success {
126			log.Current.Debugf("FollowerServer.syncWithLeader(): Follower %s done synchronization with leader (TCP %s)",
127				naddr, pipe.GetAddr())
128		}
129		return success
130	case <-killch:
131		// simply return. The pipe will eventually be closed and
132		// cause FollowerSyncProxy to err out.
133		log.Current.Debugf("FollowerServer.syncWithLeader(): Recieve kill singal.  Synchronization with leader (TCP %s) terminated.",
134			pipe.GetAddr())
135	}
136
137	return false
138}
139
140//
141// Run Follower Protocol
142//
143func runFollower(pipe *common.PeerPipe,
144	ss RequestMgr,
145	handler ActionHandler,
146	factory MsgFactory,
147	killch <-chan bool) {
148
149	// create the server
150	server := new(FollowerServer)
151
152	// create the follower state
153	server.state = newFollowerState(ss)
154
155	// Create a follower.  The follower will start a go-rountine, listening to messages coming from leader.
156	log.Current.Debugf("FollowerServer.runFollower(): Start Follower Protocol")
157	server.follower = NewFollower(FOLLOWER, pipe, handler, factory)
158	donech := server.follower.Start()
159	defer server.follower.Terminate()
160
161	//start main processing loop
162	server.processRequest(handler, factory, killch, donech)
163}
164
165//
166// main processing loop
167//
168func (s *FollowerServer) processRequest(handler ActionHandler,
169	factory MsgFactory,
170	killch <-chan bool,
171	donech <-chan bool) {
172
173	log.Current.Debugf("FollowerServer.processRequest(): Ready to process request")
174
175	incomings := s.state.requestMgr.GetRequestChannel()
176	for {
177		select {
178		case handle, ok := <-incomings:
179			if ok {
180				// move request to pending queue (waiting for proposal)
181				s.state.requestMgr.AddPendingRequest(handle)
182
183				// forward the request to the leader
184				if !s.follower.ForwardRequest(handle.Request) {
185					log.Current.Errorf("FollowerServer.processRequest(): fail to send client request to leader. Terminate.")
186					return
187				}
188			} else {
189				log.Current.Debugf("FollowerServer.processRequest(): channel for receiving client request is closed. Terminate.")
190				return
191			}
192		case <-killch:
193			// server is being explicitly terminated.  Terminate the follower go-rountine as well.
194			log.Current.Debugf("FollowerServer.processRequest(): receive kill signal. Terminate.")
195			return
196		case <-donech:
197			// follower is done.  Just return.
198			log.Current.Debugf("FollowerServer.processRequest(): Follower go-routine terminates. Terminate.")
199			return
200		}
201	}
202}
203
204//
205// Create a new FollowerState
206//
207func newFollowerState(ss RequestMgr) *FollowerState {
208
209	state := &FollowerState{requestMgr: ss}
210	return state
211}
212
213//
214// Create a connection
215//
216func createConnection(leader string) (net.Conn, error) {
217
218	leaderAddr, err := net.ResolveTCPAddr("tcp", leader)
219	if err != nil {
220		return nil, err
221	}
222
223	conn, err := net.DialTCP("tcp", nil, leaderAddr)
224	if err != nil {
225		return nil, err
226	}
227
228	conn.SetKeepAlive(true)
229	conn.SetKeepAlivePeriod(common.TCP_KEEP_ALIVE_PERIOD)
230
231	return conn, nil
232}
233