1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10package http
11
12import (
13	"bytes"
14	"encoding/json"
15	"fmt"
16	"io"
17	"net/http"
18	"strconv"
19	"sync"
20	"time"
21
22	"github.com/couchbase/query/errors"
23	"github.com/couchbase/query/execution"
24	"github.com/couchbase/query/logging"
25	"github.com/couchbase/query/server"
26	"github.com/couchbase/query/value"
27)
28
29const (
30	PRETTY_RESULT_PREFIX string = "        "
31	PRETTY_RESULT_INDENT string = "    "
32	PRETTY_PREFIX        string = "    "
33	PRETTY_INDENT        string = "    "
34	NO_PRETTY_PREFIX     string = ""
35	NO_PRETTY_INDENT     string = ""
36)
37
38func (this *httpRequest) Output() execution.Output {
39	return this
40}
41
42func (this *httpRequest) Fail(err errors.Error) {
43	this.SetState(server.FATAL)
44	// Determine the appropriate http response code based on the error
45	httpRespCode := mapErrorToHttpResponse(err, http.StatusInternalServerError)
46	this.setHttpCode(httpRespCode)
47	// Put the error on the errors channel
48	this.Errors() <- err
49}
50
51func mapErrorToHttpResponse(err errors.Error, def int) int {
52
53	// MB-19307: please note that setting the http status
54	// only works if the http header has not been sent.
55	// This is the case if the whole output document is
56	// smaller than the threshold beyond which the http
57	// server starts sending the output with a chunked
58	// transfer encoding, or the first chunk has not been
59	// put together yet.
60	// For this reason, be mindful that error codes mapped
61	// here should only be generated at a point in which
62	// the request has not produced any results (ie failed
63	// in some sort of non starter way)
64	switch err.Code() {
65	case 1000: // readonly violation
66		return http.StatusForbidden
67	case 1010: // unsupported http method
68		return http.StatusMethodNotAllowed
69	case 1020, 1030, 1040, 1050, 1060, 1065, 1070:
70		return http.StatusBadRequest
71	case 1120:
72		return http.StatusNotAcceptable
73	case 3000: // parse error range
74		return http.StatusBadRequest
75	case 4000, errors.NO_SUCH_PREPARED: // plan error range
76		return http.StatusNotFound
77	case 4300:
78		return http.StatusConflict
79	case 5000:
80		return http.StatusInternalServerError
81	case errors.SUBQUERY_BUILD:
82		return http.StatusUnprocessableEntity
83	case 10000:
84		return http.StatusUnauthorized
85	default:
86		return def
87	}
88}
89
90func (this *httpRequest) httpCode() int {
91	this.RLock()
92	defer this.RUnlock()
93	return this.httpRespCode
94}
95
96func (this *httpRequest) setHttpCode(httpRespCode int) {
97	this.Lock()
98	defer this.Unlock()
99	this.httpRespCode = httpRespCode
100}
101
102func (this *httpRequest) Failed(srvr *server.Server) {
103	defer this.stopAndClose(server.FATAL)
104
105	prefix, indent := this.prettyStrings(srvr.Pretty(), false)
106	this.writeString("{\n")
107	this.writeRequestID(prefix)
108	this.writeClientContextID(prefix)
109	this.writeErrors(prefix, indent)
110	this.writeWarnings(prefix, indent)
111	this.writeState("", prefix)
112
113	this.markTimeOfCompletion()
114
115	this.writeMetrics(srvr.Metrics(), prefix, indent)
116	this.writeProfile(srvr.Profile(), prefix, indent)
117	this.writeControls(srvr.Controls(), prefix, indent)
118	this.writeString("\n}\n")
119	this.writer.noMoreData()
120}
121
122func (this *httpRequest) markTimeOfCompletion() {
123	this.executionTime = time.Since(this.ServiceTime())
124	this.elapsedTime = time.Since(this.RequestTime())
125}
126
127func (this *httpRequest) Execute(srvr *server.Server, signature value.Value, stopNotify execution.Operator) {
128	this.NotifyStop(stopNotify)
129
130	prefix, indent := this.prettyStrings(srvr.Pretty(), false)
131
132	this.setHttpCode(http.StatusOK)
133	this.writePrefix(srvr, signature, prefix, indent)
134	stopped := this.writeResults(srvr.Pretty())
135	this.Output().AddPhaseTime(execution.RUN, time.Since(this.ExecTime()))
136
137	this.markTimeOfCompletion()
138
139	state := this.State()
140	this.writeSuffix(srvr, state, prefix, indent)
141	this.writer.noMoreData()
142	if stopped {
143		this.Close()
144	} else {
145		this.stopAndClose(server.COMPLETED)
146	}
147}
148
149func (this *httpRequest) Expire(state server.State, timeout time.Duration) {
150	this.Errors() <- errors.NewTimeoutError(timeout)
151	this.Stop(state)
152}
153
154func (this *httpRequest) stopAndClose(state server.State) {
155	this.Stop(state)
156	this.Close()
157}
158
159func (this *httpRequest) writePrefix(srvr *server.Server, signature value.Value, prefix, indent string) bool {
160	return this.writeString("{\n") &&
161		this.writeRequestID(prefix) &&
162		this.writeClientContextID(prefix) &&
163		this.writeSignature(srvr.Signature(), signature, prefix, indent) &&
164		this.writeString(",\n") &&
165		this.writeString(prefix) &&
166		this.writeString("\"results\": [")
167}
168
169func (this *httpRequest) writeRequestID(prefix string) bool {
170	return this.writeString(prefix) && this.writeString("\"requestID\": \"") && this.writeString(this.Id().String()) && this.writeString("\"")
171}
172
173func (this *httpRequest) writeClientContextID(prefix string) bool {
174	if !this.ClientID().IsValid() {
175		return true
176	}
177	return this.writeString(",\n") && this.writeString(prefix) &&
178		this.writeString("\"clientContextID\": \"") && this.writeString(this.ClientID().String()) && this.writeString("\"")
179}
180
181func (this *httpRequest) writeSignature(server_flag bool, signature value.Value, prefix, indent string) bool {
182	s := this.Signature()
183	if s == value.FALSE || (s == value.NONE && !server_flag) {
184		return true
185	}
186	return this.writeString(",\n") && this.writeString(prefix) && this.writeString("\"signature\": ") && this.writeValue(signature, prefix, indent)
187}
188
189func (this *httpRequest) prettyStrings(serverPretty, result bool) (string, string) {
190	p := this.Pretty()
191	if p == value.FALSE || (p == value.NONE && !serverPretty) {
192		return NO_PRETTY_PREFIX, NO_PRETTY_INDENT
193	} else if result {
194		return PRETTY_RESULT_PREFIX, PRETTY_RESULT_INDENT
195	} else {
196		return PRETTY_PREFIX, PRETTY_INDENT
197	}
198}
199
200// returns true if the request has already been stopped
201// (eg through timeout or delete)
202func (this *httpRequest) writeResults(pretty bool) bool {
203	var item value.Value
204	var buf bytes.Buffer
205
206	prefix, indent := this.prettyStrings(pretty, true)
207	ok := true
208	for ok {
209		select {
210		case <-this.StopExecute():
211			this.SetState(server.STOPPED)
212			return true
213		case <-this.httpCloseNotify:
214			this.SetState(server.CLOSED)
215			return false
216		default:
217		}
218
219		select {
220		case item, ok = <-this.Results():
221			if this.Halted() {
222				return true
223			}
224
225			if ok && !this.writeResult(item, &buf, prefix, indent) {
226				return false
227			}
228		case <-this.StopExecute():
229			this.SetState(server.STOPPED)
230			return true
231		case <-this.httpCloseNotify:
232			this.SetState(server.CLOSED)
233			return false
234		}
235	}
236
237	this.SetState(server.COMPLETED)
238	return false
239}
240
241func (this *httpRequest) writeResult(item value.Value, buf *bytes.Buffer, prefix, indent string) bool {
242	var success bool
243
244	buf.Reset()
245	err := item.WriteJSON(buf, prefix, indent)
246
247	// item won't be used past this point
248	item.Recycle()
249
250	if err != nil {
251		this.Errors() <- errors.NewServiceErrorInvalidJSON(err)
252		this.SetState(server.FATAL)
253		return false
254	}
255
256	if this.resultCount == 0 {
257		success = this.writeString("\n")
258	} else {
259		success = this.writeString(",\n")
260	}
261
262	if success {
263		success = this.writeString(prefix) && this.writeString(buf.String())
264	}
265
266	if success {
267		this.resultSize += len(buf.Bytes())
268		this.resultCount++
269	} else {
270		this.SetState(server.CLOSED)
271	}
272	return success
273}
274
275func (this *httpRequest) writeValue(item value.Value, prefix, indent string) bool {
276	var err error
277	var bytes []byte
278
279	if indent == "" && prefix == "" {
280		bytes, err = json.Marshal(item)
281	} else {
282		bytes, err = json.MarshalIndent(item, prefix, indent)
283	}
284	if err != nil {
285		return this.writeString(fmt.Sprintf("\"ERROR: %v\"", err))
286	}
287
288	return this.writeString(string(bytes))
289}
290
291func (this *httpRequest) writeSuffix(srvr *server.Server, state server.State, prefix, indent string) bool {
292	return this.writeString("\n") && this.writeString(prefix) && this.writeString("]") &&
293		this.writeErrors(prefix, indent) &&
294		this.writeWarnings(prefix, indent) &&
295		this.writeState(state, prefix) &&
296		this.writeMetrics(srvr.Metrics(), prefix, indent) &&
297		this.writeProfile(srvr.Profile(), prefix, indent) &&
298		this.writeControls(srvr.Controls(), prefix, indent) &&
299		this.writeString("\n}\n")
300}
301
302func (this *httpRequest) writeString(s string) bool {
303	return this.writer.writeString(s)
304}
305
306func (this *httpRequest) writeState(state server.State, prefix string) bool {
307	if state == "" {
308		state = this.State()
309	}
310
311	if state == server.COMPLETED {
312		if this.errorCount == 0 {
313			state = server.SUCCESS
314		} else {
315			state = server.ERRORS
316		}
317	}
318
319	return this.writeString(fmt.Sprintf(",\n%s\"status\": \"%s\"", prefix, state))
320}
321
322func (this *httpRequest) writeErrors(prefix string, indent string) bool {
323	var err errors.Error
324	ok := true
325loop:
326	for ok {
327		select {
328		case err, ok = <-this.Errors():
329			if ok {
330				if this.errorCount == 0 {
331					this.writeString(",\n")
332					this.writeString(prefix)
333					this.writeString("\"errors\": [")
334
335					// MB-19307: please check the comments
336					// in mapErrortoHttpResponse().
337					// Ideally we should set the status code
338					// only before calling writePrefix()
339					// but this is too cumbersome, having
340					// to check Execution errors as well.
341					if this.State() != server.FATAL {
342						this.setHttpCode(mapErrorToHttpResponse(err, http.StatusOK))
343					}
344				}
345				ok = this.writeError(err, this.errorCount, prefix, indent)
346				this.errorCount++
347			}
348		default:
349			break loop
350		}
351	}
352
353	if this.errorCount == 0 {
354		return true
355	}
356
357	if prefix != "" && !(this.writeString("\n") && this.writeString(prefix)) {
358		return false
359	}
360	return this.writeString("]")
361}
362
363func (this *httpRequest) writeWarnings(prefix, indent string) bool {
364	var err errors.Error
365	ok := true
366	alreadySeen := make(map[string]bool)
367loop:
368	for ok {
369		select {
370		case err, ok = <-this.Warnings():
371			if ok {
372				if err.OnceOnly() && alreadySeen[err.Error()] {
373					// do nothing for this warning
374					continue loop
375				}
376				if this.warningCount == 0 {
377					this.writeString(",\n")
378					this.writeString(prefix)
379					this.writeString("\"warnings\": [")
380				}
381				ok = this.writeError(err, this.warningCount, prefix, indent)
382				this.warningCount++
383				alreadySeen[err.Error()] = true
384			}
385		default:
386			break loop
387		}
388	}
389
390	if this.warningCount == 0 {
391		return true
392	}
393
394	if prefix != "" && !(this.writeString("\n") && this.writeString(prefix)) {
395		return false
396	}
397	return this.writeString("]")
398}
399
400func (this *httpRequest) writeError(err errors.Error, count int, prefix, indent string) bool {
401
402	newPrefix := prefix + indent
403
404	if count != 0 && !this.writeString(",") {
405		return false
406	}
407	if prefix != "" && !this.writeString("\n") {
408		return false
409	}
410
411	m := map[string]interface{}{
412		"code": err.Code(),
413		"msg":  err.Error(),
414	}
415
416	var er error
417	var bytes []byte
418
419	if newPrefix == "" && indent == "" {
420		bytes, er = json.Marshal(m)
421	} else {
422		bytes, er = json.MarshalIndent(m, newPrefix, indent)
423	}
424	if er != nil {
425		return false
426	}
427
428	return this.writeString(newPrefix) && this.writeString(string(bytes))
429}
430
431func (this *httpRequest) writeMetrics(metrics bool, prefix, indent string) bool {
432	m := this.Metrics()
433	if m == value.FALSE || (m == value.NONE && !metrics) {
434		return true
435	}
436
437	var newPrefix string
438	if prefix != "" {
439		newPrefix = "\n" + prefix + indent
440	}
441
442	rv := this.writeString(",\n") && this.writeString(prefix) && this.writeString("\"metrics\": {") &&
443		this.writeString(fmt.Sprintf("%s\"elapsedTime\": \"%v\"", newPrefix, this.elapsedTime)) &&
444		this.writeString(fmt.Sprintf(",%s\"executionTime\": \"%v\"", newPrefix, this.executionTime)) &&
445		this.writeString(fmt.Sprintf(",%s\"resultCount\": %d", newPrefix, this.resultCount)) &&
446		this.writeString(fmt.Sprintf(",%s\"resultSize\": %d", newPrefix, this.resultSize))
447	if !rv {
448		return false
449	}
450
451	if this.MutationCount() > 0 && !this.writeString(fmt.Sprintf(",%s\"mutationCount\": %d", newPrefix, this.MutationCount())) {
452		return false
453	}
454
455	if this.SortCount() > 0 && !this.writeString(fmt.Sprintf(",%s\"sortCount\": %d", newPrefix, this.SortCount())) {
456		return false
457	}
458
459	if this.errorCount > 0 && !this.writeString(fmt.Sprintf(",%s\"errorCount\": %d", newPrefix, this.errorCount)) {
460		return false
461	}
462
463	if this.warningCount > 0 && !this.writeString(fmt.Sprintf(",%s\"warningCount\": %d", newPrefix, this.warningCount)) {
464		return false
465	}
466
467	if prefix != "" && !(this.writeString("\n") && this.writeString(prefix)) {
468		return false
469	}
470	return this.writeString("}")
471
472}
473
474func (this *httpRequest) writeControls(controls bool, prefix, indent string) bool {
475	var newPrefix string
476	var e []byte
477	var err error
478
479	needComma := false
480	c := this.Controls()
481	if c == value.FALSE || (c == value.NONE && !controls) {
482		return true
483	}
484
485	namedArgs := this.NamedArgs()
486	positionalArgs := this.PositionalArgs()
487	if namedArgs == nil && positionalArgs == nil {
488		return true
489	}
490
491	if prefix != "" {
492		newPrefix = "\n" + prefix + indent
493	}
494	rv := this.writeString(",\n") && this.writeString(prefix) && this.writeString("\"controls\": {")
495	if !rv {
496		return false
497	}
498	if namedArgs != nil {
499		if indent != "" {
500			e, err = json.MarshalIndent(namedArgs, "\t", indent)
501		} else {
502			e, err = json.Marshal(namedArgs)
503		}
504		if err != nil || !this.writeString(fmt.Sprintf("%s\"namedArgs\": %s", newPrefix, e)) {
505			logging.Infop("Error writing namedArgs", logging.Pair{"error", err})
506		}
507		needComma = true
508	}
509	if positionalArgs != nil {
510		if needComma && !this.writeString(",") {
511			return false
512		}
513		if indent != "" {
514			e, err = json.MarshalIndent(positionalArgs, "\t", indent)
515		} else {
516			e, err = json.Marshal(positionalArgs)
517		}
518		if err != nil || !this.writeString(fmt.Sprintf("%s\"positionalArgs\": %s", newPrefix, e)) {
519			logging.Infop("Error writing positional args", logging.Pair{"error", err})
520		}
521	}
522	if prefix != "" && !(this.writeString("\n") && this.writeString(prefix)) {
523		return false
524	}
525	return this.writeString("}")
526}
527
528func (this *httpRequest) writeProfile(profile server.Profile, prefix, indent string) bool {
529	var newPrefix string
530	var e []byte
531	var err error
532
533	needComma := false
534	p := this.Profile()
535	if p == server.ProfUnset {
536		p = profile
537	}
538	if p == server.ProfOff {
539		return true
540	}
541
542	if prefix != "" {
543		newPrefix = "\n" + prefix + indent
544	}
545	rv := this.writeString(",\n") && this.writeString(prefix) && this.writeString("\"profile\": {")
546	if !rv {
547		return false
548	}
549	if p != server.ProfOff {
550		phaseTimes := this.FmtPhaseTimes()
551		if phaseTimes != nil {
552			if indent != "" {
553				e, err = json.MarshalIndent(phaseTimes, "\t", indent)
554			} else {
555				e, err = json.Marshal(phaseTimes)
556			}
557			if err != nil || !this.writeString(fmt.Sprintf("%s\"phaseTimes\": %s", newPrefix, e)) {
558				logging.Infop("Error writing phase times", logging.Pair{"error", err})
559			}
560			needComma = true
561		}
562		phaseCounts := this.FmtPhaseCounts()
563		if phaseCounts != nil {
564			if needComma && !this.writeString(",") {
565				return false
566			}
567			if indent != "" {
568				e, err = json.MarshalIndent(phaseCounts, "\t", indent)
569			} else {
570				e, err = json.Marshal(phaseCounts)
571			}
572			if err != nil || !this.writeString(fmt.Sprintf("%s\"phaseCounts\": %s", newPrefix, e)) {
573				logging.Infop("Error writing phase counts", logging.Pair{"error", err})
574			}
575			needComma = true
576		}
577		phaseOperators := this.FmtPhaseOperators()
578		if phaseOperators != nil {
579			if needComma && !this.writeString(",") {
580				return false
581			}
582			if indent != "" {
583				e, err = json.MarshalIndent(phaseOperators, "\t", indent)
584			} else {
585				e, err = json.Marshal(phaseOperators)
586			}
587			if err != nil || !this.writeString(fmt.Sprintf("%s\"phaseOperators\": %s", newPrefix, e)) {
588				logging.Infop("Error writing phase operators", logging.Pair{"error", err})
589			}
590		}
591	}
592	if p == server.ProfOn {
593		timings := this.GetTimings()
594		if timings != nil {
595			if indent != "" {
596				e, err = json.MarshalIndent(timings, "\t", indent)
597			} else {
598				e, err = json.Marshal(timings)
599			}
600			if err != nil || !this.writeString(fmt.Sprintf(",%s\"executionTimings\": %s", newPrefix, e)) {
601				logging.Infop("Error writing timings", logging.Pair{"error", err})
602			}
603		}
604	}
605	if prefix != "" && !(this.writeString("\n") && this.writeString(prefix)) {
606		return false
607	}
608	return this.writeString("}")
609}
610
611// responseDataManager is an interface for managing response data. It is used by httpRequest to take care of
612// the data in a response.
613type responseDataManager interface {
614	writeString(string) bool // write the given string for the response
615	noMoreData()             // action to take when there is no more data for the response
616}
617
618// bufferedWriter is an implementation of responseDataManager that writes response data to a buffer,
619// up to a threshold:
620type bufferedWriter struct {
621	sync.Mutex
622	req         *httpRequest  // the request for the response we are writing
623	buffer      *bytes.Buffer // buffer for writing response data to
624	buffer_pool BufferPool    // buffer manager for our buffer
625	closed      bool
626	header      bool // headers required
627	lastFlush   time.Time
628}
629
630func NewBufferedWriter(r *httpRequest, bp BufferPool) *bufferedWriter {
631	return &bufferedWriter{
632		req:         r,
633		buffer:      bp.GetBuffer(),
634		buffer_pool: bp,
635		closed:      false,
636		header:      true,
637		lastFlush:   time.Now(),
638	}
639}
640
641func (this *bufferedWriter) writeString(s string) bool {
642	this.Lock()
643	defer this.Unlock()
644
645	if this.closed {
646		return false
647	}
648
649	if len(s)+len(this.buffer.Bytes()) > this.buffer_pool.BufferCapacity() || // threshold exceeded
650		(!this.header && time.Since(this.lastFlush) > 100*time.Millisecond) { // time exceeded
651		w := this.req.resp // our request's response writer
652
653		// write response header and data buffered so far using request's response writer:
654		if this.header {
655			w.WriteHeader(this.req.httpCode())
656			this.header = false
657		}
658
659		// write out and empty the buffer
660		io.Copy(w, this.buffer)
661		this.buffer.Reset()
662
663		// do the flushing
664		this.lastFlush = time.Now()
665		w.(http.Flusher).Flush()
666	}
667	// under threshold - write the string to our buffer
668	_, err := this.buffer.Write([]byte(s))
669	return err == nil
670}
671
672func (this *bufferedWriter) noMoreData() {
673	this.Lock()
674	defer this.Unlock()
675
676	if this.closed {
677		return
678	}
679
680	w := this.req.resp // our request's response writer
681	r := this.req.req  // our request's http request
682
683	if this.header {
684		// calculate and set the Content-Length header:
685		content_len := strconv.Itoa(len(this.buffer.Bytes()))
686		w.Header().Set("Content-Length", content_len)
687		// write response header and data buffered so far:
688		w.WriteHeader(this.req.httpCode())
689		this.header = false
690	}
691
692	io.Copy(w, this.buffer)
693	// no more data in the response => return buffer to pool:
694	this.buffer_pool.PutBuffer(this.buffer)
695	r.Body.Close()
696	this.closed = true
697}
698