1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package protocol
17
18import (
19	"github.com/couchbase/gometa/common"
20	"github.com/couchbase/gometa/log"
21	"sync"
22	"time"
23)
24
25/////////////////////////////////////////////////////////////////////////////
26// WatcherServer - Public Function
27/////////////////////////////////////////////////////////////////////////////
28
29//
30// Create a new WatcherServer. This is a blocking call until
31// the WatcherServer terminates. Make sure the kilch is a buffered
32// channel such that if the goroutine running RunWatcherServer goes
33// away, the sender won't get blocked.
34//
35func RunWatcherServerWithRequest(leader string,
36	requestMgr RequestMgr,
37	handler ActionHandler,
38	factory MsgFactory,
39	killch <-chan bool,
40	readych chan<- bool,
41	alivech chan<- bool,
42	pingch <-chan bool) {
43
44	var once sync.Once
45	backoff := common.RETRY_BACKOFF
46	retry := true
47	for retry {
48		log.Current.Debugf("WatcherServer.runWatcherServer() : runOnce() returns with error.  Retry ...")
49		if runOnce(leader, requestMgr, handler, factory, killch, readych, alivech, pingch, &once) {
50			retry = false
51		}
52
53		if retry {
54			timer := time.NewTimer(backoff * time.Millisecond)
55			select {
56			case <-timer.C:
57			case <-killch:
58				return
59			}
60
61			backoff += backoff
62			if backoff > common.MAX_RETRY_BACKOFF {
63				backoff = common.MAX_RETRY_BACKOFF
64			}
65		}
66	}
67}
68
69//
70// Create a new WatcherServer. This is a blocking call until
71// the WatcherServer terminates. Make sure the kilch is a buffered
72// channel such that if the goroutine running RunWatcherServer goes
73// away, the sender won't get blocked.
74//
75func RunWatcherServer(leader string,
76	handler ActionHandler,
77	factory MsgFactory,
78	killch <-chan bool,
79	readych chan<- bool) {
80
81	RunWatcherServerWithRequest(leader, nil, handler, factory, killch, readych, make(chan bool, 1), make(chan bool, 1))
82}
83
84//
85// Create a new WatcherServer. This is a blocking call until
86// the WatcherServer terminates. Make sure the kilch is a buffered
87// channel such that if the goroutine running RunWatcherServer goes
88// away, the sender won't get blocked.
89//
90func RunWatcherServerWithElection(host string,
91	peerUDP []string,
92	peerTCP []string,
93	requestMgr RequestMgr,
94	handler ActionHandler,
95	factory MsgFactory,
96	killch <-chan bool,
97	readych chan<- bool) {
98
99	var once sync.Once
100	backoff := common.RETRY_BACKOFF
101	retry := true
102	for retry {
103		peer, isKilled := findPeerToConnect(host, peerUDP, peerTCP, factory, handler, killch)
104		if isKilled {
105			return
106		}
107
108		if peer != "" && runOnce(peer, requestMgr, handler, factory, killch, readych, make(chan bool, 1), make(chan bool, 1), &once) {
109			retry = false
110		}
111
112		if retry {
113			timer := time.NewTimer(backoff * time.Millisecond)
114			<-timer.C
115
116			backoff += backoff
117			if backoff > common.MAX_RETRY_BACKOFF {
118				backoff = common.MAX_RETRY_BACKOFF
119			}
120		}
121	}
122}
123
124/////////////////////////////////////////////////////////////////////////////
125// WatcherServer - Execution Loop
126/////////////////////////////////////////////////////////////////////////////
127
128func runOnce(peer string,
129	requestMgr RequestMgr,
130	handler ActionHandler,
131	factory MsgFactory,
132	killch <-chan bool,
133	readych chan<- bool,
134	alivech chan<- bool,
135	pingch <-chan bool,
136	once *sync.Once) (isKilled bool) {
137
138	// Catch panic at the main entry point for WatcherServer
139	defer func() {
140		if r := recover(); r != nil {
141			log.Current.Errorf("panic in WatcherServer.runOnce() : %s\n", r)
142			log.Current.Errorf("%s", log.Current.StackTrace())
143		} else {
144			log.Current.Debugf("WatcherServer.runOnce() terminates.")
145			log.Current.Tracef(log.Current.StackTrace())
146		}
147
148		if requestMgr != nil {
149			requestMgr.CleanupOnError()
150		}
151	}()
152
153	// create connection with a peer
154	conn, err := createConnection(peer)
155	if err != nil {
156		log.Current.Errorf("WatcherServer.runOnce() error : %s", err)
157		return false
158	}
159	pipe := common.NewPeerPipe(conn)
160	log.Current.Debugf("WatcherServer.runOnce() : Watcher successfully created TCP connection to peer %s", peer)
161
162	// close the connection to the peer. If connection is closed,
163	// sync proxy and watcher will also terminate by err-ing out.
164	// If sync proxy and watcher terminates the pipe upon termination,
165	// it is ok to close it again here.
166	defer common.SafeRun("WatcherServer.runOnce()",
167		func() {
168			pipe.Close()
169		})
170
171	// start syncrhorniziing with the metadata server
172	success, isKilled := syncWithPeer(pipe, handler, factory, killch)
173
174	// run watcher after synchronization
175	if success {
176		if !runWatcher(pipe, requestMgr, handler, factory, killch, readych, alivech, pingch, once) {
177			log.Current.Errorf("WatcherServer.runOnce() : Watcher terminated unexpectedly.")
178			return false
179		}
180
181	} else if !isKilled {
182		log.Current.Errorf("WatcherServer.runOnce() : Watcher fail to synchronized with peer %s", peer)
183		return false
184	}
185
186	return true
187}
188
189/////////////////////////////////////////////////////////////////////////////
190// WatcherServer - Election and Synchronization
191/////////////////////////////////////////////////////////////////////////////
192
193//
194// Synchronize with the leader.
195//
196func syncWithPeer(pipe *common.PeerPipe,
197	handler ActionHandler,
198	factory MsgFactory,
199	killch <-chan bool) (success bool, isKilled bool) {
200
201	log.Current.Debugf("WatcherServer.syncWithPeer(): Watcher start synchronization with peer (TCP %s)", pipe.GetAddr())
202	proxy := NewFollowerSyncProxy(pipe, handler, factory, false)
203	donech := proxy.GetDoneChannel()
204	go proxy.Start()
205	defer proxy.Terminate()
206
207	// This will block until NewWatcherSyncProxy has sychronized with the peer (a bool is pushed to donech)
208	select {
209	case success = <-donech:
210		if success {
211			log.Current.Debugf("WatcherServer.syncWithPeer(): Watcher done synchronization with peer (TCP %s)", pipe.GetAddr())
212		}
213		return success, false
214	case <-killch:
215		// simply return. The pipe will eventually be closed and
216		// cause WatcherSyncProxy to err out.
217		log.Current.Debugf("WatcherServer.syncWithPeer(): Recieve kill singal.  Synchronization with peer (TCP %s) terminated.",
218			pipe.GetAddr())
219		return false, true
220	}
221}
222
223//
224// Find which peer to connect to
225//
226func findPeerToConnect(host string,
227	peerUDP []string,
228	peerTCP []string,
229	factory MsgFactory,
230	handler ActionHandler,
231	killch <-chan bool) (leader string, isKilled bool) {
232
233	defer func() {
234		if r := recover(); r != nil {
235			log.Current.Errorf("panic in findPeerToConnect() : %s\n", r)
236			log.Current.Errorf("%s", log.Current.StackTrace())
237		} else {
238			log.Current.Debugf("findPeerToConnect() terminates : Diagnostic Stack ...")
239			log.Current.LazyDebug(log.Current.StackTrace)
240		}
241	}()
242
243	// Run master election to figure out who is the leader.  Only connect to leader for now.
244	site, err := CreateElectionSite(host, peerUDP, factory, handler, true)
245	if err != nil {
246		log.Current.Errorf("WatcherServer.findPeerToConnect() error : %s", err)
247		return "", false
248	}
249
250	defer func() {
251		common.SafeRun("Server.cleanupState()",
252			func() {
253				site.Close()
254			})
255	}()
256
257	resultCh := site.StartElection()
258	if resultCh == nil {
259		log.Current.Errorf("WatcherServer.findPeerToConnect: Election Site is in progress or is closed.")
260		return "", false
261	}
262
263	select {
264	case leader, ok := <-resultCh:
265		if !ok {
266			log.Current.Errorf("WatcherServer.findPeerToConnect: Election Fails")
267			return "", false
268		}
269
270		for i, peer := range peerUDP {
271			if peer == leader {
272				return peerTCP[i], false
273			}
274		}
275
276		log.Current.Errorf("WatcherServer.findPeerToConnect : Cannot find matching port for peer. Peer UPD port = %s", leader)
277		return "", false
278
279	case <-killch:
280		return "", true
281	}
282}
283
284/////////////////////////////////////////////////////////////////////////////
285// WatcherServer - Watcher Protocol
286/////////////////////////////////////////////////////////////////////////////
287
288//
289// Run Watcher Protocol
290//
291func runWatcher(pipe *common.PeerPipe,
292	requestMgr RequestMgr,
293	handler ActionHandler,
294	factory MsgFactory,
295	killch <-chan bool,
296	readych chan<- bool,
297	alivech chan<- bool,
298	pingch <-chan bool,
299	once *sync.Once) (isKilled bool) {
300
301	// Create a watcher.  The watcher will start a go-rountine, listening to messages coming from peer.
302	log.Current.Debugf("WatcherServer.runWatcher(): Start Watcher Protocol")
303	watcher := NewFollower(WATCHER, pipe, handler, factory)
304	donech := watcher.Start()
305	defer watcher.Terminate()
306
307	// notify that the watcher is starting to run.  Only do this once.
308	once.Do(func() { readych <- true })
309
310	log.Current.Debugf("WatcherServer.runWatcher(): Watcher is ready to process request")
311
312	var incomings <-chan *RequestHandle
313	if requestMgr != nil {
314		incomings = requestMgr.GetRequestChannel()
315	} else {
316		incomings = make(chan *RequestHandle)
317	}
318
319	for {
320		select {
321		case handle, ok := <-incomings:
322			if ok {
323				// move request to pending queue (waiting for proposal)
324				requestMgr.AddPendingRequest(handle)
325
326				// forward the request to the leader
327				if !watcher.ForwardRequest(handle.Request) {
328					log.Current.Errorf("WatcherServer.processRequest(): fail to send client request to leader. Terminate.")
329					return
330				}
331			} else {
332				log.Current.Debugf("WatcherServer.processRequest(): channel for receiving client request is closed. Terminate.")
333				return
334			}
335		case <-killch:
336			// server is being explicitly terminated.  Terminate the watcher go-rountine as well.
337			log.Current.Debugf("WatcherServer.runTillEnd(): receive kill signal. Terminate.")
338			return true
339		case <-donech:
340			// watcher is done.  Just return.
341			log.Current.Debugf("WatcherServer.runTillEnd(): Watcher go-routine terminates. Terminate.")
342			return false
343		case <-pingch:
344			if len(alivech) == 0 {
345				alivech <- true
346			}
347		}
348	}
349}
350