1// @author Couchbase <info@couchbase.com>
2// @copyright 2014 Couchbase, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package main
17
18import (
19	"fmt"
20	"github.com/couchbase/gometa/action"
21	"github.com/couchbase/gometa/common"
22	"github.com/couchbase/gometa/message"
23	"github.com/couchbase/gometa/protocol"
24	repo "github.com/couchbase/gometa/repository"
25	"github.com/couchbase/gometa/server"
26)
27
28type fakeServer struct {
29	repo    *repo.Repository
30	factory protocol.MsgFactory
31	handler *action.ServerAction
32	txn     *common.TxnState
33	killch  chan bool
34	status  protocol.PeerStatus
35}
36
37func runWatcher(path string) {
38
39	if path == "" {
40		fmt.Printf("Missing configuration")
41		return
42
43	}
44
45	// setup env
46	if err := server.NewEnv(path); err != nil {
47		return
48	}
49
50	// create a fake server
51	fs := new(fakeServer)
52	fs.bootstrap()
53
54	readych := make(chan bool) // blocking
55
56	go protocol.RunWatcherServerWithElection(
57		server.GetHostUDPAddr(),
58		server.GetPeerUDPAddr(),
59		server.GetPeerTCPAddr(),
60		nil,
61		fs.handler,
62		fs.factory,
63		fs.killch,
64		readych)
65
66	<-readych
67
68	runConsole(fs)
69}
70
71func runConsole(fs *fakeServer) {
72
73	for {
74		// read command from console
75		var key string
76
77		fmt.Printf("Enter Key to Retrieve\n")
78		_, err := fmt.Scanf("%s", &key)
79		if err != nil {
80			fmt.Printf("Error : %s", err.Error())
81			continue
82		}
83
84		value, err := fs.handler.Get(key)
85		if err != nil {
86			fmt.Printf("Error : %s", err.Error())
87			continue
88		}
89
90		if value != nil {
91			fmt.Printf("Result = %s \n", string(value))
92		} else {
93			fmt.Printf("Result not found\n")
94		}
95	}
96}
97
98func (s *fakeServer) bootstrap() (err error) {
99
100	// Initialize repository service
101	s.repo, err = repo.OpenRepository()
102	if err != nil {
103		return err
104	}
105
106	s.txn = common.NewTxnState()
107	s.factory = message.NewConcreteMsgFactory()
108	s.handler = action.NewDefaultServerAction(s.repo, s, s.txn)
109	s.killch = make(chan bool, 1) // make it buffered to unblock sender
110	s.status = protocol.ELECTING
111
112	return nil
113}
114
115/////////////////////////////////////////////////////////////////////////////
116// ServerCallback Interface
117/////////////////////////////////////////////////////////////////////////////
118
119func (s *fakeServer) UpdateStateOnNewProposal(proposal protocol.ProposalMsg) {
120}
121
122func (s *fakeServer) UpdateStateOnRespond(fid string, reqId uint64, err string, content []byte) {
123}
124
125func (s *fakeServer) UpdateStateOnCommit(txnid common.Txnid, key string) {
126}
127
128func (s *fakeServer) GetStatus() protocol.PeerStatus {
129	return s.status
130}
131
132func (s *fakeServer) UpdateWinningEpoch(epoch uint32) {
133}
134
135func (s *fakeServer) GetEnsembleSize() uint64 {
136	return uint64(len(server.GetPeerUDPAddr())) + 1 // including myself
137}
138
139func (s *fakeServer) GetFollowerId() string {
140	return server.GetHostTCPAddr()
141}
142
143/////////////////////////////////////////////////////////////////////////////
144// QuorumVerifier
145/////////////////////////////////////////////////////////////////////////////
146
147func (s *fakeServer) HasQuorum(count int) bool {
148	ensembleSz := s.handler.GetEnsembleSize() - 1
149	return count > int(ensembleSz/2)
150}
151