1package dataport
2
3import "testing"
4import "time"
5import "fmt"
6
7import "github.com/couchbase/indexing/secondary/logging"
8import c "github.com/couchbase/indexing/secondary/common"
9import protobuf "github.com/couchbase/indexing/secondary/protobuf/data"
10
11func TestTimeout(t *testing.T) {
12	logging.SetLogLevel(logging.Silent)
13
14	raddr := "localhost:8888"
15	maxBuckets, maxvbuckets, mutChanSize := 2, 4, 100
16
17	// start server
18	appch := make(chan interface{}, mutChanSize)
19	prefix := "indexer.dataport."
20	dconfig := c.SystemConfig.SectionConfig(prefix, true /*trim*/)
21	daemon, err := NewServer(raddr, maxvbuckets, dconfig, appch)
22	if err != nil {
23		t.Fatal(err)
24	}
25
26	// start endpoint
27	config := c.SystemConfig.SectionConfig("projector.dataport.", true /*trim*/)
28	endp, err := NewRouterEndpoint("clust", "topic", raddr, maxvbuckets, config)
29	if err != nil {
30		t.Fatal(err)
31	}
32
33	vbmaps := makeVbmaps(maxvbuckets, maxBuckets) // vbmaps
34
35	// send StreamBegin
36	for _, vbmap := range vbmaps {
37		for i := 0; i < len(vbmap.Vbuckets); i++ { // for N vbuckets
38			vbno, vbuuid := vbmap.Vbuckets[i], vbmap.Vbuuids[i]
39			kv := c.NewKeyVersions(uint64(0), []byte("Bourne"), 1)
40			kv.AddStreamBegin()
41			dkv := &c.DataportKeyVersions{
42				Bucket: vbmap.Bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv,
43			}
44			if err := endp.Send(dkv); err != nil {
45				t.Fatal(err)
46			}
47		}
48	}
49
50	go func() { // this routine will keep one connection alive
51		for i := 0; ; i++ {
52			vbmap := vbmaps[0] // keep sending sync for first vbucket alone
53			idx := i % len(vbmap.Vbuckets)
54			vbno, vbuuid := vbmap.Vbuckets[idx], vbmap.Vbuuids[idx]
55			// send sync messages
56			kv := c.NewKeyVersions(10, nil, 1)
57			kv.AddSync()
58			dkv := &c.DataportKeyVersions{
59				Bucket: vbmap.Bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv,
60			}
61			if endp.Send(dkv); err != nil {
62				t.Fatal(err)
63			}
64			<-time.After(
65				time.Duration(dconfig["tcpReadDeadline"].Int()) * time.Millisecond)
66		}
67	}()
68
69	wait := true
70	for wait {
71		select {
72		case msg := <-appch:
73			switch ce := msg.(type) {
74			case []*protobuf.VbKeyVersions:
75			case ConnectionError:
76				ref := maxvbuckets
77				t.Logf("%T %v \n", ce, ce)
78				if len(ce) != 2 {
79					t.Fatal("mismatch in ConnectionError")
80				}
81				refBuckets := map[string]bool{"default0": true, "default1": true}
82				for bucket, vbnos := range ce {
83					delete(refBuckets, bucket)
84					if len(vbnos) != ref {
85						t.Fatalf("mismatch in ConnectionError %v %v", vbnos, ref)
86					}
87				}
88				if len(refBuckets) > 0 {
89					t.Fatalf("mismatch in ConnectionError %v", refBuckets)
90				}
91				wait = false
92
93			default:
94				t.Fatalf("expected connection error %T", msg)
95			}
96		}
97	}
98
99	<-time.After(100 * time.Millisecond)
100	endp.Close()
101
102	<-time.After(100 * time.Millisecond)
103	daemon.Close()
104}
105
106func TestLoopback(t *testing.T) {
107	logging.SetLogLevel(logging.Silent)
108
109	raddr := "localhost:8888"
110	maxBuckets, maxvbuckets, mutChanSize := 2, 32, 100
111
112	// start server
113	appch := make(chan interface{}, mutChanSize)
114	prefix := "indexer.dataport."
115	config := c.SystemConfig.SectionConfig(prefix, true /*trim*/)
116	daemon, err := NewServer(raddr, maxvbuckets, config, appch)
117	if err != nil {
118		t.Fatal(err)
119	}
120
121	// start endpoint
122	config = c.SystemConfig.SectionConfig("projector.dataport.", true /*trim*/)
123	endp, err := NewRouterEndpoint("clust", "topic", raddr, maxvbuckets, config)
124	if err != nil {
125		t.Fatal(err)
126	}
127
128	vbmaps := makeVbmaps(maxvbuckets, maxBuckets) // vbmaps
129
130	// send StreamBegin
131	for _, vbmap := range vbmaps {
132		for i := 0; i < len(vbmap.Vbuckets); i++ { // for N vbuckets
133			vbno, vbuuid := vbmap.Vbuckets[i], vbmap.Vbuuids[i]
134			kv := c.NewKeyVersions(uint64(0), []byte("Bourne"), 1)
135			kv.AddStreamBegin()
136			dkv := &c.DataportKeyVersions{
137				Bucket: vbmap.Bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv,
138			}
139			if err := endp.Send(dkv); err != nil {
140				t.Fatal(err)
141			}
142		}
143	}
144
145	count, seqno := 200, 1
146	for i := 1; i <= count; i += 2 {
147		nVbs, nMuts, nIndexes := maxvbuckets, 5, 5
148		dkvs := dataKeyVersions("default0", seqno, nVbs, nMuts, nIndexes)
149		dkvs = append(dkvs, dataKeyVersions("default1", seqno, nVbs, nMuts, nIndexes)...)
150		for _, dkv := range dkvs {
151			if err := endp.Send(dkv); err != nil {
152				t.Fatal(err)
153			}
154		}
155		seqno += nMuts
156
157		// gather
158		pvbs := make([]*protobuf.VbKeyVersions, 0)
159	loop:
160		for {
161			select {
162			case msg := <-appch:
163				//t.Logf("%T %v\n", msg, msg)
164				if pvbsSub, ok := msg.([]*protobuf.VbKeyVersions); !ok {
165					t.Fatalf("unexpected type in loopback %T", msg)
166				} else {
167					pvbs = append(pvbs, pvbsSub...)
168				}
169			case <-time.After(10 * time.Millisecond):
170				break loop
171			}
172		}
173		commands := make(map[byte]int)
174		for _, vb := range protobuf2VbKeyVersions(pvbs) {
175			for _, kv := range vb.Kvs {
176				for _, cmd := range kv.Commands {
177					if _, ok := commands[byte(cmd)]; !ok {
178						commands[byte(cmd)] = 0
179					}
180					commands[byte(cmd)]++
181				}
182			}
183		}
184		if StreamBegins, ok := commands[c.StreamBegin]; ok && StreamBegins != 64 {
185			t.Fatalf("unexpected response %v", StreamBegins)
186		}
187		if commands[c.Upsert] != 1600 {
188			t.Fatalf("unexpected response %v", commands[c.Upsert])
189		}
190	}
191
192	endp.Close()
193	daemon.Close()
194}
195
196func BenchmarkLoopback(b *testing.B) {
197	logging.SetLogLevel(logging.Silent)
198
199	raddr := "localhost:8888"
200	maxBuckets, maxvbuckets, mutChanSize := 2, 32, 100
201
202	// start server
203	appch := make(chan interface{}, mutChanSize)
204	prefix := "indexer.dataport."
205	config := c.SystemConfig.SectionConfig(prefix, true /*trim*/)
206	daemon, err := NewServer(raddr, maxvbuckets, config, appch)
207	if err != nil {
208		b.Fatal(err)
209	}
210
211	// start endpoint
212	config = c.SystemConfig.SectionConfig("projector.dataport.", true /*trim*/)
213	endp, err := NewRouterEndpoint("clust", "topic", raddr, maxvbuckets, config)
214	if err != nil {
215		b.Fatal(err)
216	}
217
218	vbmaps := makeVbmaps(maxvbuckets, maxBuckets)
219
220	// send StreamBegin
221	for _, vbmap := range vbmaps {
222		for i := 0; i < len(vbmap.Vbuckets); i++ { // for N vbuckets
223			vbno, vbuuid := vbmap.Vbuckets[i], vbmap.Vbuuids[i]
224			kv := c.NewKeyVersions(uint64(0), []byte("Bourne"), 1)
225			kv.AddStreamBegin()
226			dkv := &c.DataportKeyVersions{
227				Bucket: vbmap.Bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv,
228			}
229			if err := endp.Send(dkv); err != nil {
230				b.Fatal(err)
231			}
232		}
233	}
234
235	go func() {
236		nVbs, nMuts, nIndexes, seqno := maxvbuckets, 5, 5, 1
237		for {
238			dkvs := dataKeyVersions("default0", seqno, nVbs, nMuts, nIndexes)
239			dkvs = append(dkvs, dataKeyVersions("default1", seqno, nVbs, nMuts, nIndexes)...)
240			for _, dkv := range dkvs {
241				endp.Send(dkv)
242			}
243			seqno += nMuts
244		}
245	}()
246
247	b.ResetTimer()
248	for i := 0; i < b.N; i++ {
249		select {
250		case msg := <-appch:
251			if _, ok := msg.([]*protobuf.VbKeyVersions); !ok {
252				b.Fatalf("unexpected type in loopback %T", msg)
253			}
254		}
255	}
256
257	endp.Close()
258	daemon.Close()
259}
260
261func makeVbmaps(maxvbuckets int, maxBuckets int) []*c.VbConnectionMap {
262	vbmaps := make([]*c.VbConnectionMap, 0, maxBuckets)
263	for i := 0; i < maxBuckets; i++ {
264		vbmap := &c.VbConnectionMap{
265			Bucket:   fmt.Sprintf("default%v", i),
266			Vbuckets: make([]uint16, 0, maxvbuckets),
267			Vbuuids:  make([]uint64, 0, maxvbuckets),
268		}
269		for i := 0; i < maxvbuckets; i++ {
270			vbmap.Vbuckets = append(vbmap.Vbuckets, uint16(i))
271			vbmap.Vbuuids = append(vbmap.Vbuuids, uint64(i*10))
272		}
273		vbmaps = append(vbmaps, vbmap)
274	}
275	return vbmaps
276}
277
278func dataKeyVersions(bucket string, seqno, nVbs, nMuts, nIndexes int) []*c.DataportKeyVersions {
279	dkvs := make([]*c.DataportKeyVersions, 0)
280	for i := 0; i < nVbs; i++ { // for N vbuckets
281		vbno, vbuuid := uint16(i), uint64(i*10)
282		for j := 0; j < nMuts; j++ {
283			kv := c.NewKeyVersions(uint64(seqno+j), []byte("Bourne"), nIndexes)
284			for k := 0; k < nIndexes; k++ {
285				key := fmt.Sprintf("bangalore%v", k)
286				oldkey := fmt.Sprintf("varanasi%v", k)
287				kv.AddUpsert(uint64(k), []byte(key), []byte(oldkey))
288			}
289			dkv := &c.DataportKeyVersions{
290				Bucket: bucket, Vbno: vbno, Vbuuid: vbuuid, Kv: kv,
291			}
292			dkvs = append(dkvs, dkv)
293		}
294	}
295	return dkvs
296}
297