1package client
2
3import "errors"
4import "fmt"
5import "net"
6import "time"
7import "sync/atomic"
8
9import "github.com/couchbase/indexing/secondary/logging"
10import "github.com/couchbase/indexing/secondary/transport"
11import "github.com/couchbase/indexing/secondary/security"
12import protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
13import gometrics "github.com/rcrowley/go-metrics"
14
15const (
16	CONN_RELEASE_INTERVAL      = 5  // Seconds. Don't change as long as go-metrics/ewma is being used.
17	NUM_CONN_RELEASE_INTERVALS = 60 // Don't change as long as go-metrics/ewma is being used.
18	CONN_COUNT_LOG_INTERVAL    = 60 // Seconds.
19)
20
21// ErrorClosedPool
22var ErrorClosedPool = errors.New("queryport.closedPool")
23
24// ErrorNoPool
25var ErrorNoPool = errors.New("queryport.errorNoPool")
26
27// ErrorPoolTimeout
28var ErrorPoolTimeout = errors.New("queryport.connPoolTimeout")
29
30type connectionPool struct {
31	host        string
32	mkConn      func(host string) (*connection, error)
33	connections chan *connection
34	createsem   chan bool
35	// config params
36	maxPayload       int
37	timeout          time.Duration
38	availTimeout     time.Duration
39	logPrefix        string
40	curActConns      int32
41	minPoolSizeWM    int32
42	freeConns        int32
43	relConnBatchSize int32
44	stopCh           chan bool
45	ewma             gometrics.EWMA
46}
47
48type connection struct {
49	conn net.Conn
50	pkt  *transport.TransportPacket
51}
52
53func newConnectionPool(
54	host string,
55	poolSize, poolOverflow, maxPayload int,
56	timeout, availTimeout time.Duration,
57	minPoolSizeWM int32, relConnBatchSize int32) *connectionPool {
58
59	cp := &connectionPool{
60		host:             host,
61		connections:      make(chan *connection, poolSize),
62		createsem:        make(chan bool, poolSize+poolOverflow),
63		maxPayload:       maxPayload,
64		timeout:          timeout,
65		availTimeout:     availTimeout,
66		logPrefix:        fmt.Sprintf("[Queryport-connpool:%v]", host),
67		minPoolSizeWM:    minPoolSizeWM,
68		relConnBatchSize: relConnBatchSize,
69		stopCh:           make(chan bool, 1),
70	}
71	cp.mkConn = cp.defaultMkConn
72	cp.ewma = gometrics.NewEWMA5()
73	logging.Infof("%v started poolsize %v overflow %v low WM %v relConn batch size %v ...\n",
74		cp.logPrefix, poolSize, poolOverflow, minPoolSizeWM, relConnBatchSize)
75	go cp.releaseConnsRoutine()
76	return cp
77}
78
79// ConnPoolTimeout is notified whenever connections are acquired from a pool.
80var ConnPoolCallback func(host string, source string, start time.Time, err error)
81
82func (cp *connectionPool) defaultMkConn(host string) (*connection, error) {
83	logging.Infof("%v open new connection ...\n", cp.logPrefix)
84	conn, err := security.MakeConn(host)
85	if err != nil {
86		return nil, err
87	}
88	flags := transport.TransportFlag(0).SetProtobuf()
89	pkt := transport.NewTransportPacket(cp.maxPayload, flags)
90	pkt.SetEncoder(transport.EncodingProtobuf, protobuf.ProtobufEncode)
91	pkt.SetDecoder(transport.EncodingProtobuf, protobuf.ProtobufDecode)
92	return &connection{conn, pkt}, nil
93}
94
95func (cp *connectionPool) Close() (err error) {
96	defer func() {
97		if r := recover(); r != nil {
98			logging.Verbosef("%v Close() crashed: %v\n", cp.logPrefix, r)
99			logging.Verbosef("%s", logging.StackTrace())
100		}
101	}()
102	cp.stopCh <- true
103	close(cp.connections)
104	for connectn := range cp.connections {
105		connectn.conn.Close()
106	}
107	logging.Infof("%v ... stopped\n", cp.logPrefix)
108	return
109}
110
111func (cp *connectionPool) GetWithTimeout(d time.Duration) (connectn *connection, err error) {
112	if cp == nil {
113		return nil, ErrorNoPool
114	}
115
116	path, ok := "", false
117
118	if ConnPoolCallback != nil {
119		defer func(path *string, start time.Time) {
120			ConnPoolCallback(cp.host, *path, start, err)
121		}(&path, time.Now())
122	}
123
124	path = "short-circuit"
125
126	// short-circuit available connetions.
127	select {
128	case connectn, ok = <-cp.connections:
129		if !ok {
130			return nil, ErrorClosedPool
131		}
132		logging.Debugf("%v new connection from pool\n", cp.logPrefix)
133		atomic.AddInt32(&cp.freeConns, -1)
134		atomic.AddInt32(&cp.curActConns, 1)
135		return connectn, nil
136	default:
137	}
138
139	t := time.NewTimer(cp.availTimeout * time.Millisecond)
140	defer t.Stop()
141
142	// Try to grab an available connection within 1ms
143	select {
144	case connectn, ok = <-cp.connections:
145		path = "avail1"
146		if !ok {
147			return nil, ErrorClosedPool
148		}
149		logging.Debugf("%v new connection (avail1) from pool\n", cp.logPrefix)
150		atomic.AddInt32(&cp.freeConns, -1)
151		atomic.AddInt32(&cp.curActConns, 1)
152		return connectn, nil
153
154	case <-t.C:
155		// No connection came around in time, let's see
156		// whether we can get one or build a new one first.
157		t.Reset(d) // Reuse the timer for the full timeout.
158		select {
159		case connectn, ok = <-cp.connections:
160			path = "avail2"
161			if !ok {
162				return nil, ErrorClosedPool
163			}
164			logging.Debugf("%v new connection (avail2) from pool\n", cp.logPrefix)
165			atomic.AddInt32(&cp.freeConns, -1)
166			atomic.AddInt32(&cp.curActConns, 1)
167			return connectn, nil
168
169		case cp.createsem <- true:
170			path = "create"
171			// Build a connection if we can't get a real one.
172			// This can potentially be an overflow connection, or
173			// a pooled connection.
174			connectn, err := cp.mkConn(cp.host)
175			if err != nil {
176				// On error, release our create hold
177				<-cp.createsem
178			} else {
179				atomic.AddInt32(&cp.curActConns, 1)
180			}
181			logging.Debugf("%v new connection (create) from pool\n", cp.logPrefix)
182			return connectn, err
183
184		case <-t.C:
185			return nil, ErrorPoolTimeout
186		}
187	}
188}
189
190func (cp *connectionPool) Renew(conn *connection) (*connection, error) {
191
192	newConn, err := cp.mkConn(cp.host)
193	if err == nil {
194		logging.Infof("%v closing unhealthy connection %q\n", cp.logPrefix, conn.conn.LocalAddr())
195		conn.conn.Close()
196		conn = newConn
197	}
198
199	return conn, err
200}
201
202func (cp *connectionPool) Get() (*connection, error) {
203	return cp.GetWithTimeout(cp.timeout * time.Millisecond)
204}
205
206func (cp *connectionPool) Return(connectn *connection, healthy bool) {
207	defer atomic.AddInt32(&cp.curActConns, -1)
208	if connectn == nil || connectn.conn == nil {
209		return
210	}
211
212	laddr := connectn.conn.LocalAddr()
213	if cp == nil {
214		logging.Infof("%v pool closed\n", cp.logPrefix, laddr)
215		connectn.conn.Close()
216	}
217
218	if healthy {
219		defer func() {
220			if recover() != nil {
221				// This happens when the pool has already been
222				// closed and we're trying to return a
223				// connection to it anyway.  Just close the
224				// connection.
225				connectn.conn.Close()
226			}
227		}()
228
229		select {
230		case cp.connections <- connectn:
231			logging.Debugf("%v connection %q reclaimed to pool\n", cp.logPrefix, laddr)
232			atomic.AddInt32(&cp.freeConns, 1)
233		default:
234			logging.Debugf("%v closing overflow connection %q poolSize=%v\n", cp.logPrefix, laddr, len(cp.connections))
235			<-cp.createsem
236			connectn.conn.Close()
237		}
238
239	} else {
240		logging.Infof("%v closing unhealthy connection %q\n", cp.logPrefix, laddr)
241		<-cp.createsem
242		connectn.conn.Close()
243	}
244}
245
246func max(a, b int32) int32 {
247	if a > b {
248		return a
249	}
250	return b
251}
252
253func (cp *connectionPool) numConnsToRetain() (int32, bool) {
254	avg := cp.ewma.Rate()
255	act := atomic.LoadInt32(&cp.curActConns)
256	num := max(act, int32(avg))
257	num = max(cp.minPoolSizeWM, num)
258	fc := atomic.LoadInt32(&cp.freeConns)
259	totalConns := act + fc
260	if totalConns-cp.relConnBatchSize >= num {
261		// Don't release more than relConnBatchSize number of connections
262		// in 1 iteration
263		logging.Debugf("%v releasinng connections ...", cp.logPrefix)
264		return totalConns - cp.relConnBatchSize, true
265	}
266	return totalConns, false
267}
268
269func (cp *connectionPool) releaseConns(numRetConns int32) {
270	for {
271		fc := atomic.LoadInt32(&cp.freeConns)
272		act := atomic.LoadInt32(&cp.curActConns)
273		totalConns := act + fc
274		if totalConns > numRetConns && fc > 0 {
275			select {
276			case conn, ok := <-cp.connections:
277				if !ok {
278					return
279				}
280				atomic.AddInt32(&cp.freeConns, -1)
281				conn.conn.Close()
282			default:
283				break
284			}
285		} else {
286			break
287		}
288	}
289}
290
291func (cp *connectionPool) releaseConnsRoutine() {
292	i := 0
293	j := 0
294	for {
295		time.Sleep(time.Second)
296		select {
297		case <-cp.stopCh:
298			logging.Infof("%v Stopping releaseConnsRoutine", cp.logPrefix)
299			return
300
301		default:
302			// ewma.Update happens every second
303			act := atomic.LoadInt32(&cp.curActConns)
304			cp.ewma.Update(int64(act))
305
306			// ewma.Tick() and ewma.Rate() is called every 5 seconds.
307			if i == CONN_RELEASE_INTERVAL-1 {
308				cp.ewma.Tick()
309				numRetConns, needToFreeConns := cp.numConnsToRetain()
310				if needToFreeConns {
311					cp.releaseConns(numRetConns)
312				}
313			}
314
315			// Log active and free connection count history every minute.
316			fc := atomic.LoadInt32(&cp.freeConns)
317			if j == CONN_COUNT_LOG_INTERVAL-1 {
318				logging.Infof("%v active conns %v, free conns %v", cp.logPrefix, act, fc)
319			}
320
321			i = (i + 1) % CONN_RELEASE_INTERVAL
322			j = (j + 1) % CONN_COUNT_LOG_INTERVAL
323		}
324	}
325}
326