1package querycmd
2
3import json "github.com/couchbase/indexing/secondary/common/json"
4import "flag"
5import "fmt"
6import "io"
7import "bytes"
8import "strings"
9import "strconv"
10import "net"
11import "errors"
12import "time"
13import "net/http"
14import "io/ioutil"
15import "os"
16
17import "github.com/couchbase/cbauth"
18import "github.com/couchbase/indexing/secondary/logging"
19import c "github.com/couchbase/indexing/secondary/common"
20import mclient "github.com/couchbase/indexing/secondary/manager/client"
21import qclient "github.com/couchbase/indexing/secondary/queryport/client"
22import "github.com/couchbase/query/expression"
23import "github.com/couchbase/query/parser/n1ql"
24
25// Command object containing parsed result from command-line
26// or program constructued list of args.
27type Command struct {
28	OpType string
29	// basic options.
30	Server    string
31	IndexName string
32	Bucket    string
33	AdminPort string
34	QueryPort string
35	Auth      string
36	// options for create-index.
37	Using     string
38	ExprType  string
39	PartnStr  string
40	WhereStr  string
41	SecStrs   []string
42	IsPrimary bool
43	With      string
44	WithPlan  map[string]interface{}
45	// options for build index
46	Bindexes []string
47	// options for Range, Statistics, Count
48	Low         c.SecondaryKey
49	High        c.SecondaryKey
50	Equal       c.SecondaryKey
51	Inclusion   qclient.Inclusion
52	Limit       int64
53	Distinct    bool
54	Consistency c.Consistency
55	// Configuration
56	ConfigKey string
57	ConfigVal string
58	Help      bool
59}
60
61// ParseArgs into Command object, return the list of arguments,
62// flagset used for parseing and error if any.
63func ParseArgs(arguments []string) (*Command, []string, *flag.FlagSet, error) {
64	var fields, bindexes string
65	var inclusion uint
66	var equal, low, high string
67	var useSessionCons bool
68
69	cmdOptions := &Command{Consistency: c.AnyConsistency}
70	fset := flag.NewFlagSet("cmd", flag.ExitOnError)
71
72	// basic options
73	fset.StringVar(&cmdOptions.Server, "server", "127.0.0.1:8091", "Cluster server address")
74	fset.StringVar(&cmdOptions.Auth, "auth", "", "Auth user and password")
75	fset.StringVar(&cmdOptions.Bucket, "bucket", "", "Bucket name")
76	fset.StringVar(&cmdOptions.OpType, "type", "", "Command: scan|stats|scanAll|count|nodes|create|build|move|drop|list|config")
77	fset.StringVar(&cmdOptions.IndexName, "index", "", "Index name")
78	// options for create-index
79	fset.StringVar(&cmdOptions.WhereStr, "where", "", "where clause for create index")
80	fset.StringVar(&fields, "fields", "", "Comma separated on-index fields") // secStrs
81	fset.BoolVar(&cmdOptions.IsPrimary, "primary", false, "Is primary index")
82	fset.StringVar(&cmdOptions.With, "with", "", "index specific properties")
83	// options for build-indexes, move-indexes, drop-indexes
84	fset.StringVar(&bindexes, "indexes", "", "csv list of bucket:index to build")
85	// options for Range, Statistics, Count
86	fset.StringVar(&low, "low", "[]", "Span.Range: [low]")
87	fset.StringVar(&high, "high", "[]", "Span.Range: [high]")
88	fset.StringVar(&equal, "equal", "", "Span.Lookup: [key]")
89	fset.UintVar(&inclusion, "incl", 0, "Range: 0|1|2|3")
90	fset.Int64Var(&cmdOptions.Limit, "limit", 10, "Row limit")
91	fset.BoolVar(&cmdOptions.Distinct, "distinct", false, "Only distinct entries")
92	fset.BoolVar(&cmdOptions.Help, "h", false, "print help")
93	fset.BoolVar(&useSessionCons, "consistency", false, "Use session consistency")
94	// options for setting configuration
95	fset.StringVar(&cmdOptions.ConfigKey, "ckey", "", "Config key")
96	fset.StringVar(&cmdOptions.ConfigVal, "cval", "", "Config value")
97	fset.StringVar(&cmdOptions.Using, "using", c.PlasmaDB, "storage type to use")
98
99	// not useful to expose in sherlock
100	cmdOptions.ExprType = "N1QL"
101	cmdOptions.PartnStr = ""
102
103	if err := fset.Parse(arguments); err != nil {
104		return nil, nil, fset, err
105	}
106
107	if useSessionCons {
108		cmdOptions.Consistency = c.SessionConsistency
109	}
110
111	// validate combinations
112	err := validate(cmdOptions, fset)
113	if err != nil {
114		return nil, nil, fset, err
115	}
116	// bindexes
117	if len(bindexes) > 0 {
118		cmdOptions.Bindexes = strings.Split(bindexes, ",")
119	}
120
121	// inclusion, secStrs, equal, low, high
122	cmdOptions.Inclusion = qclient.Inclusion(inclusion)
123	cmdOptions.SecStrs = make([]string, 0)
124	if fields != "" {
125		for _, field := range strings.Split(fields, ",") {
126			expr, err := n1ql.ParseExpression(field)
127			if err != nil {
128				msgf := "Error occured: Invalid field (%v) %v\n"
129				return nil, nil, fset, fmt.Errorf(msgf, field, err)
130			}
131			secStr := expression.NewStringer().Visit(expr)
132			cmdOptions.SecStrs = append(cmdOptions.SecStrs, secStr)
133		}
134	}
135	if equal != "" {
136		cmdOptions.Equal = c.SecondaryKey(Arg2Key([]byte(equal)))
137	}
138	cmdOptions.Low = c.SecondaryKey(Arg2Key([]byte(low)))
139	cmdOptions.High = c.SecondaryKey(Arg2Key([]byte(high)))
140
141	// with
142	if len(cmdOptions.With) > 0 {
143		err := json.Unmarshal([]byte(cmdOptions.With), &cmdOptions.WithPlan)
144		if err != nil {
145			logging.Fatalf("%v\n", err)
146			os.Exit(1)
147		}
148	}
149
150	// setup cbauth
151	if cmdOptions.Auth != "" {
152		up := strings.Split(cmdOptions.Auth, ":")
153		_, err := cbauth.InternalRetryDefaultInit(cmdOptions.Server, up[0], up[1])
154		if err != nil {
155			logging.Fatalf("Failed to initialize cbauth: %s\n", err)
156			os.Exit(1)
157		}
158	}
159
160	return cmdOptions, fset.Args(), fset, err
161}
162
163// HandleCommand after parsing it with ParseArgs().
164func HandleCommand(
165	client *qclient.GsiClient,
166	cmd *Command,
167	verbose bool,
168	w io.Writer) (err error) {
169
170	iname, bucket, limit, distinct := cmd.IndexName, cmd.Bucket, cmd.Limit, cmd.Distinct
171	low, high, equal, incl := cmd.Low, cmd.High, cmd.Equal, cmd.Inclusion
172	cons := cmd.Consistency
173
174	indexes, _, _, err := client.Refresh()
175
176	entries := 0
177	callb := func(res qclient.ResponseReader) bool {
178		if res.Error() != nil {
179			fmt.Fprintln(w, "Error: ", res)
180		} else if skeys, pkeys, err := res.GetEntries(); err != nil {
181			fmt.Fprintln(w, "Error: ", err)
182		} else {
183			if verbose == false {
184				for i, pkey := range pkeys {
185					fmt.Fprintf(w, "%v ... %v\n", skeys[i], string(pkey))
186				}
187			}
188			entries += len(pkeys)
189		}
190		return true
191	}
192
193	switch cmd.OpType {
194	case "nodes":
195		fmt.Fprintln(w, "List of nodes:")
196		nodes, err := client.Nodes()
197		if err != nil {
198			return err
199		}
200		for _, n := range nodes {
201			fmsg := "    {%v, %v, %q}\n"
202			fmt.Fprintf(w, fmsg, n.Adminport, n.Queryport, n.Status)
203		}
204
205	case "list":
206		time.Sleep(2 * time.Second)
207		indexes, _, _, err = client.Refresh()
208		if err != nil {
209			return err
210		}
211		fmt.Fprintln(w, "List of indexes:")
212		for _, index := range indexes {
213			printIndexInfo(w, index)
214		}
215
216	case "create":
217		var defnID uint64
218		if len(cmd.SecStrs) == 0 && !cmd.IsPrimary || cmd.IndexName == "" {
219			return fmt.Errorf("createIndex(): required fields missing")
220		}
221		defnID, err = client.CreateIndex(
222			iname, bucket, cmd.Using, cmd.ExprType,
223			cmd.PartnStr, cmd.WhereStr, cmd.SecStrs, cmd.IsPrimary,
224			[]byte(cmd.With))
225		if err == nil {
226			fmt.Fprintf(w, "Index created: %v with %q\n", defnID, cmd.With)
227		}
228
229	case "build":
230		defnIDs := make([]uint64, 0, len(cmd.Bindexes))
231		for _, bindex := range cmd.Bindexes {
232			v := strings.Split(bindex, ":")
233			if len(v) < 0 {
234				return fmt.Errorf("invalid index specified : %v", bindex)
235			}
236			bucket, iname = v[0], v[1]
237			index, ok := GetIndex(client, bucket, iname)
238			if ok {
239				defnIDs = append(defnIDs, uint64(index.Definition.DefnId))
240			} else {
241				err = fmt.Errorf("index %v/%v unknown", bucket, iname)
242				break
243			}
244		}
245		if err == nil {
246			err = client.BuildIndexes(defnIDs)
247			fmt.Fprintf(w, "Index building for: %v\n", defnIDs)
248		}
249
250	case "move":
251		index, ok := GetIndex(client, cmd.Bucket, cmd.IndexName)
252		if !ok {
253			return fmt.Errorf("invalid index specified : %v", cmd.IndexName)
254		}
255
256		if err == nil {
257			fmt.Fprintf(w, "Moving Index for: %v %v\n", index.Definition.DefnId, cmd.With)
258			err = client.MoveIndex(uint64(index.Definition.DefnId), cmd.WithPlan)
259			if err == nil {
260				fmt.Fprintf(w, "Move Index has started. Check Indexes UI for progress and Logs UI for any error\n")
261			}
262		}
263
264	case "drop":
265		index, ok := GetIndex(client, cmd.Bucket, cmd.IndexName)
266		if !ok {
267			return fmt.Errorf("invalid index specified : %v", cmd.IndexName)
268		}
269		err = client.DropIndex(uint64(index.Definition.DefnId))
270		if err == nil {
271			fmt.Fprintf(w, "Index dropped %v/%v\n", bucket, iname)
272		} else {
273			err = fmt.Errorf("index %v/%v drop failed", bucket, iname)
274			break
275		}
276
277	case "scan":
278		var state c.IndexState
279
280		index, _ := GetIndex(client, bucket, iname)
281		defnID := uint64(index.Definition.DefnId)
282		fmt.Fprintln(w, "Scan index:")
283		_, err = WaitUntilIndexState(
284			client, []uint64{defnID}, c.INDEX_STATE_ACTIVE,
285			100 /*period*/, 20000 /*timeout*/)
286
287		if err != nil {
288			state, err = client.IndexState(defnID)
289			fmt.Fprintf(w, "Index state: {%v, %v}\n", state, err)
290		} else if cmd.Equal != nil {
291			equals := []c.SecondaryKey{cmd.Equal}
292			client.Lookup(
293				uint64(defnID), "", equals, distinct, limit,
294				cons, nil, callb)
295		} else {
296			err = client.Range(
297				uint64(defnID), "", low, high, incl, distinct, limit,
298				cons, nil, callb)
299		}
300		if err == nil {
301			fmt.Fprintln(w, "Total number of entries: ", entries)
302		}
303
304	case "scanAll":
305		var state c.IndexState
306
307		index, found := GetIndex(client, bucket, iname)
308		if !found {
309			fmt.Fprintln(w, "Index not found")
310			os.Exit(1)
311		}
312
313		defnID := uint64(index.Definition.DefnId)
314		fmt.Fprintln(w, "ScanAll index:")
315		_, err = WaitUntilIndexState(
316			client, []uint64{defnID}, c.INDEX_STATE_ACTIVE,
317			100 /*period*/, 20000 /*timeout*/)
318		if err != nil {
319			state, err = client.IndexState(defnID)
320			fmt.Fprintf(w, "Index state: {%v, %v} \n", state, err)
321		} else {
322			err = client.ScanAll(
323				uint64(defnID), "", limit, cons, nil, callb)
324		}
325		if err == nil {
326			fmt.Fprintln(w, "Total number of entries: ", entries)
327		}
328
329	case "stats":
330		var state c.IndexState
331		var statsResp c.IndexStatistics
332
333		index, _ := GetIndex(client, bucket, iname)
334		defnID := uint64(index.Definition.DefnId)
335		_, err = WaitUntilIndexState(
336			client, []uint64{defnID}, c.INDEX_STATE_ACTIVE,
337			100 /*period*/, 20000 /*timeout*/)
338		if err != nil {
339			state, err = client.IndexState(defnID)
340			fmt.Fprintf(w, "Index state: {%v, %v} \n", state, err)
341		} else if cmd.Equal != nil {
342			statsResp, err = client.LookupStatistics(uint64(defnID), "", equal)
343		} else {
344			statsResp, err = client.RangeStatistics(
345				uint64(defnID), "", low, high, incl)
346		}
347		if err == nil {
348			fmt.Fprintln(w, "Stats: ", statsResp)
349		}
350
351	case "count":
352		var state c.IndexState
353		var count int64
354
355		index, _ := GetIndex(client, bucket, iname)
356		defnID := uint64(index.Definition.DefnId)
357		_, err = WaitUntilIndexState(
358			client, []uint64{defnID}, c.INDEX_STATE_ACTIVE,
359			100 /*period*/, 20000 /*timeout*/)
360		if err != nil {
361			state, err = client.IndexState(defnID)
362			fmt.Fprintf(w, "Index state: {%v, %v} \n", state, err)
363		} else if cmd.Equal != nil {
364			fmt.Fprintln(w, "CountLookup:")
365			equals := []c.SecondaryKey{cmd.Equal}
366			count, err := client.CountLookup(uint64(defnID), "", equals, cons, nil)
367			if err == nil {
368				fmt.Fprintf(w, "Index %q/%q has %v entries\n", bucket, iname, count)
369			}
370
371		} else {
372			fmt.Fprintln(w, "CountRange:")
373			count, err = client.CountRange(uint64(defnID), "", low, high, incl, cons, nil)
374			if err == nil {
375				fmt.Fprintf(w, "Index %q/%q has %v entries\n", bucket, iname, count)
376			}
377		}
378
379	case "config":
380		nodes, err := client.Nodes()
381		if err != nil {
382			return err
383		}
384		var adminurl string
385		for _, indexer := range nodes {
386			adminurl = indexer.Adminport
387			break
388		}
389		host, sport, _ := net.SplitHostPort(adminurl)
390		iport, _ := strconv.Atoi(sport)
391		client := http.Client{}
392
393		//
394		// hack, fix this
395		//
396		ihttp := iport + 2
397		url := "http://" + host + ":" + strconv.Itoa(ihttp) + "/settings"
398
399		oreq, err := http.NewRequest("GET", url, nil)
400		if cmd.Auth != "" {
401			up := strings.Split(cmd.Auth, ":")
402			oreq.SetBasicAuth(up[0], up[1])
403		}
404
405		oresp, err := client.Do(oreq)
406		if err != nil {
407			return err
408		}
409		obody, err := ioutil.ReadAll(oresp.Body)
410		if err != nil {
411			return err
412		}
413		oresp.Body.Close()
414
415		pretty := strings.Replace(string(obody), ",\"", ",\n\"", -1)
416		fmt.Printf("Current Settings:\n%s\n", string(pretty))
417
418		var jbody map[string]interface{}
419		err = json.Unmarshal(obody, &jbody)
420		if err != nil {
421			return err
422		}
423
424		if len(cmd.ConfigKey) > 0 {
425			fmt.Printf("Changing config key '%s' to value '%s'\n", cmd.ConfigKey, cmd.ConfigVal)
426			jbody[cmd.ConfigKey] = cmd.ConfigVal
427
428			pbody, err := json.Marshal(jbody)
429			if err != nil {
430				return err
431			}
432			preq, err := http.NewRequest("POST", url, bytes.NewBuffer(pbody))
433			if cmd.Auth != "" {
434				up := strings.Split(cmd.Auth, ":")
435				preq.SetBasicAuth(up[0], up[1])
436			}
437			_, err = client.Do(preq)
438			if err != nil {
439				return err
440			}
441			nresp, err := client.Do(oreq)
442			if err != nil {
443				return err
444			}
445			nbody, err := ioutil.ReadAll(nresp.Body)
446			if err != nil {
447				return err
448			}
449			pretty = strings.Replace(string(nbody), ",\"", ",\n\"", -1)
450			fmt.Printf("New Settings:\n%s\n", string(pretty))
451		}
452	}
453	return err
454}
455
456func printIndexInfo(w io.Writer, index *mclient.IndexMetadata) {
457	defn := index.Definition
458	fmt.Fprintf(w, "Index:%s/%s, Id:%v, Using:%s, Exprs:%v, isPrimary:%v\n",
459		defn.Bucket, defn.Name, defn.DefnId, defn.Using, defn.SecExprs,
460		defn.IsPrimary)
461	insts := index.Instances
462	if len(insts) < 1 {
463		fmt.Fprintf(w, "    Error: zero instances")
464	} else {
465		fmt.Fprintf(w, "    State:%s, Error:%v\n", insts[0].State, insts[0].Error)
466	}
467}
468
469// GetIndex for bucket/indexName.
470func GetIndex(
471	client *qclient.GsiClient,
472	bucket, indexName string) (*mclient.IndexMetadata, bool) {
473
474	indexes, _, _, err := client.Refresh()
475	if err != nil {
476		logging.Fatalf("%v\n", err)
477		os.Exit(1)
478	}
479	for _, index := range indexes {
480		defn := index.Definition
481		if defn.Bucket == bucket && defn.Name == indexName {
482			return index, true
483			//return uint64(index.Definition.DefnId), true
484		}
485	}
486	return nil, false
487}
488
489// WaitUntilIndexState comes to desired `state`,
490// retry for every `period` mS until `timeout` mS.
491func WaitUntilIndexState(
492	client *qclient.GsiClient, defnIDs []uint64,
493	state c.IndexState, period, timeout time.Duration) ([]c.IndexState, error) {
494
495	expired := time.After(timeout * time.Millisecond)
496	states := make([]c.IndexState, len(defnIDs))
497	pending := len(defnIDs)
498	for {
499		select {
500		case <-expired:
501			return nil, errors.New("timeout")
502
503		default:
504		}
505		for i, defnID := range defnIDs {
506			if states[i] != state {
507				st, err := client.IndexState(defnID)
508				if err != nil {
509					return nil, err
510				} else if st == state {
511					states[i] = state
512					pending--
513					continue
514				}
515			}
516		}
517		if pending == 0 {
518			return states, nil
519		}
520		time.Sleep(period * time.Millisecond)
521	}
522}
523
524//----------------
525// local functions
526//----------------
527
528// Arg2Key convert JSON string to golang-native.
529func Arg2Key(arg []byte) []interface{} {
530	var key []interface{}
531	if err := json.Unmarshal(arg, &key); err != nil {
532		logging.Fatalf("%v\n", err)
533		os.Exit(1)
534	}
535	return key
536}
537
538func first(key c.SecondaryKey) []byte {
539	if key == nil || len(key) == 0 {
540		return nil
541	}
542	return []byte(key[0].(string))
543}
544
545func validate(cmd *Command, fset *flag.FlagSet) error {
546	var have []string
547	var dont []string
548
549	switch cmd.OpType {
550	case "":
551		have = []string{}
552		dont = []string{"type", "index", "bucket", "where", "fields", "primary", "with", "indexes", "low", "high", "equal", "incl", "limit", "distinct", "ckey", "cval"}
553
554	case "nodes":
555		have = []string{"type", "server", "auth"}
556		dont = []string{"h", "index", "bucket", "where", "fields", "primary", "with", "indexes", "low", "high", "equal", "incl", "limit", "distinct", "ckey", "cval"}
557
558	case "list":
559		have = []string{"type", "server", "auth"}
560		dont = []string{"h", "index", "bucket", "where", "fields", "primary", "with", "indexes", "low", "high", "equal", "incl", "limit", "distinct", "ckey", "cval"}
561
562	case "create":
563		have = []string{"type", "server", "auth", "index", "bucket", "primary"}
564		dont = []string{"h", "indexes", "low", "high", "equal", "incl", "limit", "distinct", "ckey", "cval"}
565
566	case "build":
567		have = []string{"type", "server", "auth", "indexes"}
568		dont = []string{"h", "index", "bucket", "where", "fields", "primary", "with", "low", "high", "equal", "incl", "limit", "distinct", "ckey", "cval"}
569
570	case "move":
571		have = []string{"type", "server", "auth", "index", "bucket"}
572		dont = []string{"h", "indexes", "where", "fields", "primary", "low", "high", "equal", "incl", "limit", "distinct", "ckey", "cval"}
573
574	case "drop":
575		have = []string{"type", "server", "auth", "index", "bucket"}
576		dont = []string{"h", "where", "fields", "primary", "with", "indexes", "low", "high", "equal", "incl", "limit", "distinct", "ckey", "cval"}
577
578	case "scan":
579		have = []string{"type", "server", "auth", "index", "bucket"}
580		dont = []string{"h", "where", "fields", "primary", "with", "indexes", "ckey", "cval"}
581
582	case "scanAll":
583		have = []string{"type", "server", "auth", "index", "bucket"}
584		dont = []string{"h", "where", "fields", "primary", "with", "indexes", "low", "high", "equal", "incl", "ckey", "cval"}
585
586	case "stats":
587		have = []string{"type", "server", "auth", "index", "bucket"}
588		dont = []string{"h", "where", "fields", "primary", "with", "indexes", "limit", "distinct", "ckey", "cval"}
589
590	case "count":
591		have = []string{"type", "server", "auth", "index", "bucket"}
592		dont = []string{"h", "where", "fields", "primary", "with", "indexes", "ckey", "cval"}
593
594	case "config":
595		have = []string{"type", "server", "auth"}
596		dont = []string{"h", "index", "bucket", "where", "fields", "primary", "with", "indexes", "low", "high", "equal", "incl", "limit", "distinct"}
597
598	default:
599		return fmt.Errorf("Specified operation type '%s' has no validation rule. Please add one to use.", cmd.OpType)
600	}
601
602	err := mustHave(fset, have...)
603	if err != nil {
604		return err
605	}
606
607	err = mustNotHave(fset, dont...)
608	if err != nil {
609		return err
610	}
611
612	return nil
613}
614
615func mustHave(fset *flag.FlagSet, keys ...string) error {
616	for _, key := range keys {
617		found := false
618		fset.Visit(
619			func(f *flag.Flag) {
620				if f.Name == key {
621					found = true
622				}
623			})
624		if !found {
625			flag := fset.Lookup(key)
626			if flag == nil || flag.DefValue == "" {
627				return fmt.Errorf("Invalid flags. Flag '%s' is required for this operation", key)
628			}
629		}
630	}
631	return nil
632}
633
634func mustNotHave(fset *flag.FlagSet, keys ...string) error {
635	for _, key := range keys {
636		found := false
637		fset.Visit(
638			func(f *flag.Flag) {
639				if f.Name == key {
640					found = true
641				}
642			})
643		if found {
644			return fmt.Errorf("Invalid flags. Flag '%s' cannot appear for this operation", key)
645		}
646	}
647	return nil
648}
649