1//  Copyright (c) 2014 Couchbase, Inc.
2//  Licensed under the Apache License, Version 2.0 (the "License");
3//  you may not use this file except in compliance with the
4//  License. You may obtain a copy of the License at
5//    http://www.apache.org/licenses/LICENSE-2.0
6//  Unless required by applicable law or agreed to in writing,
7//  software distributed under the License is distributed on an "AS
8//  IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
9//  express or implied. See the License for the specific language
10//  governing permissions and limitations under the License.
11
12package cbgt
13
14import (
15	"bytes"
16	"fmt"
17	"io"
18	"net/http"
19	"reflect"
20	"testing"
21)
22
23type ErrorOnlyFeed struct {
24	name string
25}
26
27func (t *ErrorOnlyFeed) Name() string {
28	return t.name
29}
30
31func (t *ErrorOnlyFeed) IndexName() string {
32	return t.name
33}
34
35func (t *ErrorOnlyFeed) Start() error {
36	return fmt.Errorf("ErrorOnlyFeed Start() invoked")
37}
38
39func (t *ErrorOnlyFeed) Close() error {
40	return fmt.Errorf("ErrorOnlyFeed Close() invoked")
41}
42
43func (t *ErrorOnlyFeed) Dests() map[string]Dest {
44	return nil
45}
46
47func (t *ErrorOnlyFeed) Stats(w io.Writer) error {
48	return fmt.Errorf("ErrorOnlyFeed Stats() invoked")
49}
50
51func TestParsePartitionsToVBucketIds(t *testing.T) {
52	v, err := ParsePartitionsToVBucketIds(nil)
53	if err != nil || v == nil || len(v) != 0 {
54		t.Errorf("expected empty")
55	}
56	v, err = ParsePartitionsToVBucketIds(map[string]Dest{})
57	if err != nil || v == nil || len(v) != 0 {
58		t.Errorf("expected empty")
59	}
60	v, err = ParsePartitionsToVBucketIds(map[string]Dest{"123": nil})
61	if err != nil || v == nil || len(v) != 1 {
62		t.Errorf("expected one entry")
63	}
64	if v[0] != uint16(123) {
65		t.Errorf("expected 123")
66	}
67	v, err = ParsePartitionsToVBucketIds(map[string]Dest{"!bad": nil})
68	if err == nil || v != nil {
69		t.Errorf("expected error")
70	}
71}
72
73func TestDataSourcePartitions(t *testing.T) {
74	a, err := DataSourcePartitions("a fake source type",
75		"sourceName", "sourceUUID", "sourceParams", "serverURL", nil)
76	if err == nil || a != nil {
77		t.Errorf("expected fake data source type to error")
78	}
79
80	a, err = DataSourcePartitions("couchbase",
81		"sourceName", "sourceUUID", "sourceParams", "serverURL", nil)
82	if err == nil || a != nil {
83		t.Errorf("expected couchbase source type to error on bad server url")
84	}
85
86	a, err = DataSourcePartitions("couchbase-dcp",
87		"sourceName", "sourceUUID", "sourceParams", "serverURL", nil)
88	if err == nil || a != nil {
89		t.Errorf("expected couchbase source type to error on bad server url")
90	}
91
92	a, err = DataSourcePartitions("couchbase-tap",
93		"sourceName", "sourceUUID", "sourceParams", "serverURL", nil)
94	if err == nil || a != nil {
95		t.Errorf("expected couchbase source type to error on bad server url")
96	}
97
98	a, err = DataSourcePartitions("nil",
99		"sourceName", "sourceUUID", "sourceParams", "serverURL", nil)
100	if err != nil || a != nil {
101		t.Errorf("expected nil source type to work, but have no partitions")
102	}
103
104	a, err = DataSourcePartitions("primary",
105		"sourceName", "sourceUUID", "sourceParams", "serverURL", nil)
106	if err == nil || a != nil {
107		t.Errorf("expected dest source type to error on non-json server params")
108	}
109
110	a, err = DataSourcePartitions("primary",
111		"sourceName", "sourceUUID", "", "serverURL", nil)
112	if err != nil || a == nil {
113		t.Errorf("expected dest source type to ok on empty server params")
114	}
115
116	a, err = DataSourcePartitions("primary",
117		"sourceName", "sourceUUID", "{}", "serverURL", nil)
118	if err != nil || a == nil {
119		t.Errorf("expected dest source type to ok on empty JSON server params")
120	}
121}
122
123func TestNilFeedStart(t *testing.T) {
124	f := NewNILFeed("aaa", "bbb", nil)
125	if f.Name() != "aaa" {
126		t.Errorf("expected aaa name")
127	}
128	if f.IndexName() != "bbb" {
129		t.Errorf("expected bbb index name")
130	}
131	if f.Start() != nil {
132		t.Errorf("expected NILFeed.Start() to work")
133	}
134	if f.Dests() != nil {
135		t.Errorf("expected nil dests")
136	}
137	w := bytes.NewBuffer(nil)
138	if f.Stats(w) != nil {
139		t.Errorf("expected no err on nil feed stats")
140	}
141	if w.String() != "{}" {
142		t.Errorf("expected json stats")
143	}
144	if f.Close() != nil {
145		t.Errorf("expected nil dests")
146	}
147}
148
149func TestPrimaryFeed(t *testing.T) {
150	df := NewPrimaryFeed("aaa", "bbb",
151		BasicPartitionFunc, map[string]Dest{})
152	if df.Name() != "aaa" {
153		t.Errorf("expected aaa name")
154	}
155	if df.IndexName() != "bbb" {
156		t.Errorf("expected bbb index name")
157	}
158	if df.Start() != nil {
159		t.Errorf("expected PrimaryFeed start to work")
160	}
161
162	buf := make([]byte, 0, 100)
163	err := df.Stats(bytes.NewBuffer(buf))
164	if err != nil {
165		t.Errorf("expected PrimaryFeed stats to work")
166	}
167
168	key := []byte("k")
169	seq := uint64(123)
170	val := []byte("v")
171
172	if df.DataUpdate("unknown-partition", key, seq, val,
173		0, DEST_EXTRAS_TYPE_NIL, nil) == nil {
174		t.Errorf("expected err on bad partition")
175	}
176	if df.DataDelete("unknown-partition", key, seq,
177		0, DEST_EXTRAS_TYPE_NIL, nil) == nil {
178		t.Errorf("expected err on bad partition")
179	}
180	if df.SnapshotStart("unknown-partition", seq, seq) == nil {
181		t.Errorf("expected err on bad partition")
182	}
183	if df.OpaqueSet("unknown-partition", val) == nil {
184		t.Errorf("expected err on bad partition")
185	}
186	_, _, err = df.OpaqueGet("unknown-partition")
187	if err == nil {
188		t.Errorf("expected err on bad partition")
189	}
190	if df.Rollback("unknown-partition", seq) == nil {
191		t.Errorf("expected err on bad partition")
192	}
193	if df.ConsistencyWait("unknown-partition", "unknown-partition-UUID",
194		"level", seq, nil) == nil {
195		t.Errorf("expected err on bad partition")
196	}
197	df2 := NewPrimaryFeed("", "", BasicPartitionFunc, map[string]Dest{
198		"some-partition": &TestDest{},
199	})
200	if df2.ConsistencyWait("some-partition", "some-partition-UUID",
201		"level", seq, nil) != nil {
202		t.Errorf("expected no err on some partition to TestDest")
203	}
204	_, err = df.Count(nil, nil)
205	if err == nil {
206		t.Errorf("expected err on counting a primary feed")
207	}
208	if df.Query(nil, nil, nil, nil) == nil {
209		t.Errorf("expected err on querying a primary feed")
210	}
211}
212
213func TestCBAuthParams(t *testing.T) {
214	p := &CBAuthParams{
215		AuthUser:     "au",
216		AuthPassword: "ap",
217	}
218	a, b, c := p.GetCredentials()
219	if a != "au" || b != "ap" || c != "au" {
220		t.Errorf("wrong creds")
221	}
222
223	p2 := &CBAuthParamsSasl{
224		CBAuthParams{
225			AuthUser:     "au",
226			AuthPassword: "ap",
227
228			AuthSaslUser:     "asu",
229			AuthSaslPassword: "asp",
230		},
231	}
232	a, b, c = p2.GetCredentials()
233	if a != "au" || b != "ap" || c != "au" {
234		t.Errorf("wrong creds")
235	}
236	a, b = p2.GetSaslCredentials()
237	if a != "asu" || b != "asp" {
238		t.Errorf("wrong sasl creds")
239	}
240}
241
242func TestVBucketIdToPartitionDest(t *testing.T) {
243	var dests map[string]Dest
244
245	pf_hi := func(partition string, key []byte, dests map[string]Dest) (Dest, error) {
246		if string(key) != "hi" {
247			t.Errorf("expected hi")
248		}
249		return nil, nil
250	}
251	partition, dest, err := VBucketIdToPartitionDest(pf_hi, dests, 0, []byte("hi"))
252	if err != nil || dest != nil || partition != "0" {
253		t.Errorf("expected no err, got: %v", err)
254	}
255
256	pf_bye := func(partition string, key []byte, dests map[string]Dest) (Dest, error) {
257		if string(key) != "bye" {
258			t.Errorf("expected bye")
259		}
260		return nil, nil
261	}
262	partition, dest, err = VBucketIdToPartitionDest(pf_bye, dests, 1025, []byte("bye"))
263	if err != nil || dest != nil || partition != "1025" {
264		t.Errorf("expected no err, got: %v", err)
265	}
266
267	pf_err := func(partition string, key []byte, dests map[string]Dest) (Dest, error) {
268		return nil, fmt.Errorf("whoa_err")
269	}
270	partition, dest, err = VBucketIdToPartitionDest(pf_err, dests, 123, nil)
271	if err == nil {
272		t.Errorf("expected err")
273	}
274	if partition != "" {
275		t.Errorf("expected empty string parition on err")
276	}
277	if dest != nil {
278		t.Errorf("expected nil dst on err")
279	}
280}
281
282func TestTAPFeedBasics(t *testing.T) {
283	df, err := NewTAPFeed("aaa", "bbb",
284		"url", "poolName", "bucketName", "bucketUUID", "",
285		BasicPartitionFunc, map[string]Dest{}, false)
286	if err != nil {
287		t.Errorf("expected NewTAPFeed to work")
288	}
289	if df.Name() != "aaa" {
290		t.Errorf("expected aaa name")
291	}
292	if df.IndexName() != "bbb" {
293		t.Errorf("expected bbb index name")
294	}
295	if df.Dests() == nil {
296		t.Errorf("expected some dests")
297	}
298	err = df.Stats(bytes.NewBuffer(nil))
299	if err != nil {
300		t.Errorf("expected stats to work")
301	}
302}
303
304func TestDCPFeedBasics(t *testing.T) {
305	df, err := NewDCPFeed("aaa", "bbb",
306		"url", "poolName", "bucketName", "bucketUUID", "",
307		BasicPartitionFunc, map[string]Dest{}, false, nil)
308	if err != nil {
309		t.Errorf("expected NewDCPFeed to work")
310	}
311	if df.Name() != "aaa" {
312		t.Errorf("expected aaa name")
313	}
314	if df.IndexName() != "bbb" {
315		t.Errorf("expected bbb index name")
316	}
317	if df.Dests() == nil {
318		t.Errorf("expected some dests")
319	}
320	err = df.Stats(bytes.NewBuffer(nil))
321	if err != nil {
322		t.Errorf("expected stats to work")
323	}
324}
325
326func TestCouchbaseParseSourceName(t *testing.T) {
327	s, p, b := CouchbaseParseSourceName("s", "p", "b")
328	if s != "s" ||
329		p != "p" ||
330		b != "b" {
331		t.Errorf("expected s, p, b")
332	}
333
334	badURL := "http://a/badURL"
335	s, p, b = CouchbaseParseSourceName("s", "p", badURL)
336	if s != "s" ||
337		p != "p" ||
338		b != badURL {
339		t.Errorf("expected s, p, badURL: %s, got: %s %s %s",
340			badURL, s, p, b)
341	}
342
343	badURL = "http://a:8091"
344	s, p, b = CouchbaseParseSourceName("s", "p", badURL)
345	if s != "s" ||
346		p != "p" ||
347		b != badURL {
348		t.Errorf("expected s, p, badURL: %s, got: %s %s %s",
349			badURL, s, p, b)
350	}
351
352	badURL = "http://a:8091/pools"
353	s, p, b = CouchbaseParseSourceName("s", "p", badURL)
354	if s != "s" ||
355		p != "p" ||
356		b != badURL {
357		t.Errorf("expected s, p, badURL: %s, got: %s %s %s",
358			badURL, s, p, b)
359	}
360
361	badURL = "http://a:8091/pools/default"
362	s, p, b = CouchbaseParseSourceName("s", "p", badURL)
363	if s != "s" ||
364		p != "p" ||
365		b != badURL {
366		t.Errorf("expected s, p, badURL: %s, got: %s %s %s",
367			badURL, s, p, b)
368	}
369
370	badURL = "http://a:8091/pools/default/buckets"
371	s, p, b = CouchbaseParseSourceName("s", "p", badURL)
372	if s != "s" ||
373		p != "p" ||
374		b != badURL {
375		t.Errorf("expected s, p, badURL: %s, got: %s %s %s",
376			badURL, s, p, b)
377	}
378
379	badURL = "http://a:8091/pools/default/buckets/"
380	s, p, b = CouchbaseParseSourceName("s", "p", badURL)
381	if s != "s" ||
382		p != "p" ||
383		b != badURL {
384		t.Errorf("expected s, p, badURL: %s, got: %s %s %s",
385			badURL, s, p, b)
386	}
387
388	badURL = "http://a:8091/pools//buckets/theBucket"
389	s, p, b = CouchbaseParseSourceName("s", "p", badURL)
390	if s != "s" ||
391		p != "p" ||
392		b != badURL {
393		t.Errorf("expected s, p, badURL: %s, got: %s %s %s",
394			badURL, s, p, b)
395	}
396
397	bu := "http://a:8091/pools/myPool/buckets/theBucket"
398	s, p, b = CouchbaseParseSourceName("s", "p", bu)
399	if s != "http://a:8091" ||
400		p != "myPool" ||
401		b != "theBucket" {
402		t.Errorf("expected theBucket, got: %s %s %s", s, p, b)
403	}
404
405	bu = "https://a:8091/pools/myPool/buckets/theBucket"
406	s, p, b = CouchbaseParseSourceName("s", "p", bu)
407	if s != "https://a:8091" ||
408		p != "myPool" ||
409		b != "theBucket" {
410		t.Errorf("expected theBucket, got: %s %s %s", s, p, b)
411	}
412
413	bu = "https://%"
414	s, p, b = CouchbaseParseSourceName("s", "p", bu)
415	if s != "s" ||
416		p != "p" ||
417		b != bu {
418		t.Errorf("expected spbu, got: %s %s %s", s, p, b)
419	}
420}
421
422func TestCouchbasePartitions(t *testing.T) {
423	tests := []struct {
424		sourceType, sourceName, sourceUUID, sourceParams, serverIn string
425
426		expErr        bool
427		expPartitions []string
428	}{
429		{"couchbase", "", "", "", "bar", true, nil},
430		{"couchbase", "", "", "{}", "bar", true, nil},
431		{"couchbase", "", "", `{"authUser": "default"}`, "bar", true, nil},
432		{"couchbase", "", "", `{"authUser": "baz"}`, "bar", true, nil},
433
434		{"couchbase", "foo", "", "", "bar", true, nil},
435		{"couchbase", "foo", "", "{}", "bar", true, nil},
436		{"couchbase", "foo", "", `{"authUser": "default"}`, "bar", true, nil},
437		{"couchbase", "foo", "", `{"authUser": "baz"}`, "bar", true, nil},
438
439		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/default",
440			"", "{}", "baz", true, nil},
441		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/default",
442			"", `{"authUser": "default"}`, "baz", true, nil},
443		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/default",
444			"", `{"authUser": "baz"}`, "baz", true, nil},
445		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/default",
446			"", `{"authUser": "default"}`, "default", true, nil},
447		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/default",
448			"", `{"authUser": "baz"}`, "default", true, nil},
449
450		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/default",
451			"", `{"authUser": "default"}`,
452			"http://255.255.255.255:8091/pools/default/buckets/default", true, nil},
453
454		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/notDefault",
455			"", "{}", "baz", true, nil},
456		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/notDefault",
457			"", `{"authUser": "default"}`, "baz", true, nil},
458		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/notDefault",
459			"", `{"authUser": "baz"}`, "baz", true, nil},
460		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/notDefault",
461			"", `{"authUser": "default"}`, "default", true, nil},
462		{"couchbase", "http://255.255.255.255:8091/pools/default/buckets/notDefault",
463			"", `{"authUser": "baz"}`, "default", true, nil},
464	}
465
466	for _, test := range tests {
467		partitions, err := CouchbasePartitions(test.sourceType, test.sourceName,
468			test.sourceUUID, test.sourceParams, test.serverIn, nil)
469		if (test.expErr && err == nil) ||
470			(!test.expErr && err != nil) {
471			t.Errorf("test err != expErr,"+
472				" err: %v, test: %#v", err, test)
473		}
474
475		if !reflect.DeepEqual(partitions, test.expPartitions) {
476			t.Errorf("test partitions != expPartitions,"+
477				" partitions: %v, test: %#v", partitions, test)
478		}
479	}
480}
481
482func TestDataSourcePrepParams(t *testing.T) {
483	a, err := DataSourcePrepParams("a fake source type",
484		"sourceName", "sourceUUID", "sourceParams", "serverURL", nil)
485	if err == nil || a != "" {
486		t.Errorf("expected fake data source type to error")
487	}
488
489	a, err = DataSourcePrepParams("primary",
490		"sourceName", "sourceUUID", "", "serverURL", nil)
491	if err != nil || a != "" {
492		t.Errorf("expected empty data source params to ok")
493	}
494
495	a, err = DataSourcePrepParams("primary",
496		"sourceName", "sourceUUID", "{}", "serverURL", nil)
497	if err != nil || a != "{}" {
498		t.Errorf("expected {} data source params to ok")
499	}
500
501	saw_testFeedPartitions := 0
502	saw_testFeedPartitionSeqs := 0
503
504	testFeedPartitions := func(sourceType,
505		sourceName, sourceUUID, sourceParams,
506		serverIn string, options map[string]string,
507	) (
508		partitions []string, err error,
509	) {
510		saw_testFeedPartitions++
511		return nil, nil
512	}
513
514	testFeedPartitionSeqs := func(sourceType, sourceName, sourceUUID,
515		sourceParams, serverIn string, options map[string]string,
516	) (
517		map[string]UUIDSeq, error,
518	) {
519		saw_testFeedPartitionSeqs++
520		return nil, nil
521	}
522
523	RegisterFeedType("testFeed", &FeedType{
524		Partitions:    testFeedPartitions,
525		PartitionSeqs: testFeedPartitionSeqs,
526	})
527
528	sourceParams := `{"foo":"hoo","markPartitionSeqs":"currentPartitionSeqs"}`
529	a, err = DataSourcePrepParams("testFeed",
530		"sourceName", "sourceUUID", sourceParams, "serverURL", nil)
531	if err != nil {
532		t.Errorf("expected no err")
533	}
534	if a == sourceParams {
535		t.Errorf("expected transformed data source params")
536	}
537	if saw_testFeedPartitions != 1 {
538		t.Errorf("expected 1 saw_testFeedPartitions call, got: %d",
539			saw_testFeedPartitions)
540	}
541	if saw_testFeedPartitionSeqs != 1 {
542		t.Errorf("expected 1 saw_testFeedPartitionSeqs call, got: %d",
543			saw_testFeedPartitionSeqs)
544	}
545}
546
547func TestCouchbaseSourceVBucketLookUp(t *testing.T) {
548	req, _ := http.NewRequest("POST", "/api/index/idx/pindexLookup -uAdministrator:pwd", nil)
549	inDef := &IndexDef{Name: "idx", SourceName: "default", SourceType: "couchbase"}
550	tests := []struct {
551		documentID, serverIn string
552		inDef                *IndexDef
553		request              *http.Request
554		expErr               bool
555		expVBucketID         string
556	}{
557		{"test", "http://255.255.255.255:8091/pools/default/buckets/default", inDef, req, true, ""},
558	}
559	for _, test := range tests {
560		vBucketID, err := CouchbaseSourceVBucketLookUp(test.documentID,
561			test.serverIn, test.inDef, test.request)
562		if (test.expErr && err == nil) ||
563			(!test.expErr && err != nil) {
564			t.Errorf("test err != expErr,"+
565				" err: %v, test: %#v", err, test)
566		}
567
568		if !reflect.DeepEqual(vBucketID, test.expVBucketID) {
569			t.Errorf("test partitions != expPartitions,"+
570				" vBucketID: %v, test: %#v", vBucketID, test.expVBucketID)
571		}
572	}
573}
574