1// Temporary implementation to do Create,Drop,Refresh operations on GSI
2// cluster. Eventually be replaced by MetadataProvider.
3
4package client
5
6import "net/http"
7import "encoding/json"
8import "bytes"
9import "fmt"
10import "io/ioutil"
11import "errors"
12import "strings"
13import "sync"
14import "math"
15
16import "github.com/couchbase/indexing/secondary/logging"
17import "github.com/couchbase/indexing/secondary/common"
18import mclient "github.com/couchbase/indexing/secondary/manager/client"
19
20// indexError for a failed index-request.
21type indexError struct {
22	Code string `json:"code,omitempty"`
23	Msg  string `json:"msg,omitempty"`
24}
25
26// indexRequest message
27type indexRequest struct {
28	Version uint64    `json:"version,omitempty"`
29	Type    string    `json:"type,omitempty"`
30	Index   indexInfo `json:"index,omitempty"`
31}
32
33// indexMetaResponse for an indexRequest
34type indexMetaResponse struct {
35	Version uint64       `json:"version,omitempty"`
36	Status  string       `json:"status,omitempty"`
37	Indexes []indexInfo  `json:"indexes,omitempty"`
38	Errors  []indexError `json:"errors,omitempty"`
39}
40
41// cbqClient to access cbq-agent for admin operation on index.
42type cbqClient struct {
43	rw        sync.RWMutex // protects `indexes` field
44	adminport string
45	queryport string
46	httpc     *http.Client
47	indexes   []*mclient.IndexMetadata
48	logPrefix string
49}
50
51// newCbqClient create cbq-cluster client.
52func newCbqClient(cluster string) (*cbqClient, error) {
53	clusterUrl, err := common.ClusterAuthUrl(cluster)
54	if err != nil {
55		return nil, err
56	}
57	cinfo, err := common.NewClusterInfoCache(clusterUrl, "default" /*pooln*/)
58	if err != nil {
59		return nil, err
60	}
61	if err = cinfo.Fetch(); err != nil {
62		return nil, err
63	}
64	nodes := cinfo.GetNodesByServiceType("indexAdmin")
65	if l := len(nodes); l < 1 {
66		err := fmt.Errorf("cinfo.GetNodesByServiceType() returns %d nodes", l)
67		return nil, err
68	}
69	adminport, err := cinfo.GetServiceAddress(nodes[0], "indexAdmin")
70	if err != nil {
71		return nil, err
72	}
73	queryport, err := cinfo.GetServiceAddress(nodes[0], "indexScan")
74	if err != nil {
75		return nil, err
76	}
77
78	b := &cbqClient{
79		adminport: "http://" + adminport,
80		queryport: queryport,
81		httpc:     http.DefaultClient,
82	}
83	b.logPrefix = fmt.Sprintf("[cbqClient %v]", b.adminport)
84	return b, nil
85}
86
87func (b *cbqClient) Sync() error {
88	return nil
89}
90
91// Refresh implement BridgeAccessor{} interface.
92func (b *cbqClient) Refresh() ([]*mclient.IndexMetadata, uint64, uint64, error) {
93	var resp *http.Response
94	var mresp indexMetaResponse
95
96	// Construct request body.
97	req := indexRequest{Type: "list"}
98	body, err := json.Marshal(req)
99	if err == nil { // Post HTTP request.
100		bodybuf := bytes.NewBuffer(body)
101		url := b.adminport + "/list"
102		logging.Infof("%v posting %v to URL %v", b.logPrefix, bodybuf, url)
103		resp, err = b.httpc.Post(url, "application/json", bodybuf)
104		if err == nil {
105			defer resp.Body.Close()
106			mresp, err = b.metaResponse(resp)
107			if err == nil {
108				indexes := make([]*mclient.IndexMetadata, 0)
109				for _, info := range mresp.Indexes {
110					indexes = append(
111						indexes, newIndexMetaData(&info, b.queryport))
112				}
113				b.rw.Lock()
114				defer b.rw.Unlock()
115				b.indexes = indexes
116				return indexes, 0, common.INDEXER_CUR_VERSION, nil
117			}
118			return nil, 0, common.INDEXER_CUR_VERSION, err
119		}
120	}
121	return nil, 0, common.INDEXER_CUR_VERSION, err
122}
123
124// Nodes implement BridgeAccessor{} interface.
125func (b *cbqClient) Nodes() ([]*IndexerService, error) {
126	node := &IndexerService{
127		Adminport: b.adminport,
128		Queryport: b.queryport,
129		Status:    "online",
130	}
131	return []*IndexerService{node}, nil
132}
133
134// CreateIndex implement BridgeAccessor{} interface.
135func (b *cbqClient) CreateIndex(
136	name, bucket, using, exprType, whereExpr string,
137	secExprs []string, desc []bool, isPrimary bool,
138	scheme common.PartitionScheme, partitionKeys []string,
139	with []byte) (defnID uint64, err error) {
140
141	var resp *http.Response
142	var mresp indexMetaResponse
143
144	var withJSON = make(map[string]interface{})
145	if err := json.Unmarshal(with, &withJSON); err != nil {
146		panic(err)
147	}
148	retainDeletedXATTR := withJSON["retain_deleted_xattr"].(bool)
149
150	// Construct request body.
151	info := indexInfo{
152		Name:               name,
153		Bucket:             bucket,
154		Using:              using,
155		ExprType:           exprType,
156		WhereExpr:          whereExpr,
157		SecExprs:           secExprs,
158		IsPrimary:          isPrimary,
159		RetainDeletedXATTR: retainDeletedXATTR,
160	}
161	req := indexRequest{Type: "create", Index: info}
162	body, err := json.Marshal(req)
163	if err == nil { // Post HTTP request.
164		bodybuf := bytes.NewBuffer(body)
165		url := b.adminport + "/create"
166		logging.Infof("%v posting %v to URL %v", b.logPrefix, bodybuf, url)
167		resp, err = b.httpc.Post(url, "application/json", bodybuf)
168		if err == nil {
169			defer resp.Body.Close()
170			mresp, err = b.metaResponse(resp)
171			if err == nil {
172				defnID := mresp.Indexes[0].DefnID
173				b.Refresh()
174				return defnID, nil
175			}
176			return 0, err
177		}
178	}
179	return 0, err
180}
181
182// BuildIndexes implement BridgeAccessor{} interface.
183func (b *cbqClient) BuildIndexes(defnID []uint64) error {
184	panic("cbqClient does not implement build-indexes")
185}
186
187// MoveIndex implement BridgeAccessor{} interface.
188func (b *cbqClient) MoveIndex(defnID uint64, plan map[string]interface{}) error {
189	panic("cbqClient does not implement move index")
190}
191
192// DropIndex implement BridgeAccessor{} interface.
193func (b *cbqClient) DropIndex(defnID uint64) error {
194	var resp *http.Response
195
196	// Construct request body.
197	req := indexRequest{
198		Type: "drop", Index: indexInfo{DefnID: uint64(defnID)},
199	}
200	body, err := json.Marshal(req)
201	if err == nil {
202		// Post HTTP request.
203		bodybuf := bytes.NewBuffer(body)
204		url := b.adminport + "/drop"
205		logging.Infof("%v posting %v to URL %v", b.logPrefix, bodybuf, url)
206		resp, err = b.httpc.Post(url, "application/json", bodybuf)
207		if err == nil {
208			defer resp.Body.Close()
209			_, err = b.metaResponse(resp)
210			if err == nil {
211				b.Refresh()
212				return nil
213			}
214			return err
215		}
216	}
217	return err
218}
219
220// GetScanports implement BridgeAccessor{} interface.
221func (b *cbqClient) GetScanports() (queryports []string) {
222	return []string{b.queryport}
223}
224
225// GetScanport implement BridgeAccessor{} interface.
226func (b *cbqClient) GetScanport(
227	defnID uint64,
228	excludes map[common.IndexDefnId]map[common.PartitionId]map[uint64]bool,
229	skips map[common.IndexDefnId]bool) (queryport []string,
230	targetDefnID uint64, targetIndstID []uint64, rollbackTime []int64,
231	partition [][]common.PartitionId, numPartition uint32, ok bool) {
232
233	return []string{b.queryport}, defnID, nil, []int64{math.MaxInt64}, nil, 0, true
234}
235
236// GetIndexDefn implements BridgeAccessor{} interface.
237func (b *cbqClient) GetIndexDefn(defnID uint64) *common.IndexDefn {
238	panic("cbqClient does not implement GetIndexDefn")
239}
240
241// GetIndexInst implements BridgeAccessor{} interface.
242func (b *cbqClient) GetIndexInst(instId uint64) *mclient.InstanceDefn {
243	panic("cbqClient does not implement GetIndexInst")
244}
245
246// GetIndexReplica implements BridgeAccessor{} interface.
247func (b *cbqClient) GetIndexReplica(defnId uint64) []*mclient.InstanceDefn {
248	panic("cbqClient does not implement GetIndexReplica")
249}
250
251// Timeit implement BridgeAccessor{} interface.
252func (b *cbqClient) Timeit(defnID uint64, partitionId common.PartitionId, value float64) {
253	// TODO: do nothing ?
254}
255
256// IndexState implement BridgeAccessor{} interface.
257func (b *cbqClient) IndexState(defnID uint64) (common.IndexState, error) {
258	return common.INDEX_STATE_ACTIVE, nil
259}
260
261// IsPrimary implement BridgeAccessor{} interface.
262func (b *cbqClient) IsPrimary(defnID uint64) bool {
263	return false
264}
265
266// NumReplica implement BridgeAccessor{} interface.
267func (b *cbqClient) NumReplica(defnID uint64) int {
268	return 0
269}
270
271// Close implement BridgeAccessor
272func (b *cbqClient) Close() {
273	// TODO: do nothing ?
274}
275
276// Gather index meta response from http response.
277func (b *cbqClient) metaResponse(
278	resp *http.Response) (mresp indexMetaResponse, err error) {
279
280	var body []byte
281	body, err = ioutil.ReadAll(resp.Body)
282	if err == nil {
283		if err = json.Unmarshal(body, &mresp); err == nil {
284			logging.Tracef("%v received raw response %s", b.logPrefix, string(body))
285			if strings.Contains(mresp.Status, "error") {
286				err = errors.New(mresp.Errors[0].Msg)
287			}
288		}
289	}
290	return mresp, err
291}
292
293// indexInfo describes an index.
294type indexInfo struct {
295	Name               string   `json:"name,omitempty"`
296	Bucket             string   `json:"bucket,omitempty"`
297	DefnID             uint64   `json:"defnID, omitempty"`
298	Using              string   `json:"using,omitempty"`
299	ExprType           string   `json:"exprType,omitempty"`
300	PartnExpr          string   `json:"partnExpr,omitempty"`
301	SecExprs           []string `json:"secExprs,omitempty"`
302	WhereExpr          string   `json:"whereExpr,omitempty"`
303	IsPrimary          bool     `json:"isPrimary,omitempty"`
304	RetainDeletedXATTR bool     `json:"retainDeletedXATTR,omitempty"`
305}
306
307func newIndexMetaData(info *indexInfo, queryport string) *mclient.IndexMetadata {
308	defn := &common.IndexDefn{
309		DefnId:             common.IndexDefnId(info.DefnID),
310		Name:               info.Name,
311		Using:              common.IndexType(info.Using),
312		Bucket:             info.Bucket,
313		IsPrimary:          info.IsPrimary,
314		ExprType:           common.ExprType(info.ExprType),
315		SecExprs:           info.SecExprs,
316		RetainDeletedXATTR: info.RetainDeletedXATTR,
317		//PartitionKey: info.PartnExpr,
318	}
319	instances := []*mclient.InstanceDefn{
320		&mclient.InstanceDefn{
321			InstId: common.IndexInstId(info.DefnID), // TODO: defnID as InstID
322			State:  common.INDEX_STATE_READY,
323		},
324	}
325	imeta := &mclient.IndexMetadata{
326		Definition: defn,
327		Instances:  instances,
328	}
329	return imeta
330}
331