1package client
2
3import (
4	"errors"
5	"fmt"
6	"math/rand"
7	"net"
8	"strings"
9	"testing"
10	"time"
11
12	"github.com/couchbase/indexing/secondary/transport"
13)
14
15func testMkConn(h string) (*connection, error) {
16	flags := transport.TransportFlag(0).SetProtobuf()
17	pkt := transport.NewTransportPacket(1024*1024, flags)
18	conn, err := net.Dial("tcp", h)
19	if err != nil {
20		fmt.Printf("Error %v during connection\n", err)
21	}
22	return &connection{conn, pkt}, err
23}
24
25type testServer struct {
26	ln net.Listener
27}
28
29func (ts *testServer) initServer(h string, stopCh chan bool) error {
30	ipport := strings.Split(h, ":")
31	ln, err := net.Listen("tcp", ":"+ipport[1])
32	ts.ln = ln
33	if err != nil {
34		msg := fmt.Sprintf("Error %v during Listen", err)
35		return errors.New(msg)
36	}
37	for {
38		_, err := ln.Accept()
39		if err != nil {
40			select {
41			case <-stopCh:
42				return nil
43
44			default:
45				msg := fmt.Sprintf("Error %v during Accept", err)
46				return errors.New(msg)
47			}
48		}
49	}
50}
51
52func TestConnPoolBasicSanity(t *testing.T) {
53	var err error
54	var sc *connection
55
56	readDeadline := time.Duration(30)
57	writeDeadline := time.Duration(40)
58
59	ts := &testServer{}
60	tsStopCh := make(chan bool, 1)
61
62	host := "127.0.0.1:15151"
63	go ts.initServer(host, tsStopCh)
64	time.Sleep(1 * time.Second)
65
66	cp := newConnectionPool(host, 3, 6, 1024*1024, readDeadline, writeDeadline, 3, 1)
67	cp.mkConn = testMkConn
68
69	seenClients := map[*connection]bool{}
70
71	// build some connections
72	for i := 0; i < 5; i++ {
73		sc, err = cp.Get()
74		if err != nil {
75			t.Fatalf("Error getting connection from pool: %v", err)
76		}
77		seenClients[sc] = true
78	}
79
80	// return them
81	for k := range seenClients {
82		cp.Return(k, true)
83	}
84
85	err = cp.Close()
86	if err != nil {
87		t.Errorf("Expected clean close, got %v", err)
88	}
89
90	cp.Close()
91	time.Sleep(2 * time.Second)
92
93	tsStopCh <- true
94	ts.ln.Close()
95	time.Sleep(1 * time.Second)
96}
97
98func TestConnRelease(t *testing.T) {
99	readDeadline := time.Duration(30)
100	writeDeadline := time.Duration(40)
101
102	ts := &testServer{}
103	tsStopCh := make(chan bool, 1)
104
105	host := "127.0.0.1:15151"
106	go ts.initServer(host, tsStopCh)
107	time.Sleep(1 * time.Second)
108
109	cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10)
110	cp.mkConn = testMkConn
111
112	seenClients := map[*connection]bool{}
113
114	// Get 240 Connections.
115
116	for i := 0; i < 240; i++ {
117		sc, err := cp.Get()
118		if err != nil {
119			t.Fatalf("Error getting connection from pool: %v", err)
120		}
121		seenClients[sc] = true
122	}
123
124	// Return 220 of them
125	j := 0
126	for k := range seenClients {
127		cp.Return(k, true)
128		delete(seenClients, k)
129		j++
130		if j >= 220 {
131			break
132		}
133	}
134
135	// time.Sleep(time.Millisecond)
136	if cp.freeConns != 220 {
137		t.Errorf("Warning! cp.freeConns is not 220, its %d", cp.freeConns)
138	}
139
140	fmt.Println("Waiting for connections to get released")
141	time.Sleep(CONN_RELEASE_INTERVAL * 2 * time.Second)
142	if cp.freeConns != 200 {
143		t.Errorf("Warning! cp.freeConns is not 200, its %d", cp.freeConns)
144	}
145
146	fmt.Println("Waiting for more connections to get released")
147	time.Sleep(CONN_RELEASE_INTERVAL * 2 * time.Second)
148	if cp.freeConns != 180 {
149		t.Errorf("Warning! cp.freeConns is not 180, its %d", cp.freeConns)
150	}
151
152	fmt.Println("Waiting for further more connections to get released")
153	time.Sleep(CONN_RELEASE_INTERVAL * 2 * time.Second)
154	if cp.freeConns != 160 {
155		t.Errorf("Warning! cp.freeConns is not 160, its %d", cp.freeConns)
156	}
157
158	for l := range seenClients {
159		cp.Return(l, true)
160		delete(seenClients, l)
161	}
162
163	cp.Close()
164	time.Sleep(2 * time.Second)
165
166	tsStopCh <- true
167	ts.ln.Close()
168	time.Sleep(1 * time.Second)
169}
170
171func TestLongevity(t *testing.T) {
172	readDeadline := time.Duration(30)
173	writeDeadline := time.Duration(40)
174
175	ts := &testServer{}
176	tsStopCh := make(chan bool, 1)
177
178	host := "127.0.0.1:15151"
179	go ts.initServer(host, tsStopCh)
180	time.Sleep(1 * time.Second)
181
182	cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10)
183	cp.mkConn = testMkConn
184
185	// Get 240 Connections.
186
187	seenClients := map[*connection]bool{}
188
189	for i := 0; i < 200; i++ {
190		sc, err := cp.Get()
191		if err != nil {
192			t.Fatalf("Error getting connection from pool: %v", err)
193		}
194		seenClients[sc] = true
195	}
196
197	for i := 0; i < 30; i++ {
198		time.Sleep(time.Second)
199		num := rand.Intn(5)
200		j := 0
201		if i%2 == 0 {
202			fmt.Printf("Releasing %d conns.\n", num)
203			for k := range seenClients {
204				cp.Return(k, true)
205				delete(seenClients, k)
206				j++
207				if j >= num {
208					break
209				}
210			}
211		} else {
212			fmt.Printf("Getting %d conns.\n", num)
213			for k := 0; k < num; k++ {
214				sc, err := cp.Get()
215				if err != nil {
216					t.Fatalf("Error getting connection from pool: %v", err)
217				}
218				seenClients[sc] = true
219			}
220		}
221
222		// Use some safe number to verify.
223		if cp.freeConns > 20 {
224			t.Errorf("Warning! cp.freeConns is greater than 20, its %d", cp.freeConns)
225		}
226	}
227
228	for l := range seenClients {
229		cp.Return(l, true)
230		delete(seenClients, l)
231	}
232
233	cp.Close()
234	time.Sleep(2 * time.Second)
235
236	tsStopCh <- true
237	ts.ln.Close()
238	time.Sleep(1 * time.Second)
239}
240
241func startAllocatorRoutine(cp *connectionPool, ch chan *connection, stopCh chan bool) {
242	for {
243		time.Sleep(500 * time.Millisecond)
244		var num int
245
246		select {
247		case <-stopCh:
248			fmt.Println("Retuning from startAllocatorRoutine")
249			return
250
251		default:
252			if cp.curActConns < 200 {
253				num = rand.Intn(25)
254			} else {
255				num = rand.Intn(5)
256			}
257			fmt.Println("Allocating", num, "Connections")
258			for i := 0; i < num; i++ {
259				conn, err := cp.Get()
260				if err != nil {
261					errmsg := fmt.Sprintf("ERROR %v: CONNECTION GET FAILED", err)
262					panic(errmsg)
263				}
264				ch <- conn
265			}
266			if cp.curActConns > 250 {
267				errmsg := fmt.Sprintf("ERROR: TOO MANY ACTIVE CONNS %v", cp.curActConns)
268				panic(errmsg)
269			}
270		}
271	}
272}
273
274func startDeallocatorRoutine(cp *connectionPool, ch chan *connection, stopCh chan bool) {
275	for {
276		time.Sleep(500 * time.Millisecond)
277
278		select {
279		case <-stopCh:
280			fmt.Println("Retuning from startDeallocatorRoutine")
281			return
282
283		default:
284			num := rand.Intn(5)
285			fmt.Println("Returning", num, "Connections")
286			for i := 0; i < num; i++ {
287				conn := <-ch
288				cp.Return(conn, true)
289			}
290		}
291	}
292}
293
294func TestSustainedHighConns(t *testing.T) {
295	readDeadline := time.Duration(30)
296	writeDeadline := time.Duration(40)
297
298	ts := &testServer{}
299	tsStopCh := make(chan bool, 1)
300
301	host := "127.0.0.1:15151"
302	go ts.initServer(host, tsStopCh)
303	time.Sleep(1 * time.Second)
304
305	cp := newConnectionPool(host, 500, 10, 1024*1024, readDeadline, writeDeadline, 40, 10)
306	cp.mkConn = testMkConn
307
308	ch := make(chan *connection, 1000)
309
310	stopCh := make(chan bool, 2)
311
312	go startAllocatorRoutine(cp, ch, stopCh)
313	go startDeallocatorRoutine(cp, ch, stopCh)
314
315	for i := 0; i < 100; i++ {
316		time.Sleep(500 * time.Millisecond)
317		fmt.Println("cp.curActConns =", cp.curActConns)
318	}
319
320	stopCh <- true
321	stopCh <- true
322
323	time.Sleep(5 * time.Second)
324	close(ch)
325	for connectn := range ch {
326		cp.Return(connectn, true)
327	}
328
329	cp.Close()
330	time.Sleep(2 * time.Second)
331
332	tsStopCh <- true
333	ts.ln.Close()
334	time.Sleep(1 * time.Second)
335}
336
337func TestLowWM(t *testing.T) {
338	readDeadline := time.Duration(30)
339	writeDeadline := time.Duration(40)
340
341	ts := &testServer{}
342	tsStopCh := make(chan bool, 1)
343
344	host := "127.0.0.1:15151"
345	go ts.initServer(host, tsStopCh)
346	time.Sleep(1 * time.Second)
347
348	cp := newConnectionPool(host, 20, 5, 1024*1024, readDeadline, writeDeadline, 10, 2)
349	cp.mkConn = testMkConn
350
351	seenClients := map[*connection]bool{}
352
353	for i := 0; i < 12; i++ {
354		sc, err := cp.Get()
355		if err != nil {
356			t.Fatalf("Error getting connection from pool: %v", err)
357		}
358		seenClients[sc] = true
359	}
360
361	time.Sleep(CONN_RELEASE_INTERVAL * time.Second)
362	for k := range seenClients {
363		cp.Return(k, true)
364	}
365
366	if cp.freeConns != 12 {
367		t.Errorf("Expected value fo freeConns = 12, actual = %v", cp.freeConns)
368	}
369
370	// Wait 5 mins. Make sure that freeConns never get below 10.
371	for i := 0; i < 24; i++ {
372		time.Sleep(CONN_RELEASE_INTERVAL * time.Second)
373		if cp.freeConns < 10 {
374			msg := fmt.Sprintf("freeConns (%v) went below low WM", cp.freeConns)
375			panic(msg)
376		}
377	}
378
379	for l := range seenClients {
380		cp.Return(l, true)
381		delete(seenClients, l)
382	}
383
384	cp.Close()
385	time.Sleep(2 * time.Second)
386
387	tsStopCh <- true
388	ts.ln.Close()
389	time.Sleep(1 * time.Second)
390}
391
392func TestTotalConns(t *testing.T) {
393	readDeadline := time.Duration(30)
394	writeDeadline := time.Duration(40)
395
396	ts := &testServer{}
397	tsStopCh := make(chan bool, 1)
398
399	host := "127.0.0.1:15151"
400	go ts.initServer(host, tsStopCh)
401	time.Sleep(1 * time.Second)
402
403	cp := newConnectionPool(host, 120, 5, 1024*1024, readDeadline, writeDeadline, 10, 10)
404	cp.mkConn = testMkConn
405
406	seenClients := map[*connection]bool{}
407
408	// Get 100 connections.
409
410	for i := 0; i < 100; i++ {
411		sc, err := cp.Get()
412		if err != nil {
413			t.Fatalf("Error getting connection from pool: %v", err)
414		}
415		seenClients[sc] = true
416	}
417
418	// Return 20 of them
419	i := 0
420	for k := range seenClients {
421		cp.Return(k, true)
422		i++
423		if i >= 20 {
424			break
425		}
426	}
427
428	if cp.freeConns != 20 {
429		t.Errorf("Expected value for freeConns = 20, actual = %v", cp.freeConns)
430	}
431
432	if cp.curActConns != 80 {
433		t.Errorf("Expected value fo curActConns = 80, actual = %v", cp.curActConns)
434	}
435
436	// Sleep for an interval. Avg will be 80. Expect 10 conns getting freed.
437	time.Sleep(CONN_RELEASE_INTERVAL * time.Second)
438
439	if cp.freeConns != 10 {
440		t.Errorf("Expected value for freeConns = 10, actual = %v", cp.freeConns)
441	}
442
443	// Release 20 more conns.
444	j := 0
445	for k := range seenClients {
446		cp.Return(k, true)
447		j++
448		if j >= 20 {
449			break
450		}
451	}
452
453	if cp.freeConns != 30 {
454		t.Errorf("Expected value for freeConns = 30, actual = %v", cp.freeConns)
455	}
456
457	if cp.curActConns != 60 {
458		t.Errorf("Expected value fo curActConns = 60, actual = %v", cp.curActConns)
459	}
460
461	// Sleep for an interval. Avg will be 80. Expect 10 conns getting freed.
462	time.Sleep(CONN_RELEASE_INTERVAL * time.Second)
463
464	if cp.freeConns != 20 {
465		t.Errorf("Expected value for freeConns = 20, actual = %v", cp.freeConns)
466	}
467
468	cp.Close()
469	time.Sleep(2 * time.Second)
470
471	tsStopCh <- true
472	ts.ln.Close()
473	time.Sleep(1 * time.Second)
474}
475
476func TestUpdateTickRate(t *testing.T) {
477	readDeadline := time.Duration(30)
478	writeDeadline := time.Duration(40)
479
480	ts := &testServer{}
481	tsStopCh := make(chan bool, 1)
482
483	host := "127.0.0.1:15151"
484	go ts.initServer(host, tsStopCh)
485	time.Sleep(1 * time.Second)
486
487	cp := newConnectionPool(host, 40, 5, 1024*1024, readDeadline, writeDeadline, 2, 2)
488	cp.mkConn = testMkConn
489
490	seenClients := map[*connection]bool{}
491
492	// Allocate 20 conns per seconds for 10 seconds. Return all connections after 1 second.
493
494	for i := 0; i < 10; i++ {
495		for j := 0; j < 20; j++ {
496			sc, err := cp.Get()
497			if err != nil {
498				t.Fatalf("Error getting connection from pool: %v", err)
499			}
500			seenClients[sc] = true
501		}
502		time.Sleep(1 * time.Second)
503		for k := range seenClients {
504			cp.Return(k, true)
505			delete(seenClients, k)
506		}
507	}
508
509	// Make sure that numConnsToRetain returns false and 20.
510	numRetConns, needToFreeConns := cp.numConnsToRetain()
511	if needToFreeConns != false {
512		t.Errorf("needToFreeConns was expected to be false. But it is not")
513		fmt.Printf("freeConns = %v, curActConns = %v, rate = %v\n", cp.freeConns, cp.curActConns, cp.ewma.Rate())
514	}
515
516	if numRetConns != 20 {
517		t.Errorf("numRetConns was expected to be 20, Actual = %v", numRetConns)
518		fmt.Printf("freeConns = %v, curActConns = %v, rate = %v\n", cp.freeConns, cp.curActConns, cp.ewma.Rate())
519	}
520
521	// Allocate 10 conns per seconds for 10 seconds. Return all connections after 1 second.
522	for i := 0; i < 10; i++ {
523		for j := 0; j < 10; j++ {
524			sc, err := cp.Get()
525			if err != nil {
526				t.Fatalf("Error getting connection from pool: %v", err)
527			}
528			seenClients[sc] = true
529		}
530		time.Sleep(1 * time.Second)
531		for k := range seenClients {
532			cp.Return(k, true)
533			delete(seenClients, k)
534		}
535	}
536
537	// Make sure that numConnsToRetain still returns false and 20.
538	numRetConns, needToFreeConns = cp.numConnsToRetain()
539	if needToFreeConns != false {
540		t.Errorf("needToFreeConns was expected to be false. But it is not")
541		fmt.Printf("freeConns = %v, curActConns = %v, rate = %v\n", cp.freeConns, cp.curActConns, cp.ewma.Rate())
542	}
543
544	if numRetConns != 20 {
545		t.Errorf("numRetConns was expected to be 20, Actual = %v", numRetConns)
546		fmt.Printf("freeConns = %v, curActConns = %v, rate = %v\n", cp.freeConns, cp.curActConns, cp.ewma.Rate())
547	}
548
549	cp.Close()
550	time.Sleep(2 * time.Second)
551
552	tsStopCh <- true
553	ts.ln.Close()
554	time.Sleep(1 * time.Second)
555}
556