1// Copyright (c) 2014 Couchbase, Inc.
2// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
3// except in compliance with the License. You may obtain a copy of the License at
4//   http://www.apache.org/licenses/LICENSE-2.0
5// Unless required by applicable law or agreed to in writing, software distributed under the
6// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7// either express or implied. See the License for the specific language governing permissions
8// and limitations under the License.
9package client
10
11import (
12	"bytes"
13	"errors"
14	"fmt"
15	"github.com/couchbase/indexing/secondary/collatejson"
16	"github.com/couchbase/indexing/secondary/common"
17	"github.com/couchbase/indexing/secondary/logging"
18	"github.com/couchbase/query/value"
19	"math"
20	"reflect"
21	"strings"
22	//"runtime"
23	"encoding/json"
24	"github.com/couchbase/query/expression"
25	"github.com/couchbase/query/expression/parser"
26	qvalue "github.com/couchbase/query/value"
27	"os"
28	"sort"
29	"sync"
30	"sync/atomic"
31	"time"
32)
33
34//--------------------------
35// constant
36//--------------------------
37
38var (
39	NotMyPartition = "Not my partition"
40)
41
42//--------------------------
43// request broker
44//--------------------------
45
46type RequestBroker struct {
47	// callback
48	scan    ScanRequestHandler
49	count   CountRequestHandler
50	factory ResponseHandlerFactory
51	sender  ResponseSender
52	timer   ResponseTimer
53
54	// initialization
55	requestId   string
56	size        int64
57	concurrency int
58
59	// scatter/gather
60	queues   []*Queue
61	notifych chan bool
62	closed   int32
63	killch   chan bool
64	bGather  bool
65	errMap   map[common.PartitionId]map[uint64]error
66	partial  int32
67	mutex    sync.Mutex
68
69	//backfill
70	backfills []*os.File
71
72	// scan
73	defn           *common.IndexDefn
74	limit          int64
75	offset         int64
76	sorted         bool
77	pushdownLimit  int64
78	pushdownOffset int64
79	pushdownSorted bool
80	scans          Scans
81	grpAggr        *GroupAggr
82	projections    *IndexProjection
83	indexOrder     *IndexKeyOrder
84	projDesc       []bool
85	distinct       bool
86
87	// stats
88	sendCount    int64
89	receiveCount int64
90	numIndexers  int64
91}
92
93type doneStatus struct {
94	err     error
95	partial bool
96}
97
98const (
99	NoPick = -1
100	Done   = -2
101)
102
103const (
104	MetaIdPos = -1
105)
106
107//
108// New Request Broker
109//
110func NewRequestBroker(requestId string, size int64, concurrency int) *RequestBroker {
111
112	return &RequestBroker{
113		requestId:      requestId,
114		size:           size,
115		concurrency:    concurrency,
116		sorted:         true,
117		pushdownSorted: true,
118		limit:          math.MaxInt64,
119		pushdownLimit:  math.MaxInt64,
120		errMap:         make(map[common.PartitionId]map[uint64]error),
121	}
122}
123
124//
125// Set ResponseHandlerFactory
126//
127func (b *RequestBroker) SetResponseHandlerFactory(factory ResponseHandlerFactory) {
128
129	b.factory = factory
130}
131
132//
133// Set ScanRequestHandler
134//
135func (b *RequestBroker) SetScanRequestHandler(handler ScanRequestHandler) {
136
137	b.scan = handler
138}
139
140//
141// Set CountRequestHandler
142//
143func (b *RequestBroker) SetCountRequestHandler(handler CountRequestHandler) {
144
145	b.count = handler
146}
147
148//
149// Set ResponseSender
150//
151func (b *RequestBroker) SetResponseSender(sender ResponseSender) {
152
153	b.sender = sender
154}
155
156//
157// Set ResponseTimer
158//
159func (b *RequestBroker) SetResponseTimer(timer ResponseTimer) {
160
161	b.timer = timer
162}
163
164//
165// Set Limit
166//
167func (b *RequestBroker) SetLimit(limit int64) {
168
169	b.limit = limit
170	b.pushdownLimit = limit
171}
172
173//
174// Set offset
175//
176func (b *RequestBroker) SetOffset(offset int64) {
177
178	b.offset = offset
179	b.pushdownOffset = offset
180}
181
182//
183// Set Distinct
184//
185func (b *RequestBroker) SetDistinct(distinct bool) {
186
187	b.distinct = distinct
188}
189
190//
191// Set sorted
192//
193func (b *RequestBroker) SetSorted(sorted bool) {
194
195	b.sorted = sorted
196	b.pushdownSorted = sorted
197}
198
199//
200// Get Limit
201//
202func (b *RequestBroker) GetLimit() int64 {
203
204	return b.pushdownLimit
205}
206
207//
208// Get offset
209//
210func (b *RequestBroker) GetOffset() int64 {
211
212	return b.pushdownOffset
213}
214
215//
216// Get Sort
217//
218func (b *RequestBroker) GetSorted() bool {
219
220	return b.pushdownSorted
221}
222
223//
224// Set Scans
225//
226func (b *RequestBroker) SetScans(scans Scans) {
227
228	b.scans = scans
229}
230
231//
232// Set GroupAggr
233//
234func (b *RequestBroker) SetGroupAggr(grpAggr *GroupAggr) {
235
236	b.grpAggr = grpAggr
237}
238
239//
240// Set Projection
241//
242func (b *RequestBroker) SetProjection(projection *IndexProjection) {
243
244	b.projections = projection
245}
246
247//
248// Set Index Order
249//
250func (b *RequestBroker) SetIndexOrder(indexOrder *IndexKeyOrder) {
251
252	b.indexOrder = indexOrder
253}
254
255//
256// Close the broker on error
257//
258func (b *RequestBroker) Error(err error, instId uint64, partitions []common.PartitionId) {
259
260	b.mutex.Lock()
261	defer b.mutex.Unlock()
262
263	skip := partitions
264	if strings.HasPrefix(err.Error(), NotMyPartition) {
265		if offset := strings.Index(err.Error(), ":"); offset != -1 {
266			content := err.Error()[offset+1:]
267			missing := make(map[common.IndexInstId][]common.PartitionId)
268			if err1 := json.Unmarshal([]byte(content), &missing); err1 == nil {
269				if _, ok := missing[common.IndexInstId(instId)]; ok {
270					skip = missing[common.IndexInstId(instId)]
271					logging.Warnf("scan err : NotMyPartition instId %v partition %v", instId, skip)
272				}
273			} else {
274				logging.Errorf("fail to unmarshall NotMyPartition error.  Err:%v", err)
275			}
276		}
277	}
278
279	for _, partition := range skip {
280		if _, ok := b.errMap[partition]; !ok {
281			b.errMap[partition] = make(map[uint64]error)
282		}
283		logging.Debugf("request broker: scan error instId %v partition %v error %v", instId, partition, err)
284		b.errMap[partition][instId] = err
285	}
286
287	b.close()
288}
289
290func (b *RequestBroker) Partial(partial bool) {
291	partial = b.IsPartial() || partial
292	if partial {
293		atomic.StoreInt32(&b.partial, 1)
294	} else {
295		atomic.StoreInt32(&b.partial, 0)
296	}
297}
298
299//
300// Close the broker when scan is done
301//
302func (b *RequestBroker) done() {
303	b.close()
304}
305
306//
307// Close the request broker
308//
309func (b *RequestBroker) close() {
310
311	if atomic.SwapInt32(&b.closed, 1) == 0 {
312		close(b.killch)
313
314		for _, queue := range b.queues {
315			queue.Close()
316		}
317	}
318}
319
320//
321// Is the request broker closed
322//
323func (b *RequestBroker) isClose() bool {
324
325	return atomic.LoadInt32(&b.closed) == 1
326}
327
328func (b *RequestBroker) IncrementReceiveCount(count int) {
329
330	atomic.AddInt64(&b.receiveCount, int64(count))
331}
332
333func (b *RequestBroker) IncrementSendCount() {
334
335	atomic.AddInt64(&b.sendCount, 1)
336}
337
338func (b *RequestBroker) SetNumIndexers(num int) {
339
340	atomic.StoreInt64(&b.numIndexers, int64(num))
341}
342
343func (b *RequestBroker) NumIndexers() int64 {
344
345	return atomic.LoadInt64(&b.numIndexers)
346}
347
348func (c *RequestBroker) Len(id ResponseHandlerId) int64 {
349
350	if c.useGather() {
351		return c.queues[int(id)].Len()
352	}
353	return -1
354}
355
356func (c *RequestBroker) Cap(id ResponseHandlerId) int64 {
357
358	if c.useGather() {
359		return c.queues[int(id)].Cap()
360	}
361	return -1
362}
363
364func (c *RequestBroker) ReceiveCount() int64 {
365	return atomic.LoadInt64(&c.receiveCount)
366}
367
368func (c *RequestBroker) SendCount() int64 {
369	return atomic.LoadInt64(&c.sendCount)
370}
371
372func (c *RequestBroker) GetError() map[common.PartitionId]map[uint64]error {
373	c.mutex.Lock()
374	defer c.mutex.Unlock()
375
376	if len(c.errMap) == 0 {
377		return nil
378	}
379
380	result := make(map[common.PartitionId]map[uint64]error)
381	for partnId, instErrMap := range c.errMap {
382		result[partnId] = make(map[uint64]error)
383		for instId, err := range instErrMap {
384			result[partnId][instId] = err
385		}
386	}
387
388	return result
389}
390
391func (c *RequestBroker) IsPartial() bool {
392	if atomic.LoadInt32(&c.partial) == 1 {
393		return true
394	}
395	return false
396}
397
398func (c *RequestBroker) useGather() bool {
399
400	//return len(c.queues) > 0
401	return c.bGather
402}
403
404func (c *RequestBroker) AddBackfill(backfill *os.File) {
405
406	c.mutex.Lock()
407	defer c.mutex.Unlock()
408
409	c.backfills = append(c.backfills, backfill)
410}
411
412func (c *RequestBroker) GetBackfills() []*os.File {
413
414	c.mutex.Lock()
415	defer c.mutex.Unlock()
416
417	return c.backfills
418}
419
420func (b *RequestBroker) reset() {
421
422	// scatter/gather
423	b.queues = nil
424	b.notifych = nil
425	b.closed = 0
426	b.killch = make(chan bool, 1)
427	b.bGather = false
428	b.errMap = make(map[common.PartitionId]map[uint64]error)
429	b.partial = 0 // false
430
431	// backfill
432	b.backfills = nil
433
434	// stats
435	b.sendCount = 0
436	b.receiveCount = 0
437	b.numIndexers = 0
438
439	// scans
440	b.defn = nil
441	b.pushdownLimit = b.limit
442	b.pushdownOffset = b.offset
443	b.pushdownSorted = b.sorted
444	b.projDesc = nil
445}
446
447//--------------------------
448// scatter/gather
449//--------------------------
450
451//
452// Scatter requests over multiple connections
453//
454func (c *RequestBroker) scatter(clientMaker scanClientMaker, index *common.IndexDefn,
455	scanports []string, targetInstId []uint64, rollback []int64, partition [][]common.PartitionId,
456	numPartition uint32, settings *ClientSettings) (count int64,
457	err map[common.PartitionId]map[uint64]error, partial bool, refresh bool) {
458
459	defer func() {
460		logging.Debugf("scatter: requestId %v items recieved %v items processed %v", c.requestId, c.ReceiveCount(), c.SendCount())
461	}()
462
463	c.reset()
464	c.SetNumIndexers(len(partition))
465	c.defn = index
466
467	concurrency := c.concurrency
468	if concurrency < 1 {
469		concurrency = int(settings.MaxConcurrency())
470	}
471
472	var ok bool
473	var client []*GsiScanClient
474	partition = c.filterPartitions(index, partition, numPartition)
475	ok, client, targetInstId, rollback, partition = c.makeClients(clientMaker, concurrency, index, numPartition, scanports, targetInstId, rollback, partition)
476	if !ok {
477		return 0, nil, false, true
478	}
479
480	c.analyzeOrderBy(partition, numPartition, index)
481	c.analyzeProjection(partition, numPartition, index)
482	c.changePushdownParams(partition, numPartition, index)
483
484	if len(partition) == len(client) {
485		for i, partitions := range partition {
486			logging.Verbosef("scatter: requestId %v queryport %v partition %v", c.requestId, client[i].queryport, partitions)
487		}
488	}
489
490	logging.Verbosef("scatter: requestId %v limit %v offset %v sorted %v pushdown limit %v pushdown offset %v pushdown sorted %v isAggr %v",
491		c.requestId, c.limit, c.offset, c.sorted, c.pushdownLimit, c.pushdownOffset, c.pushdownSorted, c.grpAggr != nil)
492
493	if c.projections != nil {
494		logging.Debugf("scatter: requestId %v projection %v Desc %v", c.requestId, c.projections.EntryKeys, c.projDesc)
495	}
496
497	if c.scan != nil {
498		err, partial = c.scatterScan2(client, index, targetInstId, rollback, partition, numPartition, settings)
499		return 0, err, partial, false
500	} else if c.count != nil {
501		count, err, partial := c.scatterCount(client, index, targetInstId, rollback, partition, numPartition)
502		return count, err, partial, false
503	}
504
505	e := fmt.Errorf("Intenral error: Fail to process request for index %v:%v.  Unknown request handler.", index.Bucket, index.Name)
506	return 0, c.makeErrorMap(targetInstId, partition, e), false, false
507}
508
509func (c *RequestBroker) makeErrorMap(targetInstIds []uint64, partitions [][]common.PartitionId, err error) map[common.PartitionId]map[uint64]error {
510
511	errMap := make(map[common.PartitionId]map[uint64]error)
512
513	for i, partnList := range partitions {
514		for _, partition := range partnList {
515
516			if _, ok := errMap[partition]; !ok {
517				errMap[partition] = make(map[uint64]error)
518			}
519			errMap[partition][targetInstIds[i]] = err
520		}
521	}
522
523	return errMap
524}
525
526//
527// Make Scan client that are used in scans
528//
529func (c *RequestBroker) makeClients(maker scanClientMaker, max_concurrency int, index *common.IndexDefn, numPartition uint32,
530	scanports []string, targetInstIds []uint64, timestamps []int64, allPartitions [][]common.PartitionId) (bool, []*GsiScanClient,
531	[]uint64, []int64, [][]common.PartitionId) {
532
533	var newClient []*GsiScanClient
534	var newInstId []uint64
535	var newTS []int64
536	var newPartition [][]common.PartitionId
537	var newScanport []string
538
539	for i, partitions := range allPartitions {
540		if len(partitions) != 0 {
541			client := maker(scanports[i])
542			if client == nil {
543				return false, nil, nil, nil, nil
544			}
545
546			newClient = append(newClient, client)
547			newScanport = append(newScanport, scanports[i])
548			newInstId = append(newInstId, targetInstIds[i])
549			newTS = append(newTS, timestamps[i])
550			newPartition = append(newPartition, partitions)
551		}
552	}
553
554	if c.isPartitionedAggregate(index) {
555		newClient, newInstId, newTS, newPartition = c.splitClients(maker, max_concurrency, newScanport, newClient, newInstId, newTS, newPartition)
556	}
557
558	return true, newClient, newInstId, newTS, newPartition
559}
560
561//
562// split the clients until it reaches the maximum parallelism
563//
564func (c *RequestBroker) splitClients(maker scanClientMaker, max_concurrency int, scanports []string, clients []*GsiScanClient, instIds []uint64,
565	timestamps []int64, allPartitions [][]common.PartitionId) ([]*GsiScanClient, []uint64, []int64, [][]common.PartitionId) {
566
567	if len(clients) >= max_concurrency {
568		return clients, instIds, timestamps, allPartitions
569	}
570
571	max := 0
572	pos := -1
573	for i, partitions := range allPartitions {
574		if len(partitions) > max {
575			max = len(partitions)
576			pos = i
577		}
578	}
579
580	// split the clients with the most partitions
581	if pos != -1 && len(allPartitions[pos]) > 1 {
582
583		len := len(allPartitions[pos]) / 2
584
585		client := maker(scanports[pos])
586		clients = append(clients, client)
587		scanports = append(scanports, scanports[pos])
588		instIds = append(instIds, instIds[pos])
589		timestamps = append(timestamps, timestamps[pos])
590		allPartitions = append(allPartitions, allPartitions[pos][len:])
591
592		allPartitions[pos] = allPartitions[pos][0:len]
593
594		return c.splitClients(maker, max_concurrency, scanports, clients, instIds, timestamps, allPartitions)
595	}
596
597	return clients, instIds, timestamps, allPartitions
598}
599
600//
601// Scatter scan requests over multiple connections
602//
603func (c *RequestBroker) scatterScan(client []*GsiScanClient, index *common.IndexDefn, targetInstId []uint64, rollback []int64, partition [][]common.PartitionId,
604	numPartition uint32, settings *ClientSettings) (err error, partial bool) {
605
606	c.notifych = make(chan bool, 1)
607	donech_gather := make(chan bool, 1)
608
609	if len(partition) > 1 {
610		c.bGather = true
611	}
612
613	if c.useGather() {
614		c.queues = make([]*Queue, len(client))
615		size := queueSize(int(c.size), len(client), c.sorted, settings)
616		for i := 0; i < len(client); i++ {
617			c.queues[i] = NewQueue(int64(size), c.notifych)
618		}
619
620		if c.sorted {
621			go c.gather(donech_gather)
622		} else {
623			go c.forward(donech_gather)
624		}
625	}
626
627	donech_scatter := make([]chan *doneStatus, len(client))
628	for i, _ := range client {
629		donech_scatter[i] = make(chan *doneStatus, 1)
630		go c.scanSingleNode(ResponseHandlerId(i), client[i], index, targetInstId[i], rollback[i], partition[i], numPartition, donech_scatter[i])
631	}
632
633	for i, _ := range client {
634		status := <-donech_scatter[i]
635		partial = partial || status.partial
636		if status.err != nil {
637			err = errors.New(fmt.Sprintf("%v %v", err, status.err))
638		}
639	}
640
641	if c.useGather() && err == nil {
642		<-donech_gather
643	}
644
645	return
646}
647
648func (c *RequestBroker) scatterScan2(client []*GsiScanClient, index *common.IndexDefn, targetInstId []uint64, rollback []int64,
649	partition [][]common.PartitionId, numPartition uint32, settings *ClientSettings) (errMap map[common.PartitionId]map[uint64]error, partial bool) {
650
651	c.notifych = make(chan bool, 1)
652	donech_gather := make(chan bool, 1)
653
654	if len(partition) > 1 {
655		c.bGather = true
656	}
657
658	if c.useGather() {
659		c.queues = make([]*Queue, len(client))
660		size := queueSize(int(c.size), len(client), c.sorted, settings)
661		for i := 0; i < len(client); i++ {
662			c.queues[i] = NewQueue(int64(size), c.notifych)
663		}
664	}
665
666	donech_scatter := make([]chan *doneStatus, len(client))
667	for i, _ := range client {
668		donech_scatter[i] = make(chan *doneStatus, 1)
669		go c.scanSingleNode(ResponseHandlerId(i), client[i], index, targetInstId[i], rollback[i], partition[i], numPartition, donech_scatter[i])
670	}
671
672	if c.useGather() {
673		// Gather is done either
674		// 1) scan is finished (done() method is called)
675		// 2) there is an error (Error() method is called)
676		// The gather routine could exit before all scatter routines have exited
677		if c.sorted {
678			c.gather(donech_gather)
679		} else {
680			c.forward(donech_gather)
681		}
682
683		errMap = c.GetError()
684	}
685
686	// Wait for all scatter routine is done
687	for i, _ := range client {
688		<-donech_scatter[i]
689	}
690
691	errMap = c.GetError()
692	partial = c.IsPartial()
693
694	return
695}
696
697//
698// Scatter count requests over multiple connections
699//
700func (c *RequestBroker) scatterCount(client []*GsiScanClient, index *common.IndexDefn, targetInstId []uint64, rollback []int64,
701	partition [][]common.PartitionId, numPartition uint32) (count int64, err map[common.PartitionId]map[uint64]error, partial bool) {
702
703	donech := make([]chan *doneStatus, len(client))
704	for i, _ := range client {
705		donech[i] = make(chan *doneStatus, 1)
706		go c.countSingleNode(ResponseHandlerId(i), client[i], index, targetInstId[i], rollback[i], partition[i], numPartition, donech[i], &count)
707	}
708
709	for i, _ := range client {
710		status := <-donech[i]
711		partial = partial || status.partial
712	}
713
714	err = c.GetError()
715	return
716}
717
718func (c *RequestBroker) sort(rows []Row, sorted []int) bool {
719
720	size := len(c.queues)
721
722	for i := 0; i < size; i++ {
723		sorted[i] = i
724	}
725
726	for i := 0; i < size-1; i++ {
727		if !c.queues[sorted[i]].Peek(&rows[sorted[i]]) {
728			return false
729		}
730
731		for j := i + 1; j < size; j++ {
732			if !c.queues[sorted[j]].Peek(&rows[sorted[j]]) {
733				return false
734			}
735
736			if !c.defn.IsPrimary {
737
738				if rows[sorted[i]].last && !rows[sorted[j]].last ||
739					(!rows[sorted[i]].last && !rows[sorted[j]].last &&
740						c.compareKey(rows[sorted[i]].value, rows[sorted[j]].value) > 0) {
741					tmp := sorted[i]
742					sorted[i] = sorted[j]
743					sorted[j] = tmp
744				}
745
746			} else {
747
748				if rows[sorted[i]].last && !rows[sorted[j]].last ||
749					(!rows[sorted[i]].last && !rows[sorted[j]].last &&
750						c.comparePrimaryKey(rows[sorted[i]].pkey, rows[sorted[j]].pkey) > 0) {
751					tmp := sorted[i]
752					sorted[i] = sorted[j]
753					sorted[j] = tmp
754				}
755			}
756		}
757	}
758
759	return true
760}
761
762//
763// Gather results from multiple connections
764// rows - buffer of rows from each scatter gorountine
765// sorted - sorted order of the rows
766//
767func (c *RequestBroker) pick(rows []Row, sorted []int) int {
768
769	size := len(c.queues)
770
771	if !c.queues[sorted[0]].Peek(&rows[sorted[0]]) {
772		return NoPick
773	}
774
775	pos := 0
776	for i := 1; i < size; i++ {
777		if !c.queues[sorted[i]].Peek(&rows[sorted[i]]) {
778			return NoPick
779		}
780
781		if !c.defn.IsPrimary {
782
783			// last value always sorted last
784			if rows[sorted[pos]].last && !rows[sorted[i]].last ||
785				(!rows[sorted[pos]].last && !rows[sorted[i]].last &&
786					c.compareKey(rows[sorted[pos]].value, rows[sorted[i]].value) > 0) {
787
788				tmp := sorted[pos]
789				sorted[pos] = sorted[i]
790				sorted[i] = tmp
791				pos = i
792
793			} else {
794				break
795			}
796
797		} else {
798
799			// last value always sorted last
800			if rows[sorted[pos]].last && !rows[sorted[i]].last ||
801				(!rows[sorted[pos]].last && !rows[sorted[i]].last &&
802					c.comparePrimaryKey(rows[sorted[pos]].pkey, rows[sorted[i]].pkey) > 0) {
803
804				tmp := sorted[pos]
805				sorted[pos] = sorted[i]
806				sorted[i] = tmp
807				pos = i
808
809			} else {
810				break
811			}
812		}
813	}
814
815	for i := 0; i < size; i++ {
816		if !rows[sorted[i]].last {
817			return sorted[0]
818		}
819	}
820
821	return Done
822}
823
824//
825// Gather results from multiple connections with sorting.
826// The gather routine must follow CBQ processing order:
827// 1) filter (where)
828// 2) group-by
829// 3) aggregate
830// 4) projection
831// 5) order-by
832// 6) offset
833// 7) limit
834// The last 3 steps are performed by the gathering.
835//
836// This routine is called only if cbq requires sorting:
837// 1) It is not an aggregate query
838// 2) Order-by is provided
839//
840// If sorting is required, then the sort key will be in the projection
841// list, since cbq will put all keys referenced in query into the
842// projection list for non-aggregate query.    For aggregate query,
843// only aggregate and group keys will be in projection.
844//
845func (c *RequestBroker) gather(donech chan bool) {
846
847	size := len(c.queues)
848	rows := make([]Row, size)
849	sorted := make([]int, size)
850
851	defer close(donech)
852
853	// initial sort
854	isSorted := false
855	for !isSorted {
856		if c.isClose() {
857			return
858		}
859
860		isSorted = c.sort(rows, sorted)
861
862		if !isSorted {
863			select {
864			case <-c.notifych:
865				continue
866			case <-c.killch:
867				return
868			}
869		}
870	}
871
872	var curOffset int64 = 0
873	var curLimit int64 = 0
874
875	for {
876		var id int
877
878		if c.isClose() {
879			return
880		}
881
882		id = c.pick(rows, sorted)
883		if id == Done {
884			return
885		}
886
887		if id == NoPick {
888			select {
889			case <-c.notifych:
890				continue
891			case <-c.killch:
892				return
893			}
894		}
895
896		if c.queues[id].Dequeue(&rows[id]) {
897
898			// skip offset
899			if curOffset < c.offset {
900				curOffset++
901				continue
902			}
903
904			curLimit++
905			c.Partial(true)
906			if !c.sender(rows[id].pkey, rows[id].value, rows[id].skey) {
907				c.done()
908				return
909			}
910
911			// reaching limit
912			if curLimit >= c.limit {
913				c.done()
914				return
915			}
916		}
917	}
918}
919
920//
921// Gather results from multiple connections without sorting
922//
923func (c *RequestBroker) forward(donech chan bool) {
924
925	size := len(c.queues)
926	rows := make([]Row, size)
927	sorted := make([]int, size)
928
929	defer close(donech)
930
931	// initial sort (only the first row from each indexer)
932	// This is just to make sure that we have recieve at
933	// least one row before streaming response back.
934	isSorted := false
935	for !isSorted {
936		if c.isClose() {
937			return
938		}
939
940		isSorted = c.sort(rows, sorted)
941
942		if !isSorted {
943			select {
944			case <-c.notifych:
945				continue
946			case <-c.killch:
947				return
948			}
949		}
950	}
951
952	var curOffset int64 = 0
953	var curLimit int64 = 0
954
955	for {
956		if c.isClose() {
957			return
958		}
959
960		count := 0
961		found := false
962		for i := 0; i < size; i++ {
963			if c.queues[i].Peek(&rows[i]) {
964
965				if rows[i].last {
966					count++
967					continue
968				}
969
970				found = true
971
972				if c.queues[i].Dequeue(&rows[i]) {
973
974					// skip offset
975					if curOffset < c.offset {
976						curOffset++
977						continue
978					}
979
980					curLimit++
981					c.Partial(true)
982					if !c.sender(rows[i].pkey, rows[i].value, rows[i].skey) {
983						c.done()
984						return
985					}
986
987					// reaching limit
988					if curLimit >= c.limit {
989						c.done()
990						return
991					}
992				}
993			}
994		}
995
996		if count == size {
997			return
998		}
999
1000		if !found {
1001			select {
1002			case <-c.notifych:
1003				continue
1004			case <-c.killch:
1005				return
1006			}
1007		}
1008	}
1009}
1010
1011// This function compares two set of secondart key values.
1012// Returns –int, 0 or +int depending on if key1
1013// sorts less than, equal to, or greater than key2.
1014//
1015// Each sec key is a array of Value, ordered based on
1016// how they are defined in the index.  In addition, if
1017// the key is in order-by, cbq will put it in the
1018// projection list (even if the user does not ask for it).
1019//
1020func (c *RequestBroker) compareKey(key1, key2 []value.Value) int {
1021
1022	ln := len(key1)
1023	if len(key2) < ln {
1024		ln = len(key2)
1025	}
1026
1027	for i := 0; i < ln; i++ {
1028
1029		if r := key1[i].Collate(key2[i]); r != 0 {
1030
1031			// default: ascending
1032			if i >= len(c.projDesc) {
1033				return r
1034			}
1035
1036			// asecending
1037			if !c.projDesc[i] {
1038				return r
1039			}
1040
1041			// descending
1042			return 0 - r
1043		}
1044	}
1045
1046	return len(key1) - len(key2)
1047}
1048
1049// This function compares the primary key.
1050// Returns –int, 0 or +int depending on if key1
1051// sorts less than, equal to, or greater than key2.
1052func (c *RequestBroker) comparePrimaryKey(k1 []byte, k2 []byte) int {
1053
1054	return bytes.Compare(k1, k2)
1055}
1056
1057//
1058// This function makes a scan request through a single connection.
1059//
1060func (c *RequestBroker) scanSingleNode(id ResponseHandlerId, client *GsiScanClient, index *common.IndexDefn, instId uint64,
1061	rollback int64, partition []common.PartitionId, numPartition uint32, donech chan *doneStatus) {
1062
1063	if len(partition) == 0 {
1064		if c.useGather() {
1065			var r Row
1066			r.last = true
1067			c.queues[int(id)].Enqueue(&r)
1068		}
1069		donech <- &doneStatus{err: nil, partial: false}
1070		return
1071	}
1072
1073	begin := time.Now()
1074	err, partial := c.scan(client, index, rollback, partition, c.factory(id, instId, partition))
1075	if err != nil {
1076		// If there is any error, then stop the broker.
1077		// This will force other go-routine to terminate.
1078
1079		c.Partial(partial)
1080		c.Error(err, instId, partition)
1081
1082	} else {
1083		// update timer if there is no error
1084		elapsed := float64(time.Since(begin))
1085		if c.timer != nil {
1086			for _, partitionId := range partition {
1087				c.timer(instId, partitionId, elapsed)
1088			}
1089		}
1090	}
1091
1092	donech <- &doneStatus{err: err, partial: partial}
1093}
1094
1095//
1096// This function makes a count request through a single connection.
1097//
1098func (c *RequestBroker) countSingleNode(id ResponseHandlerId, client *GsiScanClient, index *common.IndexDefn, instId uint64, rollback int64,
1099	partition []common.PartitionId, numPartition uint32, donech chan *doneStatus, count *int64) {
1100
1101	if len(partition) == 0 {
1102		donech <- &doneStatus{err: nil, partial: false}
1103		return
1104	}
1105
1106	cnt, err, partial := c.count(client, index, rollback, partition)
1107	if err != nil {
1108		// If there is any error, then stop the broker.
1109		// This will force other go-routine to terminate.
1110		c.Partial(partial)
1111		c.Error(err, instId, partition)
1112	}
1113
1114	if err == nil && !partial {
1115		atomic.AddInt64(count, cnt)
1116	}
1117
1118	donech <- &doneStatus{err: err, partial: partial}
1119}
1120
1121//
1122// When a response is received from a connection, the response will first be passed to the caller so the caller
1123// has a chance to handle the rows first (e.g. backfill).    The caller will then forward the rows back to the
1124// broker for additional processing (e.g. gather).
1125//
1126func (c *RequestBroker) SendEntries(id ResponseHandlerId, pkeys [][]byte, skeys []common.SecondaryKey) bool {
1127
1128	// StreamEndResponse has nil pkeys and nil skeys
1129	if len(pkeys) == 0 && len(skeys) == 0 {
1130		if c.useGather() {
1131			var r Row
1132			r.last = true
1133			c.queues[int(id)].Enqueue(&r)
1134		}
1135		return false
1136	}
1137
1138	if c.isClose() {
1139		return false
1140	}
1141
1142	for i, skey := range skeys {
1143
1144		if c.useGather() {
1145			var vals []value.Value
1146			if c.sorted {
1147				vals = make([]value.Value, len(skey))
1148				for j := 0; j < len(skey); j++ {
1149					if s, ok := skey[j].(string); ok && collatejson.MissingLiteral.Equal(s) {
1150						vals[j] = value.NewMissingValue()
1151					} else {
1152						vals[j] = value.NewValue(skey[j])
1153					}
1154				}
1155			}
1156
1157			var r Row
1158			r.pkey = pkeys[i]
1159			r.value = vals
1160			r.skey = skey
1161			c.queues[int(id)].Enqueue(&r)
1162		} else {
1163
1164			c.Partial(true)
1165			if !c.sender(pkeys[i], nil, skey) {
1166				c.done()
1167				return false
1168			}
1169		}
1170	}
1171
1172	if c.useGather() {
1173		c.queues[int(id)].NotifyEnq()
1174	}
1175
1176	return !c.isClose()
1177}
1178
1179//--------------------------
1180// default request broker
1181//--------------------------
1182
1183type bypassResponseReader struct {
1184	pkey []byte
1185	skey common.SecondaryKey
1186}
1187
1188func (d *bypassResponseReader) GetEntries() ([]common.SecondaryKey, [][]byte, error) {
1189	return []common.SecondaryKey{d.skey}, [][]byte{d.pkey}, nil
1190}
1191
1192func (d *bypassResponseReader) Error() error {
1193	return nil
1194}
1195
1196func makeDefaultRequestBroker(cb ResponseHandler) *RequestBroker {
1197
1198	broker := NewRequestBroker("", 256, -1)
1199
1200	factory := func(id ResponseHandlerId, instId uint64, partitions []common.PartitionId) ResponseHandler {
1201		return makeDefaultResponseHandler(id, broker, instId, partitions)
1202	}
1203
1204	sender := func(pkey []byte, mskey []value.Value, uskey common.SecondaryKey) bool {
1205		broker.IncrementSendCount()
1206		if cb != nil {
1207			var reader bypassResponseReader
1208			reader.pkey = pkey
1209			reader.skey = uskey
1210			return cb(&reader)
1211		}
1212		return true
1213	}
1214
1215	broker.SetResponseHandlerFactory(factory)
1216	broker.SetResponseSender(sender)
1217
1218	return broker
1219}
1220
1221func makeDefaultResponseHandler(id ResponseHandlerId, broker *RequestBroker, instId uint64, partitions []common.PartitionId) ResponseHandler {
1222
1223	handler := func(resp ResponseReader) bool {
1224		err := resp.Error()
1225		if err != nil {
1226			logging.Errorf("defaultResponseHandler: %v", err)
1227			broker.Error(err, instId, partitions)
1228			return false
1229		}
1230		skeys, pkeys, err := resp.GetEntries()
1231		if err != nil {
1232			logging.Errorf("defaultResponseHandler: %v", err)
1233			broker.Error(err, instId, partitions)
1234			return false
1235		}
1236		if len(pkeys) != 0 || len(skeys) != 0 {
1237			if len(pkeys) != 0 {
1238				broker.IncrementReceiveCount(len(pkeys))
1239			} else {
1240				broker.IncrementReceiveCount(len(skeys))
1241			}
1242		}
1243		return broker.SendEntries(id, pkeys, skeys)
1244	}
1245
1246	return handler
1247}
1248
1249//--------------------------
1250// Partition Elimination
1251//--------------------------
1252
1253//
1254// Filter partitions based on index partiton key
1255//
1256func (c *RequestBroker) filterPartitions(index *common.IndexDefn, partitions [][]common.PartitionId, numPartition uint32) [][]common.PartitionId {
1257
1258	if numPartition == 1 || (len(partitions) == 1 && len(partitions[0]) == 1) {
1259		return partitions
1260	}
1261
1262	partitionKeyPos := partitionKeyPos(index)
1263	if len(partitionKeyPos) == 0 {
1264		return partitions
1265	}
1266
1267	partitionKeyValues := partitionKeyValues(c.requestId, partitionKeyPos, c.scans)
1268	if len(partitionKeyValues) == 0 {
1269		return partitions
1270	}
1271
1272	filter := partitionKeyHash(partitionKeyValues, c.scans, numPartition, index.HashScheme)
1273	if len(filter) == 0 {
1274		return partitions
1275	}
1276
1277	return filterPartitionIds(partitions, filter)
1278}
1279
1280//
1281// Find out the position of parition key in the index key list
1282// If there is any partition key cannot be found in the index key list,
1283// this function returns nil
1284//
1285func partitionKeyPos(defn *common.IndexDefn) []int {
1286
1287	if defn.PartitionScheme == common.SINGLE {
1288		return nil
1289	}
1290
1291	var pos []int
1292	secExprs := make(expression.Expressions, 0, len(defn.SecExprs))
1293	for _, key := range defn.SecExprs {
1294		expr, err := parser.Parse(key)
1295		if err != nil {
1296			logging.Errorf("Fail to parse secondary key", logging.TagUD(key))
1297			return nil
1298		}
1299		secExprs = append(secExprs, expr)
1300	}
1301
1302	partnExprs := make(expression.Expressions, 0, len(defn.PartitionKeys))
1303	for _, key := range defn.PartitionKeys {
1304		expr, err := parser.Parse(key)
1305		if err != nil {
1306			logging.Errorf("Fail to parse partition key", logging.TagUD(key))
1307			return nil
1308		}
1309		partnExprs = append(partnExprs, expr)
1310	}
1311
1312	if !defn.IsPrimary {
1313		for _, partnExpr := range partnExprs {
1314			for i, secExpr := range secExprs {
1315
1316				if partnExpr.EquivalentTo(secExpr) {
1317					pos = append(pos, i)
1318					break
1319				}
1320			}
1321		}
1322	} else {
1323		id := expression.NewField(expression.NewMeta(), expression.NewFieldName("id", false))
1324		idself := expression.NewField(expression.NewMeta(expression.NewIdentifier("self")), expression.NewFieldName("id", false))
1325
1326		// for primary index, it can be partitioned on an expression based on metaId.  For scan, n1ql only push down if
1327		// span is metaId (not expr on metaId).   So if partition key is not metaId, then there is no partition elimination.
1328		if len(partnExprs) == 1 && (partnExprs[0].EquivalentTo(id) || partnExprs[0].EquivalentTo(idself)) {
1329			pos = append(pos, MetaIdPos)
1330		}
1331	}
1332
1333	if len(pos) != len(defn.PartitionKeys) {
1334		return nil
1335	}
1336
1337	return pos
1338}
1339
1340//
1341// Extract the partition key value from each scan (AND-predicate from where clause)
1342// Scans is a OR-list of AND-predicate
1343// Each scan (AND-predicate) has a list of filters, with each filter being a operator on a index key
1344// The filters in the scan is sorted based on index order
1345// For partition elimination to happen, the filters must contain all the partiton keys
1346// If any scan does not have all the partition keys, then the request needs to be scatter-gather
1347//
1348func partitionKeyValues(requestId string, partnKeyPos []int, scans Scans) [][]interface{} {
1349
1350	partnKeyValues := make([][]interface{}, len(scans))
1351
1352	for scanPos, scan := range scans {
1353		if scan != nil {
1354			if len(scan.Filter) > 0 {
1355
1356				for i, filter := range scan.Filter {
1357					logging.Debugf("scatter: requestId %v filter[%v] low %v high %v", requestId, i, filter.Low, filter.High)
1358				}
1359
1360				for _, pos := range partnKeyPos {
1361
1362					if pos != MetaIdPos && pos >= len(scan.Filter) {
1363						partnKeyValues[scanPos] = nil
1364						break
1365					}
1366
1367					if pos != MetaIdPos && reflect.DeepEqual(scan.Filter[pos].Low, scan.Filter[pos].High) {
1368						partnKeyValues[scanPos] = append(partnKeyValues[scanPos], qvalue.NewValue(scan.Filter[pos].Low))
1369
1370					} else if pos == MetaIdPos &&
1371						len(scan.Filter) == 1 &&
1372						reflect.DeepEqual(scan.Filter[0].Low, scan.Filter[0].High) {
1373						// n1ql only push down span on primary key for metaId()
1374						// it will not push down on expr on metaId()
1375						partnKeyValues[scanPos] = append(partnKeyValues[scanPos], qvalue.NewValue(scan.Filter[0].Low))
1376
1377					} else {
1378						partnKeyValues[scanPos] = nil
1379						break
1380					}
1381				}
1382
1383			} else if len(scan.Seek) > 0 {
1384
1385				for i, seek := range scan.Seek {
1386					logging.Debugf("scatter: requestId %v seek[%v] value %v", requestId, i, seek)
1387				}
1388
1389				for _, pos := range partnKeyPos {
1390
1391					if pos != MetaIdPos && pos >= len(scan.Seek) {
1392						partnKeyValues[scanPos] = nil
1393						break
1394					}
1395
1396					if pos != MetaIdPos {
1397						partnKeyValues[scanPos] = append(partnKeyValues[scanPos], qvalue.NewValue(scan.Seek[pos]))
1398
1399					} else if pos == MetaIdPos &&
1400						len(scan.Filter) == 1 &&
1401						reflect.DeepEqual(scan.Filter[0].Low, scan.Filter[0].High) {
1402						// n1ql only push down span on primary key for metaId()
1403						// it will not push down on expr on metaId()
1404						partnKeyValues[scanPos] = append(partnKeyValues[scanPos], qvalue.NewValue(scan.Seek[0]))
1405
1406					} else {
1407						partnKeyValues[scanPos] = nil
1408						break
1409					}
1410				}
1411			}
1412		}
1413	}
1414
1415	return partnKeyValues
1416}
1417
1418//
1419// Generate a list of partitonId from the partition key values of each scan
1420//
1421func partitionKeyHash(partnKeyValues [][]interface{}, scans Scans, numPartition uint32, hashScheme common.HashScheme) map[common.PartitionId]bool {
1422
1423	if len(partnKeyValues) != len(scans) {
1424		return nil
1425	}
1426
1427	for i, scan := range scans {
1428		if scan != nil && len(partnKeyValues[i]) == 0 {
1429			return nil
1430		}
1431	}
1432
1433	result := make(map[common.PartitionId]bool)
1434	for _, values := range partnKeyValues {
1435
1436		v, e := qvalue.NewValue(values).MarshalJSON()
1437		if e != nil {
1438			return nil
1439		}
1440
1441		partnId := common.HashKeyPartition(v, int(numPartition), hashScheme)
1442		result[partnId] = true
1443	}
1444
1445	return result
1446}
1447
1448//
1449// Given the indexer-partitionId map, filter out the partitionId that are not used in the scans
1450//
1451func filterPartitionIds(allPartitions [][]common.PartitionId, filter map[common.PartitionId]bool) [][]common.PartitionId {
1452
1453	result := make([][]common.PartitionId, len(allPartitions))
1454	for i, partitions := range allPartitions {
1455		for _, partition := range partitions {
1456			if filter[partition] {
1457				result[i] = append(result[i], partition)
1458			}
1459		}
1460	}
1461
1462	return result
1463}
1464
1465//--------------------------
1466// API2 Push Down
1467//--------------------------
1468
1469func (c *RequestBroker) changePushdownParams(partitions [][]common.PartitionId, numPartition uint32, index *common.IndexDefn) {
1470	c.changeLimit(partitions, numPartition, index)
1471	c.changeOffset(partitions, numPartition, index)
1472	c.changeSorted(partitions, numPartition, index)
1473}
1474
1475//
1476// For aggregate query, cbq-engine will push down limit only if full aggregate results are needed.  Like range query, the limit
1477// parameter will be rewritten before pushing down to indexer.   The limit will be applied in gathered when results are returning
1478// from all indexers.
1479//
1480// For pre-aggregate result, cbq-engine will not push down limit, since limit can only apply after aggregation.
1481//
1482func (c *RequestBroker) changeLimit(partitions [][]common.PartitionId, numPartition uint32, index *common.IndexDefn) {
1483
1484	c.pushdownLimit = c.limit
1485
1486	// non-partition index
1487	if index.PartitionScheme == common.SINGLE {
1488		return
1489	}
1490
1491	// there is no limit
1492	if c.limit == math.MaxInt64 {
1493		return
1494	}
1495
1496	// there is only a single indexer involved in the scan
1497	if numPartition == 1 || len(partitions) == 1 {
1498		return
1499	}
1500
1501	// We have multiple partitions and there is a limit clause.  We need to change the limit only if
1502	// there is also offset. If there is no offset, then limit does not have to change.
1503	if c.offset == 0 {
1504		return
1505	}
1506
1507	/*
1508		// We do not have to change the limit there is exactly one partition for each scan (AND-predicate).
1509		partitionKeyPos := partitionKeyPos(index)
1510		if len(partitionKeyPos) != 0 {
1511			pushDownLimitAsIs := true
1512			partitionKeyValues := partitionKeyValues(c.requestId, partitionKeyPos, c.scans)
1513			if len(c.scans) == len(partitionKeyValues) {
1514				for i, scan := range c.scans {
1515					if scan != nil && len(partitionKeyValues[i]) == 0 {
1516						pushDownLimitAsIs = false
1517						break
1518					}
1519				}
1520
1521				// At this point, each scan is going to be mapped to one partition.
1522				// 1) the qualifying rows for each scan will only be coming a single indexer
1523				// 2) for each scan, the qualifying rows will be sorted
1524				// In this case, it is possible to push down the limit to each indexer,
1525				// without worrying about missing qualitying rows.
1526				if pushDownLimitAsIs {
1527					return
1528				}
1529			}
1530		}
1531	*/
1532
1533	// if there are multiple partitions AND there is offset, limit is offset + limit.
1534	// offset will be set back to 0 in changeOffset()
1535	c.pushdownLimit = c.offset + c.limit
1536
1537}
1538
1539//
1540// For aggregate query, cbq-engine will push down offset only if full aggregate results are needed.  Like range query, the offset
1541// parameter will be rewritten before pushing down to indexer.   The offset will be applied in gathered when results are returning
1542// from all indexers.
1543//
1544// For pre-aggregate result, cbq-engine will not push down offset, since limit can only apply after aggregation.
1545//
1546func (c *RequestBroker) changeOffset(partitions [][]common.PartitionId, numPartition uint32, index *common.IndexDefn) {
1547
1548	c.pushdownOffset = c.offset
1549
1550	// non-partition index
1551	if index.PartitionScheme == common.SINGLE {
1552		return
1553	}
1554
1555	// there is no offset
1556	if c.offset == 0 {
1557		return
1558	}
1559
1560	// there is only a single indexer involved in the scan
1561	if numPartition == 1 || len(partitions) == 1 {
1562		return
1563	}
1564
1565	/*
1566		// We do not have to change the offset there is exactly one partition for each scan (AND-predicate).
1567		partitionKeyPos := partitionKeyPos(index)
1568		if len(partitionKeyPos) != 0 {
1569			pushDownOffsetAsIs := true
1570			partitionKeyValues := partitionKeyValues(c.requestId, partitionKeyPos, c.scans)
1571			if len(c.scans) == len(partitionKeyValues) {
1572				for i, scan := range c.scans {
1573					if scan != nil && len(partitionKeyValues[i]) == 0 {
1574						pushDownOffsetAsIs = false
1575						break
1576					}
1577				}
1578
1579				// At this point, each scan is going to be mapped to one partition.
1580				// 1) the qualifying rows for each scan will only be coming a single indexer
1581				// 2) for each scan, the qualifying rows will be sorted
1582				// In this case, it is possible to push down the offset to each indexer,
1583				// without worrying about missing qualitying rows.
1584				if pushDownOffsetAsIs {
1585					return
1586				}
1587			}
1588		}
1589	*/
1590
1591	// if there is multiple partition, change offset to 0
1592	c.pushdownOffset = 0
1593
1594}
1595
1596//
1597// Determine if the results from each indexer needs to be sorted based on index key order.
1598//
1599func (c *RequestBroker) changeSorted(partitions [][]common.PartitionId, numPartition uint32, index *common.IndexDefn) {
1600
1601	c.pushdownSorted = c.sorted
1602
1603	if c.distinct {
1604		c.pushdownSorted = true
1605	}
1606
1607	// If it is an aggregate query, need to sort using index order
1608	// The indexer depends on results are sorted across partitions co-located on the same node
1609	if c.grpAggr != nil && (len(c.grpAggr.Group) != 0 || len(c.grpAggr.Aggrs) != 0) {
1610		c.pushdownSorted = true
1611	}
1612
1613}
1614
1615//--------------------------
1616// API3 push down
1617//--------------------------
1618
1619func (c *RequestBroker) isPartitionedAggregate(index *common.IndexDefn) bool {
1620
1621	// non-partition index
1622	if index.PartitionScheme == common.SINGLE {
1623		return false
1624	}
1625
1626	// aggreate query
1627	if c.grpAggr != nil {
1628		return true
1629	}
1630
1631	return false
1632}
1633
1634//
1635// For aggregate query, n1ql will expect full aggregate results for partitioned index if
1636// 1) group-by keys matches the partition keys (order is not important)
1637// 2) group keys are leading index keys
1638// 3) partition keys are leading index keys
1639//
1640func (c *RequestBroker) isPartialAggregate(partitions [][]common.PartitionId, numPartition uint32, index *common.IndexDefn) bool {
1641
1642	// non-partition index
1643	if index.PartitionScheme == common.SINGLE {
1644		return false
1645	}
1646
1647	// aggreate query
1648	if c.grpAggr != nil && len(c.grpAggr.Group) != 0 {
1649
1650		// check if partition keys are leading index keys
1651		positions := partitionKeyPos(index)
1652		if len(positions) != len(index.PartitionKeys) {
1653			return true
1654		}
1655		for i, pos := range positions {
1656			if i != pos {
1657				return true
1658			}
1659		}
1660
1661		// check if group keys are leading index keys
1662		for i, group := range c.grpAggr.Group {
1663			if int32(i) != group.KeyPos {
1664				// if group keys do not follow index key order
1665				return true
1666			}
1667		}
1668
1669		// both paritition keys and group keys are in index order.
1670		// check if group key and partition key are the same length (order is not important)
1671		if len(index.PartitionKeys) != len(c.grpAggr.Group) {
1672			return true
1673		}
1674	}
1675
1676	return false
1677}
1678
1679//
1680// We cannot sort if it is pre-aggregate result, so set sorted to false.  Otherwise, the result
1681// is sorted if there is an order-by clause.
1682//
1683// Note that cbq-engine will not push down order-by unless order by matches leading index keys.
1684// If it is a partitioned index, order-by will not push down unless it is full aggregate result.
1685//
1686// THIS FUNCTION IS NOT CALLED BECAUSE CBQ IS ALREADY HANDLING IT
1687//
1688func (c *RequestBroker) analyzeGroupBy(partitions [][]common.PartitionId, numPartition uint32, index *common.IndexDefn) {
1689
1690	// non-partition index
1691	if index.PartitionScheme == common.SINGLE {
1692		return
1693	}
1694
1695	// there is only a single indexer involved in the scan
1696	if numPartition == 1 || len(partitions) == 1 {
1697		return
1698	}
1699
1700	if c.isPartialAggregate(partitions, numPartition, index) {
1701		c.sorted = false
1702		return
1703	}
1704}
1705
1706//
1707// If sorted, then analyze the index order to see if we need to add index to projection list.
1708// If it is a non-covering index, cbq-engine will not add order-by keys to the projection list. So Gsi client
1709// will have to add the keys for sorting the scattered results.
1710//
1711func (c *RequestBroker) analyzeOrderBy(partitions [][]common.PartitionId, numPartition uint32, index *common.IndexDefn) {
1712
1713	// non-partition index
1714	if index.PartitionScheme == common.SINGLE {
1715		return
1716	}
1717
1718	// there is only a single indexer involved in the scan
1719	if numPartition == 1 || len(partitions) == 1 {
1720		return
1721	}
1722
1723	// no need to sort
1724	if !c.sorted {
1725		return
1726	}
1727
1728	if c.indexOrder != nil {
1729
1730		projection := make(map[int64]bool)
1731		if c.projections != nil && len(c.projections.EntryKeys) != 0 {
1732			// Entry key can be an index key, group key expression, or aggregate expression.
1733			for _, position := range c.projections.EntryKeys {
1734				projection[position] = true
1735			}
1736		}
1737
1738		// If the order-by key is not in the projection, then add it.
1739		// Cbq-engine pushes order-by only if the order-by keys match the leading index keys.
1740		// So order-by key must be an index key.
1741		for _, order := range c.indexOrder.KeyPos {
1742			if !projection[int64(order)] {
1743				c.projections.EntryKeys = append(c.projections.EntryKeys, int64(order))
1744				projection[int64(order)] = true
1745			}
1746		}
1747	}
1748}
1749
1750//
1751// If sorted, then analyze the projection to find out the sort order (asc/desc).
1752//
1753// Cbq-engine pushes order-by only if the order-by keys match the leading index keys.  If there is an expression in the order-by clause,
1754// cbq will not push down order-by even if the expression is based on the index key.
1755//
1756// For aggregate query, the order-by keys need to match the group-key as well.  So if the group-key is based on an expression on an index key,
1757// this means the order-by key also needs to have the same expression.  In this case, cbq will not push down order-by (violates rule above).
1758// Cbq also will not allow if there is more order-by keys than group-keys.
1759//
1760func (c *RequestBroker) analyzeProjection(partitions [][]common.PartitionId, numPartition uint32, index *common.IndexDefn) {
1761
1762	// non-partition index
1763	if index.PartitionScheme == common.SINGLE {
1764		return
1765	}
1766
1767	// there is only a single indexer involved in the scan
1768	if numPartition == 1 || len(partitions) == 1 {
1769		return
1770	}
1771
1772	// no need to sort
1773	if !c.sorted {
1774		return
1775	}
1776
1777	// 1) If there is order-by, the projection list should not be empty
1778	//    since cbq will add order-by keys to projection list.
1779	// 2) order-by is pushed down only if order-by keys match leading index keys
1780	//
1781	if c.projections != nil && len(c.projections.EntryKeys) != 0 {
1782
1783		pos := make([]int, len(c.projections.EntryKeys))
1784		for i, position := range c.projections.EntryKeys {
1785			pos[i] = int(position)
1786		}
1787		sort.Ints(pos)
1788
1789		c.projDesc = make([]bool, len(c.projections.EntryKeys))
1790		for i, position := range pos {
1791			// EntryKey could be index key, group key expression, aggregate expression.
1792			// For expression,  entryKey would be an integer greater than len(index.SecKeys)
1793			if position >= 0 && position < len(index.Desc) {
1794				// The result coming from indexer are in index order.  If
1795				// an index key is not in the projection list, it will be
1796				// skipped in the returned result.
1797				c.projDesc[i] = index.Desc[position]
1798			}
1799		}
1800	}
1801}
1802
1803//--------------------------
1804// utilities
1805//--------------------------
1806
1807func queueSize(size int, partition int, sorted bool, settings *ClientSettings) int {
1808
1809	queueSize := int(settings.ScanQueueSize())
1810
1811	if queueSize == 0 {
1812		queueSize = size
1813	}
1814
1815	return queueSize
1816
1817	//FIXME
1818	/*
1819		numCpu := runtime.NumCPU()
1820
1821		if numCpu >= partition || !sorted {
1822			return queueSize
1823		}
1824
1825		ratio := partition / numCpu
1826		return ratio * queueSize
1827	*/
1828}
1829