1package couchbase
2
3import (
4	"errors"
5	"io"
6	"testing"
7	"time"
8
9	"github.com/couchbase/gomemcached"
10	"github.com/couchbase/gomemcached/client"
11)
12
13type testT struct {
14	closed bool
15}
16
17func (t testT) Read([]byte) (int, error) {
18	return 0, io.EOF
19}
20
21func (t testT) Write([]byte) (int, error) {
22	return 0, io.EOF
23}
24
25var errAlreadyClosed = errors.New("already closed")
26
27func (t *testT) Close() error {
28	if t.closed {
29		return errAlreadyClosed
30	}
31	t.closed = true
32	return nil
33}
34
35func testMkConn(h string, ah AuthHandler) (*memcached.Client, error) {
36	return memcached.Wrap(&testT{})
37}
38
39func TestConnPool(t *testing.T) {
40	cp := newConnectionPool("h", &basicAuth{}, 3, 6)
41	cp.mkConn = testMkConn
42
43	seenClients := map[*memcached.Client]bool{}
44
45	// build some connections
46
47	for i := 0; i < 5; i++ {
48		sc, err := cp.Get()
49		if err != nil {
50			t.Fatalf("Error getting connection from pool: %v", err)
51		}
52		seenClients[sc] = true
53	}
54
55	if len(cp.connections) != 0 {
56		t.Errorf("Expected 0 connections after gets, got %v",
57			len(cp.connections))
58	}
59
60	// return them
61	for k := range seenClients {
62		cp.Return(k)
63	}
64
65	if len(cp.connections) != 3 {
66		t.Errorf("Expected 3 connections after returning them, got %v",
67			len(cp.connections))
68	}
69
70	// Try again.
71	matched := 0
72	grabbed := []*memcached.Client{}
73	for i := 0; i < 5; i++ {
74		sc, err := cp.Get()
75		if err != nil {
76			t.Fatalf("Error getting connection from pool: %v", err)
77		}
78		if seenClients[sc] {
79			matched++
80		}
81		grabbed = append(grabbed, sc)
82	}
83
84	if matched != 3 {
85		t.Errorf("Expected to match 3 conns, matched %v", matched)
86	}
87
88	for _, c := range grabbed {
89		cp.Return(c)
90	}
91
92	// Connect write error.
93	sc, err := cp.Get()
94	if err != nil {
95		t.Fatalf("Error getting a connection: %v", err)
96	}
97	err = sc.Transmit(&gomemcached.MCRequest{})
98	if err == nil {
99		t.Fatalf("Expected error sending a request")
100	}
101	if sc.IsHealthy() {
102		t.Fatalf("Expected unhealthy connection")
103	}
104	cp.Return(sc)
105
106	if len(cp.connections) != 2 {
107		t.Errorf("Expected to have 2 conns, have %v", len(cp.connections))
108	}
109
110	err = cp.Close()
111	if err != nil {
112		t.Errorf("Expected clean close, got %v", err)
113	}
114
115	err = cp.Close()
116	if err == nil {
117		t.Errorf("Expected error on second pool close")
118	}
119}
120
121func TestConnPoolSoonAvailable(t *testing.T) {
122	defer func(d time.Duration) { ConnPoolAvailWaitTime = d }(ConnPoolAvailWaitTime)
123	defer func() { ConnPoolCallback = nil }()
124
125	m := map[string]int{}
126	timings := []time.Duration{}
127	ConnPoolCallback = func(host string, source string, start time.Time, err error) {
128		m[source] = m[source] + 1
129		timings = append(timings, time.Since(start))
130	}
131
132	cp := newConnectionPool("h", &basicAuth{}, 3, 4)
133	cp.mkConn = testMkConn
134
135	seenClients := map[*memcached.Client]bool{}
136
137	// build some connections
138
139	var aClient *memcached.Client
140	for {
141		sc, err := cp.GetWithTimeout(time.Millisecond)
142		if err == ErrTimeout {
143			break
144		}
145		if err != nil {
146			t.Fatalf("Error getting connection from pool: %v", err)
147		}
148		aClient = sc
149		seenClients[sc] = true
150	}
151
152	time.AfterFunc(time.Millisecond, func() { cp.Return(aClient) })
153
154	ConnPoolAvailWaitTime = time.Second
155
156	sc, err := cp.Get()
157	if err != nil || sc != aClient {
158		t.Errorf("Expected a successful connection, got %v/%v", sc, err)
159	}
160
161	// Try again, but let's close it while we're stuck in secondary wait
162	time.AfterFunc(time.Millisecond, func() { cp.Close() })
163
164	sc, err = cp.Get()
165	if err != errClosedPool {
166		t.Errorf("Expected a closed pool, got %v/%v", sc, err)
167	}
168
169	t.Logf("Callback report: %v, timings: %v", m, timings)
170}
171
172func TestConnPoolClosedFull(t *testing.T) {
173	cp := newConnectionPool("h", &basicAuth{}, 3, 4)
174	cp.mkConn = testMkConn
175
176	seenClients := map[*memcached.Client]bool{}
177
178	// build some connections
179
180	for {
181		sc, err := cp.GetWithTimeout(time.Millisecond)
182		if err == ErrTimeout {
183			break
184		}
185		if err != nil {
186			t.Fatalf("Error getting connection from pool: %v", err)
187		}
188		seenClients[sc] = true
189	}
190
191	time.AfterFunc(2*time.Millisecond, func() { cp.Close() })
192
193	sc, err := cp.Get()
194	if err != errClosedPool {
195		t.Errorf("Expected closed pool error after closed, got %v/%v", sc, err)
196	}
197}
198
199func TestConnPoolWaitFull(t *testing.T) {
200	cp := newConnectionPool("h", &basicAuth{}, 3, 4)
201	cp.mkConn = testMkConn
202
203	seenClients := map[*memcached.Client]bool{}
204
205	// build some connections
206
207	var aClient *memcached.Client
208	for {
209		sc, err := cp.GetWithTimeout(time.Millisecond)
210		if err == ErrTimeout {
211			break
212		}
213		if err != nil {
214			t.Fatalf("Error getting connection from pool: %v", err)
215		}
216		aClient = sc
217		seenClients[sc] = true
218	}
219
220	time.AfterFunc(2*time.Millisecond, func() { cp.Return(aClient) })
221
222	sc, err := cp.Get()
223	if err != nil || sc != aClient {
224		t.Errorf("Expected a successful connection, got %v/%v", sc, err)
225	}
226}
227
228func TestConnPoolWaitFailFull(t *testing.T) {
229	cp := newConnectionPool("h", &basicAuth{}, 3, 4)
230	cp.mkConn = testMkConn
231
232	seenClients := map[*memcached.Client]bool{}
233
234	// build some connections
235
236	var aClient *memcached.Client
237	for {
238		sc, err := cp.GetWithTimeout(time.Millisecond)
239		if err == ErrTimeout {
240			break
241		}
242		if err != nil {
243			t.Fatalf("Error getting connection from pool: %v", err)
244		}
245		aClient = sc
246		seenClients[sc] = true
247	}
248
249	// causes failure
250	aClient.Transmit(&gomemcached.MCRequest{})
251	time.AfterFunc(2*time.Millisecond, func() { cp.Return(aClient) })
252
253	sc, err := cp.Get()
254	if err != nil || sc == aClient {
255		t.Errorf("Expected a new successful connection, got %v/%v", sc, err)
256	}
257}
258
259func TestConnPoolWaitDoubleFailFull(t *testing.T) {
260	cp := newConnectionPool("h", &basicAuth{}, 3, 4)
261	cp.mkConn = testMkConn
262
263	seenClients := map[*memcached.Client]bool{}
264
265	// build some connections
266
267	var aClient *memcached.Client
268	for {
269		sc, err := cp.GetWithTimeout(time.Millisecond)
270		if err == ErrTimeout {
271			break
272		}
273		if err != nil {
274			t.Fatalf("Error getting connection from pool: %v", err)
275		}
276		aClient = sc
277		seenClients[sc] = true
278	}
279
280	cp.mkConn = func(h string, ah AuthHandler) (*memcached.Client, error) {
281		return nil, io.EOF
282	}
283
284	// causes failure
285	aClient.Transmit(&gomemcached.MCRequest{})
286	time.AfterFunc(2*time.Millisecond, func() { cp.Return(aClient) })
287
288	sc, err := cp.Get()
289	if err != io.EOF {
290		t.Errorf("Expected to fail getting a new connection, got %v/%v", sc, err)
291	}
292}
293
294func TestConnPoolNil(t *testing.T) {
295	var cp *connectionPool
296	c, err := cp.Get()
297	if err == nil {
298		t.Errorf("Expected an error getting from nil, got %v", c)
299	}
300
301	// This just shouldn't error.
302	cp.Return(c)
303}
304
305func TestConnPoolClosed(t *testing.T) {
306	cp := newConnectionPool("h", &basicAuth{}, 3, 6)
307	cp.mkConn = testMkConn
308	c, err := cp.Get()
309	if err != nil {
310		t.Fatal(err)
311	}
312	cp.Close()
313
314	// This should cause the connection to be closed
315	cp.Return(c)
316	if err = c.Close(); err != errAlreadyClosed {
317		t.Errorf("Expected to close connection, wasn't closed (%v)", err)
318	}
319
320	sc, err := cp.Get()
321	if err != errClosedPool {
322		t.Errorf("Expected closed pool error after closed, got %v/%v", sc, err)
323	}
324}
325
326func TestConnPoolCloseWrongPool(t *testing.T) {
327	cp := newConnectionPool("h", &basicAuth{}, 3, 6)
328	cp.mkConn = testMkConn
329	c, err := cp.Get()
330	if err != nil {
331		t.Fatal(err)
332	}
333	cp.Close()
334
335	// Return to a different pool.  Should still be OK.
336	cp = newConnectionPool("h", &basicAuth{}, 3, 6)
337	cp.mkConn = testMkConn
338	c, err = cp.Get()
339	if err != nil {
340		t.Fatal(err)
341	}
342	cp.Close()
343
344	cp.Return(c)
345	if err = c.Close(); err != errAlreadyClosed {
346		t.Errorf("Expected to close connection, wasn't closed (%v)", err)
347	}
348}
349
350func TestConnPoolCloseNil(t *testing.T) {
351	cp := newConnectionPool("h", &basicAuth{}, 3, 6)
352	cp.mkConn = testMkConn
353	c, err := cp.Get()
354	if err != nil {
355		t.Fatal(err)
356	}
357	cp.Close()
358
359	cp = nil
360	cp.Return(c)
361	if err = c.Close(); err != errAlreadyClosed {
362		t.Errorf("Expected to close connection, wasn't closed (%v)", err)
363	}
364}
365
366func TestConnPoolStartTapFeed(t *testing.T) {
367	var cp *connectionPool
368	args := memcached.DefaultTapArguments()
369	tf, err := cp.StartTapFeed(&args)
370	if err != errNoPool {
371		t.Errorf("Expected no pool error with no pool, got %v/%v", tf, err)
372	}
373
374	cp = newConnectionPool("h", &basicAuth{}, 3, 6)
375	cp.mkConn = testMkConn
376
377	tf, err = cp.StartTapFeed(&args)
378	if err != io.EOF {
379		t.Errorf("Expected to fail a tap feed with EOF, got %v/%v", tf, err)
380	}
381
382	cp.Close()
383	tf, err = cp.StartTapFeed(&args)
384	if err != errClosedPool {
385		t.Errorf("Expected a closed pool, got %v/%v", tf, err)
386	}
387}
388
389func BenchmarkBestCaseCPGet(b *testing.B) {
390	cp := newConnectionPool("h", &basicAuth{}, 3, 6)
391	cp.mkConn = testMkConn
392
393	for i := 0; i < b.N; i++ {
394		c, err := cp.Get()
395		if err != nil {
396			b.Fatalf("Error getting from pool: %v", err)
397		}
398		cp.Return(c)
399	}
400}
401