1package client
2
3import "errors"
4import "fmt"
5import "net"
6import "time"
7import "sync/atomic"
8
9import "github.com/couchbase/indexing/secondary/logging"
10import "github.com/couchbase/indexing/secondary/transport"
11import protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
12import gometrics "github.com/rcrowley/go-metrics"
13
14const (
15	CONN_RELEASE_INTERVAL      = 5  // Seconds. Don't change as long as go-metrics/ewma is being used.
16	NUM_CONN_RELEASE_INTERVALS = 60 // Don't change as long as go-metrics/ewma is being used.
17	CONN_COUNT_LOG_INTERVAL    = 60 // Seconds.
18)
19
20// ErrorClosedPool
21var ErrorClosedPool = errors.New("queryport.closedPool")
22
23// ErrorNoPool
24var ErrorNoPool = errors.New("queryport.errorNoPool")
25
26// ErrorPoolTimeout
27var ErrorPoolTimeout = errors.New("queryport.connPoolTimeout")
28
29type connectionPool struct {
30	host        string
31	mkConn      func(host string) (*connection, error)
32	connections chan *connection
33	createsem   chan bool
34	// config params
35	maxPayload       int
36	timeout          time.Duration
37	availTimeout     time.Duration
38	logPrefix        string
39	curActConns      int32
40	minPoolSizeWM    int32
41	freeConns        int32
42	relConnBatchSize int32
43	stopCh           chan bool
44	ewma             gometrics.EWMA
45}
46
47type connection struct {
48	conn net.Conn
49	pkt  *transport.TransportPacket
50}
51
52func newConnectionPool(
53	host string,
54	poolSize, poolOverflow, maxPayload int,
55	timeout, availTimeout time.Duration,
56	minPoolSizeWM int32, relConnBatchSize int32) *connectionPool {
57
58	cp := &connectionPool{
59		host:             host,
60		connections:      make(chan *connection, poolSize),
61		createsem:        make(chan bool, poolSize+poolOverflow),
62		maxPayload:       maxPayload,
63		timeout:          timeout,
64		availTimeout:     availTimeout,
65		logPrefix:        fmt.Sprintf("[Queryport-connpool:%v]", host),
66		minPoolSizeWM:    minPoolSizeWM,
67		relConnBatchSize: relConnBatchSize,
68		stopCh:           make(chan bool, 1),
69	}
70	cp.mkConn = cp.defaultMkConn
71	cp.ewma = gometrics.NewEWMA5()
72	logging.Infof("%v started poolsize %v overflow %v low WM %v relConn batch size %v ...\n",
73		cp.logPrefix, poolSize, poolOverflow, minPoolSizeWM, relConnBatchSize)
74	go cp.releaseConnsRoutine()
75	return cp
76}
77
78// ConnPoolTimeout is notified whenever connections are acquired from a pool.
79var ConnPoolCallback func(host string, source string, start time.Time, err error)
80
81func (cp *connectionPool) defaultMkConn(host string) (*connection, error) {
82	logging.Infof("%v open new connection ...\n", cp.logPrefix)
83	conn, err := net.Dial("tcp", host)
84	if err != nil {
85		return nil, err
86	}
87	flags := transport.TransportFlag(0).SetProtobuf()
88	pkt := transport.NewTransportPacket(cp.maxPayload, flags)
89	pkt.SetEncoder(transport.EncodingProtobuf, protobuf.ProtobufEncode)
90	pkt.SetDecoder(transport.EncodingProtobuf, protobuf.ProtobufDecode)
91	return &connection{conn, pkt}, nil
92}
93
94func (cp *connectionPool) Close() (err error) {
95	defer func() {
96		if r := recover(); r != nil {
97			logging.Verbosef("%v Close() crashed: %v\n", cp.logPrefix, r)
98			logging.Verbosef("%s", logging.StackTrace())
99		}
100	}()
101	cp.stopCh <- true
102	close(cp.connections)
103	for connectn := range cp.connections {
104		connectn.conn.Close()
105	}
106	logging.Infof("%v ... stopped\n", cp.logPrefix)
107	return
108}
109
110func (cp *connectionPool) GetWithTimeout(d time.Duration) (connectn *connection, err error) {
111	if cp == nil {
112		return nil, ErrorNoPool
113	}
114
115	path, ok := "", false
116
117	if ConnPoolCallback != nil {
118		defer func(path *string, start time.Time) {
119			ConnPoolCallback(cp.host, *path, start, err)
120		}(&path, time.Now())
121	}
122
123	path = "short-circuit"
124
125	// short-circuit available connetions.
126	select {
127	case connectn, ok = <-cp.connections:
128		if !ok {
129			return nil, ErrorClosedPool
130		}
131		logging.Debugf("%v new connection from pool\n", cp.logPrefix)
132		atomic.AddInt32(&cp.freeConns, -1)
133		atomic.AddInt32(&cp.curActConns, 1)
134		return connectn, nil
135	default:
136	}
137
138	t := time.NewTimer(cp.availTimeout * time.Millisecond)
139	defer t.Stop()
140
141	// Try to grab an available connection within 1ms
142	select {
143	case connectn, ok = <-cp.connections:
144		path = "avail1"
145		if !ok {
146			return nil, ErrorClosedPool
147		}
148		logging.Debugf("%v new connection (avail1) from pool\n", cp.logPrefix)
149		atomic.AddInt32(&cp.freeConns, -1)
150		atomic.AddInt32(&cp.curActConns, 1)
151		return connectn, nil
152
153	case <-t.C:
154		// No connection came around in time, let's see
155		// whether we can get one or build a new one first.
156		t.Reset(d) // Reuse the timer for the full timeout.
157		select {
158		case connectn, ok = <-cp.connections:
159			path = "avail2"
160			if !ok {
161				return nil, ErrorClosedPool
162			}
163			logging.Debugf("%v new connection (avail2) from pool\n", cp.logPrefix)
164			atomic.AddInt32(&cp.freeConns, -1)
165			atomic.AddInt32(&cp.curActConns, 1)
166			return connectn, nil
167
168		case cp.createsem <- true:
169			path = "create"
170			// Build a connection if we can't get a real one.
171			// This can potentially be an overflow connection, or
172			// a pooled connection.
173			connectn, err := cp.mkConn(cp.host)
174			if err != nil {
175				// On error, release our create hold
176				<-cp.createsem
177			} else {
178				atomic.AddInt32(&cp.curActConns, 1)
179			}
180			logging.Debugf("%v new connection (create) from pool\n", cp.logPrefix)
181			return connectn, err
182
183		case <-t.C:
184			return nil, ErrorPoolTimeout
185		}
186	}
187}
188
189func (cp *connectionPool) Get() (*connection, error) {
190	return cp.GetWithTimeout(cp.timeout * time.Millisecond)
191}
192
193func (cp *connectionPool) Return(connectn *connection, healthy bool) {
194	defer atomic.AddInt32(&cp.curActConns, -1)
195	if connectn.conn == nil {
196		return
197	}
198
199	laddr := connectn.conn.LocalAddr()
200	if cp == nil {
201		logging.Infof("%v pool closed\n", cp.logPrefix, laddr)
202		connectn.conn.Close()
203	}
204
205	if healthy {
206		defer func() {
207			if recover() != nil {
208				// This happens when the pool has already been
209				// closed and we're trying to return a
210				// connection to it anyway.  Just close the
211				// connection.
212				connectn.conn.Close()
213			}
214		}()
215
216		select {
217		case cp.connections <- connectn:
218			logging.Debugf("%v connection %q reclaimed to pool\n", cp.logPrefix, laddr)
219			atomic.AddInt32(&cp.freeConns, 1)
220		default:
221			logging.Debugf("%v closing overflow connection %q poolSize=%v\n", cp.logPrefix, laddr, len(cp.connections))
222			<-cp.createsem
223			connectn.conn.Close()
224		}
225
226	} else {
227		logging.Infof("%v closing unhealthy connection %q\n", cp.logPrefix, laddr)
228		<-cp.createsem
229		connectn.conn.Close()
230	}
231}
232
233func max(a, b int32) int32 {
234	if a > b {
235		return a
236	}
237	return b
238}
239
240func (cp *connectionPool) numConnsToRetain() (int32, bool) {
241	avg := cp.ewma.Rate()
242	act := atomic.LoadInt32(&cp.curActConns)
243	num := max(act, int32(avg))
244	num = max(cp.minPoolSizeWM, num)
245	fc := atomic.LoadInt32(&cp.freeConns)
246	totalConns := act + fc
247	if totalConns-cp.relConnBatchSize >= num {
248		// Don't release more than relConnBatchSize number of connections
249		// in 1 iteration
250		logging.Debugf("%v releasinng connections ...", cp.logPrefix)
251		return totalConns - cp.relConnBatchSize, true
252	}
253	return totalConns, false
254}
255
256func (cp *connectionPool) releaseConns(numRetConns int32) {
257	for {
258		fc := atomic.LoadInt32(&cp.freeConns)
259		act := atomic.LoadInt32(&cp.curActConns)
260		totalConns := act + fc
261		if totalConns > numRetConns && fc > 0 {
262			select {
263			case conn, ok := <-cp.connections:
264				if !ok {
265					return
266				}
267				atomic.AddInt32(&cp.freeConns, -1)
268				conn.conn.Close()
269			default:
270				break
271			}
272		} else {
273			break
274		}
275	}
276}
277
278func (cp *connectionPool) releaseConnsRoutine() {
279	i := 0
280	j := 0
281	for {
282		time.Sleep(time.Second)
283		select {
284		case <-cp.stopCh:
285			logging.Infof("%v Stopping releaseConnsRoutine", cp.logPrefix)
286			return
287
288		default:
289			// ewma.Update happens every second
290			act := atomic.LoadInt32(&cp.curActConns)
291			cp.ewma.Update(int64(act))
292
293			// ewma.Tick() and ewma.Rate() is called every 5 seconds.
294			if i == CONN_RELEASE_INTERVAL-1 {
295				cp.ewma.Tick()
296				numRetConns, needToFreeConns := cp.numConnsToRetain()
297				if needToFreeConns {
298					cp.releaseConns(numRetConns)
299				}
300			}
301
302			// Log active and free connection count history every minute.
303			fc := atomic.LoadInt32(&cp.freeConns)
304			if j == CONN_COUNT_LOG_INTERVAL-1 {
305				logging.Infof("%v active conns %v, free conns %v", cp.logPrefix, act, fc)
306			}
307
308			i = (i + 1) % CONN_RELEASE_INTERVAL
309			j = (j + 1) % CONN_COUNT_LOG_INTERVAL
310		}
311	}
312}
313