1package client 2 3import "errors" 4import "fmt" 5import "net" 6import "time" 7import "sync/atomic" 8 9import "github.com/couchbase/indexing/secondary/logging" 10import "github.com/couchbase/indexing/secondary/transport" 11import "github.com/couchbase/indexing/secondary/security" 12import protobuf "github.com/couchbase/indexing/secondary/protobuf/query" 13import gometrics "github.com/rcrowley/go-metrics" 14 15const ( 16 CONN_RELEASE_INTERVAL = 5 // Seconds. Don't change as long as go-metrics/ewma is being used. 17 NUM_CONN_RELEASE_INTERVALS = 60 // Don't change as long as go-metrics/ewma is being used. 18 CONN_COUNT_LOG_INTERVAL = 60 // Seconds. 19) 20 21// ErrorClosedPool 22var ErrorClosedPool = errors.New("queryport.closedPool") 23 24// ErrorNoPool 25var ErrorNoPool = errors.New("queryport.errorNoPool") 26 27// ErrorPoolTimeout 28var ErrorPoolTimeout = errors.New("queryport.connPoolTimeout") 29 30type connectionPool struct { 31 host string 32 mkConn func(host string) (*connection, error) 33 connections chan *connection 34 createsem chan bool 35 // config params 36 maxPayload int 37 timeout time.Duration 38 availTimeout time.Duration 39 logPrefix string 40 curActConns int32 41 minPoolSizeWM int32 42 freeConns int32 43 relConnBatchSize int32 44 stopCh chan bool 45 ewma gometrics.EWMA 46} 47 48type connection struct { 49 conn net.Conn 50 pkt *transport.TransportPacket 51} 52 53func newConnectionPool( 54 host string, 55 poolSize, poolOverflow, maxPayload int, 56 timeout, availTimeout time.Duration, 57 minPoolSizeWM int32, relConnBatchSize int32) *connectionPool { 58 59 cp := &connectionPool{ 60 host: host, 61 connections: make(chan *connection, poolSize), 62 createsem: make(chan bool, poolSize+poolOverflow), 63 maxPayload: maxPayload, 64 timeout: timeout, 65 availTimeout: availTimeout, 66 logPrefix: fmt.Sprintf("[Queryport-connpool:%v]", host), 67 minPoolSizeWM: minPoolSizeWM, 68 relConnBatchSize: relConnBatchSize, 69 stopCh: make(chan bool, 1), 70 } 71 cp.mkConn = cp.defaultMkConn 72 cp.ewma = gometrics.NewEWMA5() 73 logging.Infof("%v started poolsize %v overflow %v low WM %v relConn batch size %v ...\n", 74 cp.logPrefix, poolSize, poolOverflow, minPoolSizeWM, relConnBatchSize) 75 go cp.releaseConnsRoutine() 76 return cp 77} 78 79// ConnPoolTimeout is notified whenever connections are acquired from a pool. 80var ConnPoolCallback func(host string, source string, start time.Time, err error) 81 82func (cp *connectionPool) defaultMkConn(host string) (*connection, error) { 83 logging.Infof("%v open new connection ...\n", cp.logPrefix) 84 conn, err := security.MakeConn(host) 85 if err != nil { 86 return nil, err 87 } 88 flags := transport.TransportFlag(0).SetProtobuf() 89 pkt := transport.NewTransportPacket(cp.maxPayload, flags) 90 pkt.SetEncoder(transport.EncodingProtobuf, protobuf.ProtobufEncode) 91 pkt.SetDecoder(transport.EncodingProtobuf, protobuf.ProtobufDecode) 92 return &connection{conn, pkt}, nil 93} 94 95func (cp *connectionPool) Close() (err error) { 96 defer func() { 97 if r := recover(); r != nil { 98 logging.Verbosef("%v Close() crashed: %v\n", cp.logPrefix, r) 99 logging.Verbosef("%s", logging.StackTrace()) 100 } 101 }() 102 cp.stopCh <- true 103 close(cp.connections) 104 for connectn := range cp.connections { 105 connectn.conn.Close() 106 } 107 logging.Infof("%v ... stopped\n", cp.logPrefix) 108 return 109} 110 111func (cp *connectionPool) GetWithTimeout(d time.Duration) (connectn *connection, err error) { 112 if cp == nil { 113 return nil, ErrorNoPool 114 } 115 116 path, ok := "", false 117 118 if ConnPoolCallback != nil { 119 defer func(path *string, start time.Time) { 120 ConnPoolCallback(cp.host, *path, start, err) 121 }(&path, time.Now()) 122 } 123 124 path = "short-circuit" 125 126 // short-circuit available connetions. 127 select { 128 case connectn, ok = <-cp.connections: 129 if !ok { 130 return nil, ErrorClosedPool 131 } 132 logging.Debugf("%v new connection from pool\n", cp.logPrefix) 133 atomic.AddInt32(&cp.freeConns, -1) 134 atomic.AddInt32(&cp.curActConns, 1) 135 return connectn, nil 136 default: 137 } 138 139 t := time.NewTimer(cp.availTimeout * time.Millisecond) 140 defer t.Stop() 141 142 // Try to grab an available connection within 1ms 143 select { 144 case connectn, ok = <-cp.connections: 145 path = "avail1" 146 if !ok { 147 return nil, ErrorClosedPool 148 } 149 logging.Debugf("%v new connection (avail1) from pool\n", cp.logPrefix) 150 atomic.AddInt32(&cp.freeConns, -1) 151 atomic.AddInt32(&cp.curActConns, 1) 152 return connectn, nil 153 154 case <-t.C: 155 // No connection came around in time, let's see 156 // whether we can get one or build a new one first. 157 t.Reset(d) // Reuse the timer for the full timeout. 158 select { 159 case connectn, ok = <-cp.connections: 160 path = "avail2" 161 if !ok { 162 return nil, ErrorClosedPool 163 } 164 logging.Debugf("%v new connection (avail2) from pool\n", cp.logPrefix) 165 atomic.AddInt32(&cp.freeConns, -1) 166 atomic.AddInt32(&cp.curActConns, 1) 167 return connectn, nil 168 169 case cp.createsem <- true: 170 path = "create" 171 // Build a connection if we can't get a real one. 172 // This can potentially be an overflow connection, or 173 // a pooled connection. 174 connectn, err := cp.mkConn(cp.host) 175 if err != nil { 176 // On error, release our create hold 177 <-cp.createsem 178 } else { 179 atomic.AddInt32(&cp.curActConns, 1) 180 } 181 logging.Debugf("%v new connection (create) from pool\n", cp.logPrefix) 182 return connectn, err 183 184 case <-t.C: 185 return nil, ErrorPoolTimeout 186 } 187 } 188} 189 190func (cp *connectionPool) Renew(conn *connection) (*connection, error) { 191 192 newConn, err := cp.mkConn(cp.host) 193 if err == nil { 194 logging.Infof("%v closing unhealthy connection %q\n", cp.logPrefix, conn.conn.LocalAddr()) 195 conn.conn.Close() 196 conn = newConn 197 } 198 199 return conn, err 200} 201 202func (cp *connectionPool) Get() (*connection, error) { 203 return cp.GetWithTimeout(cp.timeout * time.Millisecond) 204} 205 206func (cp *connectionPool) Return(connectn *connection, healthy bool) { 207 defer atomic.AddInt32(&cp.curActConns, -1) 208 if connectn == nil || connectn.conn == nil { 209 return 210 } 211 212 laddr := connectn.conn.LocalAddr() 213 if cp == nil { 214 logging.Infof("%v pool closed\n", cp.logPrefix, laddr) 215 connectn.conn.Close() 216 } 217 218 if healthy { 219 defer func() { 220 if recover() != nil { 221 // This happens when the pool has already been 222 // closed and we're trying to return a 223 // connection to it anyway. Just close the 224 // connection. 225 connectn.conn.Close() 226 } 227 }() 228 229 select { 230 case cp.connections <- connectn: 231 logging.Debugf("%v connection %q reclaimed to pool\n", cp.logPrefix, laddr) 232 atomic.AddInt32(&cp.freeConns, 1) 233 default: 234 logging.Debugf("%v closing overflow connection %q poolSize=%v\n", cp.logPrefix, laddr, len(cp.connections)) 235 <-cp.createsem 236 connectn.conn.Close() 237 } 238 239 } else { 240 logging.Infof("%v closing unhealthy connection %q\n", cp.logPrefix, laddr) 241 <-cp.createsem 242 connectn.conn.Close() 243 } 244} 245 246func max(a, b int32) int32 { 247 if a > b { 248 return a 249 } 250 return b 251} 252 253func (cp *connectionPool) numConnsToRetain() (int32, bool) { 254 avg := cp.ewma.Rate() 255 act := atomic.LoadInt32(&cp.curActConns) 256 num := max(act, int32(avg)) 257 num = max(cp.minPoolSizeWM, num) 258 fc := atomic.LoadInt32(&cp.freeConns) 259 totalConns := act + fc 260 if totalConns-cp.relConnBatchSize >= num { 261 // Don't release more than relConnBatchSize number of connections 262 // in 1 iteration 263 logging.Debugf("%v releasinng connections ...", cp.logPrefix) 264 return totalConns - cp.relConnBatchSize, true 265 } 266 return totalConns, false 267} 268 269func (cp *connectionPool) releaseConns(numRetConns int32) { 270 for { 271 fc := atomic.LoadInt32(&cp.freeConns) 272 act := atomic.LoadInt32(&cp.curActConns) 273 totalConns := act + fc 274 if totalConns > numRetConns && fc > 0 { 275 select { 276 case conn, ok := <-cp.connections: 277 if !ok { 278 return 279 } 280 atomic.AddInt32(&cp.freeConns, -1) 281 conn.conn.Close() 282 default: 283 break 284 } 285 } else { 286 break 287 } 288 } 289} 290 291func (cp *connectionPool) releaseConnsRoutine() { 292 i := 0 293 j := 0 294 for { 295 time.Sleep(time.Second) 296 select { 297 case <-cp.stopCh: 298 logging.Infof("%v Stopping releaseConnsRoutine", cp.logPrefix) 299 return 300 301 default: 302 // ewma.Update happens every second 303 act := atomic.LoadInt32(&cp.curActConns) 304 cp.ewma.Update(int64(act)) 305 306 // ewma.Tick() and ewma.Rate() is called every 5 seconds. 307 if i == CONN_RELEASE_INTERVAL-1 { 308 cp.ewma.Tick() 309 numRetConns, needToFreeConns := cp.numConnsToRetain() 310 if needToFreeConns { 311 cp.releaseConns(numRetConns) 312 } 313 } 314 315 // Log active and free connection count history every minute. 316 fc := atomic.LoadInt32(&cp.freeConns) 317 if j == CONN_COUNT_LOG_INTERVAL-1 { 318 logging.Infof("%v active conns %v, free conns %v", cp.logPrefix, act, fc) 319 } 320 321 i = (i + 1) % CONN_RELEASE_INTERVAL 322 j = (j + 1) % CONN_COUNT_LOG_INTERVAL 323 } 324 } 325} 326