1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3//  except in compliance with the License. You may obtain a copy of the License at
4//    http://www.apache.org/licenses/LICENSE-2.0
5//  Unless required by applicable law or agreed to in writing, software distributed under the
6//  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7//  either express or implied. See the License for the specific language governing permissions
8//  and limitations under the License.
9
10package http
11
12import (
13	"encoding/base64"
14	"encoding/json"
15	go_errors "errors"
16	"fmt"
17	"io"
18	"io/ioutil"
19	"math"
20	"net/http"
21	"strconv"
22	"strings"
23	"time"
24
25	adt "github.com/couchbase/goutils/go-cbaudit"
26	"github.com/couchbase/query/auth"
27	"github.com/couchbase/query/datastore"
28	"github.com/couchbase/query/errors"
29	"github.com/couchbase/query/execution"
30	"github.com/couchbase/query/plan"
31	"github.com/couchbase/query/prepareds"
32	"github.com/couchbase/query/server"
33	"github.com/couchbase/query/timestamp"
34	"github.com/couchbase/query/util"
35	"github.com/couchbase/query/value"
36)
37
38type httpRequest struct {
39	server.BaseRequest
40	resp            http.ResponseWriter
41	req             *http.Request
42	httpCloseNotify <-chan bool
43	writer          responseDataManager
44	httpRespCode    int
45	resultCount     int
46	resultSize      int
47	errorCount      int
48	warningCount    int
49
50	elapsedTime   time.Duration
51	executionTime time.Duration
52}
53
54func (r *httpRequest) OriginalHttpRequest() *http.Request {
55	return r.req
56}
57
58func newHttpRequest(resp http.ResponseWriter, req *http.Request, bp BufferPool, size int) *httpRequest {
59	var httpArgs httpRequestArgs
60	var err errors.Error
61	var phaseTime time.Duration
62
63	// This is literally when we become aware of the request
64	reqTime := time.Now()
65
66	// Limit body size in case of denial-of-service attack
67	req.Body = http.MaxBytesReader(resp, req.Body, int64(size))
68
69	e := req.ParseForm()
70	if e != nil {
71		err = errors.NewServiceErrorBadValue(go_errors.New("unable to parse form"), "request form")
72	}
73
74	if err != nil && req.Method != "GET" && req.Method != "POST" {
75		err = errors.NewServiceErrorHTTPMethod(req.Method)
76	}
77
78	err = contentNegotiation(resp, req)
79
80	if err == nil {
81		httpArgs, err = getRequestParams(req)
82	}
83
84	var statement string
85	if err == nil {
86		statement, err = httpArgs.getStatement()
87	}
88
89	var prepared *plan.Prepared
90
91	if err == nil {
92		var decoded_plan *plan.Prepared
93		var prepared_name string
94
95		prepared_name, prepared, err = getPrepared(httpArgs, &phaseTime)
96		encoded_plan, plan_err := getEncodedPlan(httpArgs)
97
98		// MB-18841 (encoded_plan processing affects latency)
99		// MB-19509 (encoded_plan may corrupt cache)
100		// MB-19659 (spurious 4080 on multi node reprepare)
101		// If an encoded_plan has been supplied, only decode it
102		// when the prepared statement can't be found or the plan
103		// is different.
104		// DecodePrepared() will make sure that the plan is only
105		// updated if it matches the REST API encoded_plan
106		// requirements.
107		if encoded_plan != "" && plan_err == nil &&
108			((err != nil && err.Code() == errors.NO_SUCH_PREPARED) ||
109				(err == nil && prepared != nil &&
110					prepared.MismatchingEncodedPlan(encoded_plan))) {
111
112			// Monitoring API: we only need to track the prepared
113			// statement if we couldn't do it in getPrepared()
114			// Distributed plans: we don't propagate on encoded_plan parameter
115			// because the client will be using it across all nodes anyway, and
116			// we want to avoid a plan distribution stampede
117			decoded_plan, plan_err = prepareds.DecodePrepared(prepared_name, encoded_plan, (prepared == nil), false, &phaseTime)
118			if plan_err != nil {
119				err = plan_err
120			} else if decoded_plan != nil {
121				prepared = decoded_plan
122				err = nil
123			}
124		}
125		if err == nil && plan_err != nil {
126			err = plan_err
127		}
128	}
129
130	if err == nil && statement == "" && prepared == nil {
131		err = errors.NewServiceErrorMissingValue("statement or prepared")
132	}
133
134	var namedArgs map[string]value.Value
135	if err == nil {
136		namedArgs, err = httpArgs.getNamedArgs()
137	}
138
139	var positionalArgs value.Values
140	if err == nil {
141		positionalArgs, err = httpArgs.getPositionalArgs()
142	}
143
144	var namespace string
145	if err == nil {
146		namespace, err = httpArgs.getString(NAMESPACE, "")
147	}
148
149	var timeout time.Duration
150	if err == nil {
151		timeout, err = httpArgs.getDuration(TIMEOUT)
152	}
153
154	var max_parallelism int
155	var param string
156	if err == nil {
157		param, err = httpArgs.getString(MAX_PARALLELISM, "")
158		if err == nil && param != "" {
159			var e error
160			max_parallelism, e = strconv.Atoi(param)
161			if e != nil {
162				err = errors.NewServiceErrorBadValue(go_errors.New("max_parallelism is invalid"), "max parallelism")
163			}
164		}
165	}
166
167	var scan_cap int64
168	if err == nil {
169		param, err = httpArgs.getString(SCAN_CAP, "")
170		if err == nil && param != "" {
171			var e error
172			scan_cap, e = strconv.ParseInt(param, 10, 64)
173			if e != nil {
174				err = errors.NewServiceErrorBadValue(go_errors.New("scan_cap is invalid"), "scan cap")
175			}
176		}
177	}
178
179	var pipeline_cap int64
180	if err == nil {
181		param, err = httpArgs.getString(PIPELINE_CAP, "")
182		if err == nil && param != "" {
183			var e error
184			pipeline_cap, e = strconv.ParseInt(param, 10, 64)
185			if e != nil {
186				err = errors.NewServiceErrorBadValue(go_errors.New("pipeline_cap is invalid"), "pipeline cap")
187			}
188		}
189	}
190
191	var pipeline_batch int
192	if err == nil {
193		param, err = httpArgs.getString(PIPELINE_BATCH, "")
194		if err == nil && param != "" {
195			var e error
196			pipeline_batch, e = strconv.Atoi(param)
197			if e != nil {
198				err = errors.NewServiceErrorBadValue(go_errors.New("pipeline_batch is invalid"), "pipeline batch")
199			}
200		}
201	}
202
203	var readonly value.Tristate
204	if err == nil {
205		readonly, err = getReadonly(httpArgs, req.Method == "GET")
206	}
207
208	var metrics value.Tristate
209	if err == nil {
210		metrics, err = httpArgs.getTristate(METRICS)
211	}
212
213	var format Format
214	if err == nil {
215		format, err = getFormat(httpArgs)
216	}
217
218	if err == nil && format != JSON {
219		err = errors.NewServiceErrorNotImplemented("format", format.String())
220	}
221
222	var signature value.Tristate
223	if err == nil {
224		signature, err = httpArgs.getTristate(SIGNATURE)
225	}
226
227	var compression Compression
228	if err == nil {
229		compression, err = getCompression(httpArgs)
230	}
231
232	if err == nil && compression != NONE {
233		err = errors.NewServiceErrorNotImplemented("compression", compression.String())
234	}
235
236	var encoding Encoding
237	if err == nil {
238		encoding, err = getEncoding(httpArgs)
239	}
240
241	if err == nil && encoding != UTF8 {
242		err = errors.NewServiceErrorNotImplemented("encoding", encoding.String())
243	}
244
245	var pretty value.Tristate
246	if err == nil {
247		pretty, err = httpArgs.getTristate(PRETTY)
248	}
249
250	var consistency *scanConfigImpl
251
252	if err == nil {
253		consistency, err = getScanConfiguration(httpArgs)
254	}
255
256	var creds auth.Credentials
257	if err == nil {
258		creds, err = getCredentials(httpArgs, req.Header["Authorization"])
259	}
260
261	client_id := ""
262	if err == nil {
263		client_id, err = getClientID(httpArgs)
264	}
265
266	var controls value.Tristate
267	if err == nil {
268		controls, err = getControlsRequest(httpArgs)
269	}
270
271	userAgent := req.UserAgent()
272	cbUserAgent := req.Header.Get("CB-User-Agent")
273	if cbUserAgent != "" {
274		userAgent = userAgent + " (" + cbUserAgent + ")"
275	}
276	rv := &httpRequest{
277		resp: resp,
278		req:  req,
279	}
280
281	server.NewBaseRequest(&rv.BaseRequest, statement, prepared, namedArgs, positionalArgs,
282		namespace, max_parallelism, scan_cap, pipeline_cap, pipeline_batch,
283		readonly, metrics, signature, pretty, consistency, client_id, creds,
284		req.RemoteAddr, userAgent)
285
286	rv.SetRequestTime(reqTime)
287	if phaseTime != 0 {
288		rv.Output().AddPhaseTime(execution.REPREPARE, phaseTime)
289	}
290
291	var prof server.Profile
292	if err == nil {
293		rv.SetControls(controls)
294		prof, err = getProfileRequest(httpArgs)
295		if err == nil {
296			rv.SetProfile(prof)
297		}
298	}
299
300	if err == nil {
301		param, err = httpArgs.getString(N1QL_FEAT_CTRL, "")
302		if err == nil && param != "" {
303			n1qlFeatureControl, e := strconv.ParseUint(param, 0, 64)
304			if e != nil {
305				err = errors.NewServiceErrorBadValue(go_errors.New("n1ql_feat_ctrl is invalid"), N1QL_FEAT_CTRL)
306			} else {
307				rv.SetFeatureControls(n1qlFeatureControl)
308			}
309		}
310	}
311
312	if err == nil {
313		param, err = httpArgs.getString(MAX_INDEX_API, "")
314		if err == nil && param != "" {
315			indexApiVer, e := strconv.Atoi(param)
316			if e != nil {
317				err = errors.NewServiceErrorBadValue(go_errors.New("max_index_api is invalid"), MAX_INDEX_API)
318			} else {
319				rv.SetIndexApiVersion(indexApiVer)
320			}
321		}
322	}
323
324	rv.SetTimeout(timeout)
325
326	rv.writer = NewBufferedWriter(rv, bp)
327
328	// Abort if client closes connection; alternatively, return when request completes.
329	rv.httpCloseNotify = resp.(http.CloseNotifier).CloseNotify()
330
331	if err != nil {
332		rv.Fail(err)
333	}
334
335	return rv
336}
337
338// For audit.Auditable interface.
339func (this *httpRequest) ElapsedTime() time.Duration {
340	return this.elapsedTime
341}
342
343// For audit.Auditable interface.
344func (this *httpRequest) ExecutionTime() time.Duration {
345	return this.executionTime
346}
347
348// For audit.Auditable interface.
349func (this *httpRequest) EventResultCount() int {
350	return this.resultCount
351}
352
353// For audit.Auditable interface.
354func (this *httpRequest) EventResultSize() int {
355	return this.resultSize
356}
357
358// For audit.Auditable interface.
359func (this *httpRequest) EventErrorCount() int {
360	return this.errorCount
361}
362
363// For audit.Auditable interface.
364func (this *httpRequest) EventWarningCount() int {
365	return this.warningCount
366}
367
368// For audit.Auditable interface.
369func (this *httpRequest) EventStatus() string {
370	state := this.State()
371	if state == server.COMPLETED {
372		if this.errorCount == 0 {
373			state = server.SUCCESS
374		} else {
375			state = server.ERRORS
376		}
377	}
378
379	return string(state)
380}
381
382// For audit.Auditable interface.
383func (this *httpRequest) EventGenericFields() adt.GenericFields {
384	return adt.GetAuditBasicFields(this.req)
385}
386
387// for audit.Auditable interface.
388func (this *httpRequest) EventRemoteAddress() string {
389	return this.req.RemoteAddr
390}
391
392const ( // Request argument names
393	MAX_PARALLELISM   = "max_parallelism"
394	SCAN_CAP          = "scan_cap"
395	PIPELINE_CAP      = "pipeline_cap"
396	PIPELINE_BATCH    = "pipeline_batch"
397	READONLY          = "readonly"
398	METRICS           = "metrics"
399	NAMESPACE         = "namespace"
400	TIMEOUT           = "timeout"
401	ARGS              = "args"
402	PREPARED          = "prepared"
403	ENCODED_PLAN      = "encoded_plan"
404	STATEMENT         = "statement"
405	FORMAT            = "format"
406	ENCODING          = "encoding"
407	COMPRESSION       = "compression"
408	SIGNATURE         = "signature"
409	PRETTY            = "pretty"
410	SCAN_CONSISTENCY  = "scan_consistency"
411	SCAN_WAIT         = "scan_wait"
412	SCAN_VECTOR       = "scan_vector"
413	SCAN_VECTORS      = "scan_vectors"
414	CREDS             = "creds"
415	CLIENT_CONTEXT_ID = "client_context_id"
416	PROFILE           = "profile"
417	CONTROLS          = "controls"
418	N1QL_FEAT_CTRL    = "n1ql_feat_ctrl"
419	MAX_INDEX_API     = "max_index_api"
420)
421
422var _PARAMETERS = []string{
423	STATEMENT,
424	PREPARED,
425	ENCODED_PLAN,
426	CREDS,
427	ARGS,
428	TIMEOUT,
429	SCAN_CONSISTENCY,
430	SCAN_WAIT,
431	SCAN_VECTOR,
432	SCAN_VECTORS,
433	MAX_PARALLELISM,
434	SCAN_CAP,
435	PIPELINE_CAP,
436	PIPELINE_BATCH,
437	READONLY,
438	METRICS,
439	NAMESPACE,
440	FORMAT,
441	ENCODING,
442	COMPRESSION,
443	SIGNATURE,
444	PRETTY,
445	CLIENT_CONTEXT_ID,
446	PROFILE,
447	CONTROLS,
448	N1QL_FEAT_CTRL,
449	MAX_INDEX_API,
450}
451
452func isValidParameter(a string) bool {
453	a = strings.TrimSpace(a)
454	// Ignore empty (whitespace) parameters. They are harmless.
455	if a == "" {
456		return true
457	}
458	if strings.IndexRune(a, '$') == 0 {
459		return true
460	}
461	for _, p := range _PARAMETERS {
462		if strings.EqualFold(p, a) {
463			return true
464		}
465	}
466	return false
467}
468
469func getPrepared(a httpRequestArgs, phaseTime *time.Duration) (string, *plan.Prepared, errors.Error) {
470	prepared_field, err := a.getValue(PREPARED)
471	if err != nil || prepared_field == nil {
472		return "", nil, err
473	}
474
475	// Monitoring API: track prepared statement access
476	prepared, err := prepareds.GetPrepared(prepared_field, prepareds.OPT_TRACK|prepareds.OPT_REMOTE|prepareds.OPT_VERIFY, phaseTime)
477	if err != nil || prepared == nil {
478		return "", nil, err
479	}
480
481	prepared_name, ok := prepared_field.Actual().(string)
482	if !ok {
483		prepared_name = ""
484	}
485	return prepared_name, prepared, err
486}
487
488func getEncodedPlan(a httpRequestArgs) (string, errors.Error) {
489	return a.getString(ENCODED_PLAN, "")
490}
491
492func getCompression(a httpRequestArgs) (Compression, errors.Error) {
493	var compression Compression
494
495	compression_field, err := a.getString(COMPRESSION, "NONE")
496	if err == nil && compression_field != "" {
497		compression = newCompression(compression_field)
498		if compression == UNDEFINED_COMPRESSION {
499			err = errors.NewServiceErrorUnrecognizedValue(COMPRESSION, compression_field)
500		}
501	}
502	return compression, err
503}
504
505func getScanConfiguration(a httpRequestArgs) (*scanConfigImpl, errors.Error) {
506
507	scan_consistency_field, err := a.getString(SCAN_CONSISTENCY, "NOT_BOUNDED")
508	if err != nil {
509		return nil, err
510	}
511
512	scan_level := newScanConsistency(scan_consistency_field)
513	if scan_level == server.UNDEFINED_CONSISTENCY {
514		return nil, errors.NewServiceErrorUnrecognizedValue(SCAN_CONSISTENCY, scan_consistency_field)
515	}
516
517	scan_wait, err := a.getDuration(SCAN_WAIT)
518	if err != nil {
519		return nil, err
520	}
521
522	scan_vector, err := a.getScanVector()
523	if err != nil {
524		return nil, err
525	}
526
527	scan_vectors, err := a.getScanVectors()
528	if err != nil {
529		return nil, err
530	}
531
532	var scan_vector_source timestamp.ScanVectorSource
533	if scan_vector == nil {
534		if scan_vectors == nil {
535			if scan_level == server.AT_PLUS && scan_vector == nil && scan_vectors == nil {
536				return nil, errors.NewServiceErrorMissingValue(SCAN_VECTOR)
537			}
538			scan_vector_source = &ZeroScanVectorSource{}
539		} else {
540			defaultNamespace, err := a.getString(NAMESPACE, "default")
541			if err != nil {
542				return nil, err
543			}
544			scan_vector_source = newMultipleScanVectorSource(defaultNamespace, scan_vectors)
545		}
546	} else {
547		if scan_vectors == nil {
548			scan_vector_source = &singleScanVectorSource{scan_vector: scan_vector}
549		} else {
550			// Not both scan_vector and scan_vectors.
551			return nil, errors.NewServiceErrorMultipleValues("scan_vector and scan_vectors")
552		}
553	}
554
555	return &scanConfigImpl{
556		scan_level:         scan_level,
557		scan_wait:          scan_wait,
558		scan_vector_source: scan_vector_source,
559	}, nil
560}
561
562func getEncoding(a httpRequestArgs) (Encoding, errors.Error) {
563	var encoding Encoding
564
565	encoding_field, err := a.getString(ENCODING, "UTF-8")
566	if err == nil && encoding_field != "" {
567		encoding = newEncoding(encoding_field)
568		if encoding == UNDEFINED_ENCODING {
569			err = errors.NewServiceErrorUnrecognizedValue(ENCODING, encoding_field)
570		}
571	}
572	return encoding, err
573}
574
575func getFormat(a httpRequestArgs) (Format, errors.Error) {
576	var format Format
577
578	format_field, err := a.getString(FORMAT, "JSON")
579	if err == nil && format_field != "" {
580		format = newFormat(format_field)
581		if format == UNDEFINED_FORMAT {
582			err = errors.NewServiceErrorUnrecognizedValue(FORMAT, format_field)
583		}
584	}
585	return format, err
586}
587
588func getReadonly(a httpRequestArgs, isGet bool) (value.Tristate, errors.Error) {
589	readonly, err := a.getTristate(READONLY)
590	if err == nil && isGet {
591		switch readonly {
592		case value.NONE:
593			readonly = value.TRUE
594		case value.FALSE:
595			err = errors.NewServiceErrorReadonly(
596				fmt.Sprintf("%s=false cannot be used with HTTP GET method.", READONLY))
597		}
598	}
599	return readonly, err
600}
601
602func getCredentials(a httpRequestArgs, auths []string) (auth.Credentials, errors.Error) {
603	// Cred_data retrieves credentials from either the URL parameters or from the body of the JSON request.
604	cred_data, err := a.getCredentials()
605	if err != nil {
606		return nil, err
607	}
608
609	// Credentials can come from the cred_data, from the Basic authorization field
610	// in  the request, both, or neither. If from both, the credentials are combined.
611	// If neither, this function should return nil, nil.
612	var creds auth.Credentials = nil
613
614	if len(cred_data) > 0 {
615		// Credentials are in request parameters:
616		creds = auth.Credentials{}
617		for _, cred := range cred_data {
618			user, user_ok := cred["user"]
619			pass, pass_ok := cred["pass"]
620			if user_ok && pass_ok {
621				creds[user] = pass
622			} else {
623				return nil, errors.NewServiceErrorMissingValue("user or pass")
624			}
625		}
626	}
627
628	if len(auths) > 0 {
629		// Credentials are in http header:
630		curAuth := auths[0]
631		if strings.HasPrefix(curAuth, "Basic ") {
632			encoded_creds := strings.Split(curAuth, " ")[1]
633			decoded_creds, err := base64.StdEncoding.DecodeString(encoded_creds)
634			if err != nil {
635				return nil, errors.NewServiceErrorBadValue(go_errors.New("credentials not base64-encoded"), CREDS)
636			}
637			// Authorization header is in format "user:pass"
638			// per http://tools.ietf.org/html/rfc1945#section-10.2
639			u_details := strings.Split(string(decoded_creds), ":")
640			if creds == nil {
641				creds = auth.Credentials{}
642			}
643			switch len(u_details) {
644			case 0, 1:
645				// Authorization header format is incorrect
646				return nil, errors.NewServiceErrorBadValue(nil, CREDS)
647			case 2:
648				creds[u_details[0]] = u_details[1]
649			default:
650				// Support passwords like "local:xxx" or "admin:xxx"
651				creds[u_details[0]] = strings.Join(u_details[1:], ":")
652			}
653		}
654	}
655
656	// If we have credentials from neither source, creds will be uninitialized, i.e. nil.
657	return creds, nil
658}
659
660const MAX_CLIENTID = 64
661
662// Ensure that client context id is no more than 64 characters.
663// Also ensure that client context id does not contain characters that would
664// break json syntax.
665func getClientID(a httpRequestArgs) (string, errors.Error) {
666	client_id, err := a.getString(CLIENT_CONTEXT_ID, "")
667	if err != nil {
668		return client_id, err
669	}
670	if len(client_id) > MAX_CLIENTID {
671		id_trunc := make([]byte, MAX_CLIENTID)
672		copy(id_trunc[:], client_id)
673		client_id = string(id_trunc)
674	}
675	l := len(client_id)
676	for i := 0; i < l; i++ {
677		switch client_id[i] {
678		case '"', '\\':
679			return client_id, errors.NewServiceErrorClientID(client_id)
680		default:
681			continue
682		}
683	}
684	return client_id, nil
685}
686
687func getProfile(a httpRequestArgs) (server.Profile, errors.Error) {
688	profile, err := a.getString(PROFILE, "")
689	if err == nil && profile != "" {
690		prof, ok := server.ParseProfile(profile)
691		if ok {
692			return prof, nil
693		} else {
694			err = errors.NewServiceErrorUnrecognizedValue(PROFILE, profile)
695		}
696
697	}
698	return server.ProfUnset, err
699}
700
701const acceptType = "application/json"
702const versionTag = "version="
703const version = acceptType + "; " + versionTag + util.VERSION
704
705func contentNegotiation(resp http.ResponseWriter, req *http.Request) errors.Error {
706	// set content type to current version
707	resp.Header().Set("Content-Type", version)
708	accept := req.Header["Accept"]
709	// if no media type specified, default to current version
710	if accept == nil || accept[0] == "*/*" {
711		return nil
712	}
713	desiredContent := accept[0]
714	// media type must be application/json at least
715	if !strings.HasPrefix(desiredContent, acceptType) {
716		return errors.NewServiceErrorMediaType(desiredContent)
717	}
718	versionIndex := strings.Index(desiredContent, versionTag)
719	// no version specified, default to current version
720	if versionIndex == -1 {
721		return nil
722	}
723	// check if requested version is supported
724	requestVersion := desiredContent[versionIndex+len(versionTag):]
725	if requestVersion >= util.MIN_VERSION && requestVersion <= util.VERSION {
726		resp.Header().Set("Content-Type", desiredContent)
727		return nil
728	}
729	return errors.NewServiceErrorMediaType(desiredContent)
730}
731
732// httpRequestArgs is an interface for getting the arguments in a http request
733type httpRequestArgs interface {
734	getString(string, string) (string, errors.Error)
735	getTristate(f string) (value.Tristate, errors.Error)
736	getValue(field string) (value.Value, errors.Error)
737	getDuration(string) (time.Duration, errors.Error)
738	getNamedArgs() (map[string]value.Value, errors.Error)
739	getPositionalArgs() (value.Values, errors.Error)
740	getStatement() (string, errors.Error)
741	getCredentials() ([]map[string]string, errors.Error)
742	getScanVector() (timestamp.Vector, errors.Error)
743	getScanVectors() (map[string]timestamp.Vector, errors.Error)
744}
745
746// getRequestParams creates a httpRequestArgs implementation,
747// depending on the content type in the request
748func getRequestParams(req *http.Request) (httpRequestArgs, errors.Error) {
749
750	const (
751		URL_CONTENT  = "application/x-www-form-urlencoded"
752		JSON_CONTENT = "application/json"
753	)
754	content_types := req.Header["Content-Type"]
755	content_type := URL_CONTENT
756
757	if len(content_types) > 0 {
758		content_type = content_types[0]
759	}
760
761	if strings.HasPrefix(content_type, URL_CONTENT) {
762		return newUrlArgs(req)
763	}
764
765	if strings.HasPrefix(content_type, JSON_CONTENT) {
766		return newJsonArgs(req)
767	}
768
769	return newUrlArgs(req)
770}
771
772// urlArgs is an implementation of httpRequestArgs that reads
773// request arguments from a url-encoded http request
774type urlArgs struct {
775	req *http.Request
776}
777
778func newUrlArgs(req *http.Request) (*urlArgs, errors.Error) {
779	for arg, _ := range req.Form {
780		if !isValidParameter(arg) {
781			return nil, errors.NewServiceErrorUnrecognizedParameter(arg)
782		}
783	}
784	return &urlArgs{req: req}, nil
785}
786
787func (this *urlArgs) getStatement() (string, errors.Error) {
788	statement, err := this.formValue(STATEMENT)
789	if err != nil {
790		return "", err
791	}
792
793	if statement == "" && this.req.Method == "POST" {
794		bytes, err := ioutil.ReadAll(this.req.Body)
795		if err != nil {
796			return "", errors.NewServiceErrorBadValue(go_errors.New("unable to read body of request"), STATEMENT)
797		}
798
799		statement = string(bytes)
800	}
801
802	return statement, nil
803}
804
805// A named argument is an argument of the form: $<identifier>=json_value
806func (this *urlArgs) getNamedArgs() (map[string]value.Value, errors.Error) {
807	var args map[string]value.Value
808
809	for name, _ := range this.req.Form {
810		if !strings.HasPrefix(util.TrimSpace(name), "$") {
811			continue
812		}
813		arg, err := this.formValue(name)
814		if err != nil {
815			return args, err
816		}
817		if len(arg) == 0 {
818			//This is an error - there _has_ to be a value for a named argument
819			return args, errors.NewServiceErrorMissingValue(fmt.Sprintf("named argument %s", name))
820		}
821		args = addNamedArg(args, name, value.NewValue([]byte(arg)))
822	}
823	return args, nil
824}
825
826func getJsonDecoder(r io.Reader) (*json.Decoder, errors.Error) {
827	if r == nil {
828		return nil, errors.NewServiceErrorDecodeNil()
829	}
830	return json.NewDecoder(r), nil
831}
832
833// Positional args are of the form: args=json_list
834func (this *urlArgs) getPositionalArgs() (value.Values, errors.Error) {
835	var positionalArgs value.Values
836
837	args_field, err := this.formValue(ARGS)
838	if err != nil || args_field == "" {
839		return positionalArgs, err
840	}
841
842	var args []interface{}
843
844	decoder, err := getJsonDecoder(strings.NewReader(args_field))
845	if err != nil {
846		return positionalArgs, err
847	}
848	e := decoder.Decode(&args)
849	if e != nil {
850		return positionalArgs, errors.NewServiceErrorBadValue(go_errors.New("unable to parse args parameter as array"), ARGS)
851	}
852
853	positionalArgs = make([]value.Value, len(args))
854	// Put each element of args into positionalArgs
855	for i, arg := range args {
856		positionalArgs[i] = value.NewValue(arg)
857	}
858
859	return positionalArgs, nil
860}
861
862// Note: This function has no receiver, which makes it easier to test.
863// The "json" input parameter should be a parsed JSON object structure, the output of a decoder.
864func getScanVectorsFromJSON(json interface{}) (map[string]timestamp.Vector, errors.Error) {
865	bucketMap, ok := json.(map[string]interface{})
866	if !ok {
867		return nil, errors.NewServiceErrorTypeMismatch(SCAN_VECTORS, "map of strings to vectors")
868	}
869
870	out := make(map[string]timestamp.Vector)
871	for k, v := range bucketMap {
872		// Is it a sparse vector?
873		mapVector, ok := v.(map[string]interface{})
874		if ok {
875			entries, err := makeSparseVector(mapVector)
876			if err != nil {
877				return nil, err
878			}
879			out[k] = entries
880			continue
881		}
882		// Is it a full vector?
883		arrayVector, ok := v.([]interface{})
884		if ok {
885			entries, err := makeFullVector(arrayVector)
886			if err != nil {
887				return nil, err
888			}
889			out[k] = entries
890			continue
891		}
892		return nil, errors.NewServiceErrorTypeMismatch(k, "full or sparse vector")
893	}
894
895	return out, nil
896}
897
898// Note: This function has no receiver, which makes it easier to test.
899// The "json" input parameter should be a parsed JSON object structure, the output of a decoder.
900func getScanVectorFromJSON(json interface{}) (timestamp.Vector, errors.Error) {
901	// Is it a sparse vector?
902	mapVector, ok := json.(map[string]interface{})
903	if ok {
904		return makeSparseVector(mapVector)
905	}
906
907	// Is it a full vector?
908	arrayVector, ok := json.([]interface{})
909	if ok {
910		return makeFullVector(arrayVector)
911	}
912
913	return nil, errors.NewServiceErrorTypeMismatch(SCAN_VECTOR, "full or sparse vector")
914}
915
916func (this *urlArgs) getScanVector() (timestamp.Vector, errors.Error) {
917	scan_vector_data_field, err := this.formValue(SCAN_VECTOR)
918
919	if err != nil || scan_vector_data_field == "" {
920		return nil, err
921	}
922
923	var target interface{}
924	decoder, err := getJsonDecoder(strings.NewReader(scan_vector_data_field))
925	if err != nil {
926		return nil, err
927	}
928	e := decoder.Decode(&target)
929	if e != nil {
930		return nil, errors.NewServiceErrorBadValue(go_errors.New("unable to parse scan vector"), SCAN_VECTOR)
931	}
932
933	return getScanVectorFromJSON(target)
934}
935
936func (this *urlArgs) getScanVectors() (map[string]timestamp.Vector, errors.Error) {
937	scan_vectors_data_field, err := this.formValue(SCAN_VECTORS)
938
939	if err != nil || scan_vectors_data_field == "" {
940		return nil, err
941	}
942
943	var target interface{}
944	decoder, err := getJsonDecoder(strings.NewReader(scan_vectors_data_field))
945	if err != nil {
946		return nil, err
947	}
948	e := decoder.Decode(&target)
949	if e != nil {
950		return nil, errors.NewServiceErrorBadValue(go_errors.New("unable to parse scan vectors"), SCAN_VECTORS)
951	}
952
953	return getScanVectorsFromJSON(target)
954}
955
956func (this *urlArgs) getDuration(f string) (time.Duration, errors.Error) {
957	var timeout time.Duration
958
959	timeout_field, err := this.formValue(f)
960	if err == nil && timeout_field != "" {
961		timeout, err = newDuration(timeout_field)
962	}
963	return timeout, err
964}
965
966func (this *urlArgs) getString(f string, dflt string) (string, errors.Error) {
967	value := dflt
968
969	value_field, err := this.formValue(f)
970	if err == nil && value_field != "" {
971		value = value_field
972	}
973	return value, err
974}
975
976func (this *urlArgs) getTristate(f string) (value.Tristate, errors.Error) {
977	tristate_value := value.NONE
978	value_field, err := this.formValue(f)
979	if err != nil {
980		return tristate_value, err
981	}
982	if value_field == "" {
983		return tristate_value, nil
984	}
985	bool_value, e := strconv.ParseBool(value_field)
986	if e != nil {
987		return tristate_value, errors.NewServiceErrorBadValue(go_errors.New("unable to parse tristate as a boolean"), f)
988	}
989	tristate_value = value.ToTristate(bool_value)
990	return tristate_value, nil
991}
992
993func (this *urlArgs) getCredentials() ([]map[string]string, errors.Error) {
994	var creds_data []map[string]string
995
996	creds_field, err := this.formValue(CREDS)
997	if err == nil && creds_field != "" {
998		decoder, err := getJsonDecoder(strings.NewReader(creds_field))
999		if err != nil {
1000			return creds_data, err
1001		}
1002		e := decoder.Decode(&creds_data)
1003		if e != nil {
1004			err = errors.NewServiceErrorBadValue(go_errors.New("unable to parse creds"), CREDS)
1005		}
1006	}
1007	return creds_data, err
1008}
1009
1010func (this *urlArgs) getValue(field string) (value.Value, errors.Error) {
1011	var val value.Value
1012	value_field, err := this.getString(field, "")
1013	if err == nil && value_field != "" {
1014		val = value.NewValue([]byte(value_field))
1015	}
1016	return val, err
1017}
1018
1019// To handle cases where the inp req contains spaces before and after the
1020// query param add Trimspace().
1021func (this *urlArgs) getField(field string) []string {
1022	for name, value := range this.req.Form {
1023
1024		if strings.EqualFold(util.TrimSpace(field), util.TrimSpace(name)) {
1025			return value
1026		}
1027	}
1028	return nil
1029}
1030
1031func (this *urlArgs) formValue(field string) (string, errors.Error) {
1032	values := this.getField(field)
1033
1034	switch len(values) {
1035	case 0:
1036		return "", nil
1037	case 1:
1038		return util.TrimSpace(values[0]), nil
1039	default:
1040		return "", errors.NewServiceErrorMultipleValues(field)
1041	}
1042}
1043
1044// jsonArgs is an implementation of httpRequestArgs that reads
1045// request arguments from a json-encoded http request
1046type jsonArgs struct {
1047	args map[string]interface{}
1048	req  *http.Request
1049}
1050
1051// create a jsonArgs structure from the given http request.
1052func newJsonArgs(req *http.Request) (*jsonArgs, errors.Error) {
1053	var p jsonArgs
1054	decoder, e := getJsonDecoder(req.Body)
1055	if e != nil {
1056		return nil, e
1057	}
1058	err := decoder.Decode(&p.args)
1059	if err != nil {
1060		return nil, errors.NewServiceErrorBadValue(go_errors.New("unable to parse JSON"), "JSON request body")
1061	}
1062	for arg, _ := range p.args {
1063		if !isValidParameter(arg) {
1064			return nil, errors.NewServiceErrorUnrecognizedParameter(arg)
1065		}
1066	}
1067	p.req = req
1068	return &p, nil
1069}
1070
1071func (this *jsonArgs) getField(field string) (interface{}, bool) {
1072	for name, value := range this.args {
1073		if strings.EqualFold(field, name) {
1074			return value, true
1075		}
1076	}
1077	return nil, false
1078}
1079
1080func (this *jsonArgs) getStatement() (string, errors.Error) {
1081	return this.getString(STATEMENT, "")
1082}
1083
1084func (this *jsonArgs) getNamedArgs() (map[string]value.Value, errors.Error) {
1085	var args map[string]value.Value
1086	for name, arg := range this.args {
1087		if !strings.HasPrefix(name, "$") {
1088			continue
1089		}
1090		args = addNamedArg(args, name, value.NewValue(arg))
1091	}
1092	return args, nil
1093}
1094
1095func (this *jsonArgs) getPositionalArgs() (value.Values, errors.Error) {
1096	var positionalArgs value.Values
1097
1098	args_field, in_request := this.getField(ARGS)
1099	if !in_request {
1100		return positionalArgs, nil
1101	}
1102
1103	args, type_ok := args_field.([]interface{})
1104	if !type_ok {
1105		return positionalArgs, errors.NewServiceErrorTypeMismatch(ARGS, "array")
1106	}
1107
1108	positionalArgs = make([]value.Value, len(args))
1109	// Put each element of args into positionalArgs
1110	for i, arg := range args {
1111		positionalArgs[i] = value.NewValue(arg)
1112	}
1113
1114	return positionalArgs, nil
1115}
1116
1117func (this *jsonArgs) getCredentials() ([]map[string]string, errors.Error) {
1118	creds_field, in_request := this.getField(CREDS)
1119	if !in_request {
1120		return nil, nil
1121	}
1122
1123	creds_items, arr_ok := creds_field.([]interface{})
1124	if arr_ok {
1125		creds_data := make([]map[string]string, len(creds_items))
1126		for i, item := range creds_items {
1127			map_item, map_ok := item.(map[string]interface{})
1128			if map_ok {
1129				map_new := make(map[string]string, len(map_item))
1130				for k, v := range map_item {
1131					vs, v_ok := v.(string)
1132					if v_ok {
1133						map_new[k] = vs
1134					} else {
1135						return nil, errors.NewServiceErrorTypeMismatch(CREDS,
1136							"array of { user, pass }")
1137					}
1138				}
1139				creds_data[i] = map_new
1140			} else {
1141				return nil, errors.NewServiceErrorTypeMismatch(CREDS, "array of { user, pass }")
1142			}
1143		}
1144		return creds_data, nil
1145	}
1146	return nil, errors.NewServiceErrorTypeMismatch(CREDS, "array of { user, pass }")
1147}
1148
1149func (this *jsonArgs) getScanVectors() (map[string]timestamp.Vector, errors.Error) {
1150	scan_vectors_data, in_request := this.getField(SCAN_VECTORS)
1151	if !in_request {
1152		return nil, nil
1153	}
1154	return getScanVectorsFromJSON(scan_vectors_data)
1155}
1156
1157func (this *jsonArgs) getScanVector() (timestamp.Vector, errors.Error) {
1158	scan_vector_data, in_request := this.getField(SCAN_VECTOR)
1159	if !in_request {
1160		return nil, nil
1161	}
1162	return getScanVectorFromJSON(scan_vector_data)
1163}
1164
1165func makeVectorEntry(index int, args interface{}) (*scanVectorEntry, errors.Error) {
1166	data, is_map := args.(map[string]interface{})
1167	if !is_map {
1168		return nil, errors.NewServiceErrorTypeMismatch(SCAN_VECTOR, "array or map of { number, string }")
1169	}
1170	value, has_value := data["value"]
1171	if !has_value {
1172		return nil, errors.NewServiceErrorTypeMismatch(SCAN_VECTOR, "array or map of { number, string }")
1173	}
1174	value_val, is_number := value.(float64)
1175	if !is_number {
1176		return nil, errors.NewServiceErrorTypeMismatch(SCAN_VECTOR, "array or map of { number, string }")
1177	}
1178	guard, has_guard := data["guard"]
1179	if !has_guard {
1180		return nil, errors.NewServiceErrorTypeMismatch(SCAN_VECTOR, "array or map of { number, string }")
1181	}
1182	guard_val, guard_ok := guard.(string)
1183	if !guard_ok {
1184		return nil, errors.NewServiceErrorTypeMismatch(SCAN_VECTOR, "array or map of { number, string }")
1185	}
1186	return &scanVectorEntry{
1187		position: uint32(index),
1188		value:    uint64(value_val),
1189		guard:    guard_val,
1190	}, nil
1191}
1192
1193func (this *jsonArgs) getDuration(f string) (time.Duration, errors.Error) {
1194	var timeout time.Duration
1195	t, err := this.getString(f, "0s")
1196	if err != nil {
1197		return timeout, err
1198	}
1199	return newDuration(t)
1200}
1201
1202func (this *jsonArgs) getTristate(f string) (value.Tristate, errors.Error) {
1203	value_tristate := value.NONE
1204	value_field, in_request := this.getField(f)
1205	if !in_request {
1206		return value_tristate, nil
1207	}
1208
1209	b, type_ok := value_field.(bool)
1210	if !type_ok {
1211		return value_tristate, errors.NewServiceErrorTypeMismatch(f, "boolean")
1212	}
1213
1214	value_tristate = value.ToTristate(b)
1215	return value_tristate, nil
1216}
1217
1218// helper function to get a string type argument
1219func (this *jsonArgs) getString(f string, dflt string) (string, errors.Error) {
1220	value_field, in_request := this.getField(f)
1221	if !in_request {
1222		return dflt, nil
1223	}
1224
1225	value, type_ok := value_field.(string)
1226	if !type_ok {
1227		return value, errors.NewServiceErrorTypeMismatch(f, "string")
1228	}
1229	return value, nil
1230}
1231
1232func (this *jsonArgs) getValue(f string) (value.Value, errors.Error) {
1233	var val value.Value
1234	value_field, in_request := this.getField(f)
1235	if !in_request {
1236		return val, nil
1237	}
1238
1239	val = value.NewValue(value_field)
1240	return val, nil
1241}
1242
1243type Encoding int
1244
1245const (
1246	UTF8 Encoding = iota
1247	UNDEFINED_ENCODING
1248)
1249
1250func newEncoding(s string) Encoding {
1251	switch strings.ToUpper(s) {
1252	case "UTF-8":
1253		return UTF8
1254	default:
1255		return UNDEFINED_ENCODING
1256	}
1257}
1258
1259func (e Encoding) String() string {
1260	var s string
1261	switch e {
1262	case UTF8:
1263		s = "UTF-8"
1264	default:
1265		s = "UNDEFINED_ENCODING"
1266	}
1267	return s
1268}
1269
1270type Format int
1271
1272const (
1273	JSON Format = iota
1274	XML
1275	CSV
1276	TSV
1277	UNDEFINED_FORMAT
1278)
1279
1280func newFormat(s string) Format {
1281	switch strings.ToUpper(s) {
1282	case "JSON":
1283		return JSON
1284	case "XML":
1285		return XML
1286	case "CSV":
1287		return CSV
1288	case "TSV":
1289		return TSV
1290	default:
1291		return UNDEFINED_FORMAT
1292	}
1293}
1294
1295func (f Format) String() string {
1296	var s string
1297	switch f {
1298	case JSON:
1299		s = "JSON"
1300	case XML:
1301		s = "XML"
1302	case CSV:
1303		s = "CSV"
1304	case TSV:
1305		s = "TSV"
1306	default:
1307		s = "UNDEFINED_FORMAT"
1308	}
1309	return s
1310}
1311
1312type Compression int
1313
1314const (
1315	NONE Compression = iota
1316	ZIP
1317	RLE
1318	LZMA
1319	LZO
1320	UNDEFINED_COMPRESSION
1321)
1322
1323func newCompression(s string) Compression {
1324	switch strings.ToUpper(s) {
1325	case "NONE":
1326		return NONE
1327	case "ZIP":
1328		return ZIP
1329	case "RLE":
1330		return RLE
1331	case "LZMA":
1332		return LZMA
1333	case "LZO":
1334		return LZO
1335	default:
1336		return UNDEFINED_COMPRESSION
1337	}
1338}
1339
1340func (c Compression) String() string {
1341	var s string
1342	switch c {
1343	case NONE:
1344		s = "NONE"
1345	case ZIP:
1346		s = "ZIP"
1347	case RLE:
1348		s = "RLE"
1349	case LZMA:
1350		s = "LZMA"
1351	case LZO:
1352		s = "LZO"
1353	default:
1354		s = "UNDEFINED_COMPRESSION"
1355	}
1356	return s
1357}
1358
1359// scanVectorEntry implements timestamp.Entry
1360type scanVectorEntry struct {
1361	position uint32
1362	value    uint64
1363	guard    string
1364}
1365
1366func (this *scanVectorEntry) Position() uint32 {
1367	return this.position
1368}
1369
1370func (this *scanVectorEntry) Value() uint64 {
1371	return this.value
1372}
1373
1374func (this *scanVectorEntry) Guard() string {
1375	return this.guard
1376}
1377
1378// scanVectorEntries implements timestamp.Vector
1379type scanVectorEntries struct {
1380	entries []timestamp.Entry
1381}
1382
1383func (this *scanVectorEntries) Entries() []timestamp.Entry {
1384	return this.entries
1385}
1386
1387// A scan vector entry is an array of length two: [number, string] holding a sequence number and a guard string.
1388func extractValues(arr []interface{}) (uint64, string, errors.Error) {
1389	if len(arr) != 2 {
1390		return 0, "", errors.NewServiceErrorScanVectorBadLength(arr)
1391	}
1392	sequenceNum, found := arr[0].(float64)
1393	if !found {
1394		return 0, "", errors.NewServiceErrorScanVectorBadSequenceNumber(arr[0])
1395	}
1396	if sequenceNum < 0.0 || sequenceNum > math.MaxUint64 || math.Floor(sequenceNum) != sequenceNum {
1397		return 0, "", errors.NewServiceErrorScanVectorBadSequenceNumber(arr[0])
1398	}
1399	uuid, found := arr[1].(string)
1400	if !found {
1401		return 0, "", errors.NewServiceErrorScanVectorBadUUID(arr[1])
1402	}
1403	return uint64(sequenceNum), uuid, nil
1404}
1405
1406func makeSparseVector(args map[string]interface{}) (*scanVectorEntries, errors.Error) {
1407	entries := make([]timestamp.Entry, len(args))
1408	i := 0
1409	for key, arg := range args {
1410		index, err := strconv.Atoi(key)
1411		if err != nil {
1412			return nil, errors.NewServiceErrorBadValue(go_errors.New("Key value is not integer: "+key), SCAN_VECTOR)
1413		}
1414		array, ok := arg.([]interface{})
1415		if !ok {
1416			return nil, errors.NewServiceErrorTypeMismatch("scan vector entry", "two-element array")
1417		}
1418		sequenceNum, uuid, error := extractValues(array)
1419		if err != nil {
1420			return nil, error
1421		}
1422		entries[i] = &scanVectorEntry{
1423			position: uint32(index),
1424			value:    sequenceNum,
1425			guard:    uuid,
1426		}
1427		i = i + 1
1428	}
1429	return &scanVectorEntries{
1430		entries: entries,
1431	}, nil
1432}
1433
1434func makeFullVector(args []interface{}) (*scanVectorEntries, errors.Error) {
1435	if len(args) != SCAN_VECTOR_SIZE {
1436		return nil, errors.NewServiceErrorTypeMismatch(SCAN_VECTOR,
1437			fmt.Sprintf("array of %d entries", SCAN_VECTOR_SIZE))
1438	}
1439	entries := make([]timestamp.Entry, len(args))
1440	for i, arg := range args {
1441		array, ok := arg.([]interface{})
1442		if !ok {
1443			return nil, errors.NewServiceErrorTypeMismatch("entry of a full scan vector",
1444				"array of length 2")
1445		}
1446		sequenceNum, uuid, err := extractValues(array)
1447		if err != nil {
1448			return nil, err
1449		}
1450		entries[i] = &scanVectorEntry{
1451			position: uint32(i),
1452			value:    sequenceNum,
1453			guard:    uuid,
1454		}
1455	}
1456	return &scanVectorEntries{
1457		entries: entries,
1458	}, nil
1459}
1460
1461const SCAN_VECTOR_SIZE = 1024
1462
1463type scanConfigImpl struct {
1464	scan_level         server.ScanConsistency
1465	scan_wait          time.Duration
1466	scan_vector_source timestamp.ScanVectorSource
1467}
1468
1469func (this *scanConfigImpl) ScanConsistency() datastore.ScanConsistency {
1470	if this == nil {
1471		return datastore.UNBOUNDED
1472	}
1473	switch this.scan_level {
1474	case server.NOT_BOUNDED:
1475		return datastore.UNBOUNDED
1476	case server.REQUEST_PLUS, server.STATEMENT_PLUS:
1477		return datastore.SCAN_PLUS
1478	case server.AT_PLUS:
1479		return datastore.AT_PLUS
1480	default:
1481		return datastore.UNBOUNDED
1482	}
1483}
1484
1485func (this *scanConfigImpl) ScanWait() time.Duration {
1486	return this.scan_wait
1487}
1488
1489func (this *scanConfigImpl) ScanVectorSource() timestamp.ScanVectorSource {
1490	return this.scan_vector_source
1491}
1492
1493func newScanConsistency(s string) server.ScanConsistency {
1494	switch strings.ToUpper(s) {
1495	case "NOT_BOUNDED":
1496		return server.NOT_BOUNDED
1497	case "REQUEST_PLUS":
1498		return server.REQUEST_PLUS
1499	case "STATEMENT_PLUS":
1500		return server.STATEMENT_PLUS
1501	case "AT_PLUS":
1502		return server.AT_PLUS
1503	default:
1504		return server.UNDEFINED_CONSISTENCY
1505	}
1506}
1507
1508// addNamedArgs is used by getNamedArgs implementations to add a named argument
1509func addNamedArg(args map[string]value.Value, name string, arg value.Value) map[string]value.Value {
1510	if args == nil {
1511		args = make(map[string]value.Value)
1512	}
1513	name = util.TrimSpace(name)
1514	// The '$' is trimmed from the argument name when added to args:
1515	args[strings.TrimPrefix(name, "$")] = arg
1516	return args
1517}
1518
1519// helper function to create a time.Duration instance from a given string.
1520// There must be a unit - valid units are "ns", "us", "ms", "s", "m", "h"
1521func newDuration(s string) (duration time.Duration, err errors.Error) {
1522	// Error if given string has no unit
1523	last_char := s[len(s)-1]
1524	if last_char != 's' && last_char != 'm' && last_char != 'h' {
1525		err = errors.NewServiceErrorBadValue(nil,
1526			fmt.Sprintf("duration value %s: missing or incorrect unit "+
1527				"(valid units: ns, us, ms, s, m, h)", s))
1528	}
1529	if err == nil {
1530		d, e := time.ParseDuration(s)
1531		if e != nil {
1532			err = errors.NewServiceErrorBadValue(go_errors.New("Unable to parse duration: "+s), "duration")
1533		} else {
1534			duration = d
1535		}
1536	}
1537	return
1538}
1539