1//  Copyright (c) 2017 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10package gsi
11
12import (
13	"encoding/json"
14	go_er "errors"
15	"fmt"
16	"io/ioutil"
17	http_base "net/http"
18	"os"
19	"path/filepath"
20	"reflect"
21	"strconv"
22	"testing"
23	"time"
24
25	"github.com/couchbase/query/accounting"
26	acct_resolver "github.com/couchbase/query/accounting/resolver"
27	"github.com/couchbase/query/auth"
28	config_resolver "github.com/couchbase/query/clustering/resolver"
29	"github.com/couchbase/query/datastore"
30	"github.com/couchbase/query/datastore/resolver"
31	"github.com/couchbase/query/datastore/system"
32	"github.com/couchbase/query/errors"
33	"github.com/couchbase/query/execution"
34	"github.com/couchbase/query/logging"
35	log_resolver "github.com/couchbase/query/logging/resolver"
36	"github.com/couchbase/query/plan"
37	"github.com/couchbase/query/prepareds"
38	"github.com/couchbase/query/server"
39	"github.com/couchbase/query/server/http"
40	"github.com/couchbase/query/timestamp"
41	//	"github.com/couchbase/query/util"
42	"github.com/couchbase/query/value"
43)
44
45/*
46Global variables accessed by individual test cases for
47Couchbase server. Site_CBS, Auth_param, Pool_CBS
48and Namespace_CBS represent the site, server authentication
49parameters the ip of the couchbase server instance
50and the namespace.
51*/
52var Site_CBS = "http://"
53var Username = "Administrator"
54var Password = "password"
55var Auth_param = "Administrator:password"
56var Pool_CBS = "127.0.0.1:8091/"
57var Namespace_CBS = "default"
58var Consistency_parameter = datastore.SCAN_PLUS
59var curlWhitelist = map[string]interface{}{"all_access": true}
60
61func init() {
62
63	Pool_CBS = server.GetIP(true) + ":8091/"
64
65	logger, _ := log_resolver.NewLogger("golog")
66	logging.SetLogger(logger)
67}
68
69type MockQuery struct {
70	server.BaseRequest
71	response    *MockResponse
72	resultCount int
73}
74
75type MockServer struct {
76	server    *server.Server
77	acctstore accounting.AccountingStore
78}
79
80func (this *MockQuery) OriginalHttpRequest() *http_base.Request {
81	return nil
82}
83
84func (this *MockQuery) Output() execution.Output {
85	return this
86}
87
88func (this *MockQuery) Fail(err errors.Error) {
89	defer this.Stop(server.FATAL)
90	this.response.err = err
91	close(this.response.done)
92}
93
94func (this *MockQuery) Error(err errors.Error) {
95	if this.response.err == nil {
96		this.response.err = err
97	}
98}
99
100func (this *MockQuery) Execute(srvr *server.Server, signature value.Value, stopNotify execution.Operator) {
101	defer this.stopAndClose(server.COMPLETED)
102
103	this.NotifyStop(stopNotify)
104	this.writeResults()
105	close(this.response.done)
106}
107
108func (this *MockQuery) Failed(srvr *server.Server) {
109	defer this.stopAndClose(server.FATAL)
110}
111
112func (this *MockQuery) Expire(state server.State, timeout time.Duration) {
113	defer this.stopAndClose(state)
114
115	this.response.err = errors.NewError(nil, "Query timed out")
116	close(this.response.done)
117}
118
119func (this *MockQuery) stopAndClose(state server.State) {
120	this.Stop(state)
121	this.Close()
122}
123
124func (this *MockQuery) writeResults() bool {
125	var item value.Value
126
127	ok := true
128	for ok {
129		select {
130		case <-this.StopExecute():
131			this.SetState(server.STOPPED)
132			return true
133		default:
134		}
135
136		select {
137		case item, ok = <-this.Results():
138			if ok {
139				if !this.writeResult(item) {
140					this.SetState(server.FATAL)
141					return false
142				}
143			}
144		case <-this.StopExecute():
145			this.SetState(server.STOPPED)
146			return true
147		}
148	}
149
150	this.SetState(server.COMPLETED)
151	return true
152}
153
154func (this *MockQuery) writeResult(item value.Value) bool {
155	bytes, err := json.Marshal(item)
156	if err != nil {
157		panic(err.Error())
158	}
159
160	this.resultCount++
161
162	var resultLine map[string]interface{}
163	json.Unmarshal(bytes, &resultLine)
164
165	this.response.results = append(this.response.results, resultLine)
166	return true
167}
168
169type MockResponse struct {
170	err      errors.Error
171	results  []interface{}
172	warnings []errors.Error
173	done     chan bool
174}
175
176func (this *MockResponse) NoMoreResults() {
177	close(this.done)
178}
179
180/*
181Scan consistency implementation. The default
182is set to REQUEST_PLUS.
183*/
184type scanConfigImpl struct {
185	scan_level datastore.ScanConsistency
186}
187
188func (this *scanConfigImpl) ScanConsistency() datastore.ScanConsistency {
189	return this.scan_level
190}
191
192func (this *scanConfigImpl) ScanWait() time.Duration {
193	return 0
194}
195
196func (this *scanConfigImpl) ScanVectorSource() timestamp.ScanVectorSource {
197	return &http.ZeroScanVectorSource{}
198}
199
200func (this *MockServer) doStats(request *MockQuery) {
201	request.CompleteRequest(0, 0, request.resultCount, 0, 0, nil, this.server)
202}
203
204var _ALL_USERS = auth.Credentials{
205	"customerowner":  "customerpass",
206	"ordersowner":    "orderspass",
207	"productowner":   "productpass",
208	"purchaseowner":  "purchasepass",
209	"reviewowner":    "reviewpass",
210	"shellTestowner": "shellTestpass",
211}
212
213/*
214This method is used to execute the N1QL query represented by
215the input argument (q) string using the NewBaseRequest method
216as defined in the server request.go.
217*/
218func Run(mockServer *MockServer, q, namespace string, namedArgs map[string]value.Value,
219	positionalArgs value.Values) ([]interface{}, []errors.Error, errors.Error) {
220	var metrics value.Tristate
221	consistency := &scanConfigImpl{scan_level: datastore.SCAN_PLUS}
222
223	mr := &MockResponse{
224		results: []interface{}{}, warnings: []errors.Error{}, done: make(chan bool),
225	}
226	query := &MockQuery{
227		response: mr,
228	}
229	server.NewBaseRequest(&query.BaseRequest, q, nil, namedArgs, positionalArgs, namespace, 0, 0, 0, 0,
230		value.FALSE, metrics, value.TRUE, value.TRUE, consistency, "", _ALL_USERS, "", "")
231
232	//	query.BaseRequest.SetIndexApiVersion(datastore.INDEX_API_3)
233	//	query.BaseRequest.SetFeatureControls(util.N1QL_GROUPAGG_PUSHDOWN)
234	defer mockServer.doStats(query)
235
236	select {
237	case mockServer.server.Channel() <- query:
238		// Wait until the request exits.
239		<-query.CloseNotify()
240	default:
241		// Timeout.
242		return nil, nil, errors.NewError(nil, "Query timed out")
243	}
244
245	// wait till all the results are ready
246	<-mr.done
247	return mr.results, mr.warnings, mr.err
248}
249
250func RunPrepared(mockServer *MockServer, q, namespace string, namedArgs map[string]value.Value,
251	positionalArgs value.Values) ([]interface{}, []errors.Error, errors.Error) {
252	var metrics value.Tristate
253	consistency := &scanConfigImpl{scan_level: datastore.SCAN_PLUS}
254
255	mr := &MockResponse{
256		results: []interface{}{}, warnings: []errors.Error{}, done: make(chan bool),
257	}
258	query := &MockQuery{
259		response: mr,
260	}
261
262	prepared, err := PrepareStmt(mockServer, namespace, q)
263	if err != nil {
264		return nil, nil, err
265	}
266
267	server.NewBaseRequest(&query.BaseRequest, "", prepared, namedArgs, positionalArgs, namespace, 0, 0, 0, 0,
268		value.FALSE, metrics, value.TRUE, value.TRUE, consistency, "", _ALL_USERS, "", "")
269
270	//	query.BaseRequest.SetIndexApiVersion(datastore.INDEX_API_3)
271	//	query.BaseRequest.SetFeatureControls(util.N1QL_GROUPAGG_PUSHDOWN)
272	defer mockServer.doStats(query)
273
274	select {
275	case mockServer.server.Channel() <- query:
276		// Wait until the request exits.
277		<-query.CloseNotify()
278	default:
279		// Timeout.
280		return nil, nil, errors.NewError(nil, "Query timed out")
281	}
282
283	// wait till all the results are ready
284	<-mr.done
285	return mr.results, mr.warnings, mr.err
286}
287
288/*
289Used to specify the N1QL nodes options using the method NewServer
290as defined in server/server.go.
291*/
292func Start(site, pool, namespace string, setGlobals bool) *MockServer {
293
294	mockServer := &MockServer{}
295	ds, err := resolver.NewDatastore(site + pool)
296	if err != nil {
297		logging.Errorp(err.Error())
298		os.Exit(1)
299	}
300
301	sys, err := system.NewDatastore(ds)
302	if err != nil {
303		logging.Errorp(err.Error())
304		os.Exit(1)
305	}
306
307	if setGlobals {
308		datastore.SetDatastore(ds)
309		datastore.SetSystemstore(sys)
310	}
311
312	configstore, err := config_resolver.NewConfigstore("stub:")
313	if err != nil {
314		logging.Errorp("Could not connect to configstore",
315			logging.Pair{"error", err},
316		)
317	}
318
319	acctstore, err := acct_resolver.NewAcctstore("stub:")
320	if err != nil {
321		logging.Errorp("Could not connect to acctstore",
322			logging.Pair{"error", err},
323		)
324	}
325
326	// Start the completed requests log - keep it small and busy
327	server.RequestsInit(0, 8)
328
329	// Start the prepared statement cache
330	prepareds.PreparedsInit(1024)
331
332	channel := make(server.RequestChannel, 10)
333	plusChannel := make(server.RequestChannel, 10)
334
335	// need to do it before NewServer() or server scope's changes to
336	// the variable and not the package...
337	server.SetActives(http.NewActiveRequests())
338	server, err := server.NewServer(ds, sys, configstore, acctstore, namespace,
339		false, channel, plusChannel, 1, 1, 1, 0, false, false, true, true,
340		server.ProfOff, false)
341	if err != nil {
342		logging.Errorp(err.Error())
343		os.Exit(1)
344	}
345
346	server.SetWhitelist(curlWhitelist)
347
348	prepareds.PreparedsReprepareInit(ds, sys, namespace)
349	server.SetKeepAlive(1 << 10)
350
351	go server.Serve()
352	mockServer.server = server
353	mockServer.acctstore = acctstore
354
355	return mockServer
356}
357
358func dropResultEntry(result interface{}, e string) {
359	switch v := result.(type) {
360	case map[string]interface{}:
361		delete(v, e)
362		for _, f := range v {
363			dropResultEntry(f, e)
364		}
365	case []interface{}:
366		for _, f := range v {
367			dropResultEntry(f, e)
368		}
369	}
370}
371
372func dropResultsEntry(results []interface{}, entry interface{}) {
373	e := fmt.Sprintf("%v", entry)
374	for _, r := range results {
375		dropResultEntry(r, e)
376	}
377}
378
379func addResultsEntry(newResults, results []interface{}, entry interface{}) {
380	e := fmt.Sprintf("%v", entry)
381	for i, r := range results {
382		v, ok := r.(map[string]interface{})
383		if ok {
384			newV, ok := newResults[i].(map[string]interface{})
385			if ok {
386				newV[e] = v[e]
387			}
388		}
389	}
390}
391
392func FtestCaseFile(fname string, prepared, explain bool, qc *MockServer, namespace string) (fin_stmt string, errstring error) {
393	fin_stmt = ""
394
395	/* Reads the input file and returns its contents in the form
396	   of a byte array.
397	*/
398	b, err := ioutil.ReadFile(fname)
399	if err != nil {
400		errstring = go_er.New(fmt.Sprintf("ReadFile failed: %v", err))
401		return
402	}
403
404	var cases []map[string]interface{}
405
406	err = json.Unmarshal(b, &cases)
407	if err != nil {
408		errstring = go_er.New(fmt.Sprintf("couldn't json unmarshal: %v, err: %v", string(b), err))
409		return
410	}
411	for i, c := range cases {
412		d, ok := c["disabled"]
413		if ok {
414			disabled := d.(bool)
415			if disabled == true {
416				continue
417			}
418		}
419
420		/* Handles all queries to be run against CBServer and Datastore */
421		v, ok := c["statements"]
422		if !ok || v == nil {
423			errstring = go_er.New(fmt.Sprintf("missing statements for case file: %v, index: %v", fname, i))
424			return
425		}
426		statements := v.(string)
427
428		var ordered bool
429		if o, ook := c["ordered"]; ook {
430			ordered = o.(bool)
431		}
432
433		if explain {
434			if errstring = checkExplain(qc, namespace, statements, c, ordered, fname, i); errstring != nil {
435				return
436			}
437		}
438
439		fin_stmt = strconv.Itoa(i) + ": " + statements
440		var resultsActual []interface{}
441		var errActual errors.Error
442		if prepared {
443			resultsActual, _, errActual = RunPrepared(qc, statements, namespace, nil, nil)
444		} else {
445			resultsActual, _, errActual = Run(qc, statements, namespace, nil, nil)
446		}
447
448		errExpected := ""
449		v, ok = c["error"]
450		if ok {
451			errExpected = v.(string)
452		}
453
454		if errActual != nil {
455			if errExpected == "" {
456				errstring = go_er.New(fmt.Sprintf("unexpected err: %v, statements: %v"+
457					", for case file: %v, index: %v", errActual, statements, fname, i))
458				return
459			}
460
461			if errExpected != errActual.Error() {
462				errstring = go_er.New(fmt.Sprintf("Mismatched error - expected '%s' actual '%s'"+
463					", for case file: %v, index: %v", errExpected, errActual.Error(), fname, i))
464				return
465			}
466
467			continue
468		}
469
470		if errExpected != "" {
471			errstring = go_er.New(fmt.Sprintf("did not see the expected err: %v, statements: %v"+
472				", for case file: %v, index: %v", errActual, statements, fname, i))
473			return
474		}
475
476		// ignore certain parts of the results if we need to
477		// we handle scalars and array of scalars, ignore the rest
478		// filter only applied to first level fields
479		ignore, ok := c["ignore"]
480		if ok {
481			switch ignore.(type) {
482			case []interface{}:
483				for _, v := range ignore.([]interface{}) {
484					switch v.(type) {
485					case []interface{}:
486					case map[string]interface{}:
487					default:
488						dropResultsEntry(resultsActual, v)
489					}
490				}
491			case map[string]interface{}:
492			default:
493				dropResultsEntry(resultsActual, ignore)
494			}
495		}
496
497		// opposite of ignore - only select certain fields
498		// again, we handle scalars and the scalars in an array
499		accept, ok := c["accept"]
500		if ok {
501			newResults := make([]interface{}, len(resultsActual))
502			switch accept.(type) {
503			case []interface{}:
504				for j, _ := range resultsActual {
505					newResults[j] = make(map[string]interface{}, len(accept.([]interface{})))
506				}
507				for _, v := range accept.([]interface{}) {
508					switch v.(type) {
509					case []interface{}:
510					case map[string]interface{}:
511					default:
512						addResultsEntry(newResults, resultsActual, v)
513					}
514				}
515			case map[string]interface{}:
516			default:
517				for j, _ := range resultsActual {
518					newResults[j] = make(map[string]interface{}, 1)
519				}
520				addResultsEntry(newResults, resultsActual, accept)
521			}
522			resultsActual = newResults
523		}
524		v, ok = c["results"]
525		if ok {
526			resultsExpected := v.([]interface{})
527			okres := doResultsMatch(resultsActual, resultsExpected, ordered, statements, fname, i)
528			if okres != nil {
529				errstring = okres
530				return
531			}
532		}
533	}
534	return fin_stmt, nil
535}
536
537/*
538Matches expected results with the results obtained by
539running the queries.
540*/
541func doResultsMatch(resultsActual, resultsExpected []interface{}, ordered bool, stmt, fname string, i int) (errstring error) {
542	if len(resultsActual) != len(resultsExpected) {
543		return go_er.New(fmt.Sprintf("results len don't match, %v vs %v, %v vs %v"+
544			", (%v)for case file: %v, index: %v",
545			len(resultsActual), len(resultsExpected),
546			resultsActual, resultsExpected, stmt, fname, i))
547	}
548
549	if ordered {
550		if !reflect.DeepEqual(resultsActual, resultsExpected) {
551			return go_er.New(fmt.Sprintf("results don't match, actual: %#v, expected: %#v"+
552				", (%v) for case file: %v, index: %v",
553				resultsActual, resultsExpected, stmt, fname, i))
554		}
555	} else {
556	nextresult:
557		for _, re := range resultsExpected {
558			for j, ra := range resultsActual {
559				if ra != nil && reflect.DeepEqual(ra, re) {
560					resultsActual[j] = nil
561					continue nextresult
562				}
563			}
564			return go_er.New(fmt.Sprintf("results don't match: %#v is not present in : %#v"+
565				", (%v) for case file: %v, index: %v",
566				re, resultsActual, stmt, fname, i))
567		}
568
569	}
570
571	return nil
572}
573
574func checkExplain(qc *MockServer, namespace string, statement string, c map[string]interface{}, ordered bool,
575	fname string, i int) (errstring error) {
576	var ev map[string]interface{}
577
578	e, ok := c["explain"]
579	if ok {
580		ev, ok = e.(map[string]interface{})
581	}
582
583	if !ok {
584		return
585	}
586
587	var eStmt string
588	var erExpected []interface{}
589
590	ed, dok := ev["disabled"]
591	es, sok := ev["statement"]
592	er, rok := ev["results"]
593
594	if dok {
595		if disabled := ed.(bool); disabled {
596			return
597		}
598	}
599
600	if sok {
601		eStmt, sok = es.(string)
602	}
603
604	if !sok {
605		return
606	}
607
608	if rok {
609		erExpected, rok = er.([]interface{})
610	}
611
612	explainStmt := "EXPLAIN " + statement
613	resultsActual, _, errActual := Run(qc, explainStmt, namespace, nil, nil)
614	if errActual != nil || len(resultsActual) != 1 {
615		return go_er.New(fmt.Sprintf("(%v) error actual: %#v"+
616			", for case file: %v, index: %v", explainStmt, resultsActual, fname, i))
617	}
618
619	namedParams := make(map[string]value.Value, 1)
620	namedParams["explan"] = value.NewValue(resultsActual[0])
621
622	resultsActual, _, errActual = Run(qc, eStmt, namespace, namedParams, nil)
623	if errActual != nil {
624		return go_er.New(fmt.Sprintf("unexpected err: %v, statement: %v"+
625			", for case file: %v, index: %v", errActual, eStmt, fname, i))
626	}
627
628	if rok {
629		return doResultsMatch(resultsActual, erExpected, ordered, eStmt, fname, i)
630	}
631
632	return
633}
634
635func PrepareStmt(qc *MockServer, namespace, statement string) (*plan.Prepared, errors.Error) {
636	prepareStmt := "PREPARE " + statement
637	resultsActual, _, errActual := Run(qc, prepareStmt, namespace, nil, nil)
638	if errActual != nil || len(resultsActual) != 1 {
639		return nil, errors.NewError(nil, fmt.Sprintf("Error %#v FOR (%v)", prepareStmt, resultsActual))
640	}
641	RunStmt(qc, "DELETE FROM system:prepareds")
642	ra := resultsActual[0].(map[string]interface{})
643	return prepareds.DecodePrepared("", ra["encoded_plan"].(string), true, false, nil)
644}
645
646/*
647Method to pass in parameters for site, pool and
648namespace to Start method for Couchbase Server.
649*/
650
651func Start_cs(setGlobals bool) *MockServer {
652	ms := Start(Site_CBS, Auth_param+"@"+Pool_CBS, Namespace_CBS, setGlobals)
653
654	return ms
655}
656
657func RunMatch(filename string, prepared, explain bool, qc *MockServer, t *testing.T) {
658
659	matches, err := filepath.Glob(filename)
660	if err != nil {
661		t.Errorf("glob failed: %v", err)
662	}
663
664	for _, m := range matches {
665		t.Logf("TestCaseFile: %v\n", m)
666		stmt, errcs := FtestCaseFile(m, prepared, explain, qc, Namespace_CBS)
667
668		if errcs != nil {
669			t.Errorf("Error : %s", errcs.Error())
670			return
671		}
672
673		if stmt != "" {
674			t.Logf(" %v\n", stmt)
675		}
676
677		fmt.Println("\nQuery : ", m, "\n\n")
678	}
679
680}
681
682func RunStmt(mockServer *MockServer, q string) ([]interface{}, []errors.Error, errors.Error) {
683	return Run(mockServer, q, Namespace_CBS, nil, nil)
684}
685