refactoring

This commit is contained in:
crusader 2017-11-09 16:40:55 +09:00
parent 8f60f3b463
commit 497fde3617
4 changed files with 206 additions and 177 deletions

19
constants.go Normal file
View File

@ -0,0 +1,19 @@
package redis_pool
const (
// Maximum number of idle connections in the pool.
DefaultMaxIdle = 1
// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
DefaultMaxActive = 1
// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
DefaultIdleTimeout = 0
// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
DefaultWait = false
)

280
pool.go
View File

@ -1,12 +1,11 @@
package overflow_redis_pool package redis_pool
import ( import (
"bytes" "bytes"
"container/list"
"context"
"crypto/rand" "crypto/rand"
"crypto/sha1" "crypto/sha1"
"errors" "errors"
"fmt"
"io" "io"
"strconv" "strconv"
"sync" "sync"
@ -18,14 +17,16 @@ import (
var nowFunc = time.Now // for testing var nowFunc = time.Now // for testing
var (
// ErrPoolExhausted is returned from a pool connection method (Do, Send, // ErrPoolExhausted is returned from a pool connection method (Do, Send,
// Receive, Flush, Err) when the maximum number of database connections in the // Receive, Flush, Err) when the maximum number of database connections in the
// pool has been reached. // pool has been reached.
var ErrPoolExhausted = errors.New("redigo: connection pool exhausted") ErrPoolFull = errors.New("Redis pool: Pool is reached maximum capacity.")
)
var ( var (
errPoolClosed = errors.New("redigo: connection pool closed") errPoolClosed = errors.New("Redis pool: connection pool closed")
errConnClosed = errors.New("redigo: connection closed") errConnClosed = errors.New("Redis pool: connection closed")
) )
// Pool maintains a pool of connections. The application calls the Get method // Pool maintains a pool of connections. The application calls the Get method
@ -103,22 +104,24 @@ var (
// } // }
// //
type Pool interface { type Pool interface {
Start() error
Stop()
Get() redis.Conn Get() redis.Conn
ActiveCount() int Put(conn redis.Conn)
Close() error
Capacity() int
Available() int
} }
type pool struct { type pool struct {
ctx context.Context ph PoolHandler
handler PoolHandler
// mu protects fields defined below.
mu sync.Mutex
cond *sync.Cond
closed bool
active int
// Stack of idleConn with most recently used at the front. connections map[redis.Conn]*pooledConnection
idle list.List connectionQueueChan chan *pooledConnection
stopChan chan struct{}
stopWg sync.WaitGroup
} }
type idleConn struct { type idleConn struct {
@ -129,170 +132,163 @@ type idleConn struct {
// NewPool creates a new pool. // NewPool creates a new pool.
// //
// Deprecated: Initialize the Pool directory as shown in the example. // Deprecated: Initialize the Pool directory as shown in the example.
func NewPool(ctx context.Context, handler PoolHandler) Pool { func New(ph PoolHandler) Pool {
p := &pool{ p := &pool{
ctx: ctx, ph: ph,
handler: handler,
} }
return p return p
} }
// Get gets a connection. The application must close the returned connection. func (p *pool) Start() error {
// This method always returns a valid connection so that applications can defer if nil == p.ph {
// error handling to the first use of the connection. If there is an error panic("GRPC Pool: pool handler must be specified.")
// getting an underlying connection, then the connection Err, Do, Send, Flush }
// and Receive methods return that error. p.ph.Validate()
func (p *pool) Get() redis.Conn {
c, err := p.get() if p.stopChan != nil {
if err != nil { return fmt.Errorf("GRPC Pool: pool is already running. Stop it before starting it again")
return errorConnection{err} }
p.stopChan = make(chan struct{})
p.connections = make(map[redis.Conn]*pooledConnection, p.ph.GetMaxCapacity())
p.connectionQueueChan = make(chan *pooledConnection, p.ph.GetMaxCapacity())
if 0 < p.ph.GetMaxIdle() {
for i := 0; i < p.ph.GetMaxIdle(); i++ {
if err := p.createConnection(); nil != err {
p.Stop()
return err
}
} }
return &pooledConnection{p: p, c: c}
} }
// ActiveCount returns the number of active connections in the pool.
func (p *pool) ActiveCount() int {
p.mu.Lock()
active := p.active
p.mu.Unlock()
return active
}
// Close releases the resources used by the pool.
func (p *pool) Close() error {
p.mu.Lock()
idle := p.idle
p.idle.Init()
p.closed = true
p.active -= idle.Len()
if p.cond != nil {
p.cond.Broadcast()
}
p.mu.Unlock()
for e := idle.Front(); e != nil; e = e.Next() {
e.Value.(idleConn).c.Close()
}
return nil return nil
} }
// release decrements the active count and signals waiters. The caller must func (p *pool) Stop() {
// hold p.mu during the call. if p.stopChan == nil {
func (p *pool) release() { panic("GRPC Pool: pool must be started before stopping it")
p.active -= 1
if p.cond != nil {
p.cond.Signal()
} }
close(p.connectionQueueChan)
for k, _ := range p.connections {
p.destroyConnection(k)
} }
// get prunes stale connections and returns a connection from the idle list or p.stopWg.Wait()
// creates a new connection. p.stopChan = nil
func (p *pool) get() (redis.Conn, error) {
p.mu.Lock()
// Prune stale connections.
if timeout := p.handler.GetIdleTimeout(); timeout > 0 {
for i, n := 0, p.idle.Len(); i < n; i++ {
e := p.idle.Back()
if e == nil {
break
} }
ic := e.Value.(idleConn)
if ic.t.Add(timeout).After(nowFunc()) { func (p *pool) createConnection() error {
break maxCapacity := p.ph.GetMaxCapacity()
currentConnectionCount := len(p.connections)
if currentConnectionCount >= maxCapacity {
return ErrPoolFull
} }
p.idle.Remove(e)
p.release() conn, err := p.ph.Dial()
p.mu.Unlock() if err != nil {
ic.c.Close() return err
p.mu.Lock() }
pc := &pooledConnection{
p: p,
c: conn,
idleTime: time.Now(),
inUse: false,
}
p.connections[conn] = pc
p.connectionQueueChan <- pc
p.stopWg.Add(1)
return nil
}
func (p *pool) destroyConnection(conn redis.Conn) error {
var err error
pc, ok := p.connections[conn]
if !ok {
return nil
}
err = pc.c.Close()
if nil != err {
return err
}
delete(p.connections, conn)
p.stopWg.Done()
return nil
}
func (p *pool) get() (*pooledConnection, error) {
var pc *pooledConnection
var err error
avail := len(p.connectionQueueChan)
if 0 == avail {
if err = p.createConnection(); nil != err {
return nil, err
} }
} }
for { for {
select {
// Get idle connection. case pc = <-p.connectionQueueChan:
// All good
for i, n := 0, p.idle.Len(); i < n; i++ { default:
e := p.idle.Front() }
if e == nil { idleTimeout := p.ph.GetIdleTimeout()
if 1 < len(p.connectionQueueChan) && 0 < idleTimeout && pc.idleTime.Add(idleTimeout).Before(time.Now()) {
go p.destroyConnection(pc.c)
continue
}
break break
} }
ic := e.Value.(idleConn)
p.idle.Remove(e) return pc, nil
test := p.handler.TestOnBorrow
p.mu.Unlock()
if test == nil || test(ic.c, ic.t) == nil {
return ic.c, nil
}
ic.c.Close()
p.mu.Lock()
p.release()
} }
// Check for pool closed before dialing a new connection. func (p *pool) Get() redis.Conn {
var err error
if p.closed { var pc *pooledConnection
p.mu.Unlock() if pc, err = p.get(); nil != err {
return nil, errors.New("redigo: get on closed pool") return errorConnection{err}
} }
// Dial new connection if under limit. pc.inUse = true
if p.handler.GetMaxActive() == 0 || p.active < p.handler.GetMaxActive() { return pc
dial := p.handler.Dial
p.active += 1
p.mu.Unlock()
c, err := dial()
if err != nil {
p.mu.Lock()
p.release()
p.mu.Unlock()
c = nil
}
return c, err
} }
if !p.handler.IsWait() { func (p *pool) Put(conn redis.Conn) {
p.mu.Unlock() pc, ok := p.connections[conn.(*pooledConnection).c]
return nil, ErrPoolExhausted if !ok {
return
} }
if p.cond == nil { pc.idleTime = time.Now()
p.cond = sync.NewCond(&p.mu) pc.inUse = false
}
p.cond.Wait() select {
case p.connectionQueueChan <- pc:
// All good
default:
// channel is full
return
} }
} }
func (p *pool) put(c redis.Conn, forceClose bool) error { func (p *pool) Capacity() int {
err := c.Err() return cap(p.connectionQueueChan)
p.mu.Lock()
if !p.closed && err == nil && !forceClose {
p.idle.PushFront(idleConn{t: nowFunc(), c: c})
if p.idle.Len() > p.handler.GetMaxIdle() {
c = p.idle.Remove(p.idle.Back()).(idleConn).c
} else {
c = nil
}
} }
if c == nil { func (p *pool) Available() int {
if p.cond != nil { return len(p.connectionQueueChan)
p.cond.Signal()
}
p.mu.Unlock()
return nil
}
p.release()
p.mu.Unlock()
return c.Close()
} }
type pooledConnection struct { type pooledConnection struct {
p *pool p *pool
c redis.Conn c redis.Conn
idleTime time.Time
inUse bool
state int state int
} }
@ -347,7 +343,9 @@ func (pc *pooledConnection) Close() error {
} }
} }
c.Do("") c.Do("")
pc.p.put(c, pc.state != 0)
pc.p.Put(pc)
return nil return nil
} }

View File

@ -1,4 +1,4 @@
package overflow_redis_pool package redis_pool
import ( import (
"time" "time"
@ -21,12 +21,12 @@ type PoolHandler interface {
// closed. // closed.
TestOnBorrow(c redis.Conn, t time.Time) error TestOnBorrow(c redis.Conn, t time.Time) error
// Maximum number of idle connections in the pool.
GetMaxIdle() int
// Maximum number of connections allocated by the pool at a given time. // Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool. // When zero, there is no limit on the number of connections in the pool.
GetMaxActive() int GetMaxCapacity() int
// Maximum number of idle connections in the pool.
GetMaxIdle() int
// Close connections after remaining idle for this duration. If the value // Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set // is zero, then idle connections are not closed. Applications should set
@ -36,4 +36,6 @@ type PoolHandler interface {
// If Wait is true and the pool is at the MaxActive limit, then Get() waits // If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning. // for a connection to be returned to the pool before returning.
IsWait() bool IsWait() bool
Validate()
} }

View File

@ -1,18 +1,19 @@
package overflow_redis_pool package redis_pool
import ( import (
"fmt"
"time" "time"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
) )
type PoolHandlers struct { type PoolHandlers struct {
// Maximum number of idle connections in the pool.
MaxIdle int
// Maximum number of connections allocated by the pool at a given time. // Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool. // When zero, there is no limit on the number of connections in the pool.
MaxActive int MaxCapacity int
// Maximum number of idle connections in the pool.
MaxIdle int
// Close connections after remaining idle for this duration. If the value // Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set // is zero, then idle connections are not closed. Applications should set
@ -24,26 +25,35 @@ type PoolHandlers struct {
Wait bool Wait bool
} }
func (h *PoolHandlers) Dial() (redis.Conn, error) { func (ph *PoolHandlers) Dial() (redis.Conn, error) {
return nil, nil return nil, fmt.Errorf("Redis Pool: Dial method is not implemented")
} }
func (h *PoolHandlers) TestOnBorrow(c redis.Conn, t time.Time) error { func (ph *PoolHandlers) TestOnBorrow(c redis.Conn, t time.Time) error {
return nil return nil
} }
func (h *PoolHandlers) GetMaxIdle() int { func (ph *PoolHandlers) GetMaxCapacity() int {
return h.MaxIdle return ph.MaxCapacity
}
func (ph *PoolHandlers) GetMaxIdle() int {
return ph.MaxIdle
}
func (ph *PoolHandlers) GetIdleTimeout() time.Duration {
return ph.IdleTimeout
}
func (ph *PoolHandlers) IsWait() bool {
return ph.Wait
} }
func (h *PoolHandlers) GetMaxActive() int { func (ph *PoolHandlers) Validate() {
return h.MaxActive if ph.MaxIdle <= 0 {
ph.MaxIdle = DefaultMaxIdle
} }
if ph.MaxActive <= 0 {
func (h *PoolHandlers) GetIdleTimeout() time.Duration { ph.MaxActive = DefaultMaxActive
return h.IdleTimeout }
if ph.IdleTimeout <= 0 {
ph.IdleTimeout = DefaultIdleTimeout
} }
func (h *PoolHandlers) IsWait() bool {
return h.Wait
} }