Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 88 additions & 16 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pool
import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"sync"
"time"
//"reflect"
Expand Down Expand Up @@ -47,13 +46,16 @@ type channelPool struct {
maxActive int
openingConns int
connReqs []chan connReq
openerCh chan struct{}
}

type idleConn struct {
conn interface{}
t time.Time
}

var connectionRequestQueueSize = 1000000

// NewChannelPool 初始化连接
func NewChannelPool(poolConfig *Config) (Pool, error) {
if !(poolConfig.InitialCap <= poolConfig.MaxIdle && poolConfig.MaxCap >= poolConfig.MaxIdle && poolConfig.InitialCap >= 0) {
Expand All @@ -73,12 +75,15 @@ func NewChannelPool(poolConfig *Config) (Pool, error) {
idleTimeout: poolConfig.IdleTimeout,
maxActive: poolConfig.MaxCap,
openingConns: poolConfig.InitialCap,
openerCh: make(chan struct{}, connectionRequestQueueSize),
}

if poolConfig.Ping != nil {
c.ping = poolConfig.Ping
}

go c.connectionOpener()

for i := 0; i < poolConfig.InitialCap; i++ {
conn, err := c.factory()
if err != nil {
Expand All @@ -99,6 +104,36 @@ func (c *channelPool) getConns() chan *idleConn {
return conns
}

// connectionOpener separate goroutine for opening new connection
func (c *channelPool) connectionOpener() {
for {
select {
case _, ok := <-c.openerCh:
if !ok {
return
}
c.openNewConnection()
}
}
}

// openNewConnection Open one new connection
func (c *channelPool) openNewConnection() {
conn, err := c.factory()
if err != nil {
c.mu.Lock()
c.openingConns--
c.maybeOpenNewConnections()
c.mu.Unlock()

// put nil connection into pool to wake up pending channel fetch
c.Put(nil)
return
}

c.Put(conn)
}

// Get 从pool中取一个连接
func (c *channelPool) Get() (interface{}, error) {
conns := c.getConns()
Expand Down Expand Up @@ -129,7 +164,6 @@ func (c *channelPool) Get() (interface{}, error) {
return wrapConn.conn, nil
default:
c.mu.Lock()
log.Debugf("openConn %v %v", c.openingConns, c.maxActive)
if c.openingConns >= c.maxActive {
req := make(chan connReq, 1)
c.connReqs = append(c.connReqs, req)
Expand All @@ -138,6 +172,9 @@ func (c *channelPool) Get() (interface{}, error) {
if !ok {
return nil, ErrMaxActiveConnReached
}
if ret.idleConn.conn == nil {
return nil, errors.New("failed to create a new connection")
}
if timeout := c.idleTimeout; timeout > 0 {
if ret.idleConn.t.Add(timeout).Before(time.Now()) {
//丢弃并关闭该连接
Expand All @@ -151,27 +188,26 @@ func (c *channelPool) Get() (interface{}, error) {
c.mu.Unlock()
return nil, ErrClosed
}

// c.factory 耗时较长,采用乐观策略,先增加,失败后再减少
c.openingConns++
c.mu.Unlock()
conn, err := c.factory()
if err != nil {
c.mu.Lock()
c.openingConns--
c.mu.Unlock()
return nil, err
}
c.openingConns++
c.mu.Unlock()
return conn, nil
}
}
}

// Put 将连接放回pool中
func (c *channelPool) Put(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}

c.mu.Lock()

if c.conns == nil {
if c.conns == nil && conn != nil {
c.mu.Unlock()
return c.Close(conn)
}
Expand All @@ -180,12 +216,17 @@ func (c *channelPool) Put(conn interface{}) error {
req := c.connReqs[0]
copy(c.connReqs, c.connReqs[1:])
c.connReqs = c.connReqs[:l-1]
req <- connReq{
idleConn: &idleConn{conn: conn, t: time.Now()},
if conn == nil {
req <- connReq{idleConn: nil}
//return errors.New("connection is nil. rejecting")
} else {
req <- connReq{
idleConn: &idleConn{conn: conn, t: time.Now()},
}
}
c.mu.Unlock()
return nil
} else {
} else if conn != nil {
select {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
c.mu.Unlock()
Expand All @@ -196,20 +237,46 @@ func (c *channelPool) Put(conn interface{}) error {
return c.Close(conn)
}
}

c.mu.Unlock()
return errors.New("connection is nil, rejecting")
}

// maybeOpenNewConnections 如果有请求在,并且池里的连接上限未达到时,开启新的连接
// Assumes c.mu is locked
func (c *channelPool) maybeOpenNewConnections() {
numRequest := len(c.connReqs)

if c.maxActive > 0 {
numCanOpen := c.maxActive - c.openingConns
if numRequest > numCanOpen {
numRequest = numCanOpen
}
}
for numRequest > 0 {
c.openingConns++
numRequest--
c.openerCh <- struct{}{}
}
}

// Close 关闭单条连接
func (c *channelPool) Close(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.Lock()
defer c.mu.Unlock()
if c.close == nil {
return nil
}

var err error
err = c.close(conn)

c.mu.Lock()
c.openingConns--
return c.close(conn)
c.maybeOpenNewConnections()
c.mu.Unlock()
return err
}

// Ping 检查单条连接是否有效
Expand All @@ -229,13 +296,18 @@ func (c *channelPool) Release() {
c.ping = nil
closeFun := c.close
c.close = nil
openerCh := c.openerCh
c.openerCh = nil
c.mu.Unlock()

if conns == nil {
return
}

// close channels
close(conns)
close(openerCh)

for wrapConn := range conns {
//log.Printf("Type %v\n",reflect.TypeOf(wrapConn.conn))
closeFun(wrapConn.conn)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/silenceper/pool

go 1.13

require github.com/sirupsen/logrus v1.4.2
//require github.com/sirupsen/logrus v1.4.2