1package secondaryindex
2
3import (
4	"fmt"
5	"github.com/couchbase/indexing/secondary/collatejson"
6	c "github.com/couchbase/indexing/secondary/common"
7	"github.com/couchbase/indexing/secondary/logging"
8	qc "github.com/couchbase/indexing/secondary/queryport/client"
9	nclient "github.com/couchbase/indexing/secondary/queryport/n1ql"
10	tc "github.com/couchbase/indexing/secondary/tests/framework/common"
11	"github.com/couchbase/query/datastore"
12	"github.com/couchbase/query/errors"
13	"github.com/couchbase/query/expression"
14	"github.com/couchbase/query/expression/parser"
15	"github.com/couchbase/query/value"
16	"log"
17	"strconv"
18	"time"
19)
20
21// Creates an index and waits for it to become active
22func N1QLCreateSecondaryIndex(
23	indexName, bucketName, server, whereExpr string, indexFields []string, isPrimary bool, with []byte,
24	skipIfExists bool, indexActiveTimeoutSeconds int64) error {
25
26	log.Printf("N1QLCreateSecondaryIndex :: server = %v", server)
27	nc, err := nclient.NewGSIIndexer(server, "default", bucketName)
28	logging.SetLogLevel(logging.Error)
29	requestId := "12345"
30	exprs := make(expression.Expressions, 0, len(indexFields))
31	for _, exprS := range indexFields {
32		expr, _ := parser.Parse(exprS)
33		exprs = append(exprs, expr)
34	}
35	rangeKey := exprs
36
37	_, err = nc.CreateIndex(requestId, indexName, nil, rangeKey, nil, nil)
38	if err != nil {
39		return err
40	}
41	return nil
42}
43
44func N1QLRange(indexName, bucketName, server string, low, high []interface{}, inclusion uint32,
45	distinct bool, limit int64, consistency c.Consistency, vector *qc.TsConsistency) (tc.ScanResponse, error) {
46
47	client, err := nclient.NewGSIIndexer(server, "default", bucketName)
48	if err != nil {
49		return nil, err
50	}
51	logging.SetLogLevel(logging.Error)
52	tctx := &testContext{}
53	conn, err := datastore.NewSizedIndexConnection(100000, tctx)
54	if err != nil {
55		log.Fatalf("error creating SizedIndexConnection: %v\n", err)
56	}
57
58	requestid := getrequestid()
59	index, err := client.IndexByName(indexName)
60	if err != nil {
61		return nil, err
62	}
63
64	var err1 error
65	index, err1 = WaitForIndexOnline(client, indexName, index)
66	if err1 != nil {
67		return nil, err1
68	}
69
70	var start time.Time
71	go func() {
72		l, h := skey2qkey(low), skey2qkey(high)
73		rng := datastore.Range{Low: l, High: h, Inclusion: datastore.Inclusion(inclusion)}
74		span := &datastore.Span{Seek: nil, Range: rng}
75		cons := getConsistency(consistency)
76		start = time.Now()
77		index.Scan(requestid, span, false, limit, cons, nil, conn)
78	}()
79
80	results, err2 := getresultsfromchannel(conn.EntryChannel(), index.IsPrimary(), tctx)
81	elapsed := time.Since(start)
82	tc.LogPerfStat("Range", elapsed)
83	return results, err2
84}
85
86func N1QLLookup(indexName, bucketName, server string, values []interface{},
87	distinct bool, limit int64, consistency c.Consistency, vector *qc.TsConsistency) (tc.ScanResponse, error) {
88
89	client, err := nclient.NewGSIIndexer(server, "default", bucketName)
90	if err != nil {
91		return nil, err
92	}
93	logging.SetLogLevel(logging.Error)
94	tctx := &testContext{}
95	conn, err := datastore.NewSizedIndexConnection(100000, tctx)
96	if err != nil {
97		log.Fatalf("error creating SizedIndexConnection: %v\n", err)
98	}
99
100	requestid := getrequestid()
101	index, err := client.IndexByName(indexName)
102	if err != nil {
103		return nil, err
104	}
105
106	var err1 error
107	index, err1 = WaitForIndexOnline(client, indexName, index)
108	if err1 != nil {
109		return nil, err1
110	}
111
112	var start time.Time
113	go func() {
114		l, h := skey2qkey(values), skey2qkey(values)
115		rng := datastore.Range{Low: l, High: h, Inclusion: datastore.BOTH}
116		span := &datastore.Span{Seek: nil, Range: rng}
117		cons := getConsistency(consistency)
118		start = time.Now()
119		index.Scan(requestid, span, false, limit, cons, nil, conn)
120	}()
121
122	results, err2 := getresultsfromchannel(conn.EntryChannel(), index.IsPrimary(), tctx)
123	elapsed := time.Since(start)
124	tc.LogPerfStat("Lookup", elapsed)
125	return results, err2
126}
127
128func N1QLScanAll(indexName, bucketName, server string, limit int64,
129	consistency c.Consistency, vector *qc.TsConsistency) (tc.ScanResponse, error) {
130
131	client, err := nclient.NewGSIIndexer(server, "default", bucketName)
132	if err != nil {
133		return nil, err
134	}
135	logging.SetLogLevel(logging.Error)
136	tctx := &testContext{}
137	conn, err := datastore.NewSizedIndexConnection(100000, tctx)
138	if err != nil {
139		log.Fatalf("error creating SizedIndexConnection: %v\n", err)
140	}
141
142	requestid := getrequestid()
143	index, err := client.IndexByName(indexName)
144	if err != nil {
145		return nil, err
146	}
147
148	var err1 error
149	index, err1 = WaitForIndexOnline(client, indexName, index)
150	if err1 != nil {
151		return nil, err1
152	}
153
154	var start time.Time
155	go func() {
156		rng := datastore.Range{Low: nil, High: nil, Inclusion: datastore.BOTH}
157		span := &datastore.Span{Seek: nil, Range: rng}
158		cons := getConsistency(consistency)
159		start = time.Now()
160		index.Scan(requestid, span, false, limit, cons, nil, conn)
161	}()
162
163	results, err2 := getresultsfromchannel(conn.EntryChannel(), index.IsPrimary(), tctx)
164	elapsed := time.Since(start)
165	tc.LogPerfStat("ScanAll", elapsed)
166	return results, err2
167}
168
169func N1QLScans(indexName, bucketName, server string, scans qc.Scans, reverse, distinct bool,
170	projection *qc.IndexProjection, offset, limit int64,
171	consistency c.Consistency, vector *qc.TsConsistency) (tc.ScanResponse, error) {
172
173	client, err := nclient.NewGSIIndexer(server, "default", bucketName)
174	if err != nil {
175		return nil, err
176	}
177	logging.SetLogLevel(logging.Error)
178	tctx := &testContext{}
179	conn, err := datastore.NewSizedIndexConnection(100000, tctx)
180	if err != nil {
181		log.Fatalf("error creating SizedIndexConnection: %v\n", err)
182	}
183	requestid := getrequestid()
184	index, err := client.IndexByName(indexName)
185
186	var err1 error
187	index, err1 = WaitForIndexOnline(client, indexName, index)
188	if err1 != nil {
189		return nil, err1
190	}
191
192	index2, useScan2 := index.(datastore.Index2)
193	if err != nil {
194		return nil, err
195	}
196
197	var start time.Time
198	go func() {
199		spans2 := make(datastore.Spans2, len(scans))
200		for i, scan := range scans {
201			spans2[i] = &datastore.Span2{}
202			if len(scan.Seek) != 0 {
203				spans2[i].Seek = skey2qkey(scan.Seek)
204			}
205			spans2[i].Ranges = filtertoranges2(scan.Filter)
206		}
207
208		proj := projectionton1ql(projection)
209		cons := getConsistency(consistency)
210		ordered := true
211		if useScan2 {
212			start = time.Now()
213			// TODO: pass the vector instead of nil.
214			// Currently go tests do not pass timestamp vector
215			index2.Scan2(requestid, spans2, reverse, distinct, ordered, proj,
216				offset, limit, cons, nil, conn)
217		} else {
218			log.Fatalf("Indexer does not support Index2 API. Cannot call Scan2 method.")
219		}
220	}()
221
222	results, err2 := getresultsfromchannel(conn.EntryChannel(), index.IsPrimary(), tctx)
223	elapsed := time.Since(start)
224	tc.LogPerfStat("MultiScan", elapsed)
225	return results, err2
226}
227
228func N1QLMultiScanCount(indexName, bucketName, server string, scans qc.Scans, distinct bool,
229	consistency c.Consistency, vector *qc.TsConsistency) (int64, error) {
230	var count int64
231	client, err := nclient.NewGSIIndexer(server, "default", bucketName)
232	if err != nil {
233		return 0, err
234	}
235	logging.SetLogLevel(logging.Error)
236
237	requestid := getrequestid()
238	index, err := client.IndexByName(indexName)
239
240	var err1 error
241	index, err1 = WaitForIndexOnline(client, indexName, index)
242	if err1 != nil {
243		return 0, err1
244	}
245
246	index2, useScan2 := index.(datastore.CountIndex2)
247	if err != nil {
248		return 0, err
249	}
250
251	var start time.Time
252	spans2 := make(datastore.Spans2, len(scans))
253	for i, scan := range scans {
254		spans2[i] = &datastore.Span2{}
255		if len(scan.Seek) != 0 {
256			spans2[i].Seek = skey2qkey(scan.Seek)
257		}
258		spans2[i].Ranges = filtertoranges2(scan.Filter)
259	}
260
261	cons := getConsistency(consistency)
262	if useScan2 {
263		start = time.Now()
264		if distinct {
265			// TODO: pass the vector instead of nil.
266			// Currently go tests do not pass timestamp vector
267			count, err = index2.CountDistinct(requestid, spans2, cons, nil)
268		} else {
269			count, err = index2.Count2(requestid, spans2, cons, nil)
270		}
271	} else {
272		log.Fatalf("Indexer does not support CountIndex2 interface. Cannot call Count2 method.")
273	}
274
275	elapsed := time.Since(start)
276	tc.LogPerfStat("MultiScanCount", elapsed)
277	return count, err
278}
279
280func N1QLScan3(indexName, bucketName, server string, scans qc.Scans, reverse, distinct bool,
281	projection *qc.IndexProjection, offset, limit int64, groupAggr *qc.GroupAggr,
282	consistency c.Consistency, vector *qc.TsConsistency) (tc.ScanResponse, tc.GroupAggrScanResponse, error) {
283
284	client, err := nclient.NewGSIIndexer(server, "default", bucketName)
285	if err != nil {
286		return nil, nil, err
287	}
288	logging.SetLogLevel(logging.Error)
289
290	tctx := &testContext{}
291	conn, err := datastore.NewSizedIndexConnection(100000, tctx)
292	if err != nil {
293		log.Fatalf("error creating SizedIndexConnection: %v\n", err)
294	}
295	requestid := getrequestid()
296	index, err := client.IndexByName(indexName)
297
298	var err1 error
299	index, err1 = WaitForIndexOnline(client, indexName, index)
300	if err1 != nil {
301		return nil, nil, err1
302	}
303
304	index3, useScan3 := index.(datastore.Index3)
305	if err != nil {
306		return nil, nil, err
307	}
308
309	var start time.Time
310	go func() {
311		spans2 := make(datastore.Spans2, len(scans))
312		for i, scan := range scans {
313			spans2[i] = &datastore.Span2{}
314			if len(scan.Seek) != 0 {
315				spans2[i].Seek = skey2qkey(scan.Seek)
316			}
317			spans2[i].Ranges = filtertoranges2(scan.Filter)
318		}
319
320		proj := projectionton1ql(projection)
321		groupAggregates := groupAggrton1ql(groupAggr)
322
323		cons := getConsistency(consistency)
324
325		if useScan3 {
326			start = time.Now()
327			// TODO: pass the vector instead of nil.
328			// Currently go tests do not pass timestamp vector
329			index3.Scan3(requestid, spans2, reverse, distinct, proj,
330				offset, limit, groupAggregates, nil, cons, nil, conn)
331			// TODO: Passing nil for IndexKeyOrders
332		} else {
333			log.Fatalf("Indexer does not support Index3 API. Cannot call Scan3 method.")
334		}
335	}()
336
337	var results tc.ScanResponse
338	var garesults tc.GroupAggrScanResponse
339	var err2 error
340	if groupAggr != nil {
341		garesults = resultsforaggrgates(conn.EntryChannel())
342	} else {
343		results, err2 = getresultsfromchannel(conn.EntryChannel(), index.IsPrimary(), tctx)
344	}
345
346	elapsed := time.Since(start)
347	tc.LogPerfStat("MultiScan", elapsed)
348	return results, garesults, err2
349}
350
351func filtertoranges2(filters []*qc.CompositeElementFilter) datastore.Ranges2 {
352	if filters == nil || len(filters) == 0 {
353		return nil
354	}
355	ranges2 := make(datastore.Ranges2, len(filters))
356	for i, cef := range filters {
357		ranges2[i] = &datastore.Range2{}
358		ranges2[i].Low = interfaceton1qlvalue(cef.Low)
359		ranges2[i].High = interfaceton1qlvalue(cef.High)
360		ranges2[i].Inclusion = datastore.Inclusion(cef.Inclusion)
361	}
362
363	return ranges2
364}
365
366func groupAggrton1ql(groupAggs *qc.GroupAggr) *datastore.IndexGroupAggregates {
367	if groupAggs == nil {
368		return nil
369	}
370
371	//Group
372	var groups datastore.IndexGroupKeys
373	if groupAggs.Group != nil {
374		groups = make(datastore.IndexGroupKeys, len(groupAggs.Group))
375		for i, grp := range groupAggs.Group {
376			expr, _ := parser.Parse(grp.Expr)
377			g := &datastore.IndexGroupKey{
378				EntryKeyId: int(grp.EntryKeyId),
379				KeyPos:     int(grp.KeyPos),
380				Expr:       expr,
381			}
382			groups[i] = g
383		}
384	}
385
386	//Aggrs
387	var aggregates datastore.IndexAggregates
388	if groupAggs.Aggrs != nil {
389		aggregates = make(datastore.IndexAggregates, len(groupAggs.Aggrs))
390		for i, aggr := range groupAggs.Aggrs {
391			expr, _ := parser.Parse(aggr.Expr)
392			a := &datastore.IndexAggregate{
393				Operation:  gsiaggrtypeton1ql(aggr.AggrFunc),
394				EntryKeyId: int(aggr.EntryKeyId),
395				KeyPos:     int(aggr.KeyPos),
396				Expr:       expr,
397				Distinct:   aggr.Distinct,
398			}
399			aggregates[i] = a
400		}
401	}
402
403	var dependsOnIndexKeys []int
404	if groupAggs.DependsOnIndexKeys != nil {
405		dependsOnIndexKeys = make([]int, len(groupAggs.DependsOnIndexKeys))
406		for i, ikey := range groupAggs.DependsOnIndexKeys {
407			dependsOnIndexKeys[i] = int(ikey)
408		}
409	}
410
411	ga := &datastore.IndexGroupAggregates{
412		Name:               groupAggs.Name,
413		Group:              groups,
414		Aggregates:         aggregates,
415		DependsOnIndexKeys: dependsOnIndexKeys,
416		IndexKeyNames:      groupAggs.IndexKeyNames,
417	}
418
419	return ga
420}
421
422func projectionton1ql(projection *qc.IndexProjection) *datastore.IndexProjection {
423	if projection == nil {
424		return nil
425	}
426
427	entrykeys := make([]int, 0, len(projection.EntryKeys))
428	for _, ek := range projection.EntryKeys {
429		entrykeys = append(entrykeys, int(ek))
430	}
431
432	n1qlProj := &datastore.IndexProjection{
433		EntryKeys:  entrykeys,
434		PrimaryKey: projection.PrimaryKey,
435	}
436
437	return n1qlProj
438}
439
440func getrequestid() string {
441	uuid, _ := c.NewUUID()
442	return strconv.Itoa(int(uuid.Uint64()))
443}
444
445func getConsistency(consistency c.Consistency) datastore.ScanConsistency {
446	var cons datastore.ScanConsistency
447	if consistency == c.SessionConsistency {
448		cons = datastore.SCAN_PLUS
449	} else {
450		cons = datastore.UNBOUNDED
451	}
452	return cons
453}
454
455func getresultsfromchannel(ch datastore.EntryChannel, isprimary bool, tctx *testContext) (tc.ScanResponse, error) {
456
457	scanResults := make(tc.ScanResponse)
458	ok := true
459	var err error
460	for ok {
461		entry, ok := <-ch
462		if ok {
463			if isprimary {
464				scanResults[entry.PrimaryKey] = nil
465			} else {
466				scanResults[entry.PrimaryKey] = values2SKey(entry.EntryKey)
467			}
468		} else {
469			break
470		}
471	}
472
473	if tctx.err != nil {
474		err = tctx.err
475	}
476
477	return scanResults, err
478}
479
480func resultsforaggrgates(ch datastore.EntryChannel) tc.GroupAggrScanResponse {
481	scanResults := make(tc.GroupAggrScanResponse, 0)
482	ok := true
483	for ok {
484		entry, ok := <-ch
485		if ok {
486			log.Printf("Scanresult Row  %v : %v ", entry.EntryKey, entry.PrimaryKey)
487			scanResults = append(scanResults, values2SKey(entry.EntryKey))
488		} else {
489			break
490		}
491	}
492	return scanResults
493}
494
495func interfaceton1qlvalue(key interface{}) value.Value {
496	if s, ok := key.(string); ok && collatejson.MissingLiteral.Equal(s) {
497		return value.NewMissingValue()
498	} else {
499		if key == c.MinUnbounded || key == c.MaxUnbounded {
500			return nil
501		}
502		return value.NewValue(key)
503	}
504}
505
506func skey2qkey(skey c.SecondaryKey) value.Values {
507	qkey := make(value.Values, 0, len(skey))
508	for _, x := range skey {
509		qkey = append(qkey, value.NewValue(x))
510	}
511	return qkey
512}
513
514func values2SKey(vals value.Values) c.SecondaryKey {
515	if len(vals) == 0 {
516		return nil
517	}
518	skey := make(c.SecondaryKey, 0, len(vals))
519	for _, val := range []value.Value(vals) {
520		skey = append(skey, val.ActualForIndex())
521	}
522	return skey
523}
524
525func gsiaggrtypeton1ql(gsiaggr c.AggrFuncType) datastore.AggregateType {
526	switch gsiaggr {
527	case c.AGG_MIN:
528		return datastore.AGG_MIN
529	case c.AGG_MAX:
530		return datastore.AGG_MAX
531	case c.AGG_SUM:
532		return datastore.AGG_SUM
533	case c.AGG_COUNT:
534		return datastore.AGG_COUNT
535	case c.AGG_COUNTN:
536		return datastore.AGG_COUNTN
537	}
538	return datastore.AGG_COUNT
539}
540
541type testContext struct {
542	err error
543}
544
545func (ctxt *testContext) GetScanCap() int64 {
546	return 512 // Default index scan request size
547}
548
549func (ctxt *testContext) Error(err errors.Error) {
550	fmt.Printf("Scan error: %v\n", err)
551	ctxt.err = err.Cause()
552}
553
554func (ctxt *testContext) Warning(wrn errors.Error) {
555	fmt.Printf("scan warning: %v\n", wrn)
556}
557
558func (ctxt *testContext) Fatal(fatal errors.Error) {
559	fmt.Printf("scan fatal: %v\n", fatal)
560}
561
562func (ctxt *testContext) MaxParallelism() int {
563	return 1
564}
565
566func WaitForIndexOnline(n1qlclient datastore.Indexer, indexName string, index datastore.Index) (datastore.Index, error) {
567
568	var err error
569	for i := 0; i < 30; i++ {
570		if s, _, _ := index.State(); s == datastore.ONLINE {
571			return index, nil
572		}
573
574		time.Sleep(1 * time.Second)
575		if err := n1qlclient.Refresh(); err != nil {
576			return nil, err
577		}
578
579		index, err = n1qlclient.IndexByName(indexName)
580		if err != nil {
581			return nil, err
582		}
583	}
584
585	return nil, fmt.Errorf("index %v fails to come online after 30s", indexName)
586}
587