1//  Copyright (c) 2013 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10package system
11
12import (
13	"encoding/json"
14	"fmt"
15	"strings"
16
17	"github.com/couchbase/query/auth"
18	"github.com/couchbase/query/datastore"
19	"github.com/couchbase/query/errors"
20	"github.com/couchbase/query/expression"
21	"github.com/couchbase/query/logging"
22	"github.com/couchbase/query/timestamp"
23	"github.com/couchbase/query/value"
24)
25
26type indexKeyspace struct {
27	keyspaceBase
28	name    string
29	indexer datastore.Indexer
30}
31
32func (b *indexKeyspace) Release() {
33}
34
35func (b *indexKeyspace) NamespaceId() string {
36	return b.namespace.Id()
37}
38
39func (b *indexKeyspace) Id() string {
40	return b.Name()
41}
42
43func (b *indexKeyspace) Name() string {
44	return b.name
45}
46
47func (b *indexKeyspace) Count(context datastore.QueryContext) (int64, errors.Error) {
48	count := int64(0)
49	namespaceIds, excp := b.namespace.store.actualStore.NamespaceIds()
50	if excp == nil {
51		for _, namespaceId := range namespaceIds {
52			namespace, excp := b.namespace.store.actualStore.NamespaceById(namespaceId)
53			if excp == nil {
54				keyspaceIds, excp := namespace.KeyspaceIds()
55				if excp == nil {
56					for _, keyspaceId := range keyspaceIds {
57						excludeResults := !canRead(context, namespaceId, keyspaceId) &&
58							!canListIndexes(context, namespaceId, keyspaceId)
59						keyspace, excp := namespace.KeyspaceById(keyspaceId)
60						if excp == nil {
61							indexers, excp := keyspace.Indexers()
62							if excp == nil {
63								for _, indexer := range indexers {
64									excp = indexer.Refresh()
65									if excp != nil {
66										return 0, errors.NewSystemDatastoreError(excp, "")
67									}
68
69									indexIds, excp := indexer.IndexIds()
70									if excp == nil {
71										if excludeResults {
72											if len(indexIds) > 0 {
73												context.Warning(errors.NewSystemFilteredRowsWarning("system:indexes"))
74											}
75										} else {
76											count += int64(len(indexIds))
77										}
78									} else {
79										return 0, errors.NewSystemDatastoreError(excp, "")
80									}
81								}
82							} else {
83								return 0, errors.NewSystemDatastoreError(excp, "")
84							}
85						} else {
86							return 0, errors.NewSystemDatastoreError(excp, "")
87						}
88					}
89				} else {
90					return 0, errors.NewSystemDatastoreError(excp, "")
91				}
92			} else {
93				return 0, errors.NewSystemDatastoreError(excp, "")
94			}
95		}
96		return count, nil
97	}
98	return 0, errors.NewSystemDatastoreError(excp, "")
99}
100
101func (b *indexKeyspace) Indexer(name datastore.IndexType) (datastore.Indexer, errors.Error) {
102	return b.indexer, nil
103}
104
105func (b *indexKeyspace) Indexers() ([]datastore.Indexer, errors.Error) {
106	return []datastore.Indexer{b.indexer}, nil
107}
108
109func (b *indexKeyspace) Fetch(keys []string, keysMap map[string]value.AnnotatedValue,
110	context datastore.QueryContext, subPaths []string) (errs []errors.Error) {
111
112	for _, key := range keys {
113		ids := strings.SplitN(key, "/", 3)
114		if len(ids) < 3 {
115			errs = append(errs, errors.NewSystemMalformedKeyError(key, "system:indexes"))
116			continue
117		}
118		namespaceId := ids[0]
119		keyspaceId := ids[1]
120		indexId := ids[2]
121		if !canRead(context, namespaceId, keyspaceId) &&
122			!canListIndexes(context, namespaceId, keyspaceId) {
123			context.Warning(errors.NewSystemFilteredRowsWarning("system:indexes"))
124			continue
125		}
126
127		err := b.fetchOne(key, keysMap, namespaceId, keyspaceId, indexId)
128		if err != nil {
129			if errs == nil {
130				errs = make([]errors.Error, 0, 1)
131			}
132			errs = append(errs, err)
133			continue
134		}
135	}
136
137	return
138}
139
140func (b *indexKeyspace) fetchOne(key string, keysMap map[string]value.AnnotatedValue,
141	namespaceId string, keyspaceId string, indexId string) errors.Error {
142
143	actualStore := b.namespace.store.actualStore
144	namespace, err := actualStore.NamespaceById(namespaceId)
145	if err != nil {
146		return err
147	}
148
149	keyspace, err := namespace.KeyspaceById(keyspaceId)
150	if err != nil {
151		return err
152	}
153
154	indexers, err := keyspace.Indexers()
155	if err != nil {
156		logging.Infof("Indexer returned error %v", err)
157		return err
158	}
159
160	for _, indexer := range indexers {
161		index, err := indexer.IndexById(indexId)
162		if err != nil {
163			continue
164		}
165
166		state, msg, err := index.State()
167		if err != nil {
168			return err
169		}
170		doc := value.NewAnnotatedValue(map[string]interface{}{
171			"id":           index.Id(),
172			"name":         index.Name(),
173			"keyspace_id":  keyspace.Id(),
174			"namespace_id": namespace.Id(),
175			"datastore_id": actualStore.URL(),
176			"index_key":    datastoreObjectToJSONSafe(indexKeyToIndexKeyStringArray(index)),
177			"using":        datastoreObjectToJSONSafe(index.Type()),
178			"state":        string(state),
179		})
180
181		doc.SetAttachment("meta", map[string]interface{}{
182			"id": key,
183		})
184
185		partition := indexPartitionToString(index)
186		if partition != "" {
187			doc.SetField("partition", partition)
188		}
189
190		if msg != "" {
191			doc.SetField("message", msg)
192		}
193
194		cond := index.Condition()
195		if cond != nil {
196			doc.SetField("condition", cond.String())
197		}
198
199		if index.IsPrimary() {
200			doc.SetField("is_primary", true)
201		}
202
203		keysMap[key] = doc
204	}
205
206	return nil
207}
208
209func indexKeyToIndexKeyStringArray(index datastore.Index) (rv []string) {
210	if index2, ok2 := index.(datastore.Index2); ok2 {
211		keys := index2.RangeKey2()
212		rv = make([]string, len(keys))
213		for i, kp := range keys {
214			s := expression.NewStringer().Visit(kp.Expr)
215			if kp.Desc {
216				s += " DESC"
217			}
218			rv[i] = s
219		}
220
221	} else {
222		rv = make([]string, len(index.RangeKey()))
223		for i, kp := range index.RangeKey() {
224			rv[i] = expression.NewStringer().Visit(kp)
225		}
226	}
227	return
228}
229
230func indexPartitionToString(index datastore.Index) (rv string) {
231	index3, ok3 := index.(datastore.Index3)
232	if !ok3 {
233		return
234	}
235	partition, _ := index3.PartitionKeys()
236	if partition == nil || partition.Strategy == datastore.NO_PARTITION {
237		return
238	}
239
240	rv = string(partition.Strategy) + "("
241	for i, expr := range partition.Exprs {
242		if i > 0 {
243			rv += ","
244		}
245		rv += expression.NewStringer().Visit(expr)
246	}
247	rv += ")"
248	return
249}
250
251func datastoreObjectToJSONSafe(catobj interface{}) interface{} {
252	var rv interface{}
253	bytes, err := json.Marshal(catobj)
254	if err == nil {
255		json.Unmarshal(bytes, &rv)
256	}
257	return rv
258}
259
260func newIndexesKeyspace(p *namespace) (*indexKeyspace, errors.Error) {
261	b := new(indexKeyspace)
262	setKeyspaceBase(&b.keyspaceBase, p)
263	b.name = KEYSPACE_NAME_INDEXES
264
265	primary := &indexIndex{name: "#primary", keyspace: b}
266	b.indexer = newSystemIndexer(b, primary)
267	setIndexBase(&primary.indexBase, b.indexer)
268
269	return b, nil
270}
271
272func (b *indexKeyspace) Insert(inserts []value.Pair) ([]value.Pair, errors.Error) {
273	// FIXME
274	return nil, errors.NewSystemNotImplementedError(nil, "")
275}
276
277func (b *indexKeyspace) Update(updates []value.Pair) ([]value.Pair, errors.Error) {
278	// FIXME
279	return nil, errors.NewSystemNotImplementedError(nil, "Not yet implemented.")
280}
281
282func (b *indexKeyspace) Upsert(upserts []value.Pair) ([]value.Pair, errors.Error) {
283	// FIXME
284	return nil, errors.NewSystemNotImplementedError(nil, "Not yet implemented.")
285}
286
287func (b *indexKeyspace) Delete(deletes []string, context datastore.QueryContext) ([]string, errors.Error) {
288	// FIXME
289	return nil, errors.NewSystemNotImplementedError(nil, "Not yet implemented.")
290}
291
292type indexIndex struct {
293	indexBase
294	name     string
295	keyspace *indexKeyspace
296}
297
298func (pi *indexIndex) KeyspaceId() string {
299	return pi.keyspace.Id()
300}
301
302func (pi *indexIndex) Id() string {
303	return pi.Name()
304}
305
306func (pi *indexIndex) Name() string {
307	return pi.name
308}
309
310func (pi *indexIndex) Type() datastore.IndexType {
311	return datastore.SYSTEM
312}
313
314func (pi *indexIndex) SeekKey() expression.Expressions {
315	return nil
316}
317
318func (pi *indexIndex) RangeKey() expression.Expressions {
319	return nil
320}
321
322func (pi *indexIndex) Condition() expression.Expression {
323	return nil
324}
325
326func (pi *indexIndex) IsPrimary() bool {
327	return true
328}
329
330func (pi *indexIndex) State() (state datastore.IndexState, msg string, err errors.Error) {
331	return datastore.ONLINE, "", nil
332}
333
334func (pi *indexIndex) Statistics(requestId string, span *datastore.Span) (
335	datastore.Statistics, errors.Error) {
336	return nil, nil
337}
338
339func (pi *indexIndex) Drop(requestId string) errors.Error {
340	return errors.NewSystemIdxNoDropError(nil, "")
341}
342
343func (pi *indexIndex) Scan(requestId string, span *datastore.Span, distinct bool, limit int64,
344	cons datastore.ScanConsistency, vector timestamp.Vector, conn *datastore.IndexConnection) {
345	pi.ScanEntries(requestId, limit, cons, vector, conn)
346}
347
348// Do the presented credentials authorize the user to read the namespace/keyspace bucket?
349func canRead(context datastore.QueryContext, namespace string, keyspace string) bool {
350	privs := auth.NewPrivileges()
351	privs.Add(namespace+":"+keyspace, auth.PRIV_QUERY_SELECT)
352	_, err := datastore.GetDatastore().Authorize(privs, context.Credentials(), context.OriginalHttpRequest())
353	res := err == nil
354	return res
355}
356
357// Do the presented credentials authorize the user to list indexes of the namespace/keyspace bucket?
358func canListIndexes(context datastore.QueryContext, namespace string, keyspace string) bool {
359	privs := auth.NewPrivileges()
360	privs.Add(namespace+":"+keyspace, auth.PRIV_QUERY_LIST_INDEX)
361	_, err := datastore.GetDatastore().Authorize(privs, context.Credentials(), context.OriginalHttpRequest())
362	res := err == nil
363	return res
364}
365
366func (pi *indexIndex) ScanEntries(requestId string, limit int64, cons datastore.ScanConsistency,
367	vector timestamp.Vector, conn *datastore.IndexConnection) {
368	defer close(conn.EntryChannel())
369
370	// eliminate duplicate keys
371	keys := make(map[string]string, 64)
372
373	actualStore := pi.keyspace.namespace.store.actualStore
374	namespaceIds, err := actualStore.NamespaceIds()
375	if err == nil {
376		for _, namespaceId := range namespaceIds {
377			namespace, err := actualStore.NamespaceById(namespaceId)
378			if err == nil {
379				keyspaceIds, err := namespace.KeyspaceIds()
380				if err == nil {
381					for _, keyspaceId := range keyspaceIds {
382						keyspace, err := namespace.KeyspaceById(keyspaceId)
383						if err == nil {
384							indexers, err := keyspace.Indexers()
385							if err == nil {
386								for _, indexer := range indexers {
387									err = indexer.Refresh()
388									if err != nil {
389										logging.Errorf("Refreshing indexes failed %v", err)
390
391										// MB-23555, don't throw errors, or the scan will be terminated
392										conn.Warning(errors.NewSystemDatastoreError(err, ""))
393										// don't return here but continue processing, because other keyspaces may still be responsive. MB-15834
394										continue
395									}
396
397									indexIds, err := indexer.IndexIds()
398									if err == nil {
399										for _, indexId := range indexIds {
400											key := fmt.Sprintf("%s/%s/%s", namespaceId, keyspaceId, indexId)
401											keys[key] = key
402										}
403									}
404								}
405							}
406						}
407					}
408				}
409			}
410		}
411	}
412
413	for k, _ := range keys {
414		entry := datastore.IndexEntry{PrimaryKey: k}
415		if !sendSystemKey(conn, &entry) {
416			return
417		}
418	}
419}
420