1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10package server
11
12import (
13	"encoding/json"
14	"fmt"
15	"math"
16	"os"
17	"runtime"
18	"runtime/pprof"
19	"strings"
20	"sync"
21	"time"
22
23	atomic "github.com/couchbase/go-couchbase/platform"
24	"github.com/couchbase/query/accounting"
25	"github.com/couchbase/query/algebra"
26	"github.com/couchbase/query/clustering"
27	"github.com/couchbase/query/datastore"
28	"github.com/couchbase/query/errors"
29	"github.com/couchbase/query/execution"
30	"github.com/couchbase/query/logging"
31	"github.com/couchbase/query/parser/n1ql"
32	"github.com/couchbase/query/plan"
33	"github.com/couchbase/query/planner"
34	"github.com/couchbase/query/prepareds"
35	queryMetakv "github.com/couchbase/query/server/settings/couchbase"
36	"github.com/couchbase/query/util"
37	"github.com/couchbase/query/value"
38)
39
40type Profile int
41
42const (
43	ProfUnset = Profile(iota)
44	ProfOff
45	ProfPhases
46	ProfOn
47)
48
49var _PROFILE_MAP = map[string]Profile{
50	"off":     ProfOff,
51	"phases":  ProfPhases,
52	"timings": ProfOn,
53}
54
55var _PROFILE_DEFAULT = ProfOff
56
57var _PROFILE_NAMES = []string{
58	ProfUnset:  "",
59	ProfOff:    "off",
60	ProfPhases: "phases",
61	ProfOn:     "timings",
62}
63
64var _IPv6 = false
65
66func (profile Profile) String() string {
67	return _PROFILE_NAMES[profile]
68}
69
70type Server struct {
71	// due to alignment issues on x86 platforms these atomic
72	// variables need to right at the beginning of the structure
73	servicers      atomic.AlignedInt64
74	plusServicers  atomic.AlignedInt64
75	maxParallelism atomic.AlignedInt64
76	keepAlive      atomic.AlignedInt64
77	requestSize    atomic.AlignedInt64
78
79	sync.RWMutex
80	datastore   datastore.Datastore
81	systemstore datastore.Datastore
82	configstore clustering.ConfigurationStore
83	acctstore   accounting.AccountingStore
84	namespace   string
85	readonly    bool
86	channel     RequestChannel
87	plusChannel RequestChannel
88	done        chan bool
89	plusDone    chan bool
90	timeout     time.Duration
91	signature   bool
92	metrics     bool
93	wg          sync.WaitGroup
94	plusWg      sync.WaitGroup
95	memprofile  string
96	cpuprofile  string
97	enterprise  bool
98	pretty      bool
99	srvprofile  Profile
100	srvcontrols bool
101	whitelist   map[string]interface{}
102}
103
104// Default Keep Alive Length
105
106const KEEP_ALIVE_DEFAULT = 1024 * 16
107
108func NewServer(store datastore.Datastore, sys datastore.Datastore, config clustering.ConfigurationStore,
109	acctng accounting.AccountingStore, namespace string, readonly bool,
110	channel, plusChannel RequestChannel, servicers, plusServicers, maxParallelism int,
111	timeout time.Duration, signature, metrics, enterprise, pretty bool,
112	srvprofile Profile, srvcontrols bool) (*Server, errors.Error) {
113	rv := &Server{
114		datastore:   store,
115		systemstore: sys,
116		configstore: config,
117		acctstore:   acctng,
118		namespace:   namespace,
119		readonly:    readonly,
120		channel:     channel,
121		plusChannel: plusChannel,
122		signature:   signature,
123		timeout:     timeout,
124		metrics:     metrics,
125		done:        make(chan bool),
126		plusDone:    make(chan bool),
127		enterprise:  enterprise,
128		pretty:      pretty,
129		srvcontrols: srvcontrols,
130		srvprofile:  srvprofile,
131	}
132
133	// special case handling for the atomic specfic stuff
134	atomic.StoreInt64(&rv.servicers, int64(servicers))
135	atomic.StoreInt64(&rv.plusServicers, int64(plusServicers))
136
137	store.SetLogLevel(logging.LogLevel())
138	rv.SetMaxParallelism(maxParallelism)
139
140	// set default values
141	rv.SetMaxIndexAPI(datastore.INDEX_API_MAX)
142	if rv.enterprise {
143		util.SetN1qlFeatureControl(util.DEF_N1QL_FEAT_CTRL)
144	} else {
145		util.SetN1qlFeatureControl(util.DEF_N1QL_FEAT_CTRL | util.CE_N1QL_FEAT_CTRL)
146	}
147
148	//	sys, err := system.NewDatastore(store)
149	//	if err != nil {
150	//		return nil, err
151	//	}
152	//
153	//	rv.systemstore = sys
154
155	// Setup callback function for metakv settings changes
156	callb := func(cfg queryMetakv.Config) {
157		logging.Infof("Settings notifier from metakv\n")
158
159		// SetParamValuesForAll accepts a full-set or subset of global configuration
160		// and updates those fields.
161		SetParamValuesForAll(cfg, rv)
162	}
163
164	queryMetakv.SetupSettingsNotifier(callb, make(chan struct{}))
165
166	return rv, nil
167}
168
169func (this *Server) Datastore() datastore.Datastore {
170	return this.datastore
171}
172
173func (this *Server) Systemstore() datastore.Datastore {
174	return this.systemstore
175}
176
177func (this *Server) Namespace() string {
178	return this.namespace
179}
180
181func (this *Server) SetWhitelist(val map[string]interface{}) {
182	this.whitelist = val
183}
184
185func (this *Server) GetWhitelist() map[string]interface{} {
186	return this.whitelist
187}
188
189func (this *Server) ConfigurationStore() clustering.ConfigurationStore {
190	return this.configstore
191}
192
193func (this *Server) AccountingStore() accounting.AccountingStore {
194	return this.acctstore
195}
196
197func (this *Server) Channel() RequestChannel {
198	return this.channel
199}
200
201func (this *Server) PlusChannel() RequestChannel {
202	return this.plusChannel
203}
204
205func (this *Server) Signature() bool {
206	return this.signature
207}
208
209func (this *Server) Metrics() bool {
210	return this.metrics
211}
212
213func (this *Server) Pretty() bool {
214	this.RLock()
215	defer this.RUnlock()
216	return this.pretty
217}
218
219func (this *Server) SetPretty(pretty bool) {
220	this.Lock()
221	defer this.Unlock()
222	this.pretty = pretty
223}
224
225func (this *Server) KeepAlive() int {
226	return int(atomic.LoadInt64(&this.keepAlive))
227}
228
229func (this *Server) SetKeepAlive(keepAlive int) {
230	if keepAlive <= 0 {
231		keepAlive = KEEP_ALIVE_DEFAULT
232	}
233	atomic.StoreInt64(&this.keepAlive, int64(keepAlive))
234}
235
236func (this *Server) MaxParallelism() int {
237	return int(atomic.LoadInt64(&this.maxParallelism))
238}
239
240func (this *Server) SetMaxParallelism(maxParallelism int) {
241	if maxParallelism <= 0 {
242		maxParallelism = runtime.NumCPU()
243	}
244	atomic.StoreInt64(&this.maxParallelism, int64(maxParallelism))
245}
246
247func (this *Server) MemProfile() string {
248	this.RLock()
249	defer this.RUnlock()
250	return this.memprofile
251}
252
253func (this *Server) SetMemProfile(memprofile string) {
254	this.Lock()
255	defer this.Unlock()
256	this.memprofile = memprofile
257}
258
259func (this *Server) CpuProfile() string {
260	this.RLock()
261	defer this.RUnlock()
262	return this.cpuprofile
263}
264
265func (this *Server) SetCpuProfile(cpuprofile string) {
266	this.Lock()
267	defer this.Unlock()
268	this.cpuprofile = cpuprofile
269	if this.cpuprofile == "" {
270		return
271	}
272	f, err := os.Create(this.cpuprofile)
273	if err != nil {
274		logging.Errorp("Cannot start cpu profiler", logging.Pair{"error", err})
275		this.cpuprofile = ""
276	} else {
277		pprof.StartCPUProfile(f)
278	}
279}
280
281func (this *Server) ScanCap() int64 {
282	return datastore.GetScanCap()
283}
284
285func (this *Server) SetScanCap(scan_cap int64) {
286	datastore.SetScanCap(scan_cap)
287}
288
289func (this *Server) PipelineCap() int64 {
290	return execution.GetPipelineCap()
291}
292
293func (this *Server) SetPipelineCap(pipeline_cap int64) {
294	execution.SetPipelineCap(pipeline_cap)
295}
296
297func (this *Server) PipelineBatch() int {
298	return execution.PipelineBatchSize()
299}
300
301func (this *Server) SetPipelineBatch(pipeline_batch int) {
302	execution.SetPipelineBatch(pipeline_batch)
303}
304
305func (this *Server) MaxIndexAPI() int {
306	return util.GetMaxIndexAPI()
307}
308
309func (this *Server) SetMaxIndexAPI(apiVersion int) {
310	if apiVersion < datastore.INDEX_API_MIN || apiVersion > datastore.INDEX_API_MAX {
311		apiVersion = datastore.INDEX_API_MIN
312	}
313	util.SetMaxIndexAPI(apiVersion)
314}
315
316func (this *Server) Debug() bool {
317	return logging.LogLevel() == logging.DEBUG
318}
319
320func (this *Server) SetDebug(debug bool) {
321	if debug {
322		this.SetLogLevel("debug")
323	} else {
324		this.SetLogLevel("info")
325	}
326}
327
328func (this *Server) LogLevel() string {
329	return logging.LogLevel().String()
330}
331
332func (this *Server) SetLogLevel(level string) {
333	lvl, ok := logging.ParseLevel(level)
334	if !ok {
335		logging.Errorp("SetLogLevel: unrecognized level", logging.Pair{"level", level})
336		return
337	}
338	if this.datastore != nil {
339		this.datastore.SetLogLevel(lvl)
340	}
341	logging.SetLevel(lvl)
342}
343
344const (
345	MAX_REQUEST_SIZE = 64 * (1 << 20)
346)
347
348func (this *Server) RequestSizeCap() int {
349	return int(atomic.LoadInt64(&this.requestSize))
350}
351
352func (this *Server) SetRequestSizeCap(requestSize int) {
353	if requestSize <= 0 {
354		requestSize = math.MaxInt32
355	}
356	atomic.StoreInt64(&this.requestSize, int64(requestSize))
357}
358
359func (this *Server) Servicers() int {
360	return int(atomic.LoadInt64(&this.servicers))
361}
362
363func (this *Server) SetServicers(servicers int) {
364	this.Lock()
365	defer this.Unlock()
366
367	// MB-19683 - don't restart if no change
368	if int(atomic.LoadInt64(&this.servicers)) == servicers {
369		return
370	}
371
372	// Stop the current set of servicers
373	close(this.done)
374	logging.Infop("SetServicers - waiting for current servicers to finish")
375	this.wg.Wait()
376	// Set servicer count and recreate servicers
377	atomic.StoreInt64(&this.servicers, int64(servicers))
378	logging.Infop("SetServicers - starting new servicers")
379	// Start new set of servicers
380	this.done = make(chan bool)
381	go this.Serve()
382}
383
384func (this *Server) PlusServicers() int {
385	return int(atomic.LoadInt64(&this.plusServicers))
386}
387
388func (this *Server) SetPlusServicers(plusServicers int) {
389	this.Lock()
390	defer this.Unlock()
391
392	// MB-19683 - don't restart if no change
393	if int(atomic.LoadInt64(&this.plusServicers)) == plusServicers {
394		return
395	}
396
397	// Stop the current set of servicers
398	close(this.plusDone)
399	logging.Infop("SetPlusServicers - waiting for current plusServicers to finish")
400	this.plusWg.Wait()
401	// Set plus servicer count and recreate plus servicers
402	atomic.StoreInt64(&this.plusServicers, int64(plusServicers))
403	logging.Infop("SetPlusServicers - starting new plusServicers")
404	// Start new set of servicers
405	this.plusDone = make(chan bool)
406	go this.PlusServe()
407}
408
409func (this *Server) Timeout() time.Duration {
410	return this.timeout
411}
412
413func (this *Server) SetTimeout(timeout time.Duration) {
414	this.timeout = timeout
415}
416
417func (this *Server) Profile() Profile {
418	return this.srvprofile
419}
420
421func (this *Server) SetProfile(srvprofile Profile) {
422	this.srvprofile = srvprofile
423}
424
425func (this *Server) Controls() bool {
426	return this.srvcontrols
427}
428
429func (this *Server) SetControls(srvcontrols bool) {
430	this.srvcontrols = srvcontrols
431}
432
433func ParseProfile(name string) (Profile, bool) {
434	prof, ok := _PROFILE_MAP[strings.ToLower(name)]
435	if ok {
436		return prof, ok
437	} else {
438		return _PROFILE_DEFAULT, ok
439	}
440}
441
442func (this *Server) Enterprise() bool {
443	return this.enterprise
444}
445
446func (this *Server) Serve() {
447	// Use a threading model. Do not spawn a separate
448	// goroutine for each request, as that would be
449	// unbounded and could degrade performance of already
450	// executing queries.
451	servicers := this.Servicers()
452	this.wg.Add(servicers)
453	for i := 0; i < servicers; i++ {
454		go this.doServe()
455	}
456}
457
458func (this *Server) doServe() {
459	defer this.wg.Done()
460	ok := true
461	for ok {
462		select {
463		case request := <-this.channel:
464			this.serviceRequest(request)
465		case <-this.done:
466			ok = false
467		}
468	}
469}
470
471func (this *Server) PlusServe() {
472	// Use a threading model. Do not spawn a separate
473	// goroutine for each request, as that would be
474	// unbounded and could degrade performance of already
475	// executing queries.
476	plusServicers := this.PlusServicers()
477	this.plusWg.Add(plusServicers)
478	for i := 0; i < plusServicers; i++ {
479		go this.doPlusServe()
480	}
481}
482
483func (this *Server) doPlusServe() {
484	defer this.plusWg.Done()
485	ok := true
486	for ok {
487		select {
488		case request := <-this.plusChannel:
489			this.serviceRequest(request)
490		case <-this.plusDone:
491			ok = false
492		}
493	}
494}
495
496func (this *Server) serviceRequest(request Request) {
497	defer func() {
498		err := recover()
499		if err != nil {
500			buf := make([]byte, 1<<16)
501			n := runtime.Stack(buf, false)
502			s := string(buf[0:n])
503			logging.Severep("", logging.Pair{"panic", err},
504				logging.Pair{"stack", s})
505			os.Stderr.WriteString(s)
506			os.Stderr.Sync()
507		}
508	}()
509
510	request.Servicing()
511
512	namespace := request.Namespace()
513	if namespace == "" {
514		namespace = this.namespace
515	}
516
517	prepared, err := this.getPrepared(request, namespace)
518	if err != nil {
519		request.Fail(err)
520	}
521
522	if (this.readonly || value.ToBool(request.Readonly())) &&
523		(prepared != nil && !prepared.Readonly()) {
524		request.Fail(errors.NewServiceErrorReadonly("The server or request is read-only" +
525			" and cannot accept this write statement."))
526	}
527
528	if request.State() == FATAL {
529		request.Failed(this)
530		return
531	}
532
533	maxParallelism := request.MaxParallelism()
534	if maxParallelism <= 0 {
535		maxParallelism = this.MaxParallelism()
536	}
537
538	context := execution.NewContext(request.Id().String(), this.datastore, this.systemstore, namespace,
539		this.readonly, maxParallelism, request.ScanCap(), request.PipelineCap(), request.PipelineBatch(),
540		request.NamedArgs(), request.PositionalArgs(), request.Credentials(), request.ScanConsistency(),
541		request.ScanVectorSource(), request.Output(), request.OriginalHttpRequest(),
542		prepared, request.IndexApiVersion(), request.FeatureControls())
543
544	context.SetWhitelist(this.whitelist)
545
546	build := time.Now()
547	operator, er := execution.Build(prepared, context)
548	if er != nil {
549		error, ok := er.(errors.Error)
550		if ok {
551			request.Fail(error)
552		} else {
553			request.Fail(errors.NewError(er, ""))
554		}
555	}
556
557	operator.SetRoot()
558	request.SetTimings(operator)
559	request.Output().AddPhaseTime(execution.INSTANTIATE, time.Since(build))
560
561	if request.State() == FATAL {
562		request.Failed(this)
563		return
564	}
565
566	timeout := request.Timeout()
567
568	// never allow request side timeout to be higher than
569	// server side timeout
570	if this.timeout > 0 && (this.timeout < timeout || timeout <= 0) {
571		timeout = this.timeout
572	}
573	if timeout > 0 {
574		request.SetTimer(time.AfterFunc(timeout, func() { request.Expire(TIMEOUT, timeout) }))
575		context.SetReqDeadline(time.Now().Add(timeout))
576	} else {
577		context.SetReqDeadline(time.Time{})
578	}
579
580	go operator.RunOnce(context, nil)
581
582	request.SetExecTime(time.Now())
583	request.Execute(this, prepared.Signature(), operator)
584}
585
586func (this *Server) getPrepared(request Request, namespace string) (*plan.Prepared, errors.Error) {
587	prepared := request.Prepared()
588	if prepared == nil {
589		parse := time.Now()
590		stmt, err := n1ql.ParseStatement(request.Statement())
591		request.Output().AddPhaseTime(execution.PARSE, time.Since(parse))
592		if err != nil {
593			return nil, errors.NewParseSyntaxError(err, "")
594		}
595
596		isprepare := false
597		if _, ok := stmt.(*algebra.Prepare); ok {
598			isprepare = true
599		}
600
601		prep := time.Now()
602		namedArgs := request.NamedArgs()
603		positionalArgs := request.PositionalArgs()
604
605		// No args for a prepared statement - should we throw an error?
606		if isprepare {
607			namedArgs = nil
608			positionalArgs = nil
609		}
610
611		prepared, err = planner.BuildPrepared(stmt, this.datastore, this.systemstore, namespace, false,
612			namedArgs, positionalArgs, request.IndexApiVersion(), request.FeatureControls())
613		request.Output().AddPhaseTime(execution.PLAN, time.Since(prep))
614		if err != nil {
615			return nil, errors.NewPlanError(err, "")
616		}
617
618		// EXECUTE doesn't get a plan. Get the plan from the cache.
619		switch stmt.Type() {
620		case "EXECUTE":
621			var reprepTime time.Duration
622			var err errors.Error
623
624			exec, _ := stmt.(*algebra.Execute)
625			if exec.Prepared() != nil {
626
627				prepared, err = prepareds.GetPrepared(exec.Prepared(), prepareds.OPT_TRACK|prepareds.OPT_REMOTE|prepareds.OPT_VERIFY, &reprepTime)
628				if reprepTime > 0 {
629					request.Output().AddPhaseTime(execution.REPREPARE, reprepTime)
630				}
631				if err != nil {
632					return nil, err
633				}
634				request.SetPrepared(prepared)
635
636				// when executing prepared statements, we set the type to that
637				// of the prepared statement
638				request.SetType(prepared.Type())
639			} else {
640
641				// this never happens, but for completeness
642				errors.NewPlanError(nil, "prepared not specified")
643			}
644		default:
645
646			// set the type for all statements bar prepare
647			// (doing otherwise would have accounting track prepares
648			// as if they were executions)
649			if isprepare {
650				request.SetIsPrepare(true)
651			} else {
652				request.SetType(stmt.Type())
653			}
654
655			// even though this is not a prepared statement, add the
656			// text for the benefit of context.Recover(): we can
657			// output the text in case of crashes
658			prepared.SetText(request.Statement())
659		}
660	} else {
661
662		// ditto
663		request.SetType(prepared.Type())
664	}
665
666	if logging.LogLevel() >= logging.DEBUG {
667		// log EXPLAIN for the request
668		logExplain(prepared)
669	}
670
671	return prepared, nil
672}
673
674func logExplain(prepared *plan.Prepared) {
675	var pl plan.Operator = prepared
676	explain, err := json.MarshalIndent(pl, "", "    ")
677	if err != nil {
678		logging.Tracep("Error logging explain", logging.Pair{"error", err})
679		return
680	}
681
682	logging.Tracep("Explain ", logging.Pair{"explain", fmt.Sprintf("<ud>%v</ud>", string(explain))})
683}
684
685// API for tracking server options
686type ServerOptions interface {
687	Controls() bool
688	Profile() Profile
689}
690
691var options ServerOptions
692
693func SetOptions(o ServerOptions) {
694	options = o
695}
696
697func GetControls() bool {
698	return options.Controls()
699}
700
701func GetProfile() Profile {
702	return options.Profile()
703}
704
705// Return the correct address for localhost depending on if
706// IPv4 or IPv6
707func GetIP(is_url bool) string {
708	if _IPv6 {
709		if is_url {
710			return "[::1]"
711		} else {
712			return "::1"
713		}
714	}
715	return "127.0.0.1"
716}
717
718func SetIP(val bool) {
719	_IPv6 = val
720	util.IPv6 = val
721}
722
723// This needs to support both IPv4 and IPv6
724// The prev version of impl for this function assumed
725// that node is always ip:port. It should not have a protocol component.
726func HostNameandPort(node string) (host, port string) {
727	tokens := []string{}
728	// For IPv6
729	if _IPv6 {
730		// Then the url should be of the form [::1]:8091
731		tokens = strings.Split(node, "]:")
732		host = strings.Replace(tokens[0], "[", "", 1)
733
734	} else {
735		// For IPv4
736		tokens = strings.Split(node, ":")
737		host = tokens[0]
738	}
739
740	if len(tokens) == 2 {
741		port = tokens[1]
742	} else {
743		port = ""
744	}
745
746	return
747}
748