1package gocb
2
3import (
4	"bytes"
5	"encoding/json"
6	"fmt"
7	"net/http"
8	"net/url"
9	"time"
10
11	"github.com/opentracing/opentracing-go"
12)
13
14type n1qlCache struct {
15	name        string
16	encodedPlan string
17}
18
19type n1qlError struct {
20	Code    uint32 `json:"code"`
21	Message string `json:"msg"`
22}
23
24func (e *n1qlError) Error() string {
25	return fmt.Sprintf("[%d] %s", e.Code, e.Message)
26}
27
28type n1qlResponseMetrics struct {
29	ElapsedTime   string `json:"elapsedTime"`
30	ExecutionTime string `json:"executionTime"`
31	ResultCount   uint   `json:"resultCount"`
32	ResultSize    uint   `json:"resultSize"`
33	MutationCount uint   `json:"mutationCount,omitempty"`
34	SortCount     uint   `json:"sortCount,omitempty"`
35	ErrorCount    uint   `json:"errorCount,omitempty"`
36	WarningCount  uint   `json:"warningCount,omitempty"`
37}
38
39type n1qlResponse struct {
40	RequestId       string              `json:"requestID"`
41	ClientContextId string              `json:"clientContextID"`
42	Results         []json.RawMessage   `json:"results,omitempty"`
43	Errors          []n1qlError         `json:"errors,omitempty"`
44	Status          string              `json:"status"`
45	Metrics         n1qlResponseMetrics `json:"metrics"`
46}
47
48type n1qlMultiError []n1qlError
49
50func (e *n1qlMultiError) Error() string {
51	return (*e)[0].Error()
52}
53
54func (e *n1qlMultiError) Code() uint32 {
55	return (*e)[0].Code
56}
57
58// QueryResultMetrics encapsulates various metrics gathered during a queries execution.
59type QueryResultMetrics struct {
60	ElapsedTime   time.Duration
61	ExecutionTime time.Duration
62	ResultCount   uint
63	ResultSize    uint
64	MutationCount uint
65	SortCount     uint
66	ErrorCount    uint
67	WarningCount  uint
68}
69
70// QueryResults allows access to the results of a N1QL query.
71type QueryResults interface {
72	One(valuePtr interface{}) error
73	Next(valuePtr interface{}) bool
74	NextBytes() []byte
75	Close() error
76
77	RequestId() string
78	ClientContextId() string
79	Metrics() QueryResultMetrics
80
81	// SourceAddr returns the source endpoint where the request was sent to.
82	// VOLATILE
83	SourceEndpoint() string
84}
85
86type n1qlResults struct {
87	closed          bool
88	index           int
89	rows            []json.RawMessage
90	err             error
91	requestId       string
92	clientContextId string
93	metrics         QueryResultMetrics
94	sourceAddr      string
95}
96
97func (r *n1qlResults) Next(valuePtr interface{}) bool {
98	if r.err != nil {
99		return false
100	}
101
102	row := r.NextBytes()
103	if row == nil {
104		return false
105	}
106
107	r.err = json.Unmarshal(row, valuePtr)
108	if r.err != nil {
109		return false
110	}
111
112	return true
113}
114
115func (r *n1qlResults) NextBytes() []byte {
116	if r.err != nil {
117		return nil
118	}
119
120	if r.index+1 >= len(r.rows) {
121		r.closed = true
122		return nil
123	}
124	r.index++
125
126	return r.rows[r.index]
127}
128
129func (r *n1qlResults) Close() error {
130	r.closed = true
131	return r.err
132}
133
134func (r *n1qlResults) One(valuePtr interface{}) error {
135	if !r.Next(valuePtr) {
136		err := r.Close()
137		if err != nil {
138			return err
139		}
140		return ErrNoResults
141	}
142
143	// Ignore any errors occurring after we already have our result
144	err := r.Close()
145	if err != nil {
146		// Return no error as we got the one result already.
147		return nil
148	}
149
150	return nil
151}
152
153func (r *n1qlResults) SourceEndpoint() string {
154	return r.sourceAddr
155}
156
157func (r *n1qlResults) RequestId() string {
158	if !r.closed {
159		panic("Result must be closed before accessing meta-data")
160	}
161
162	return r.requestId
163}
164
165func (r *n1qlResults) ClientContextId() string {
166	if !r.closed {
167		panic("Result must be closed before accessing meta-data")
168	}
169
170	return r.clientContextId
171}
172
173func (r *n1qlResults) Metrics() QueryResultMetrics {
174	if !r.closed {
175		panic("Result must be closed before accessing meta-data")
176	}
177
178	return r.metrics
179}
180
181// Executes the N1QL query (in opts) on the server n1qlEp.
182// This function assumes that `opts` already contains all the required
183// settings. This function will inject any additional connection or request-level
184// settings into the `opts` map (currently this is only the timeout).
185func (c *Cluster) executeN1qlQuery(tracectx opentracing.SpanContext, n1qlEp string, opts map[string]interface{}, creds []UserPassPair, timeout time.Duration, client *http.Client) (QueryResults, error) {
186	reqUri := fmt.Sprintf("%s/query/service", n1qlEp)
187
188	tmostr, castok := opts["timeout"].(string)
189	if castok {
190		var err error
191		timeout, err = time.ParseDuration(tmostr)
192		if err != nil {
193			return nil, err
194		}
195	} else {
196		// Set the timeout string to its default variant
197		opts["timeout"] = timeout.String()
198	}
199
200	if len(creds) > 1 {
201		opts["creds"] = creds
202	}
203
204	reqJson, err := json.Marshal(opts)
205	if err != nil {
206		return nil, err
207	}
208
209	req, err := http.NewRequest("POST", reqUri, bytes.NewBuffer(reqJson))
210	if err != nil {
211		return nil, err
212	}
213	req.Header.Set("Content-Type", "application/json")
214
215	if len(creds) == 1 {
216		req.SetBasicAuth(creds[0].Username, creds[0].Password)
217	}
218
219	dtrace := c.agentConfig.Tracer.StartSpan("dispatch",
220		opentracing.ChildOf(tracectx))
221
222	resp, err := doHttpWithTimeout(client, req, timeout)
223	if err != nil {
224		dtrace.Finish()
225		return nil, err
226	}
227
228	dtrace.Finish()
229
230	strace := c.agentConfig.Tracer.StartSpan("streaming",
231		opentracing.ChildOf(tracectx))
232
233	n1qlResp := n1qlResponse{}
234	jsonDec := json.NewDecoder(resp.Body)
235	err = jsonDec.Decode(&n1qlResp)
236	if err != nil {
237		strace.Finish()
238		return nil, err
239	}
240
241	err = resp.Body.Close()
242	if err != nil {
243		logDebugf("Failed to close socket (%s)", err)
244	}
245
246	// TODO(brett19): place the server_duration in the right place...
247	//srvDuration, _ := time.ParseDuration(n1qlResp.Metrics.ExecutionTime)
248	//strace.SetTag("server_duration", srvDuration)
249
250	strace.Finish()
251
252	if len(n1qlResp.Errors) > 0 {
253		return nil, (*n1qlMultiError)(&n1qlResp.Errors)
254	}
255
256	if resp.StatusCode != 200 {
257		return nil, &viewError{
258			Message: "HTTP Error",
259			Reason:  fmt.Sprintf("Status code was %d.", resp.StatusCode),
260		}
261	}
262
263	elapsedTime, err := time.ParseDuration(n1qlResp.Metrics.ElapsedTime)
264	if err != nil {
265		logDebugf("Failed to parse elapsed time duration (%s)", err)
266	}
267
268	executionTime, err := time.ParseDuration(n1qlResp.Metrics.ExecutionTime)
269	if err != nil {
270		logDebugf("Failed to parse execution time duration (%s)", err)
271	}
272
273	epInfo, err := url.Parse(reqUri)
274	if err != nil {
275		logWarnf("Failed to parse N1QL source address")
276		epInfo = &url.URL{
277			Host: "",
278		}
279	}
280
281	return &n1qlResults{
282		sourceAddr:      epInfo.Host,
283		requestId:       n1qlResp.RequestId,
284		clientContextId: n1qlResp.ClientContextId,
285		index:           -1,
286		rows:            n1qlResp.Results,
287		metrics: QueryResultMetrics{
288			ElapsedTime:   elapsedTime,
289			ExecutionTime: executionTime,
290			ResultCount:   n1qlResp.Metrics.ResultCount,
291			ResultSize:    n1qlResp.Metrics.ResultSize,
292			MutationCount: n1qlResp.Metrics.MutationCount,
293			SortCount:     n1qlResp.Metrics.SortCount,
294			ErrorCount:    n1qlResp.Metrics.ErrorCount,
295			WarningCount:  n1qlResp.Metrics.WarningCount,
296		},
297	}, nil
298}
299
300func (c *Cluster) prepareN1qlQuery(tracectx opentracing.SpanContext, n1qlEp string, opts map[string]interface{}, creds []UserPassPair, timeout time.Duration, client *http.Client) (*n1qlCache, error) {
301	prepOpts := make(map[string]interface{})
302	for k, v := range opts {
303		prepOpts[k] = v
304	}
305	prepOpts["statement"] = "PREPARE " + opts["statement"].(string)
306
307	prepRes, err := c.executeN1qlQuery(tracectx, n1qlEp, prepOpts, creds, timeout, client)
308	if err != nil {
309		return nil, err
310	}
311
312	var preped n1qlPrepData
313	err = prepRes.One(&preped)
314	if err != nil {
315		return nil, err
316	}
317
318	return &n1qlCache{
319		name:        preped.Name,
320		encodedPlan: preped.EncodedPlan,
321	}, nil
322}
323
324type n1qlPrepData struct {
325	EncodedPlan string `json:"encoded_plan"`
326	Name        string `json:"name"`
327}
328
329// Performs a spatial query and returns a list of rows or an error.
330func (c *Cluster) doN1qlQuery(tracectx opentracing.SpanContext, b *Bucket, q *N1qlQuery, params interface{}) (QueryResults, error) {
331	var err error
332	var n1qlEp string
333	var timeout time.Duration
334	var client *http.Client
335	var creds []UserPassPair
336
337	if b != nil {
338		n1qlEp, err = b.getN1qlEp()
339		if err != nil {
340			return nil, err
341		}
342
343		if b.n1qlTimeout < c.n1qlTimeout {
344			timeout = b.n1qlTimeout
345		} else {
346			timeout = c.n1qlTimeout
347		}
348		client = b.client.HttpClient()
349		if c.auth != nil {
350			creds, err = c.auth.Credentials(AuthCredsRequest{
351				Service:  N1qlService,
352				Endpoint: n1qlEp,
353				Bucket:   b.name,
354			})
355			if err != nil {
356				return nil, err
357			}
358		} else {
359			creds = []UserPassPair{
360				{
361					Username: b.name,
362					Password: b.password,
363				},
364			}
365		}
366	} else {
367		if c.auth == nil {
368			panic("Cannot perform cluster level queries without Cluster Authenticator.")
369		}
370
371		tmpB, err := c.randomBucket()
372		if err != nil {
373			return nil, err
374		}
375
376		n1qlEp, err = tmpB.getN1qlEp()
377		if err != nil {
378			return nil, err
379		}
380
381		timeout = c.n1qlTimeout
382		client = tmpB.client.HttpClient()
383
384		creds, err = c.auth.Credentials(AuthCredsRequest{
385			Service:  N1qlService,
386			Endpoint: n1qlEp,
387		})
388		if err != nil {
389			return nil, err
390		}
391	}
392
393	execOpts := make(map[string]interface{})
394	for k, v := range q.options {
395		execOpts[k] = v
396	}
397	if params != nil {
398		args, isArray := params.([]interface{})
399		if isArray {
400			execOpts["args"] = args
401		} else {
402			mapArgs, isMap := params.(map[string]interface{})
403			if isMap {
404				for key, value := range mapArgs {
405					execOpts["$"+key] = value
406				}
407			} else {
408				panic("Invalid params argument passed")
409			}
410		}
411	}
412
413	if q.adHoc {
414		return c.executeN1qlQuery(tracectx, n1qlEp, execOpts, creds, timeout, client)
415	}
416
417	// Do Prepared Statement Logic
418	var cachedStmt *n1qlCache
419
420	stmtStr, isStr := q.options["statement"].(string)
421	if !isStr {
422		return nil, ErrCliInternalError
423	}
424
425	c.clusterLock.RLock()
426	cachedStmt = c.queryCache[stmtStr]
427	c.clusterLock.RUnlock()
428
429	if cachedStmt != nil {
430		// Attempt to execute our cached query plan
431		delete(execOpts, "statement")
432		execOpts["prepared"] = cachedStmt.name
433		execOpts["encoded_plan"] = cachedStmt.encodedPlan
434
435		etrace := c.agentConfig.Tracer.StartSpan("execute",
436			opentracing.ChildOf(tracectx))
437
438		results, err := c.executeN1qlQuery(etrace.Context(), n1qlEp, execOpts, creds, timeout, client)
439		if err == nil {
440			etrace.Finish()
441			return results, nil
442		}
443
444		etrace.Finish()
445
446		// If we get error 4050, 4070 or 5000, we should attempt
447		//   to reprepare the statement immediately before failing.
448		n1qlErr, isN1qlErr := err.(*n1qlMultiError)
449		if !isN1qlErr {
450			return nil, err
451		}
452		if n1qlErr.Code() != 4050 && n1qlErr.Code() != 4070 && n1qlErr.Code() != 5000 {
453			return nil, err
454		}
455	}
456
457	// Prepare the query
458	ptrace := c.agentConfig.Tracer.StartSpan("prepare",
459		opentracing.ChildOf(tracectx))
460
461	cachedStmt, err = c.prepareN1qlQuery(ptrace.Context(), n1qlEp, q.options, creds, timeout, client)
462	if err != nil {
463		ptrace.Finish()
464		return nil, err
465	}
466
467	ptrace.Finish()
468
469	// Save new cached statement
470	c.clusterLock.Lock()
471	c.queryCache[stmtStr] = cachedStmt
472	c.clusterLock.Unlock()
473
474	// Update with new prepared data
475	delete(execOpts, "statement")
476	execOpts["prepared"] = cachedStmt.name
477	execOpts["encoded_plan"] = cachedStmt.encodedPlan
478
479	etrace := c.agentConfig.Tracer.StartSpan("execute",
480		opentracing.ChildOf(tracectx))
481	defer etrace.Finish()
482
483	return c.executeN1qlQuery(etrace.Context(), n1qlEp, execOpts, creds, timeout, client)
484}
485
486// ExecuteN1qlQuery performs a n1ql query and returns a list of rows or an error.
487func (c *Cluster) ExecuteN1qlQuery(q *N1qlQuery, params interface{}) (QueryResults, error) {
488	span := c.agentConfig.Tracer.StartSpan("ExecuteSearchQuery",
489		opentracing.Tag{Key: "couchbase.service", Value: "n1ql"})
490	defer span.Finish()
491
492	return c.doN1qlQuery(span.Context(), nil, q, params)
493}
494