1// Package queryport provides a simple library to spawn a queryport and access
2// queryport via passive client API.
3//
4// ---> Request                 ---> Request
5//      <--- Response                <--- Response
6//      <--- Response                <--- Response
7//      ...                     ---> EndStreamRequest
8//      <--- StreamEndResponse       <--- Response (residue)
9//                                   <--- StreamEndResponse
10
11package client
12
13import "errors"
14import "fmt"
15import "io"
16import "net"
17import "time"
18import json "github.com/couchbase/indexing/secondary/common/json"
19import "sync/atomic"
20
21import "github.com/couchbase/indexing/secondary/logging"
22import "github.com/couchbase/indexing/secondary/common"
23import protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
24import "github.com/couchbase/indexing/secondary/transport"
25import "github.com/golang/protobuf/proto"
26
27// GsiScanClient for scan operations.
28type GsiScanClient struct {
29	queryport string
30	pool      *connectionPool
31	// config params
32	maxPayload         int // TODO: what if it exceeds ?
33	readDeadline       time.Duration
34	writeDeadline      time.Duration
35	poolSize           int
36	poolOverflow       int
37	cpTimeout          time.Duration
38	cpAvailWaitTimeout time.Duration
39	logPrefix          string
40	minPoolSizeWM      int32
41	relConnBatchSize   int32
42
43	serverVersion uint32
44}
45
46func NewGsiScanClient(queryport string, config common.Config) (*GsiScanClient, error) {
47	t := time.Duration(config["connPoolAvailWaitTimeout"].Int())
48	c := &GsiScanClient{
49		queryport:          queryport,
50		maxPayload:         config["maxPayload"].Int(),
51		readDeadline:       time.Duration(config["readDeadline"].Int()),
52		writeDeadline:      time.Duration(config["writeDeadline"].Int()),
53		poolSize:           config["settings.poolSize"].Int(),
54		poolOverflow:       config["settings.poolOverflow"].Int(),
55		cpTimeout:          time.Duration(config["connPoolTimeout"].Int()),
56		cpAvailWaitTimeout: t,
57		logPrefix:          fmt.Sprintf("[GsiScanClient:%q]", queryport),
58		minPoolSizeWM:      int32(config["settings.minPoolSizeWM"].Int()),
59		relConnBatchSize:   int32(config["settings.relConnBatchSize"].Int()),
60	}
61	c.pool = newConnectionPool(
62		queryport, c.poolSize, c.poolOverflow, c.maxPayload, c.cpTimeout,
63		c.cpAvailWaitTimeout, c.minPoolSizeWM, c.relConnBatchSize)
64	logging.Infof("%v started ...\n", c.logPrefix)
65
66	if version, err := c.Helo(); err == nil || err == io.EOF {
67		atomic.StoreUint32(&c.serverVersion, version)
68	} else {
69		c.pool.Close()
70		return nil, fmt.Errorf("%s: unable to obtain server version. Error = %v", queryport, err)
71	}
72
73	return c, nil
74}
75
76func (c *GsiScanClient) RefreshServerVersion() {
77	// refresh the version ONLY IF there is no error, so we absolutely
78	// know we have right version.
79	if version, err := c.Helo(); err == nil {
80		if version != atomic.LoadUint32(&c.serverVersion) {
81			atomic.StoreUint32(&c.serverVersion, version)
82		}
83	}
84}
85
86func (c *GsiScanClient) NeedSessionConsVector() bool {
87	return atomic.LoadUint32(&c.serverVersion) == 0
88}
89
90func (c *GsiScanClient) Helo() (uint32, error) {
91	req := &protobuf.HeloRequest{
92		Version: proto.Uint32(uint32(protobuf.ProtobufVersion())),
93	}
94
95	resp, err := c.doRequestResponse(req, "")
96	if err != nil {
97		return 0, err
98	}
99	heloResp := resp.(*protobuf.HeloResponse)
100	return heloResp.GetVersion(), nil
101}
102
103// LookupStatistics for a single secondary-key.
104func (c *GsiScanClient) LookupStatistics(
105	defnID uint64, value common.SecondaryKey) (common.IndexStatistics, error) {
106
107	// serialize lookup value.
108	val, err := json.Marshal(value)
109	if err != nil {
110		return nil, err
111	}
112	req := &protobuf.StatisticsRequest{
113		DefnID: proto.Uint64(defnID),
114		Span:   &protobuf.Span{Equals: [][]byte{val}},
115	}
116	resp, err := c.doRequestResponse(req, "")
117	if err != nil {
118		return nil, err
119	}
120	statResp := resp.(*protobuf.StatisticsResponse)
121	if statResp.GetErr() != nil {
122		err = errors.New(statResp.GetErr().GetError())
123		return nil, err
124	}
125	return statResp.GetStats(), nil
126}
127
128// RangeStatistics for index range.
129func (c *GsiScanClient) RangeStatistics(
130	defnID uint64, low, high common.SecondaryKey,
131	inclusion Inclusion) (common.IndexStatistics, error) {
132
133	// serialize low and high values.
134	l, err := json.Marshal(low)
135	if err != nil {
136		return nil, err
137	}
138	h, err := json.Marshal(high)
139	if err != nil {
140		return nil, err
141	}
142
143	req := &protobuf.StatisticsRequest{
144		DefnID: proto.Uint64(defnID),
145		Span: &protobuf.Span{
146			Range: &protobuf.Range{
147				Low: l, High: h, Inclusion: proto.Uint32(uint32(inclusion)),
148			},
149		},
150	}
151	resp, err := c.doRequestResponse(req, "")
152	if err != nil {
153		return nil, err
154	}
155	statResp := resp.(*protobuf.StatisticsResponse)
156	if statResp.GetErr() != nil {
157		err = errors.New(statResp.GetErr().GetError())
158		return nil, err
159	}
160	return statResp.GetStats(), nil
161}
162
163// Lookup scan index between low and high.
164func (c *GsiScanClient) Lookup(
165	defnID uint64, requestId string, values []common.SecondaryKey,
166	distinct bool, limit int64,
167	cons common.Consistency, vector *TsConsistency,
168	callb ResponseHandler,
169	rollbackTime int64,
170	partitions []common.PartitionId) (error, bool) {
171
172	// serialize lookup value.
173	equals := make([][]byte, 0, len(values))
174	for _, value := range values {
175		val, err := json.Marshal(value)
176		if err != nil {
177			return err, false
178		}
179		equals = append(equals, val)
180	}
181
182	connectn, err := c.pool.Get()
183	if err != nil {
184		return err, false
185	}
186	healthy := true
187	closeStream := false
188	conn, pkt := connectn.conn, connectn.pkt
189	defer func() {
190		go func() {
191			if closeStream {
192				_, healthy = c.closeStream(conn, pkt, requestId)
193			}
194			c.pool.Return(connectn, healthy)
195		}()
196	}()
197
198	partnIds := make([]uint64, len(partitions))
199	for i, partnId := range partitions {
200		partnIds[i] = uint64(partnId)
201	}
202
203	req := &protobuf.ScanRequest{
204		DefnID:       proto.Uint64(defnID),
205		RequestId:    proto.String(requestId),
206		Span:         &protobuf.Span{Equals: equals},
207		Distinct:     proto.Bool(distinct),
208		Limit:        proto.Int64(limit),
209		Cons:         proto.Uint32(uint32(cons)),
210		RollbackTime: proto.Int64(rollbackTime),
211		PartitionIds: partnIds,
212		Sorted:       proto.Bool(true),
213	}
214	if vector != nil {
215		req.Vector = protobuf.NewTsConsistency(
216			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
217	}
218
219	// ---> protobuf.ScanRequest
220	if err := c.sendRequest(conn, pkt, req); err != nil {
221		fmsg := "%v Lookup(%v) request transport failed `%v`\n"
222		logging.Errorf(fmsg, c.logPrefix, requestId, err)
223		healthy = false
224		return err, false
225	}
226
227	cont, partial := true, false
228	for cont {
229		// <--- protobuf.ResponseStream
230		cont, healthy, err, closeStream = c.streamResponse(conn, pkt, callb, requestId)
231		if err != nil { // if err, cont should have been set to false
232			fmsg := "%v Lookup(%s) response failed `%v`\n"
233			logging.Errorf(fmsg, c.logPrefix, requestId, err)
234		} else { // partially succeeded
235			partial = true
236		}
237	}
238	return err, partial
239}
240
241// Range scan index between low and high.
242func (c *GsiScanClient) Range(
243	defnID uint64, requestId string, low, high common.SecondaryKey, inclusion Inclusion,
244	distinct bool, limit int64, cons common.Consistency, vector *TsConsistency,
245	callb ResponseHandler, rollbackTime int64, partitions []common.PartitionId) (error, bool) {
246
247	// serialize low and high values.
248	l, err := json.Marshal(low)
249	if err != nil {
250		return err, false
251	}
252	h, err := json.Marshal(high)
253	if err != nil {
254		return err, false
255	}
256
257	connectn, err := c.pool.Get()
258	if err != nil {
259		return err, false
260	}
261	healthy := true
262	closeStream := false
263	conn, pkt := connectn.conn, connectn.pkt
264	defer func() {
265		go func() {
266			if closeStream {
267				_, healthy = c.closeStream(conn, pkt, requestId)
268			}
269			c.pool.Return(connectn, healthy)
270		}()
271	}()
272
273	partnIds := make([]uint64, len(partitions))
274	for i, partnId := range partitions {
275		partnIds[i] = uint64(partnId)
276	}
277
278	req := &protobuf.ScanRequest{
279		DefnID:    proto.Uint64(defnID),
280		RequestId: proto.String(requestId),
281		Span: &protobuf.Span{
282			Range: &protobuf.Range{
283				Low: l, High: h, Inclusion: proto.Uint32(uint32(inclusion)),
284			},
285		},
286		Distinct:     proto.Bool(distinct),
287		Limit:        proto.Int64(limit),
288		Cons:         proto.Uint32(uint32(cons)),
289		RollbackTime: proto.Int64(rollbackTime),
290		PartitionIds: partnIds,
291		Sorted:       proto.Bool(true),
292	}
293	if vector != nil {
294		req.Vector = protobuf.NewTsConsistency(
295			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
296	}
297	// ---> protobuf.ScanRequest
298	if err := c.sendRequest(conn, pkt, req); err != nil {
299		fmsg := "%v Range(%v) request transport failed `%v`\n"
300		logging.Errorf(fmsg, c.logPrefix, requestId, err)
301		healthy = false
302		return err, false
303	}
304
305	cont, partial := true, false
306	for cont {
307		// <--- protobuf.ResponseStream
308		cont, healthy, err, closeStream = c.streamResponse(conn, pkt, callb, requestId)
309		if err != nil { // if err, cont should have been set to false
310			fmsg := "%v Range(%v) response failed `%v`\n"
311			logging.Errorf(fmsg, c.logPrefix, requestId, err)
312		} else { // partial succeeded
313			partial = true
314		}
315	}
316	return err, partial
317}
318
319// Range scan index between low and high.
320func (c *GsiScanClient) RangePrimary(
321	defnID uint64, requestId string, low, high []byte, inclusion Inclusion,
322	distinct bool, limit int64, cons common.Consistency, vector *TsConsistency,
323	callb ResponseHandler, rollbackTime int64, partitions []common.PartitionId) (error, bool) {
324
325	connectn, err := c.pool.Get()
326	if err != nil {
327		return err, false
328	}
329	healthy := true
330	closeStream := false
331	conn, pkt := connectn.conn, connectn.pkt
332	defer func() {
333		go func() {
334			if closeStream {
335				_, healthy = c.closeStream(conn, pkt, requestId)
336			}
337			c.pool.Return(connectn, healthy)
338		}()
339	}()
340
341	partnIds := make([]uint64, len(partitions))
342	for i, partnId := range partitions {
343		partnIds[i] = uint64(partnId)
344	}
345
346	req := &protobuf.ScanRequest{
347		DefnID:    proto.Uint64(defnID),
348		RequestId: proto.String(requestId),
349		Span: &protobuf.Span{
350			Range: &protobuf.Range{
351				Low: low, High: high,
352				Inclusion: proto.Uint32(uint32(inclusion)),
353			},
354		},
355		Distinct:     proto.Bool(distinct),
356		Limit:        proto.Int64(limit),
357		Cons:         proto.Uint32(uint32(cons)),
358		RollbackTime: proto.Int64(rollbackTime),
359		PartitionIds: partnIds,
360		Sorted:       proto.Bool(true),
361	}
362	if vector != nil {
363		req.Vector = protobuf.NewTsConsistency(
364			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
365	}
366	// ---> protobuf.ScanRequest
367	if err := c.sendRequest(conn, pkt, req); err != nil {
368		fmsg := "%v RangePrimary(%v) request transport failed `%v`\n"
369		logging.Errorf(fmsg, c.logPrefix, requestId, err)
370		healthy = false
371		return err, false
372	}
373
374	cont, partial := true, false
375	for cont {
376		// <--- protobuf.ResponseStream
377		cont, healthy, err, closeStream = c.streamResponse(conn, pkt, callb, requestId)
378		if err != nil { // if err, cont should have been set to false
379			fmsg := "%v RangePrimary(%v) response failed `%v`\n"
380			logging.Errorf(fmsg, c.logPrefix, requestId, err)
381		} else {
382			partial = true
383		}
384	}
385	return err, partial
386}
387
388// ScanAll for full table scan.
389func (c *GsiScanClient) ScanAll(
390	defnID uint64, requestId string, limit int64,
391	cons common.Consistency, vector *TsConsistency,
392	callb ResponseHandler, rollbackTime int64, partitions []common.PartitionId) (error, bool) {
393
394	connectn, err := c.pool.Get()
395	if err != nil {
396		return err, false
397	}
398	healthy := true
399	closeStream := false
400	conn, pkt := connectn.conn, connectn.pkt
401	defer func() {
402		go func() {
403			if closeStream {
404				_, healthy = c.closeStream(conn, pkt, requestId)
405			}
406			c.pool.Return(connectn, healthy)
407		}()
408	}()
409
410	partnIds := make([]uint64, len(partitions))
411	for i, partnId := range partitions {
412		partnIds[i] = uint64(partnId)
413	}
414
415	req := &protobuf.ScanAllRequest{
416		DefnID:       proto.Uint64(defnID),
417		RequestId:    proto.String(requestId),
418		Limit:        proto.Int64(limit),
419		Cons:         proto.Uint32(uint32(cons)),
420		RollbackTime: proto.Int64(rollbackTime),
421		PartitionIds: partnIds,
422	}
423	if vector != nil {
424		req.Vector = protobuf.NewTsConsistency(
425			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
426	}
427	if err := c.sendRequest(conn, pkt, req); err != nil {
428		fmsg := "%v ScanAll(%v) request transport failed `%v`\n"
429		logging.Errorf(fmsg, c.logPrefix, requestId, err)
430		healthy = false
431		return err, false
432	}
433
434	cont, partial := true, false
435	for cont {
436		// <--- protobuf.ResponseStream
437		cont, healthy, err, closeStream = c.streamResponse(conn, pkt, callb, requestId)
438		if err != nil { // if err, cont should have been set to false
439			fmsg := "%v ScanAll(%v) response failed `%v`\n"
440			logging.Errorf(fmsg, c.logPrefix, requestId, err)
441		} else {
442			partial = true
443		}
444	}
445	return err, partial
446}
447
448func (c *GsiScanClient) MultiScan(
449	defnID uint64, requestId string, scans Scans,
450	reverse, distinct bool, projection *IndexProjection, offset, limit int64,
451	cons common.Consistency, vector *TsConsistency,
452	callb ResponseHandler, rollbackTime int64, partitions []common.PartitionId) (error, bool) {
453
454	// serialize scans
455	protoScans := make([]*protobuf.Scan, len(scans))
456	for i, scan := range scans {
457		if scan != nil {
458			var equals [][]byte
459			var filters []*protobuf.CompositeElementFilter
460
461			// If Seek is there, then do not marshall Range
462			if len(scan.Seek) > 0 {
463				equals = make([][]byte, len(scan.Seek))
464				for i, seek := range scan.Seek {
465					s, err := json.Marshal(seek)
466					if err != nil {
467						return err, false
468					}
469					equals[i] = s
470				}
471			} else {
472				filters = make([]*protobuf.CompositeElementFilter, len(scan.Filter))
473				if scan.Filter != nil {
474					for j, f := range scan.Filter {
475						var l, h []byte
476						var err error
477						if f.Low != common.MinUnbounded { // Do not encode if unbounded
478							l, err = json.Marshal(f.Low)
479							if err != nil {
480								return err, false
481							}
482						}
483
484						if f.High != common.MaxUnbounded { // Do not encode if unbounded
485							h, err = json.Marshal(f.High)
486							if err != nil {
487								return err, false
488							}
489						}
490
491						fl := &protobuf.CompositeElementFilter{
492							Low: l, High: h, Inclusion: proto.Uint32(uint32(f.Inclusion)),
493						}
494
495						filters[j] = fl
496					}
497				}
498			}
499			s := &protobuf.Scan{
500				Filters: filters,
501				Equals:  equals,
502			}
503			protoScans[i] = s
504		}
505	}
506
507	//IndexProjection
508	var protoProjection *protobuf.IndexProjection
509	if projection != nil {
510		protoProjection = &protobuf.IndexProjection{
511			EntryKeys:  projection.EntryKeys,
512			PrimaryKey: proto.Bool(projection.PrimaryKey),
513		}
514	}
515
516	connectn, err := c.pool.Get()
517	if err != nil {
518		return err, false
519	}
520	healthy := true
521	closeStream := false
522	conn, pkt := connectn.conn, connectn.pkt
523	defer func() {
524		go func() {
525			if closeStream {
526				_, healthy = c.closeStream(conn, pkt, requestId)
527			}
528			c.pool.Return(connectn, healthy)
529		}()
530	}()
531
532	partnIds := make([]uint64, len(partitions))
533	for i, partnId := range partitions {
534		partnIds[i] = uint64(partnId)
535	}
536
537	req := &protobuf.ScanRequest{
538		DefnID: proto.Uint64(defnID),
539		Span: &protobuf.Span{
540			Range: nil,
541		},
542		RequestId:       proto.String(requestId),
543		Distinct:        proto.Bool(distinct),
544		Limit:           proto.Int64(limit),
545		Cons:            proto.Uint32(uint32(cons)),
546		Scans:           protoScans,
547		Indexprojection: protoProjection,
548		Reverse:         proto.Bool(reverse),
549		Offset:          proto.Int64(offset),
550		RollbackTime:    proto.Int64(rollbackTime),
551		PartitionIds:    partnIds,
552		Sorted:          proto.Bool(true),
553	}
554	if vector != nil {
555		req.Vector = protobuf.NewTsConsistency(
556			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
557	}
558	// ---> protobuf.ScanRequest
559	if err := c.sendRequest(conn, pkt, req); err != nil {
560		fmsg := "%v Range(%v) request transport failed `%v`\n"
561		logging.Errorf(fmsg, c.logPrefix, requestId, err)
562		healthy = false
563		return err, false
564	}
565
566	cont, partial := true, false
567	for cont {
568		// <--- protobuf.ResponseStream
569		cont, healthy, err, closeStream = c.streamResponse(conn, pkt, callb, requestId)
570		if err != nil { // if err, cont should have been set to false
571			fmsg := "%v Scans(%v) response failed `%v`\n"
572			logging.Errorf(fmsg, c.logPrefix, requestId, err)
573		} else { // partial succeeded
574			partial = true
575		}
576	}
577	return err, partial
578}
579
580func (c *GsiScanClient) MultiScanPrimary(
581	defnID uint64, requestId string, scans Scans,
582	reverse, distinct bool, projection *IndexProjection, offset, limit int64,
583	cons common.Consistency, vector *TsConsistency,
584	callb ResponseHandler, rollbackTime int64, partitions []common.PartitionId) (error, bool) {
585
586	var what string
587	// serialize scans
588	protoScans := make([]*protobuf.Scan, 0)
589	for _, scan := range scans {
590		if scan != nil {
591			var equals [][]byte
592			var filters []*protobuf.CompositeElementFilter
593
594			// If Seek is there, then ignore Range
595			if len(scan.Seek) > 0 {
596				var k []byte
597				key := scan.Seek[0]
598				if k, what = curePrimaryKey(key); what == "after" {
599					continue
600				}
601				equals = [][]byte{k}
602			} else {
603				filters = make([]*protobuf.CompositeElementFilter, 0)
604				skip := false
605				if scan.Filter != nil {
606					for _, f := range scan.Filter {
607						var l, h []byte
608						if f.Low != common.MinUnbounded { // Ignore if unbounded
609							if l, what = curePrimaryKey(f.Low); what == "after" {
610								skip = true
611								break
612							}
613						}
614						if f.High != common.MaxUnbounded { // Ignore if unbounded
615							if h, what = curePrimaryKey(f.High); what == "before" {
616								skip = true
617								break
618							}
619						}
620
621						fl := &protobuf.CompositeElementFilter{
622							Low: l, High: h, Inclusion: proto.Uint32(uint32(f.Inclusion)),
623						}
624
625						filters = append(filters, fl)
626					}
627					if skip {
628						continue
629					}
630				}
631			}
632			s := &protobuf.Scan{
633				Filters: filters,
634				Equals:  equals,
635			}
636			protoScans = append(protoScans, s)
637		}
638	}
639
640	if len(protoScans) == 0 {
641		return nil, true
642	}
643
644	//IndexProjection
645	var protoProjection *protobuf.IndexProjection
646	if projection != nil {
647		protoProjection = &protobuf.IndexProjection{
648			EntryKeys:  projection.EntryKeys,
649			PrimaryKey: proto.Bool(projection.PrimaryKey),
650		}
651	}
652
653	connectn, err := c.pool.Get()
654	if err != nil {
655		return err, false
656	}
657	healthy := true
658	closeStream := false
659	conn, pkt := connectn.conn, connectn.pkt
660	defer func() {
661		go func() {
662			if closeStream {
663				_, healthy = c.closeStream(conn, pkt, requestId)
664			}
665			c.pool.Return(connectn, healthy)
666		}()
667	}()
668
669	partnIds := make([]uint64, len(partitions))
670	for i, partnId := range partitions {
671		partnIds[i] = uint64(partnId)
672	}
673
674	req := &protobuf.ScanRequest{
675		DefnID: proto.Uint64(defnID),
676		Span: &protobuf.Span{
677			Range: nil,
678		},
679		RequestId:       proto.String(requestId),
680		Distinct:        proto.Bool(distinct),
681		Limit:           proto.Int64(limit),
682		Cons:            proto.Uint32(uint32(cons)),
683		Scans:           protoScans,
684		Indexprojection: protoProjection,
685		Reverse:         proto.Bool(reverse),
686		Offset:          proto.Int64(offset),
687		RollbackTime:    proto.Int64(rollbackTime),
688		PartitionIds:    partnIds,
689		Sorted:          proto.Bool(true),
690	}
691	if vector != nil {
692		req.Vector = protobuf.NewTsConsistency(
693			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
694	}
695	// ---> protobuf.ScanRequest
696	if err := c.sendRequest(conn, pkt, req); err != nil {
697		fmsg := "%v Range(%v) request transport failed `%v`\n"
698		logging.Errorf(fmsg, c.logPrefix, requestId, err)
699		healthy = false
700		return err, false
701	}
702
703	cont, partial := true, false
704	for cont {
705		// <--- protobuf.ResponseStream
706		cont, healthy, err, closeStream = c.streamResponse(conn, pkt, callb, requestId)
707		if err != nil { // if err, cont should have been set to false
708			fmsg := "%v Scans(%v) response failed `%v`\n"
709			logging.Errorf(fmsg, c.logPrefix, requestId, err)
710		} else { // partial succeeded
711			partial = true
712		}
713	}
714	return err, partial
715}
716
717// CountLookup to count number entries for given set of keys.
718func (c *GsiScanClient) CountLookup(
719	defnID uint64, requestId string, values []common.SecondaryKey,
720	cons common.Consistency, vector *TsConsistency, rollbackTime int64, partitions []common.PartitionId) (int64, error) {
721
722	// serialize match value.
723	equals := make([][]byte, 0, len(values))
724	for _, value := range values {
725		val, err := json.Marshal(value)
726		if err != nil {
727			return 0, err
728		}
729		equals = append(equals, val)
730	}
731
732	partnIds := make([]uint64, len(partitions))
733	for i, partnId := range partitions {
734		partnIds[i] = uint64(partnId)
735	}
736
737	req := &protobuf.CountRequest{
738		DefnID:       proto.Uint64(defnID),
739		RequestId:    proto.String(requestId),
740		Span:         &protobuf.Span{Equals: equals},
741		Cons:         proto.Uint32(uint32(cons)),
742		RollbackTime: proto.Int64(rollbackTime),
743		PartitionIds: partnIds,
744	}
745	if vector != nil {
746		req.Vector = protobuf.NewTsConsistency(
747			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
748	}
749	resp, err := c.doRequestResponse(req, requestId)
750	if err != nil {
751		return 0, err
752	}
753	countResp := resp.(*protobuf.CountResponse)
754	if countResp.GetErr() != nil {
755		err = errors.New(countResp.GetErr().GetError())
756		return 0, err
757	}
758	return countResp.GetCount(), nil
759}
760
761// CountLookup to count number entries for given set of keys for primary index
762func (c *GsiScanClient) CountLookupPrimary(
763	defnID uint64, requestId string, values [][]byte,
764	cons common.Consistency, vector *TsConsistency, rollbackTime int64, partitions []common.PartitionId) (int64, error) {
765
766	partnIds := make([]uint64, len(partitions))
767	for i, partnId := range partitions {
768		partnIds[i] = uint64(partnId)
769	}
770
771	req := &protobuf.CountRequest{
772		DefnID:       proto.Uint64(defnID),
773		RequestId:    proto.String(requestId),
774		Span:         &protobuf.Span{Equals: values},
775		Cons:         proto.Uint32(uint32(cons)),
776		RollbackTime: proto.Int64(rollbackTime),
777		PartitionIds: partnIds,
778	}
779	if vector != nil {
780		req.Vector = protobuf.NewTsConsistency(
781			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
782	}
783	resp, err := c.doRequestResponse(req, requestId)
784	if err != nil {
785		return 0, err
786	}
787	countResp := resp.(*protobuf.CountResponse)
788	if countResp.GetErr() != nil {
789		err = errors.New(countResp.GetErr().GetError())
790		return 0, err
791	}
792	return countResp.GetCount(), nil
793}
794
795// CountRange to count number entries in the given range.
796func (c *GsiScanClient) CountRange(
797	defnID uint64, requestId string, low, high common.SecondaryKey, inclusion Inclusion,
798	cons common.Consistency, vector *TsConsistency, rollbackTime int64, partitions []common.PartitionId) (int64, error) {
799
800	// serialize low and high values.
801	l, err := json.Marshal(low)
802	if err != nil {
803		return 0, err
804	}
805	h, err := json.Marshal(high)
806	if err != nil {
807		return 0, err
808	}
809
810	partnIds := make([]uint64, len(partitions))
811	for i, partnId := range partitions {
812		partnIds[i] = uint64(partnId)
813	}
814
815	req := &protobuf.CountRequest{
816		DefnID:    proto.Uint64(defnID),
817		RequestId: proto.String(requestId),
818		Span: &protobuf.Span{
819			Range: &protobuf.Range{
820				Low: l, High: h, Inclusion: proto.Uint32(uint32(inclusion)),
821			},
822		},
823		Cons:         proto.Uint32(uint32(cons)),
824		RollbackTime: proto.Int64(rollbackTime),
825		PartitionIds: partnIds,
826	}
827	if vector != nil {
828		req.Vector = protobuf.NewTsConsistency(
829			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
830	}
831
832	resp, err := c.doRequestResponse(req, requestId)
833	if err != nil {
834		return 0, err
835	}
836	countResp := resp.(*protobuf.CountResponse)
837	if countResp.GetErr() != nil {
838		err = errors.New(countResp.GetErr().GetError())
839		return 0, err
840	}
841	return countResp.GetCount(), nil
842}
843
844// CountRange to count number entries in the given range for primary index
845func (c *GsiScanClient) CountRangePrimary(
846	defnID uint64, requestId string, low, high []byte, inclusion Inclusion,
847	cons common.Consistency, vector *TsConsistency, rollbackTime int64, partitions []common.PartitionId) (int64, error) {
848
849	partnIds := make([]uint64, len(partitions))
850	for i, partnId := range partitions {
851		partnIds[i] = uint64(partnId)
852	}
853
854	req := &protobuf.CountRequest{
855		DefnID:    proto.Uint64(defnID),
856		RequestId: proto.String(requestId),
857		Span: &protobuf.Span{
858			Range: &protobuf.Range{
859				Low: low, High: high, Inclusion: proto.Uint32(uint32(inclusion)),
860			},
861		},
862		Cons:         proto.Uint32(uint32(cons)),
863		RollbackTime: proto.Int64(rollbackTime),
864		PartitionIds: partnIds,
865	}
866	if vector != nil {
867		req.Vector = protobuf.NewTsConsistency(
868			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
869	}
870
871	resp, err := c.doRequestResponse(req, requestId)
872	if err != nil {
873		return 0, err
874	}
875	countResp := resp.(*protobuf.CountResponse)
876	if countResp.GetErr() != nil {
877		err = errors.New(countResp.GetErr().GetError())
878		return 0, err
879	}
880	return countResp.GetCount(), nil
881}
882
883func (c *GsiScanClient) MultiScanCount(
884	defnID uint64, requestId string, scans Scans, distinct bool,
885	cons common.Consistency, vector *TsConsistency, rollbackTime int64, partitions []common.PartitionId) (int64, error) {
886
887	// serialize scans
888	protoScans := make([]*protobuf.Scan, len(scans))
889	for i, scan := range scans {
890		if scan != nil {
891			var equals [][]byte
892			var filters []*protobuf.CompositeElementFilter
893
894			// If Seek is there, then do not marshall Range
895			if len(scan.Seek) > 0 {
896				equals = make([][]byte, len(scan.Seek))
897				for i, seek := range scan.Seek {
898					s, err := json.Marshal(seek)
899					if err != nil {
900						return 0, err
901					}
902					equals[i] = s
903				}
904			} else {
905				filters = make([]*protobuf.CompositeElementFilter, len(scan.Filter))
906				if scan.Filter != nil {
907					for j, f := range scan.Filter {
908						var l, h []byte
909						var err error
910						if f.Low != common.MinUnbounded { // Do not encode if unbounded
911							l, err = json.Marshal(f.Low)
912							if err != nil {
913								return 0, err
914							}
915						}
916						if f.High != common.MaxUnbounded { // Do not encode if unbounded
917							h, err = json.Marshal(f.High)
918							if err != nil {
919								return 0, err
920							}
921						}
922
923						fl := &protobuf.CompositeElementFilter{
924							Low: l, High: h, Inclusion: proto.Uint32(uint32(f.Inclusion)),
925						}
926
927						filters[j] = fl
928					}
929				}
930			}
931			s := &protobuf.Scan{
932				Filters: filters,
933				Equals:  equals,
934			}
935			protoScans[i] = s
936		}
937	}
938
939	partnIds := make([]uint64, len(partitions))
940	for i, partnId := range partitions {
941		partnIds[i] = uint64(partnId)
942	}
943
944	req := &protobuf.CountRequest{
945		DefnID:    proto.Uint64(defnID),
946		RequestId: proto.String(requestId),
947		Span: &protobuf.Span{
948			Range: nil,
949		},
950		Distinct:     proto.Bool(distinct),
951		Scans:        protoScans,
952		Cons:         proto.Uint32(uint32(cons)),
953		RollbackTime: proto.Int64(rollbackTime),
954		PartitionIds: partnIds,
955	}
956
957	if vector != nil {
958		req.Vector = protobuf.NewTsConsistency(
959			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
960	}
961
962	resp, err := c.doRequestResponse(req, requestId)
963	if err != nil {
964		return 0, err
965	}
966	countResp := resp.(*protobuf.CountResponse)
967	if countResp.GetErr() != nil {
968		err = errors.New(countResp.GetErr().GetError())
969		return 0, err
970	}
971	return countResp.GetCount(), nil
972}
973
974func (c *GsiScanClient) MultiScanCountPrimary(
975	defnID uint64, requestId string, scans Scans, distinct bool,
976	cons common.Consistency, vector *TsConsistency, rollbackTime int64, partitions []common.PartitionId) (int64, error) {
977
978	var what string
979	// serialize scans
980	protoScans := make([]*protobuf.Scan, 0)
981	for _, scan := range scans {
982		if scan != nil {
983			var equals [][]byte
984			var filters []*protobuf.CompositeElementFilter
985
986			// If Seek is there, then ignore Range
987			if len(scan.Seek) > 0 {
988				var k []byte
989				key := scan.Seek[0]
990				if k, what = curePrimaryKey(key); what == "after" {
991					continue
992				}
993				equals = [][]byte{k}
994			} else {
995				filters = make([]*protobuf.CompositeElementFilter, 0)
996				skip := false
997				if scan.Filter != nil {
998					for _, f := range scan.Filter {
999						var l, h []byte
1000						if f.Low != common.MinUnbounded { // Ignore if unbounded
1001							if l, what = curePrimaryKey(f.Low); what == "after" {
1002								skip = true
1003								break
1004							}
1005						}
1006						if f.High != common.MaxUnbounded { // Ignore if unbounded
1007							if h, what = curePrimaryKey(f.High); what == "before" {
1008								skip = true
1009								break
1010							}
1011						}
1012
1013						fl := &protobuf.CompositeElementFilter{
1014							Low: l, High: h, Inclusion: proto.Uint32(uint32(f.Inclusion)),
1015						}
1016
1017						filters = append(filters, fl)
1018					}
1019
1020					if skip {
1021						continue
1022					}
1023				}
1024			}
1025			s := &protobuf.Scan{
1026				Filters: filters,
1027				Equals:  equals,
1028			}
1029			protoScans = append(protoScans, s)
1030		}
1031	}
1032
1033	if len(protoScans) == 0 {
1034		return 0, nil
1035	}
1036
1037	partnIds := make([]uint64, len(partitions))
1038	for i, partnId := range partitions {
1039		partnIds[i] = uint64(partnId)
1040	}
1041
1042	req := &protobuf.CountRequest{
1043		DefnID:    proto.Uint64(defnID),
1044		RequestId: proto.String(requestId),
1045		Span: &protobuf.Span{
1046			Range: nil,
1047		},
1048		Distinct:     proto.Bool(distinct),
1049		Scans:        protoScans,
1050		Cons:         proto.Uint32(uint32(cons)),
1051		RollbackTime: proto.Int64(rollbackTime),
1052		PartitionIds: partnIds,
1053	}
1054
1055	if vector != nil {
1056		req.Vector = protobuf.NewTsConsistency(
1057			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
1058	}
1059
1060	resp, err := c.doRequestResponse(req, requestId)
1061	if err != nil {
1062		return 0, err
1063	}
1064	countResp := resp.(*protobuf.CountResponse)
1065	if countResp.GetErr() != nil {
1066		err = errors.New(countResp.GetErr().GetError())
1067		return 0, err
1068	}
1069	return countResp.GetCount(), nil
1070}
1071
1072func (c *GsiScanClient) Scan3(
1073	defnID uint64, requestId string, scans Scans,
1074	reverse, distinct bool, projection *IndexProjection, offset, limit int64,
1075	groupAggr *GroupAggr, sorted bool,
1076	cons common.Consistency, vector *TsConsistency,
1077	callb ResponseHandler, rollbackTime int64, partitions []common.PartitionId) (error, bool) {
1078
1079	// serialize scans
1080	protoScans := make([]*protobuf.Scan, len(scans))
1081	for i, scan := range scans {
1082		if scan != nil {
1083			var equals [][]byte
1084			var filters []*protobuf.CompositeElementFilter
1085
1086			// If Seek is there, then do not marshall Range
1087			if len(scan.Seek) > 0 {
1088				equals = make([][]byte, len(scan.Seek))
1089				for i, seek := range scan.Seek {
1090					s, err := json.Marshal(seek)
1091					if err != nil {
1092						return err, false
1093					}
1094					equals[i] = s
1095				}
1096			} else {
1097				filters = make([]*protobuf.CompositeElementFilter, len(scan.Filter))
1098				if scan.Filter != nil {
1099					for j, f := range scan.Filter {
1100						var l, h []byte
1101						var err error
1102						if f.Low != common.MinUnbounded { // Do not encode if unbounded
1103							l, err = json.Marshal(f.Low)
1104							if err != nil {
1105								return err, false
1106							}
1107						}
1108						if f.High != common.MaxUnbounded { // Do not encode if unbounded
1109							h, err = json.Marshal(f.High)
1110							if err != nil {
1111								return err, false
1112							}
1113						}
1114
1115						fl := &protobuf.CompositeElementFilter{
1116							Low: l, High: h, Inclusion: proto.Uint32(uint32(f.Inclusion)),
1117						}
1118
1119						filters[j] = fl
1120					}
1121				}
1122			}
1123			s := &protobuf.Scan{
1124				Filters: filters,
1125				Equals:  equals,
1126			}
1127			protoScans[i] = s
1128		}
1129	}
1130
1131	//IndexProjection
1132	var protoProjection *protobuf.IndexProjection
1133	if projection != nil {
1134		protoProjection = &protobuf.IndexProjection{
1135			EntryKeys:  projection.EntryKeys,
1136			PrimaryKey: proto.Bool(projection.PrimaryKey),
1137		}
1138	}
1139
1140	// Groups and Aggregates
1141	var protoGroupAggr *protobuf.GroupAggr
1142	if groupAggr != nil {
1143		// GroupKeys
1144		protoGroupKeys := make([]*protobuf.GroupKey, len(groupAggr.Group))
1145		for i, grp := range groupAggr.Group {
1146			gk := &protobuf.GroupKey{
1147				EntryKeyId: proto.Int32(grp.EntryKeyId),
1148				KeyPos:     proto.Int32(grp.KeyPos),
1149				Expr:       []byte(grp.Expr),
1150			}
1151			protoGroupKeys[i] = gk
1152		}
1153		// Aggregates
1154		protoAggregates := make([]*protobuf.Aggregate, len(groupAggr.Aggrs))
1155		for i, aggr := range groupAggr.Aggrs {
1156			ag := &protobuf.Aggregate{
1157				AggrFunc:   proto.Uint32(uint32(aggr.AggrFunc)),
1158				EntryKeyId: proto.Int32(aggr.EntryKeyId),
1159				KeyPos:     proto.Int32(aggr.KeyPos),
1160				Expr:       []byte(aggr.Expr),
1161				Distinct:   proto.Bool(aggr.Distinct),
1162			}
1163			protoAggregates[i] = ag
1164		}
1165		protoIndexKeyNames := make([][]byte, len(groupAggr.IndexKeyNames))
1166		for i, keyName := range groupAggr.IndexKeyNames {
1167			protoIndexKeyNames[i] = []byte(keyName)
1168		}
1169		protoGroupAggr = &protobuf.GroupAggr{
1170			Name:               []byte(groupAggr.Name),
1171			GroupKeys:          protoGroupKeys,
1172			Aggrs:              protoAggregates,
1173			DependsOnIndexKeys: groupAggr.DependsOnIndexKeys,
1174			IndexKeyNames:      protoIndexKeyNames,
1175			AllowPartialAggr:   proto.Bool(groupAggr.AllowPartialAggr),
1176			OnePerPrimaryKey:   proto.Bool(groupAggr.OnePerPrimaryKey),
1177		}
1178	}
1179
1180	connectn, err := c.pool.Get()
1181	if err != nil {
1182		return err, false
1183	}
1184	healthy := true
1185	closeStream := false
1186	conn, pkt := connectn.conn, connectn.pkt
1187	defer func() {
1188		go func() {
1189			if closeStream {
1190				_, healthy = c.closeStream(conn, pkt, requestId)
1191			}
1192			c.pool.Return(connectn, healthy)
1193		}()
1194	}()
1195
1196	partnIds := make([]uint64, len(partitions))
1197	for i, partnId := range partitions {
1198		partnIds[i] = uint64(partnId)
1199	}
1200
1201	req := &protobuf.ScanRequest{
1202		DefnID: proto.Uint64(defnID),
1203		Span: &protobuf.Span{
1204			Range: nil,
1205		},
1206		RequestId:       proto.String(requestId),
1207		Distinct:        proto.Bool(distinct),
1208		Limit:           proto.Int64(limit),
1209		Cons:            proto.Uint32(uint32(cons)),
1210		Scans:           protoScans,
1211		Indexprojection: protoProjection,
1212		Reverse:         proto.Bool(reverse),
1213		Offset:          proto.Int64(offset),
1214		RollbackTime:    proto.Int64(rollbackTime),
1215		PartitionIds:    partnIds,
1216		GroupAggr:       protoGroupAggr,
1217		Sorted:          proto.Bool(sorted),
1218	}
1219	if vector != nil {
1220		req.Vector = protobuf.NewTsConsistency(
1221			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
1222	}
1223	// ---> protobuf.ScanRequest
1224	if err := c.sendRequest(conn, pkt, req); err != nil {
1225		fmsg := "%v Range(%v) request transport failed `%v`\n"
1226		logging.Errorf(fmsg, c.logPrefix, requestId, err)
1227		healthy = false
1228		return err, false
1229	}
1230
1231	cont, partial := true, false
1232	for cont {
1233		// <--- protobuf.ResponseStream
1234		cont, healthy, err, closeStream = c.streamResponse(conn, pkt, callb, requestId)
1235		if err != nil { // if err, cont should have been set to false
1236			fmsg := "%v Scans(%v) response failed `%v`\n"
1237			logging.Errorf(fmsg, c.logPrefix, requestId, err)
1238		} else { // partial succeeded
1239			partial = true
1240		}
1241	}
1242	return err, partial
1243}
1244
1245func (c *GsiScanClient) Scan3Primary(
1246	defnID uint64, requestId string, scans Scans,
1247	reverse, distinct bool, projection *IndexProjection, offset, limit int64,
1248	groupAggr *GroupAggr, sorted bool,
1249	cons common.Consistency, vector *TsConsistency,
1250	callb ResponseHandler, rollbackTime int64, partitions []common.PartitionId) (error, bool) {
1251
1252	var what string
1253	// serialize scans
1254	protoScans := make([]*protobuf.Scan, 0)
1255	for _, scan := range scans {
1256		if scan != nil {
1257			var equals [][]byte
1258			var filters []*protobuf.CompositeElementFilter
1259
1260			// If Seek is there, then ignore Range
1261			if len(scan.Seek) > 0 {
1262				var k []byte
1263				key := scan.Seek[0]
1264				if k, what = curePrimaryKey(key); what == "after" {
1265					continue
1266				}
1267				equals = [][]byte{k}
1268			} else {
1269				filters = make([]*protobuf.CompositeElementFilter, 0)
1270				skip := false
1271				if scan.Filter != nil {
1272					for _, f := range scan.Filter {
1273						var l, h []byte
1274						if f.Low != common.MinUnbounded { // Ignore if unbounded
1275							if l, what = curePrimaryKey(f.Low); what == "after" {
1276								skip = true
1277								break
1278							}
1279						}
1280						if f.High != common.MaxUnbounded { // Ignore if unbounded
1281							if h, what = curePrimaryKey(f.High); what == "before" {
1282								skip = true
1283								break
1284							}
1285						}
1286
1287						fl := &protobuf.CompositeElementFilter{
1288							Low: l, High: h, Inclusion: proto.Uint32(uint32(f.Inclusion)),
1289						}
1290
1291						filters = append(filters, fl)
1292					}
1293					if skip {
1294						continue
1295					}
1296				}
1297			}
1298			s := &protobuf.Scan{
1299				Filters: filters,
1300				Equals:  equals,
1301			}
1302			protoScans = append(protoScans, s)
1303		}
1304	}
1305
1306	if len(protoScans) == 0 {
1307		protoScans = append(protoScans, getEmptySpanForPrimary())
1308	}
1309
1310	//IndexProjection
1311	var protoProjection *protobuf.IndexProjection
1312	if projection != nil {
1313		protoProjection = &protobuf.IndexProjection{
1314			EntryKeys:  projection.EntryKeys,
1315			PrimaryKey: proto.Bool(projection.PrimaryKey),
1316		}
1317	}
1318
1319	// Groups and Aggregates
1320	var protoGroupAggr *protobuf.GroupAggr
1321	if groupAggr != nil {
1322		// GroupKeys
1323		protoGroupKeys := make([]*protobuf.GroupKey, len(groupAggr.Group))
1324		for i, grp := range groupAggr.Group {
1325			gk := &protobuf.GroupKey{
1326				EntryKeyId: proto.Int32(grp.EntryKeyId),
1327				KeyPos:     proto.Int32(grp.KeyPos),
1328				Expr:       []byte(grp.Expr),
1329			}
1330			protoGroupKeys[i] = gk
1331		}
1332		// Aggregates
1333		protoAggregates := make([]*protobuf.Aggregate, len(groupAggr.Aggrs))
1334		for i, aggr := range groupAggr.Aggrs {
1335			ag := &protobuf.Aggregate{
1336				AggrFunc:   proto.Uint32(uint32(aggr.AggrFunc)),
1337				EntryKeyId: proto.Int32(aggr.EntryKeyId),
1338				KeyPos:     proto.Int32(aggr.KeyPos),
1339				Expr:       []byte(aggr.Expr),
1340				Distinct:   proto.Bool(aggr.Distinct),
1341			}
1342			protoAggregates[i] = ag
1343		}
1344		protoIndexKeyNames := make([][]byte, len(groupAggr.IndexKeyNames))
1345		for i, keyName := range groupAggr.IndexKeyNames {
1346			protoIndexKeyNames[i] = []byte(keyName)
1347		}
1348		protoGroupAggr = &protobuf.GroupAggr{
1349			Name:               []byte(groupAggr.Name),
1350			GroupKeys:          protoGroupKeys,
1351			Aggrs:              protoAggregates,
1352			DependsOnIndexKeys: groupAggr.DependsOnIndexKeys,
1353			IndexKeyNames:      protoIndexKeyNames,
1354		}
1355	}
1356
1357	connectn, err := c.pool.Get()
1358	if err != nil {
1359		return err, false
1360	}
1361	healthy := true
1362	closeStream := false
1363	conn, pkt := connectn.conn, connectn.pkt
1364	defer func() {
1365		go func() {
1366			if closeStream {
1367				_, healthy = c.closeStream(conn, pkt, requestId)
1368			}
1369			c.pool.Return(connectn, healthy)
1370		}()
1371	}()
1372
1373	partnIds := make([]uint64, len(partitions))
1374	for i, partnId := range partitions {
1375		partnIds[i] = uint64(partnId)
1376	}
1377
1378	req := &protobuf.ScanRequest{
1379		DefnID: proto.Uint64(defnID),
1380		Span: &protobuf.Span{
1381			Range: nil,
1382		},
1383		RequestId:       proto.String(requestId),
1384		Distinct:        proto.Bool(distinct),
1385		Limit:           proto.Int64(limit),
1386		Cons:            proto.Uint32(uint32(cons)),
1387		Scans:           protoScans,
1388		Indexprojection: protoProjection,
1389		Reverse:         proto.Bool(reverse),
1390		Offset:          proto.Int64(offset),
1391		RollbackTime:    proto.Int64(rollbackTime),
1392		PartitionIds:    partnIds,
1393		GroupAggr:       protoGroupAggr,
1394		Sorted:          proto.Bool(sorted),
1395	}
1396	if vector != nil {
1397		req.Vector = protobuf.NewTsConsistency(
1398			vector.Vbnos, vector.Seqnos, vector.Vbuuids, vector.Crc64)
1399	}
1400	// ---> protobuf.ScanRequest
1401	if err := c.sendRequest(conn, pkt, req); err != nil {
1402		fmsg := "%v Range(%v) request transport failed `%v`\n"
1403		logging.Errorf(fmsg, c.logPrefix, requestId, err)
1404		healthy = false
1405		return err, false
1406	}
1407
1408	cont, partial := true, false
1409	for cont {
1410		// <--- protobuf.ResponseStream
1411		cont, healthy, err, closeStream = c.streamResponse(conn, pkt, callb, requestId)
1412		if err != nil { // if err, cont should have been set to false
1413			fmsg := "%v Scans(%v) response failed `%v`\n"
1414			logging.Errorf(fmsg, c.logPrefix, requestId, err)
1415		} else { // partial succeeded
1416			partial = true
1417		}
1418	}
1419	return err, partial
1420}
1421
1422func (c *GsiScanClient) Close() error {
1423	return c.pool.Close()
1424}
1425
1426func (c *GsiScanClient) doRequestResponse(
1427	req interface{}, requestId string) (interface{}, error) {
1428
1429	connectn, err := c.pool.Get()
1430	if err != nil {
1431		return nil, err
1432	}
1433	healthy := true
1434	defer func() { c.pool.Return(connectn, healthy) }()
1435
1436	conn, pkt := connectn.conn, connectn.pkt
1437
1438	// ---> protobuf.*Request
1439	if err := c.sendRequest(conn, pkt, req); err != nil {
1440		fmsg := "%v %T(%v) request transport failed `%v`\n"
1441		arg1 := logging.TagUD(req)
1442		logging.Errorf(fmsg, c.logPrefix, arg1, requestId, err)
1443		healthy = false
1444		return nil, err
1445	}
1446
1447	laddr := conn.LocalAddr()
1448	c.trySetDeadline(conn, c.readDeadline)
1449	// <--- protobuf.*Response
1450	resp, err := pkt.Receive(conn)
1451	if err != nil {
1452		fmsg := "%v req(%v) connection %v response %T transport failed `%v`\n"
1453		arg1 := logging.TagUD(req)
1454		logging.Errorf(fmsg, c.logPrefix, requestId, laddr, arg1, err)
1455		healthy = false
1456		return nil, err
1457	}
1458
1459	c.trySetDeadline(conn, c.readDeadline)
1460	// <--- protobuf.StreamEndResponse (skipped) TODO: knock this off.
1461	if endResp, err := pkt.Receive(conn); err != nil {
1462		fmsg := "%v req(%v) connection %v response %T transport failed `%v`\n"
1463		arg1 := logging.TagUD(req)
1464		logging.Errorf(fmsg, c.logPrefix, requestId, laddr, arg1, err)
1465		healthy = false
1466		return nil, err
1467	} else if endResp != nil {
1468		healthy = false
1469		return nil, ErrorProtocol
1470	}
1471	return resp, nil
1472}
1473
1474func (c *GsiScanClient) sendRequest(
1475	conn net.Conn, pkt *transport.TransportPacket, req interface{}) (err error) {
1476
1477	c.trySetDeadline(conn, c.writeDeadline)
1478	return pkt.Send(conn, req)
1479}
1480
1481func (c *GsiScanClient) streamResponse(
1482	conn net.Conn,
1483	pkt *transport.TransportPacket,
1484	callb ResponseHandler, requestId string) (cont bool, healthy bool, err error, closeStream bool) {
1485
1486	var resp interface{}
1487	var finish bool
1488
1489	closeStream = false
1490	laddr := conn.LocalAddr()
1491	c.trySetDeadline(conn, c.readDeadline)
1492	if resp, err = pkt.Receive(conn); err != nil {
1493		//resp := &protobuf.ResponseStream{
1494		//    Err: &protobuf.Error{Error: proto.String(err.Error())},
1495		//}
1496		//callb(resp) // callback with error
1497		cont, healthy = false, false
1498		if err == io.EOF {
1499			fmsg := "%v req(%v) connection %q closed `%v` \n"
1500			logging.Errorf(fmsg, c.logPrefix, requestId, laddr, err)
1501		} else {
1502			fmsg := "%v req(%v) connection %q response transport failed `%v`\n"
1503			logging.Errorf(fmsg, c.logPrefix, requestId, laddr, err)
1504		}
1505
1506	} else if resp == nil {
1507		finish = true
1508		fmsg := "%v req(%v) connection %q received StreamEndResponse"
1509		logging.Tracef(fmsg, c.logPrefix, requestId, laddr)
1510		callb(&protobuf.StreamEndResponse{}) // callback most likely return true
1511		cont, healthy = false, true
1512
1513	} else {
1514		streamResp := resp.(*protobuf.ResponseStream)
1515		if err = streamResp.Error(); err == nil {
1516			cont = callb(streamResp)
1517		}
1518		healthy = true
1519	}
1520
1521	if cont == false && healthy == true && finish == false {
1522		closeStream = true
1523	}
1524	return
1525}
1526
1527func (c *GsiScanClient) closeStream(
1528	conn net.Conn, pkt *transport.TransportPacket,
1529	requestId string) (err error, healthy bool) {
1530
1531	var resp interface{}
1532	laddr := conn.LocalAddr()
1533	healthy = true
1534	// request server to end the stream.
1535	err = c.sendRequest(conn, pkt, &protobuf.EndStreamRequest{})
1536	if err != nil {
1537		fmsg := "%v closeStream(%v) request transport failed `%v`\n"
1538		logging.Errorf(fmsg, c.logPrefix, requestId, err)
1539		healthy = false
1540		return
1541	}
1542	fmsg := "%v req(%v) connection %q transmitted protobuf.EndStreamRequest"
1543	logging.Tracef(fmsg, c.logPrefix, requestId, laddr)
1544
1545	// flush the connection until stream has ended.
1546	for true {
1547		c.trySetDeadline(conn, c.readDeadline)
1548		resp, err = pkt.Receive(conn)
1549		if err != nil {
1550			healthy = false
1551			if err == io.EOF {
1552				fmsg := "%v req(%v) connection %q closed `%v`\n"
1553				logging.Errorf(fmsg, c.logPrefix, requestId, laddr, err)
1554				return
1555			}
1556			fmsg := "%v req(%v) connection %q response transport failed `%v`\n"
1557			logging.Errorf(fmsg, c.logPrefix, requestId, laddr, err)
1558			return
1559
1560		} else if resp == nil { // End of stream marker
1561			return
1562		}
1563	}
1564	return
1565}
1566
1567func (c *GsiScanClient) trySetDeadline(conn net.Conn, deadline time.Duration) {
1568	if deadline > time.Duration(0) {
1569		timeoutMs := deadline * time.Millisecond
1570		conn.SetReadDeadline(time.Now().Add(timeoutMs))
1571	}
1572}
1573
1574func getEmptySpanForPrimary() *protobuf.Scan {
1575	fl := &protobuf.CompositeElementFilter{
1576		Low: []byte(""), High: []byte(""), Inclusion: proto.Uint32(uint32(0)),
1577	}
1578	return &protobuf.Scan{Filters: []*protobuf.CompositeElementFilter{fl}}
1579}
1580