1// Copyright (c) 2017 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file 3// except in compliance with the License. You may obtain a copy of the License at 4// http://www.apache.org/licenses/LICENSE-2.0 5// Unless required by applicable law or agreed to in writing, software distributed under the 6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 7// either express or implied. See the License for the specific language governing permissions 8// and limitations under the License. 9 10package gsi 11 12import ( 13 "encoding/json" 14 go_er "errors" 15 "fmt" 16 "io/ioutil" 17 http_base "net/http" 18 "os" 19 "path/filepath" 20 "reflect" 21 "strconv" 22 "testing" 23 "time" 24 25 "github.com/couchbase/query/accounting" 26 acct_resolver "github.com/couchbase/query/accounting/resolver" 27 "github.com/couchbase/query/auth" 28 config_resolver "github.com/couchbase/query/clustering/resolver" 29 "github.com/couchbase/query/datastore" 30 "github.com/couchbase/query/datastore/resolver" 31 "github.com/couchbase/query/datastore/system" 32 "github.com/couchbase/query/errors" 33 "github.com/couchbase/query/execution" 34 "github.com/couchbase/query/logging" 35 log_resolver "github.com/couchbase/query/logging/resolver" 36 "github.com/couchbase/query/plan" 37 "github.com/couchbase/query/prepareds" 38 "github.com/couchbase/query/server" 39 "github.com/couchbase/query/server/http" 40 "github.com/couchbase/query/timestamp" 41 // "github.com/couchbase/query/util" 42 "github.com/couchbase/query/value" 43) 44 45/* 46Global variables accessed by individual test cases for 47Couchbase server. Site_CBS, Auth_param, Pool_CBS 48and Namespace_CBS represent the site, server authentication 49parameters the ip of the couchbase server instance 50and the namespace. 51*/ 52var Site_CBS = "http://" 53var Username = "Administrator" 54var Password = "password" 55var Auth_param = "Administrator:password" 56var Pool_CBS = "127.0.0.1:8091/" 57var Namespace_CBS = "default" 58var Consistency_parameter = datastore.SCAN_PLUS 59var curlWhitelist = map[string]interface{}{"all_access": true} 60 61func init() { 62 63 Pool_CBS = server.GetIP(true) + ":8091/" 64 65 logger, _ := log_resolver.NewLogger("golog") 66 logging.SetLogger(logger) 67} 68 69type MockQuery struct { 70 server.BaseRequest 71 response *MockResponse 72 resultCount int 73} 74 75type MockServer struct { 76 server *server.Server 77 acctstore accounting.AccountingStore 78} 79 80func (this *MockQuery) OriginalHttpRequest() *http_base.Request { 81 return nil 82} 83 84func (this *MockQuery) Output() execution.Output { 85 return this 86} 87 88func (this *MockQuery) Fail(err errors.Error) { 89 defer this.Stop(server.FATAL) 90 this.response.err = err 91 close(this.response.done) 92} 93 94func (this *MockQuery) Error(err errors.Error) { 95 if this.response.err == nil { 96 this.response.err = err 97 } 98} 99 100func (this *MockQuery) Execute(srvr *server.Server, signature value.Value, stopNotify execution.Operator) { 101 defer this.stopAndClose(server.COMPLETED) 102 103 this.NotifyStop(stopNotify) 104 this.writeResults() 105 close(this.response.done) 106} 107 108func (this *MockQuery) Failed(srvr *server.Server) { 109 defer this.stopAndClose(server.FATAL) 110} 111 112func (this *MockQuery) Expire(state server.State, timeout time.Duration) { 113 defer this.stopAndClose(state) 114 115 this.response.err = errors.NewError(nil, "Query timed out") 116 close(this.response.done) 117} 118 119func (this *MockQuery) stopAndClose(state server.State) { 120 this.Stop(state) 121 this.Close() 122} 123 124func (this *MockQuery) writeResults() bool { 125 var item value.Value 126 127 ok := true 128 for ok { 129 select { 130 case <-this.StopExecute(): 131 this.SetState(server.STOPPED) 132 return true 133 default: 134 } 135 136 select { 137 case item, ok = <-this.Results(): 138 if ok { 139 if !this.writeResult(item) { 140 this.SetState(server.FATAL) 141 return false 142 } 143 } 144 case <-this.StopExecute(): 145 this.SetState(server.STOPPED) 146 return true 147 } 148 } 149 150 this.SetState(server.COMPLETED) 151 return true 152} 153 154func (this *MockQuery) writeResult(item value.Value) bool { 155 bytes, err := json.Marshal(item) 156 if err != nil { 157 panic(err.Error()) 158 } 159 160 this.resultCount++ 161 162 var resultLine map[string]interface{} 163 json.Unmarshal(bytes, &resultLine) 164 165 this.response.results = append(this.response.results, resultLine) 166 return true 167} 168 169type MockResponse struct { 170 err errors.Error 171 results []interface{} 172 warnings []errors.Error 173 done chan bool 174} 175 176func (this *MockResponse) NoMoreResults() { 177 close(this.done) 178} 179 180/* 181Scan consistency implementation. The default 182is set to REQUEST_PLUS. 183*/ 184type scanConfigImpl struct { 185 scan_level datastore.ScanConsistency 186} 187 188func (this *scanConfigImpl) ScanConsistency() datastore.ScanConsistency { 189 return this.scan_level 190} 191 192func (this *scanConfigImpl) ScanWait() time.Duration { 193 return 0 194} 195 196func (this *scanConfigImpl) ScanVectorSource() timestamp.ScanVectorSource { 197 return &http.ZeroScanVectorSource{} 198} 199 200func (this *MockServer) doStats(request *MockQuery) { 201 request.CompleteRequest(0, 0, request.resultCount, 0, 0, nil, this.server) 202} 203 204var _ALL_USERS = auth.Credentials{ 205 "customerowner": "customerpass", 206 "ordersowner": "orderspass", 207 "productowner": "productpass", 208 "purchaseowner": "purchasepass", 209 "reviewowner": "reviewpass", 210 "shellTestowner": "shellTestpass", 211} 212 213/* 214This method is used to execute the N1QL query represented by 215the input argument (q) string using the NewBaseRequest method 216as defined in the server request.go. 217*/ 218func Run(mockServer *MockServer, q, namespace string, namedArgs map[string]value.Value, 219 positionalArgs value.Values) ([]interface{}, []errors.Error, errors.Error) { 220 var metrics value.Tristate 221 consistency := &scanConfigImpl{scan_level: datastore.SCAN_PLUS} 222 223 mr := &MockResponse{ 224 results: []interface{}{}, warnings: []errors.Error{}, done: make(chan bool), 225 } 226 query := &MockQuery{ 227 response: mr, 228 } 229 server.NewBaseRequest(&query.BaseRequest, q, nil, namedArgs, positionalArgs, namespace, 0, 0, 0, 0, 230 value.FALSE, metrics, value.TRUE, value.TRUE, consistency, "", _ALL_USERS, "", "") 231 232 // query.BaseRequest.SetIndexApiVersion(datastore.INDEX_API_3) 233 // query.BaseRequest.SetFeatureControls(util.N1QL_GROUPAGG_PUSHDOWN) 234 defer mockServer.doStats(query) 235 236 select { 237 case mockServer.server.Channel() <- query: 238 // Wait until the request exits. 239 <-query.CloseNotify() 240 default: 241 // Timeout. 242 return nil, nil, errors.NewError(nil, "Query timed out") 243 } 244 245 // wait till all the results are ready 246 <-mr.done 247 return mr.results, mr.warnings, mr.err 248} 249 250func RunPrepared(mockServer *MockServer, q, namespace string, namedArgs map[string]value.Value, 251 positionalArgs value.Values) ([]interface{}, []errors.Error, errors.Error) { 252 var metrics value.Tristate 253 consistency := &scanConfigImpl{scan_level: datastore.SCAN_PLUS} 254 255 mr := &MockResponse{ 256 results: []interface{}{}, warnings: []errors.Error{}, done: make(chan bool), 257 } 258 query := &MockQuery{ 259 response: mr, 260 } 261 262 prepared, err := PrepareStmt(mockServer, namespace, q) 263 if err != nil { 264 return nil, nil, err 265 } 266 267 server.NewBaseRequest(&query.BaseRequest, "", prepared, namedArgs, positionalArgs, namespace, 0, 0, 0, 0, 268 value.FALSE, metrics, value.TRUE, value.TRUE, consistency, "", _ALL_USERS, "", "") 269 270 // query.BaseRequest.SetIndexApiVersion(datastore.INDEX_API_3) 271 // query.BaseRequest.SetFeatureControls(util.N1QL_GROUPAGG_PUSHDOWN) 272 defer mockServer.doStats(query) 273 274 select { 275 case mockServer.server.Channel() <- query: 276 // Wait until the request exits. 277 <-query.CloseNotify() 278 default: 279 // Timeout. 280 return nil, nil, errors.NewError(nil, "Query timed out") 281 } 282 283 // wait till all the results are ready 284 <-mr.done 285 return mr.results, mr.warnings, mr.err 286} 287 288/* 289Used to specify the N1QL nodes options using the method NewServer 290as defined in server/server.go. 291*/ 292func Start(site, pool, namespace string, setGlobals bool) *MockServer { 293 294 mockServer := &MockServer{} 295 ds, err := resolver.NewDatastore(site + pool) 296 if err != nil { 297 logging.Errorp(err.Error()) 298 os.Exit(1) 299 } 300 301 sys, err := system.NewDatastore(ds) 302 if err != nil { 303 logging.Errorp(err.Error()) 304 os.Exit(1) 305 } 306 307 if setGlobals { 308 datastore.SetDatastore(ds) 309 datastore.SetSystemstore(sys) 310 } 311 312 configstore, err := config_resolver.NewConfigstore("stub:") 313 if err != nil { 314 logging.Errorp("Could not connect to configstore", 315 logging.Pair{"error", err}, 316 ) 317 } 318 319 acctstore, err := acct_resolver.NewAcctstore("stub:") 320 if err != nil { 321 logging.Errorp("Could not connect to acctstore", 322 logging.Pair{"error", err}, 323 ) 324 } 325 326 // Start the completed requests log - keep it small and busy 327 server.RequestsInit(0, 8) 328 329 // Start the prepared statement cache 330 prepareds.PreparedsInit(1024) 331 332 channel := make(server.RequestChannel, 10) 333 plusChannel := make(server.RequestChannel, 10) 334 335 // need to do it before NewServer() or server scope's changes to 336 // the variable and not the package... 337 server.SetActives(http.NewActiveRequests()) 338 server, err := server.NewServer(ds, sys, configstore, acctstore, namespace, 339 false, channel, plusChannel, 1, 1, 1, 0, false, false, true, true, 340 server.ProfOff, false) 341 if err != nil { 342 logging.Errorp(err.Error()) 343 os.Exit(1) 344 } 345 346 server.SetWhitelist(curlWhitelist) 347 348 prepareds.PreparedsReprepareInit(ds, sys, namespace) 349 server.SetKeepAlive(1 << 10) 350 351 go server.Serve() 352 mockServer.server = server 353 mockServer.acctstore = acctstore 354 355 return mockServer 356} 357 358func dropResultEntry(result interface{}, e string) { 359 switch v := result.(type) { 360 case map[string]interface{}: 361 delete(v, e) 362 for _, f := range v { 363 dropResultEntry(f, e) 364 } 365 case []interface{}: 366 for _, f := range v { 367 dropResultEntry(f, e) 368 } 369 } 370} 371 372func dropResultsEntry(results []interface{}, entry interface{}) { 373 e := fmt.Sprintf("%v", entry) 374 for _, r := range results { 375 dropResultEntry(r, e) 376 } 377} 378 379func addResultsEntry(newResults, results []interface{}, entry interface{}) { 380 e := fmt.Sprintf("%v", entry) 381 for i, r := range results { 382 v, ok := r.(map[string]interface{}) 383 if ok { 384 newV, ok := newResults[i].(map[string]interface{}) 385 if ok { 386 newV[e] = v[e] 387 } 388 } 389 } 390} 391 392func FtestCaseFile(fname string, prepared, explain bool, qc *MockServer, namespace string) (fin_stmt string, errstring error) { 393 fin_stmt = "" 394 395 /* Reads the input file and returns its contents in the form 396 of a byte array. 397 */ 398 b, err := ioutil.ReadFile(fname) 399 if err != nil { 400 errstring = go_er.New(fmt.Sprintf("ReadFile failed: %v", err)) 401 return 402 } 403 404 var cases []map[string]interface{} 405 406 err = json.Unmarshal(b, &cases) 407 if err != nil { 408 errstring = go_er.New(fmt.Sprintf("couldn't json unmarshal: %v, err: %v", string(b), err)) 409 return 410 } 411 for i, c := range cases { 412 d, ok := c["disabled"] 413 if ok { 414 disabled := d.(bool) 415 if disabled == true { 416 continue 417 } 418 } 419 420 /* Handles all queries to be run against CBServer and Datastore */ 421 v, ok := c["statements"] 422 if !ok || v == nil { 423 errstring = go_er.New(fmt.Sprintf("missing statements for case file: %v, index: %v", fname, i)) 424 return 425 } 426 statements := v.(string) 427 428 var ordered bool 429 if o, ook := c["ordered"]; ook { 430 ordered = o.(bool) 431 } 432 433 if explain { 434 if errstring = checkExplain(qc, namespace, statements, c, ordered, fname, i); errstring != nil { 435 return 436 } 437 } 438 439 fin_stmt = strconv.Itoa(i) + ": " + statements 440 var resultsActual []interface{} 441 var errActual errors.Error 442 if prepared { 443 resultsActual, _, errActual = RunPrepared(qc, statements, namespace, nil, nil) 444 } else { 445 resultsActual, _, errActual = Run(qc, statements, namespace, nil, nil) 446 } 447 448 errExpected := "" 449 v, ok = c["error"] 450 if ok { 451 errExpected = v.(string) 452 } 453 454 if errActual != nil { 455 if errExpected == "" { 456 errstring = go_er.New(fmt.Sprintf("unexpected err: %v, statements: %v"+ 457 ", for case file: %v, index: %v", errActual, statements, fname, i)) 458 return 459 } 460 461 if errExpected != errActual.Error() { 462 errstring = go_er.New(fmt.Sprintf("Mismatched error - expected '%s' actual '%s'"+ 463 ", for case file: %v, index: %v", errExpected, errActual.Error(), fname, i)) 464 return 465 } 466 467 continue 468 } 469 470 if errExpected != "" { 471 errstring = go_er.New(fmt.Sprintf("did not see the expected err: %v, statements: %v"+ 472 ", for case file: %v, index: %v", errActual, statements, fname, i)) 473 return 474 } 475 476 // ignore certain parts of the results if we need to 477 // we handle scalars and array of scalars, ignore the rest 478 // filter only applied to first level fields 479 ignore, ok := c["ignore"] 480 if ok { 481 switch ignore.(type) { 482 case []interface{}: 483 for _, v := range ignore.([]interface{}) { 484 switch v.(type) { 485 case []interface{}: 486 case map[string]interface{}: 487 default: 488 dropResultsEntry(resultsActual, v) 489 } 490 } 491 case map[string]interface{}: 492 default: 493 dropResultsEntry(resultsActual, ignore) 494 } 495 } 496 497 // opposite of ignore - only select certain fields 498 // again, we handle scalars and the scalars in an array 499 accept, ok := c["accept"] 500 if ok { 501 newResults := make([]interface{}, len(resultsActual)) 502 switch accept.(type) { 503 case []interface{}: 504 for j, _ := range resultsActual { 505 newResults[j] = make(map[string]interface{}, len(accept.([]interface{}))) 506 } 507 for _, v := range accept.([]interface{}) { 508 switch v.(type) { 509 case []interface{}: 510 case map[string]interface{}: 511 default: 512 addResultsEntry(newResults, resultsActual, v) 513 } 514 } 515 case map[string]interface{}: 516 default: 517 for j, _ := range resultsActual { 518 newResults[j] = make(map[string]interface{}, 1) 519 } 520 addResultsEntry(newResults, resultsActual, accept) 521 } 522 resultsActual = newResults 523 } 524 v, ok = c["results"] 525 if ok { 526 resultsExpected := v.([]interface{}) 527 okres := doResultsMatch(resultsActual, resultsExpected, ordered, statements, fname, i) 528 if okres != nil { 529 errstring = okres 530 return 531 } 532 } 533 } 534 return fin_stmt, nil 535} 536 537/* 538Matches expected results with the results obtained by 539running the queries. 540*/ 541func doResultsMatch(resultsActual, resultsExpected []interface{}, ordered bool, stmt, fname string, i int) (errstring error) { 542 if len(resultsActual) != len(resultsExpected) { 543 return go_er.New(fmt.Sprintf("results len don't match, %v vs %v, %v vs %v"+ 544 ", (%v)for case file: %v, index: %v", 545 len(resultsActual), len(resultsExpected), 546 resultsActual, resultsExpected, stmt, fname, i)) 547 } 548 549 if ordered { 550 if !reflect.DeepEqual(resultsActual, resultsExpected) { 551 return go_er.New(fmt.Sprintf("results don't match, actual: %#v, expected: %#v"+ 552 ", (%v) for case file: %v, index: %v", 553 resultsActual, resultsExpected, stmt, fname, i)) 554 } 555 } else { 556 nextresult: 557 for _, re := range resultsExpected { 558 for j, ra := range resultsActual { 559 if ra != nil && reflect.DeepEqual(ra, re) { 560 resultsActual[j] = nil 561 continue nextresult 562 } 563 } 564 return go_er.New(fmt.Sprintf("results don't match: %#v is not present in : %#v"+ 565 ", (%v) for case file: %v, index: %v", 566 re, resultsActual, stmt, fname, i)) 567 } 568 569 } 570 571 return nil 572} 573 574func checkExplain(qc *MockServer, namespace string, statement string, c map[string]interface{}, ordered bool, 575 fname string, i int) (errstring error) { 576 var ev map[string]interface{} 577 578 e, ok := c["explain"] 579 if ok { 580 ev, ok = e.(map[string]interface{}) 581 } 582 583 if !ok { 584 return 585 } 586 587 var eStmt string 588 var erExpected []interface{} 589 590 ed, dok := ev["disabled"] 591 es, sok := ev["statement"] 592 er, rok := ev["results"] 593 594 if dok { 595 if disabled := ed.(bool); disabled { 596 return 597 } 598 } 599 600 if sok { 601 eStmt, sok = es.(string) 602 } 603 604 if !sok { 605 return 606 } 607 608 if rok { 609 erExpected, rok = er.([]interface{}) 610 } 611 612 explainStmt := "EXPLAIN " + statement 613 resultsActual, _, errActual := Run(qc, explainStmt, namespace, nil, nil) 614 if errActual != nil || len(resultsActual) != 1 { 615 return go_er.New(fmt.Sprintf("(%v) error actual: %#v"+ 616 ", for case file: %v, index: %v", explainStmt, resultsActual, fname, i)) 617 } 618 619 namedParams := make(map[string]value.Value, 1) 620 namedParams["explan"] = value.NewValue(resultsActual[0]) 621 622 resultsActual, _, errActual = Run(qc, eStmt, namespace, namedParams, nil) 623 if errActual != nil { 624 return go_er.New(fmt.Sprintf("unexpected err: %v, statement: %v"+ 625 ", for case file: %v, index: %v", errActual, eStmt, fname, i)) 626 } 627 628 if rok { 629 return doResultsMatch(resultsActual, erExpected, ordered, eStmt, fname, i) 630 } 631 632 return 633} 634 635func PrepareStmt(qc *MockServer, namespace, statement string) (*plan.Prepared, errors.Error) { 636 prepareStmt := "PREPARE " + statement 637 resultsActual, _, errActual := Run(qc, prepareStmt, namespace, nil, nil) 638 if errActual != nil || len(resultsActual) != 1 { 639 return nil, errors.NewError(nil, fmt.Sprintf("Error %#v FOR (%v)", prepareStmt, resultsActual)) 640 } 641 RunStmt(qc, "DELETE FROM system:prepareds") 642 ra := resultsActual[0].(map[string]interface{}) 643 return prepareds.DecodePrepared("", ra["encoded_plan"].(string), true, false, nil) 644} 645 646/* 647Method to pass in parameters for site, pool and 648namespace to Start method for Couchbase Server. 649*/ 650 651func Start_cs(setGlobals bool) *MockServer { 652 ms := Start(Site_CBS, Auth_param+"@"+Pool_CBS, Namespace_CBS, setGlobals) 653 654 return ms 655} 656 657func RunMatch(filename string, prepared, explain bool, qc *MockServer, t *testing.T) { 658 659 matches, err := filepath.Glob(filename) 660 if err != nil { 661 t.Errorf("glob failed: %v", err) 662 } 663 664 for _, m := range matches { 665 t.Logf("TestCaseFile: %v\n", m) 666 stmt, errcs := FtestCaseFile(m, prepared, explain, qc, Namespace_CBS) 667 668 if errcs != nil { 669 t.Errorf("Error : %s", errcs.Error()) 670 return 671 } 672 673 if stmt != "" { 674 t.Logf(" %v\n", stmt) 675 } 676 677 fmt.Println("\nQuery : ", m, "\n\n") 678 } 679 680} 681 682func RunStmt(mockServer *MockServer, q string) ([]interface{}, []errors.Error, errors.Error) { 683 return Run(mockServer, q, Namespace_CBS, nil, nil) 684} 685