1package gocb
2
3import (
4	"encoding/json"
5	"fmt"
6	"net/http"
7	"net/url"
8
9	"github.com/opentracing/opentracing-go"
10)
11
12type viewError struct {
13	Message string `json:"message"`
14	Reason  string `json:"reason"`
15}
16
17type viewResponse struct {
18	TotalRows int               `json:"total_rows,omitempty"`
19	Rows      []json.RawMessage `json:"rows,omitempty"`
20	Error     string            `json:"error,omitempty"`
21	Reason    string            `json:"reason,omitempty"`
22	Errors    []viewError       `json:"errors,omitempty"`
23}
24
25func (e *viewError) Error() string {
26	return e.Message + " - " + e.Reason
27}
28
29// ViewResults implements an iterator interface which can be used to iterate over the rows of the query results.
30type ViewResults interface {
31	One(valuePtr interface{}) error
32	Next(valuePtr interface{}) bool
33	NextBytes() []byte
34	Close() error
35}
36
37// ViewResultMetrics allows access to the TotalRows value from the view response.  This is
38// implemented as an additional interface to maintain ABI compatibility for the 1.x series.
39type ViewResultMetrics interface {
40	TotalRows() int
41}
42
43type viewResults struct {
44	index     int
45	rows      []json.RawMessage
46	totalRows int
47	err       error
48	endErr    error
49}
50
51func (r *viewResults) Next(valuePtr interface{}) bool {
52	if r.err != nil {
53		return false
54	}
55
56	row := r.NextBytes()
57	if row == nil {
58		return false
59	}
60
61	r.err = json.Unmarshal(row, valuePtr)
62	if r.err != nil {
63		return false
64	}
65
66	return true
67}
68
69func (r *viewResults) NextBytes() []byte {
70	if r.err != nil {
71		return nil
72	}
73
74	if r.index+1 >= len(r.rows) {
75		return nil
76	}
77	r.index++
78
79	return r.rows[r.index]
80}
81
82func (r *viewResults) Close() error {
83	if r.err != nil {
84		return r.err
85	}
86
87	if r.endErr != nil {
88		return r.endErr
89	}
90
91	return nil
92}
93
94func (r *viewResults) One(valuePtr interface{}) error {
95	if !r.Next(valuePtr) {
96		err := r.Close()
97		if err != nil {
98			return err
99		}
100		return ErrNoResults
101	}
102
103	// Ignore any errors occurring after we already have our result
104	err := r.Close()
105	if err != nil {
106		// Return no error as we got the one result already.
107		return nil
108	}
109
110	return nil
111}
112
113func (r *viewResults) TotalRows() int {
114	return r.totalRows
115}
116
117func (b *Bucket) executeViewQuery(tracectx opentracing.SpanContext, viewType, ddoc, viewName string, options url.Values) (ViewResults, error) {
118	capiEp, err := b.getViewEp()
119	if err != nil {
120		return nil, err
121	}
122
123	reqUri := fmt.Sprintf("%s/_design/%s/%s/%s?%s", capiEp, ddoc, viewType, viewName, options.Encode())
124
125	req, err := http.NewRequest("GET", reqUri, nil)
126	if err != nil {
127		return nil, err
128	}
129
130	if b.cluster.auth != nil {
131		userPass, err := getSingleCredential(b.cluster.auth, AuthCredsRequest{
132			Service:  CapiService,
133			Endpoint: capiEp,
134			Bucket:   b.name,
135		})
136		if err != nil {
137			return nil, err
138		}
139
140		req.SetBasicAuth(userPass.Username, userPass.Password)
141	} else {
142		req.SetBasicAuth(b.name, b.password)
143	}
144
145	dtrace := b.tracer.StartSpan("dispatch",
146		opentracing.ChildOf(tracectx))
147
148	resp, err := doHttpWithTimeout(b.client.HttpClient(), req, b.viewTimeout)
149	if err != nil {
150		dtrace.Finish()
151		return nil, err
152	}
153
154	dtrace.Finish()
155
156	strace := b.tracer.StartSpan("streaming",
157		opentracing.ChildOf(tracectx))
158
159	viewResp := viewResponse{}
160	jsonDec := json.NewDecoder(resp.Body)
161	err = jsonDec.Decode(&viewResp)
162	if err != nil {
163		strace.Finish()
164		return nil, err
165	}
166
167	err = resp.Body.Close()
168	if err != nil {
169		logDebugf("Failed to close socket (%s)", err)
170	}
171
172	strace.Finish()
173
174	if resp.StatusCode != 200 {
175		if viewResp.Error != "" {
176			return nil, &viewError{
177				Message: viewResp.Error,
178				Reason:  viewResp.Reason,
179			}
180		}
181
182		return nil, &viewError{
183			Message: "HTTP Error",
184			Reason:  fmt.Sprintf("Status code was %d.", resp.StatusCode),
185		}
186	}
187
188	var endErrs MultiError
189	for _, endErr := range viewResp.Errors {
190		endErrs.add(&viewError{
191			Message: endErr.Message,
192			Reason:  endErr.Reason,
193		})
194	}
195
196	return &viewResults{
197		index:     -1,
198		rows:      viewResp.Rows,
199		totalRows: viewResp.TotalRows,
200		endErr:    endErrs.get(),
201	}, nil
202}
203
204// ExecuteViewQuery performs a view query and returns a list of rows or an error.
205func (b *Bucket) ExecuteViewQuery(q *ViewQuery) (ViewResults, error) {
206	span := b.tracer.StartSpan("ExecuteViewQuery",
207		opentracing.Tag{Key: "couchbase.service", Value: "views"})
208	defer span.Finish()
209
210	ddoc, name, opts, err := q.getInfo()
211	if err != nil {
212		return nil, err
213	}
214
215	return b.executeViewQuery(span.Context(), "_view", ddoc, name, opts)
216}
217
218// ExecuteSpatialQuery performs a spatial query and returns a list of rows or an error.
219func (b *Bucket) ExecuteSpatialQuery(q *SpatialQuery) (ViewResults, error) {
220	span := b.tracer.StartSpan("ExecuteSpatialQuery",
221		opentracing.Tag{Key: "couchbase.service", Value: "views"})
222	defer span.Finish()
223
224	ddoc, name, opts, err := q.getInfo()
225	if err != nil {
226		return nil, err
227	}
228
229	return b.executeViewQuery(span.Context(), "_spatial", ddoc, name, opts)
230}
231