diff --git a/channel.go b/channel.go index 761ffce..50acca4 100644 --- a/channel.go +++ b/channel.go @@ -3,7 +3,6 @@ package pool import ( "errors" "fmt" - log "github.com/sirupsen/logrus" "sync" "time" //"reflect" @@ -47,6 +46,7 @@ type channelPool struct { maxActive int openingConns int connReqs []chan connReq + openerCh chan struct{} } type idleConn struct { @@ -54,6 +54,8 @@ type idleConn struct { t time.Time } +var connectionRequestQueueSize = 1000000 + // NewChannelPool 初始化连接 func NewChannelPool(poolConfig *Config) (Pool, error) { if !(poolConfig.InitialCap <= poolConfig.MaxIdle && poolConfig.MaxCap >= poolConfig.MaxIdle && poolConfig.InitialCap >= 0) { @@ -73,12 +75,15 @@ func NewChannelPool(poolConfig *Config) (Pool, error) { idleTimeout: poolConfig.IdleTimeout, maxActive: poolConfig.MaxCap, openingConns: poolConfig.InitialCap, + openerCh: make(chan struct{}, connectionRequestQueueSize), } if poolConfig.Ping != nil { c.ping = poolConfig.Ping } + go c.connectionOpener() + for i := 0; i < poolConfig.InitialCap; i++ { conn, err := c.factory() if err != nil { @@ -99,6 +104,36 @@ func (c *channelPool) getConns() chan *idleConn { return conns } +// connectionOpener separate goroutine for opening new connection +func (c *channelPool) connectionOpener() { + for { + select { + case _, ok := <-c.openerCh: + if !ok { + return + } + c.openNewConnection() + } + } +} + +// openNewConnection Open one new connection +func (c *channelPool) openNewConnection() { + conn, err := c.factory() + if err != nil { + c.mu.Lock() + c.openingConns-- + c.maybeOpenNewConnections() + c.mu.Unlock() + + // put nil connection into pool to wake up pending channel fetch + c.Put(nil) + return + } + + c.Put(conn) +} + // Get 从pool中取一个连接 func (c *channelPool) Get() (interface{}, error) { conns := c.getConns() @@ -129,7 +164,6 @@ func (c *channelPool) Get() (interface{}, error) { return wrapConn.conn, nil default: c.mu.Lock() - log.Debugf("openConn %v %v", c.openingConns, c.maxActive) if c.openingConns >= c.maxActive { req := make(chan connReq, 1) c.connReqs = append(c.connReqs, req) @@ -138,6 +172,9 @@ func (c *channelPool) Get() (interface{}, error) { if !ok { return nil, ErrMaxActiveConnReached } + if ret.idleConn.conn == nil { + return nil, errors.New("failed to create a new connection") + } if timeout := c.idleTimeout; timeout > 0 { if ret.idleConn.t.Add(timeout).Before(time.Now()) { //丢弃并关闭该连接 @@ -151,13 +188,17 @@ func (c *channelPool) Get() (interface{}, error) { c.mu.Unlock() return nil, ErrClosed } + + // c.factory 耗时较长,采用乐观策略,先增加,失败后再减少 + c.openingConns++ + c.mu.Unlock() conn, err := c.factory() if err != nil { + c.mu.Lock() + c.openingConns-- c.mu.Unlock() return nil, err } - c.openingConns++ - c.mu.Unlock() return conn, nil } } @@ -165,13 +206,8 @@ func (c *channelPool) Get() (interface{}, error) { // Put 将连接放回pool中 func (c *channelPool) Put(conn interface{}) error { - if conn == nil { - return errors.New("connection is nil. rejecting") - } - c.mu.Lock() - - if c.conns == nil { + if c.conns == nil && conn != nil { c.mu.Unlock() return c.Close(conn) } @@ -180,12 +216,17 @@ func (c *channelPool) Put(conn interface{}) error { req := c.connReqs[0] copy(c.connReqs, c.connReqs[1:]) c.connReqs = c.connReqs[:l-1] - req <- connReq{ - idleConn: &idleConn{conn: conn, t: time.Now()}, + if conn == nil { + req <- connReq{idleConn: nil} + //return errors.New("connection is nil. rejecting") + } else { + req <- connReq{ + idleConn: &idleConn{conn: conn, t: time.Now()}, + } } c.mu.Unlock() return nil - } else { + } else if conn != nil { select { case c.conns <- &idleConn{conn: conn, t: time.Now()}: c.mu.Unlock() @@ -196,6 +237,27 @@ func (c *channelPool) Put(conn interface{}) error { return c.Close(conn) } } + + c.mu.Unlock() + return errors.New("connection is nil, rejecting") +} + +// maybeOpenNewConnections 如果有请求在,并且池里的连接上限未达到时,开启新的连接 +// Assumes c.mu is locked +func (c *channelPool) maybeOpenNewConnections() { + numRequest := len(c.connReqs) + + if c.maxActive > 0 { + numCanOpen := c.maxActive - c.openingConns + if numRequest > numCanOpen { + numRequest = numCanOpen + } + } + for numRequest > 0 { + c.openingConns++ + numRequest-- + c.openerCh <- struct{}{} + } } // Close 关闭单条连接 @@ -203,13 +265,18 @@ func (c *channelPool) Close(conn interface{}) error { if conn == nil { return errors.New("connection is nil. rejecting") } - c.mu.Lock() - defer c.mu.Unlock() if c.close == nil { return nil } + + var err error + err = c.close(conn) + + c.mu.Lock() c.openingConns-- - return c.close(conn) + c.maybeOpenNewConnections() + c.mu.Unlock() + return err } // Ping 检查单条连接是否有效 @@ -229,13 +296,18 @@ func (c *channelPool) Release() { c.ping = nil closeFun := c.close c.close = nil + openerCh := c.openerCh + c.openerCh = nil c.mu.Unlock() if conns == nil { return } + // close channels close(conns) + close(openerCh) + for wrapConn := range conns { //log.Printf("Type %v\n",reflect.TypeOf(wrapConn.conn)) closeFun(wrapConn.conn) diff --git a/go.mod b/go.mod index 49ecc4e..2a6c80e 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/silenceper/pool go 1.13 -require github.com/sirupsen/logrus v1.4.2 +//require github.com/sirupsen/logrus v1.4.2