1// Copyright (c) 2014 Couchbase, Inc.
2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3// except in compliance with the License. You may obtain a copy of the License at
4//   http://www.apache.org/licenses/LICENSE-2.0
5// Unless required by applicable law or agreed to in writing, software distributed under the
6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7// either express or implied. See the License for the specific language governing permissions
8// and limitations under the License.
9
10package manager
11
12import (
13	"github.com/couchbase/gometa/action"
14	"github.com/couchbase/gometa/common"
15	"github.com/couchbase/gometa/message"
16	"github.com/couchbase/gometa/protocol"
17	repo "github.com/couchbase/gometa/repository"
18	"github.com/couchbase/indexing/secondary/logging"
19	"sync"
20)
21
22///////////////////////////////////////////////////////
23// Type Definition
24///////////////////////////////////////////////////////
25
26type watcher struct {
27	mgr         *IndexManager
28	leaderAddr  string
29	watcherAddr string
30	txn         *common.TxnState
31	repo        *repo.Repository
32	factory     protocol.MsgFactory
33	handler     *action.ServerAction
34	killch      chan bool
35	status      protocol.PeerStatus
36
37	mutex         sync.Mutex
38	isClosed      bool
39	observes      map[string]*observeHandle
40	notifications map[common.Txnid]*notificationHandle
41}
42
43type observeHandle struct {
44	key         string
45	w           *watcher
46	mutex       sync.Mutex
47	condVar     *sync.Cond
48	checkForAdd bool
49	done        bool
50}
51
52type notificationHandle struct {
53	key     string
54	content []byte
55	evtType EventType
56}
57
58///////////////////////////////////////////////////////
59// private function : Watcher
60///////////////////////////////////////////////////////
61
62func startWatcher(mgr *IndexManager,
63	repo *repo.Repository,
64	leaderAddr string,
65	watcherId string) (s *watcher, err error) {
66
67	s = new(watcher)
68
69	s.mgr = mgr
70	s.leaderAddr = leaderAddr
71	s.repo = repo
72	s.isClosed = false
73	s.observes = make(map[string]*observeHandle)
74	s.notifications = make(map[common.Txnid]*notificationHandle)
75
76	s.watcherAddr = watcherId
77	if err != nil {
78		return nil, err
79	}
80	logging.Debugf("watcher.startWatcher(): watcher follower ID %s", s.watcherAddr)
81
82	s.txn = common.NewTxnState()
83	s.factory = message.NewConcreteMsgFactory()
84	// TODO: Using DefaultServerAction, but need a callback on LogAndCommit
85	s.handler = action.NewDefaultServerAction(s.repo, s, s.txn)
86	s.killch = make(chan bool, 1) // make it buffered to unblock sender
87	s.status = protocol.ELECTING
88
89	readych := make(chan bool)
90
91	// TODO: call Close() to cleanup the state upon retry by the watcher server
92	go protocol.RunWatcherServer(
93		leaderAddr,
94		s.handler,
95		s.factory,
96		s.killch,
97		readych)
98
99	// TODO: timeout
100	<-readych
101
102	return s, nil
103}
104
105func (s *watcher) Close() {
106	s.mutex.Lock()
107	defer s.mutex.Unlock()
108
109	if !s.isClosed {
110		s.isClosed = true
111		s.killch <- true
112	}
113
114	for _, handle := range s.observes {
115		handle.signal(false)
116	}
117}
118
119func (s *watcher) Get(key string) ([]byte, error) {
120	return s.handler.Get(key)
121}
122
123func (s *watcher) Set(key string, content []byte) error {
124	return s.handler.Set(key, content)
125}
126
127/////////////////////////////////////////////////////////////////////////////
128// Private Function : Metadata Observe
129/////////////////////////////////////////////////////////////////////////////
130
131//
132// Observe will first for existence of the local repository in the watcher.
133// If the condition is satisied, then this function will just return.  Otherwise,
134// this function will wait until condition arrives when dictionary notifies the
135// watcher on new metadata update.
136// TODO: Support timeout
137//
138func (s *watcher) observeForAdd(key string) {
139
140	handle := func() *observeHandle {
141		s.mutex.Lock()
142		defer s.mutex.Unlock()
143
144		// This function now has the mutex.  Check if the key if it already exists.
145		value, err := s.Get(key)
146		if err == nil && value != nil {
147			return nil
148		}
149
150		// Create a handle to observe the key when it is committed.  Note that the watcher
151		// will need to acquire the mutex for processing the commit.  Therefore, we don't
152		// have to worry about race condition.
153		handle, ok := s.observes[key]
154		if !ok {
155			handle = newObserveHandle(key)
156			s.observes[key] = handle
157		}
158
159		return handle
160	}()
161
162	if handle != nil {
163		// wait to get notified
164		handle.wait()
165
166		// Double check if the key exist
167		value, err := s.Get(key)
168		if err == nil && value != nil {
169			return
170		}
171
172		// If key still does not exist, then continue to wait
173		s.observeForAdd(key)
174	}
175}
176
177func (s *watcher) observeForDelete(key string) {
178
179	handle := func() *observeHandle {
180		s.mutex.Lock()
181		defer s.mutex.Unlock()
182
183		// TODO : Check for the real error for non-existence (from goforestDB)
184		value, err := s.Get(key)
185		if err != nil || value == nil {
186			return nil
187		}
188
189		// Create a handle to observe the key when it is committed.  Note that the watcher
190		// will need to acquire the mutex for processing the commit.  Therefore, we don't
191		// have to worry about race condition.
192		handle, ok := s.observes[key]
193		if !ok {
194			handle = newObserveHandle(key)
195			s.observes[key] = handle
196		}
197
198		return handle
199	}()
200
201	if handle != nil {
202		handle.wait()
203
204		// Double check if the key exist
205		value, err := s.Get(key)
206		if err != nil || value == nil {
207			return
208		}
209
210		// If key still exist, then continue to wait
211		s.observeForDelete(key)
212	}
213}
214
215func (s *watcher) removeObserve(handle *observeHandle) {
216	s.mutex.Lock()
217	defer s.mutex.Unlock()
218
219	h, ok := s.observes[handle.key]
220	if ok && h == handle {
221		delete(s.observes, handle.key)
222	}
223}
224
225func newObserveHandle(key string) *observeHandle {
226	handle := new(observeHandle)
227	handle.condVar = sync.NewCond(&handle.mutex)
228	handle.key = key
229	handle.done = false
230
231	return handle
232}
233
234func (o *observeHandle) wait() {
235	o.condVar.L.Lock()
236	defer o.condVar.L.Unlock()
237	o.condVar.Wait()
238}
239
240func (o *observeHandle) signal(done bool) {
241	o.w.removeObserve(o)
242
243	o.condVar.L.Lock()
244	defer o.condVar.L.Unlock()
245	o.done = done
246	o.condVar.Broadcast()
247}
248
249/////////////////////////////////////////////////////////////////////////////
250// Private Function : Metadata Notification
251/////////////////////////////////////////////////////////////////////////////
252
253func newNotificationHandle(key string, evtType EventType, content []byte) *notificationHandle {
254	handle := new(notificationHandle)
255	handle.key = key
256	handle.evtType = evtType
257	handle.content = content
258
259	return handle
260}
261
262/////////////////////////////////////////////////////////////////////////////
263// ServerCallback Interface
264/////////////////////////////////////////////////////////////////////////////
265
266func (s *watcher) UpdateStateOnNewProposal(proposal protocol.ProposalMsg) {
267	s.mutex.Lock()
268	defer s.mutex.Unlock()
269
270	opCode := common.OpCode(proposal.GetOpCode())
271	logging.Debugf("Watcher.UpdateStateOnNewProposal(): receive proposal on metadata kind %d", findTypeFromKey(proposal.GetKey()))
272
273	// register the event for notification
274	var evtType EventType = EVENT_NONE
275	switch opCode {
276	case common.OPCODE_ADD:
277		metaType := findTypeFromKey(proposal.GetKey())
278		if metaType == KIND_INDEX_DEFN {
279			evtType = EVENT_CREATE_INDEX
280		} else if metaType == KIND_TOPOLOGY {
281			evtType = EVENT_UPDATE_TOPOLOGY
282		}
283	case common.OPCODE_SET:
284		metaType := findTypeFromKey(proposal.GetKey())
285		if metaType == KIND_INDEX_DEFN {
286			evtType = EVENT_CREATE_INDEX
287		} else if metaType == KIND_TOPOLOGY {
288			evtType = EVENT_UPDATE_TOPOLOGY
289		}
290	case common.OPCODE_DELETE:
291		if findTypeFromKey(proposal.GetKey()) == KIND_INDEX_DEFN {
292			evtType = EVENT_DROP_INDEX
293		}
294	default:
295		logging.Debugf("Watcher.UpdateStateOnNewProposal(): recieve proposal with opcode %d.  Skip convert proposal to event.", opCode)
296	}
297
298	logging.Debugf("Watcher.UpdateStateOnNewProposal(): convert metadata type to event  %d", evtType)
299	if evtType != EVENT_NONE {
300		logging.Debugf("Watcher.UpdateStateOnNewProposal(): register event for txid %d", proposal.GetTxnid())
301		s.notifications[common.Txnid(proposal.GetTxnid())] =
302			newNotificationHandle(proposal.GetKey(), evtType, proposal.GetContent())
303	}
304}
305
306func (s *watcher) UpdateStateOnCommit(txnid common.Txnid, key string) {
307	s.mutex.Lock()
308	defer s.mutex.Unlock()
309
310	handle, ok := s.observes[key]
311	if ok && handle != nil {
312		// Signal will remove observeHandle from watcher
313		handle.signal(true)
314	}
315
316	notification, ok := s.notifications[txnid]
317	if ok && notification != nil && s.mgr != nil {
318		logging.Debugf("Watcher.UpdateStateOnCommit(): notify event for txid %d", txnid)
319		s.mgr.notify(notification.evtType, notification.content)
320		delete(s.notifications, txnid)
321	}
322}
323
324func (s *watcher) UpdateStateOnRespond(fid string, reqId uint64, err string, content []byte) {
325}
326
327func (s *watcher) GetStatus() protocol.PeerStatus {
328	return s.status
329}
330
331func (s *watcher) UpdateWinningEpoch(epoch uint32) {
332}
333
334func (s *watcher) GetEnsembleSize() uint64 {
335	return 1 // just myself -- only used for leader election
336}
337
338func (s *watcher) GetFollowerId() string {
339	return s.watcherAddr
340}
341
342/////////////////////////////////////////////////////////////////////////////
343// QuorumVerifier
344/////////////////////////////////////////////////////////////////////////////
345
346func (s *watcher) HasQuorum(count int) bool {
347	ensembleSz := s.handler.GetEnsembleSize() - 1
348	return count > int(ensembleSz/2)
349}
350