1// Copyright (c) 2014 Couchbase, Inc. 2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file 3// except in compliance with the License. You may obtain a copy of the License at 4// http://www.apache.org/licenses/LICENSE-2.0 5// Unless required by applicable law or agreed to in writing, software distributed under the 6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 7// either express or implied. See the License for the specific language governing permissions 8// and limitations under the License. 9 10package server 11 12import ( 13 "encoding/json" 14 "fmt" 15 "math" 16 "os" 17 "runtime" 18 "runtime/pprof" 19 "strings" 20 "sync" 21 "time" 22 23 atomic "github.com/couchbase/go-couchbase/platform" 24 "github.com/couchbase/query/accounting" 25 "github.com/couchbase/query/algebra" 26 "github.com/couchbase/query/clustering" 27 "github.com/couchbase/query/datastore" 28 "github.com/couchbase/query/errors" 29 "github.com/couchbase/query/execution" 30 "github.com/couchbase/query/logging" 31 "github.com/couchbase/query/parser/n1ql" 32 "github.com/couchbase/query/plan" 33 "github.com/couchbase/query/planner" 34 "github.com/couchbase/query/prepareds" 35 queryMetakv "github.com/couchbase/query/server/settings/couchbase" 36 "github.com/couchbase/query/util" 37 "github.com/couchbase/query/value" 38) 39 40type Profile int 41 42const ( 43 ProfUnset = Profile(iota) 44 ProfOff 45 ProfPhases 46 ProfOn 47) 48 49var _PROFILE_MAP = map[string]Profile{ 50 "off": ProfOff, 51 "phases": ProfPhases, 52 "timings": ProfOn, 53} 54 55var _PROFILE_DEFAULT = ProfOff 56 57var _PROFILE_NAMES = []string{ 58 ProfUnset: "", 59 ProfOff: "off", 60 ProfPhases: "phases", 61 ProfOn: "timings", 62} 63 64var _IPv6 = false 65 66func (profile Profile) String() string { 67 return _PROFILE_NAMES[profile] 68} 69 70type Server struct { 71 // due to alignment issues on x86 platforms these atomic 72 // variables need to right at the beginning of the structure 73 servicers atomic.AlignedInt64 74 plusServicers atomic.AlignedInt64 75 maxParallelism atomic.AlignedInt64 76 keepAlive atomic.AlignedInt64 77 requestSize atomic.AlignedInt64 78 79 sync.RWMutex 80 datastore datastore.Datastore 81 systemstore datastore.Datastore 82 configstore clustering.ConfigurationStore 83 acctstore accounting.AccountingStore 84 namespace string 85 readonly bool 86 channel RequestChannel 87 plusChannel RequestChannel 88 done chan bool 89 plusDone chan bool 90 timeout time.Duration 91 signature bool 92 metrics bool 93 wg sync.WaitGroup 94 plusWg sync.WaitGroup 95 memprofile string 96 cpuprofile string 97 enterprise bool 98 pretty bool 99 srvprofile Profile 100 srvcontrols bool 101 whitelist map[string]interface{} 102} 103 104// Default Keep Alive Length 105 106const KEEP_ALIVE_DEFAULT = 1024 * 16 107 108func NewServer(store datastore.Datastore, sys datastore.Datastore, config clustering.ConfigurationStore, 109 acctng accounting.AccountingStore, namespace string, readonly bool, 110 channel, plusChannel RequestChannel, servicers, plusServicers, maxParallelism int, 111 timeout time.Duration, signature, metrics, enterprise, pretty bool, 112 srvprofile Profile, srvcontrols bool) (*Server, errors.Error) { 113 rv := &Server{ 114 datastore: store, 115 systemstore: sys, 116 configstore: config, 117 acctstore: acctng, 118 namespace: namespace, 119 readonly: readonly, 120 channel: channel, 121 plusChannel: plusChannel, 122 signature: signature, 123 timeout: timeout, 124 metrics: metrics, 125 done: make(chan bool), 126 plusDone: make(chan bool), 127 enterprise: enterprise, 128 pretty: pretty, 129 srvcontrols: srvcontrols, 130 srvprofile: srvprofile, 131 } 132 133 // special case handling for the atomic specfic stuff 134 atomic.StoreInt64(&rv.servicers, int64(servicers)) 135 atomic.StoreInt64(&rv.plusServicers, int64(plusServicers)) 136 137 store.SetLogLevel(logging.LogLevel()) 138 rv.SetMaxParallelism(maxParallelism) 139 140 // set default values 141 rv.SetMaxIndexAPI(datastore.INDEX_API_MAX) 142 if rv.enterprise { 143 util.SetN1qlFeatureControl(util.DEF_N1QL_FEAT_CTRL) 144 } else { 145 util.SetN1qlFeatureControl(util.DEF_N1QL_FEAT_CTRL | util.CE_N1QL_FEAT_CTRL) 146 } 147 148 // sys, err := system.NewDatastore(store) 149 // if err != nil { 150 // return nil, err 151 // } 152 // 153 // rv.systemstore = sys 154 155 // Setup callback function for metakv settings changes 156 callb := func(cfg queryMetakv.Config) { 157 logging.Infof("Settings notifier from metakv\n") 158 159 // SetParamValuesForAll accepts a full-set or subset of global configuration 160 // and updates those fields. 161 SetParamValuesForAll(cfg, rv) 162 } 163 164 queryMetakv.SetupSettingsNotifier(callb, make(chan struct{})) 165 166 return rv, nil 167} 168 169func (this *Server) Datastore() datastore.Datastore { 170 return this.datastore 171} 172 173func (this *Server) Systemstore() datastore.Datastore { 174 return this.systemstore 175} 176 177func (this *Server) Namespace() string { 178 return this.namespace 179} 180 181func (this *Server) SetWhitelist(val map[string]interface{}) { 182 this.whitelist = val 183} 184 185func (this *Server) GetWhitelist() map[string]interface{} { 186 return this.whitelist 187} 188 189func (this *Server) ConfigurationStore() clustering.ConfigurationStore { 190 return this.configstore 191} 192 193func (this *Server) AccountingStore() accounting.AccountingStore { 194 return this.acctstore 195} 196 197func (this *Server) Channel() RequestChannel { 198 return this.channel 199} 200 201func (this *Server) PlusChannel() RequestChannel { 202 return this.plusChannel 203} 204 205func (this *Server) Signature() bool { 206 return this.signature 207} 208 209func (this *Server) Metrics() bool { 210 return this.metrics 211} 212 213func (this *Server) Pretty() bool { 214 this.RLock() 215 defer this.RUnlock() 216 return this.pretty 217} 218 219func (this *Server) SetPretty(pretty bool) { 220 this.Lock() 221 defer this.Unlock() 222 this.pretty = pretty 223} 224 225func (this *Server) KeepAlive() int { 226 return int(atomic.LoadInt64(&this.keepAlive)) 227} 228 229func (this *Server) SetKeepAlive(keepAlive int) { 230 if keepAlive <= 0 { 231 keepAlive = KEEP_ALIVE_DEFAULT 232 } 233 atomic.StoreInt64(&this.keepAlive, int64(keepAlive)) 234} 235 236func (this *Server) MaxParallelism() int { 237 return int(atomic.LoadInt64(&this.maxParallelism)) 238} 239 240func (this *Server) SetMaxParallelism(maxParallelism int) { 241 if maxParallelism <= 0 { 242 maxParallelism = runtime.NumCPU() 243 } 244 atomic.StoreInt64(&this.maxParallelism, int64(maxParallelism)) 245} 246 247func (this *Server) MemProfile() string { 248 this.RLock() 249 defer this.RUnlock() 250 return this.memprofile 251} 252 253func (this *Server) SetMemProfile(memprofile string) { 254 this.Lock() 255 defer this.Unlock() 256 this.memprofile = memprofile 257} 258 259func (this *Server) CpuProfile() string { 260 this.RLock() 261 defer this.RUnlock() 262 return this.cpuprofile 263} 264 265func (this *Server) SetCpuProfile(cpuprofile string) { 266 this.Lock() 267 defer this.Unlock() 268 this.cpuprofile = cpuprofile 269 if this.cpuprofile == "" { 270 return 271 } 272 f, err := os.Create(this.cpuprofile) 273 if err != nil { 274 logging.Errorp("Cannot start cpu profiler", logging.Pair{"error", err}) 275 this.cpuprofile = "" 276 } else { 277 pprof.StartCPUProfile(f) 278 } 279} 280 281func (this *Server) ScanCap() int64 { 282 return datastore.GetScanCap() 283} 284 285func (this *Server) SetScanCap(scan_cap int64) { 286 datastore.SetScanCap(scan_cap) 287} 288 289func (this *Server) PipelineCap() int64 { 290 return execution.GetPipelineCap() 291} 292 293func (this *Server) SetPipelineCap(pipeline_cap int64) { 294 execution.SetPipelineCap(pipeline_cap) 295} 296 297func (this *Server) PipelineBatch() int { 298 return execution.PipelineBatchSize() 299} 300 301func (this *Server) SetPipelineBatch(pipeline_batch int) { 302 execution.SetPipelineBatch(pipeline_batch) 303} 304 305func (this *Server) MaxIndexAPI() int { 306 return util.GetMaxIndexAPI() 307} 308 309func (this *Server) SetMaxIndexAPI(apiVersion int) { 310 if apiVersion < datastore.INDEX_API_MIN || apiVersion > datastore.INDEX_API_MAX { 311 apiVersion = datastore.INDEX_API_MIN 312 } 313 util.SetMaxIndexAPI(apiVersion) 314} 315 316func (this *Server) Debug() bool { 317 return logging.LogLevel() == logging.DEBUG 318} 319 320func (this *Server) SetDebug(debug bool) { 321 if debug { 322 this.SetLogLevel("debug") 323 } else { 324 this.SetLogLevel("info") 325 } 326} 327 328func (this *Server) LogLevel() string { 329 return logging.LogLevel().String() 330} 331 332func (this *Server) SetLogLevel(level string) { 333 lvl, ok := logging.ParseLevel(level) 334 if !ok { 335 logging.Errorp("SetLogLevel: unrecognized level", logging.Pair{"level", level}) 336 return 337 } 338 if this.datastore != nil { 339 this.datastore.SetLogLevel(lvl) 340 } 341 logging.SetLevel(lvl) 342} 343 344const ( 345 MAX_REQUEST_SIZE = 64 * (1 << 20) 346) 347 348func (this *Server) RequestSizeCap() int { 349 return int(atomic.LoadInt64(&this.requestSize)) 350} 351 352func (this *Server) SetRequestSizeCap(requestSize int) { 353 if requestSize <= 0 { 354 requestSize = math.MaxInt32 355 } 356 atomic.StoreInt64(&this.requestSize, int64(requestSize)) 357} 358 359func (this *Server) Servicers() int { 360 return int(atomic.LoadInt64(&this.servicers)) 361} 362 363func (this *Server) SetServicers(servicers int) { 364 this.Lock() 365 defer this.Unlock() 366 367 // MB-19683 - don't restart if no change 368 if int(atomic.LoadInt64(&this.servicers)) == servicers { 369 return 370 } 371 372 // Stop the current set of servicers 373 close(this.done) 374 logging.Infop("SetServicers - waiting for current servicers to finish") 375 this.wg.Wait() 376 // Set servicer count and recreate servicers 377 atomic.StoreInt64(&this.servicers, int64(servicers)) 378 logging.Infop("SetServicers - starting new servicers") 379 // Start new set of servicers 380 this.done = make(chan bool) 381 go this.Serve() 382} 383 384func (this *Server) PlusServicers() int { 385 return int(atomic.LoadInt64(&this.plusServicers)) 386} 387 388func (this *Server) SetPlusServicers(plusServicers int) { 389 this.Lock() 390 defer this.Unlock() 391 392 // MB-19683 - don't restart if no change 393 if int(atomic.LoadInt64(&this.plusServicers)) == plusServicers { 394 return 395 } 396 397 // Stop the current set of servicers 398 close(this.plusDone) 399 logging.Infop("SetPlusServicers - waiting for current plusServicers to finish") 400 this.plusWg.Wait() 401 // Set plus servicer count and recreate plus servicers 402 atomic.StoreInt64(&this.plusServicers, int64(plusServicers)) 403 logging.Infop("SetPlusServicers - starting new plusServicers") 404 // Start new set of servicers 405 this.plusDone = make(chan bool) 406 go this.PlusServe() 407} 408 409func (this *Server) Timeout() time.Duration { 410 return this.timeout 411} 412 413func (this *Server) SetTimeout(timeout time.Duration) { 414 this.timeout = timeout 415} 416 417func (this *Server) Profile() Profile { 418 return this.srvprofile 419} 420 421func (this *Server) SetProfile(srvprofile Profile) { 422 this.srvprofile = srvprofile 423} 424 425func (this *Server) Controls() bool { 426 return this.srvcontrols 427} 428 429func (this *Server) SetControls(srvcontrols bool) { 430 this.srvcontrols = srvcontrols 431} 432 433func ParseProfile(name string) (Profile, bool) { 434 prof, ok := _PROFILE_MAP[strings.ToLower(name)] 435 if ok { 436 return prof, ok 437 } else { 438 return _PROFILE_DEFAULT, ok 439 } 440} 441 442func (this *Server) Enterprise() bool { 443 return this.enterprise 444} 445 446func (this *Server) Serve() { 447 // Use a threading model. Do not spawn a separate 448 // goroutine for each request, as that would be 449 // unbounded and could degrade performance of already 450 // executing queries. 451 servicers := this.Servicers() 452 this.wg.Add(servicers) 453 for i := 0; i < servicers; i++ { 454 go this.doServe() 455 } 456} 457 458func (this *Server) doServe() { 459 defer this.wg.Done() 460 ok := true 461 for ok { 462 select { 463 case request := <-this.channel: 464 this.serviceRequest(request) 465 case <-this.done: 466 ok = false 467 } 468 } 469} 470 471func (this *Server) PlusServe() { 472 // Use a threading model. Do not spawn a separate 473 // goroutine for each request, as that would be 474 // unbounded and could degrade performance of already 475 // executing queries. 476 plusServicers := this.PlusServicers() 477 this.plusWg.Add(plusServicers) 478 for i := 0; i < plusServicers; i++ { 479 go this.doPlusServe() 480 } 481} 482 483func (this *Server) doPlusServe() { 484 defer this.plusWg.Done() 485 ok := true 486 for ok { 487 select { 488 case request := <-this.plusChannel: 489 this.serviceRequest(request) 490 case <-this.plusDone: 491 ok = false 492 } 493 } 494} 495 496func (this *Server) serviceRequest(request Request) { 497 defer func() { 498 err := recover() 499 if err != nil { 500 buf := make([]byte, 1<<16) 501 n := runtime.Stack(buf, false) 502 s := string(buf[0:n]) 503 logging.Severep("", logging.Pair{"panic", err}, 504 logging.Pair{"stack", s}) 505 os.Stderr.WriteString(s) 506 os.Stderr.Sync() 507 } 508 }() 509 510 request.Servicing() 511 512 namespace := request.Namespace() 513 if namespace == "" { 514 namespace = this.namespace 515 } 516 517 prepared, err := this.getPrepared(request, namespace) 518 if err != nil { 519 request.Fail(err) 520 } 521 522 if (this.readonly || value.ToBool(request.Readonly())) && 523 (prepared != nil && !prepared.Readonly()) { 524 request.Fail(errors.NewServiceErrorReadonly("The server or request is read-only" + 525 " and cannot accept this write statement.")) 526 } 527 528 if request.State() == FATAL { 529 request.Failed(this) 530 return 531 } 532 533 maxParallelism := request.MaxParallelism() 534 if maxParallelism <= 0 { 535 maxParallelism = this.MaxParallelism() 536 } 537 538 context := execution.NewContext(request.Id().String(), this.datastore, this.systemstore, namespace, 539 this.readonly, maxParallelism, request.ScanCap(), request.PipelineCap(), request.PipelineBatch(), 540 request.NamedArgs(), request.PositionalArgs(), request.Credentials(), request.ScanConsistency(), 541 request.ScanVectorSource(), request.Output(), request.OriginalHttpRequest(), 542 prepared, request.IndexApiVersion(), request.FeatureControls()) 543 544 context.SetWhitelist(this.whitelist) 545 546 build := time.Now() 547 operator, er := execution.Build(prepared, context) 548 if er != nil { 549 error, ok := er.(errors.Error) 550 if ok { 551 request.Fail(error) 552 } else { 553 request.Fail(errors.NewError(er, "")) 554 } 555 } 556 557 operator.SetRoot() 558 request.SetTimings(operator) 559 request.Output().AddPhaseTime(execution.INSTANTIATE, time.Since(build)) 560 561 if request.State() == FATAL { 562 request.Failed(this) 563 return 564 } 565 566 timeout := request.Timeout() 567 568 // never allow request side timeout to be higher than 569 // server side timeout 570 if this.timeout > 0 && (this.timeout < timeout || timeout <= 0) { 571 timeout = this.timeout 572 } 573 if timeout > 0 { 574 request.SetTimer(time.AfterFunc(timeout, func() { request.Expire(TIMEOUT, timeout) })) 575 context.SetReqDeadline(time.Now().Add(timeout)) 576 } else { 577 context.SetReqDeadline(time.Time{}) 578 } 579 580 go operator.RunOnce(context, nil) 581 582 request.SetExecTime(time.Now()) 583 request.Execute(this, prepared.Signature(), operator) 584} 585 586func (this *Server) getPrepared(request Request, namespace string) (*plan.Prepared, errors.Error) { 587 prepared := request.Prepared() 588 if prepared == nil { 589 parse := time.Now() 590 stmt, err := n1ql.ParseStatement(request.Statement()) 591 request.Output().AddPhaseTime(execution.PARSE, time.Since(parse)) 592 if err != nil { 593 return nil, errors.NewParseSyntaxError(err, "") 594 } 595 596 isprepare := false 597 if _, ok := stmt.(*algebra.Prepare); ok { 598 isprepare = true 599 } 600 601 prep := time.Now() 602 namedArgs := request.NamedArgs() 603 positionalArgs := request.PositionalArgs() 604 605 // No args for a prepared statement - should we throw an error? 606 if isprepare { 607 namedArgs = nil 608 positionalArgs = nil 609 } 610 611 prepared, err = planner.BuildPrepared(stmt, this.datastore, this.systemstore, namespace, false, 612 namedArgs, positionalArgs, request.IndexApiVersion(), request.FeatureControls()) 613 request.Output().AddPhaseTime(execution.PLAN, time.Since(prep)) 614 if err != nil { 615 return nil, errors.NewPlanError(err, "") 616 } 617 618 // EXECUTE doesn't get a plan. Get the plan from the cache. 619 switch stmt.Type() { 620 case "EXECUTE": 621 var reprepTime time.Duration 622 var err errors.Error 623 624 exec, _ := stmt.(*algebra.Execute) 625 if exec.Prepared() != nil { 626 627 prepared, err = prepareds.GetPrepared(exec.Prepared(), prepareds.OPT_TRACK|prepareds.OPT_REMOTE|prepareds.OPT_VERIFY, &reprepTime) 628 if reprepTime > 0 { 629 request.Output().AddPhaseTime(execution.REPREPARE, reprepTime) 630 } 631 if err != nil { 632 return nil, err 633 } 634 request.SetPrepared(prepared) 635 636 // when executing prepared statements, we set the type to that 637 // of the prepared statement 638 request.SetType(prepared.Type()) 639 } else { 640 641 // this never happens, but for completeness 642 errors.NewPlanError(nil, "prepared not specified") 643 } 644 default: 645 646 // set the type for all statements bar prepare 647 // (doing otherwise would have accounting track prepares 648 // as if they were executions) 649 if isprepare { 650 request.SetIsPrepare(true) 651 } else { 652 request.SetType(stmt.Type()) 653 } 654 655 // even though this is not a prepared statement, add the 656 // text for the benefit of context.Recover(): we can 657 // output the text in case of crashes 658 prepared.SetText(request.Statement()) 659 } 660 } else { 661 662 // ditto 663 request.SetType(prepared.Type()) 664 } 665 666 if logging.LogLevel() >= logging.DEBUG { 667 // log EXPLAIN for the request 668 logExplain(prepared) 669 } 670 671 return prepared, nil 672} 673 674func logExplain(prepared *plan.Prepared) { 675 var pl plan.Operator = prepared 676 explain, err := json.MarshalIndent(pl, "", " ") 677 if err != nil { 678 logging.Tracep("Error logging explain", logging.Pair{"error", err}) 679 return 680 } 681 682 logging.Tracep("Explain ", logging.Pair{"explain", fmt.Sprintf("<ud>%v</ud>", string(explain))}) 683} 684 685// API for tracking server options 686type ServerOptions interface { 687 Controls() bool 688 Profile() Profile 689} 690 691var options ServerOptions 692 693func SetOptions(o ServerOptions) { 694 options = o 695} 696 697func GetControls() bool { 698 return options.Controls() 699} 700 701func GetProfile() Profile { 702 return options.Profile() 703} 704 705// Return the correct address for localhost depending on if 706// IPv4 or IPv6 707func GetIP(is_url bool) string { 708 if _IPv6 { 709 if is_url { 710 return "[::1]" 711 } else { 712 return "::1" 713 } 714 } 715 return "127.0.0.1" 716} 717 718func SetIP(val bool) { 719 _IPv6 = val 720 util.IPv6 = val 721} 722 723// This needs to support both IPv4 and IPv6 724// The prev version of impl for this function assumed 725// that node is always ip:port. It should not have a protocol component. 726func HostNameandPort(node string) (host, port string) { 727 tokens := []string{} 728 // For IPv6 729 if _IPv6 { 730 // Then the url should be of the form [::1]:8091 731 tokens = strings.Split(node, "]:") 732 host = strings.Replace(tokens[0], "[", "", 1) 733 734 } else { 735 // For IPv4 736 tokens = strings.Split(node, ":") 737 host = tokens[0] 738 } 739 740 if len(tokens) == 2 { 741 port = tokens[1] 742 } else { 743 port = "" 744 } 745 746 return 747} 748