diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..154913e --- /dev/null +++ b/constants.go @@ -0,0 +1,19 @@ +package redis_pool + +const ( + // Maximum number of idle connections in the pool. + DefaultMaxIdle = 1 + + // Maximum number of connections allocated by the pool at a given time. + // When zero, there is no limit on the number of connections in the pool. + DefaultMaxActive = 1 + + // Close connections after remaining idle for this duration. If the value + // is zero, then idle connections are not closed. Applications should set + // the timeout to a value less than the server's timeout. + DefaultIdleTimeout = 0 + + // If Wait is true and the pool is at the MaxActive limit, then Get() waits + // for a connection to be returned to the pool before returning. + DefaultWait = false +) diff --git a/pool.go b/pool.go index 1f23286..23a6c8e 100644 --- a/pool.go +++ b/pool.go @@ -1,12 +1,11 @@ -package overflow_redis_pool +package redis_pool import ( "bytes" - "container/list" - "context" "crypto/rand" "crypto/sha1" "errors" + "fmt" "io" "strconv" "sync" @@ -18,14 +17,16 @@ import ( var nowFunc = time.Now // for testing -// ErrPoolExhausted is returned from a pool connection method (Do, Send, -// Receive, Flush, Err) when the maximum number of database connections in the -// pool has been reached. -var ErrPoolExhausted = errors.New("redigo: connection pool exhausted") +var ( + // ErrPoolExhausted is returned from a pool connection method (Do, Send, + // Receive, Flush, Err) when the maximum number of database connections in the + // pool has been reached. + ErrPoolFull = errors.New("Redis pool: Pool is reached maximum capacity.") +) var ( - errPoolClosed = errors.New("redigo: connection pool closed") - errConnClosed = errors.New("redigo: connection closed") + errPoolClosed = errors.New("Redis pool: connection pool closed") + errConnClosed = errors.New("Redis pool: connection closed") ) // Pool maintains a pool of connections. The application calls the Get method @@ -103,22 +104,24 @@ var ( // } // type Pool interface { + Start() error + Stop() + Get() redis.Conn - ActiveCount() int - Close() error + Put(conn redis.Conn) + + Capacity() int + Available() int } type pool struct { - ctx context.Context - handler PoolHandler - // mu protects fields defined below. - mu sync.Mutex - cond *sync.Cond - closed bool - active int + ph PoolHandler - // Stack of idleConn with most recently used at the front. - idle list.List + connections map[redis.Conn]*pooledConnection + connectionQueueChan chan *pooledConnection + + stopChan chan struct{} + stopWg sync.WaitGroup } type idleConn struct { @@ -129,171 +132,164 @@ type idleConn struct { // NewPool creates a new pool. // // Deprecated: Initialize the Pool directory as shown in the example. -func NewPool(ctx context.Context, handler PoolHandler) Pool { +func New(ph PoolHandler) Pool { p := &pool{ - ctx: ctx, - handler: handler, + ph: ph, } return p } -// Get gets a connection. The application must close the returned connection. -// This method always returns a valid connection so that applications can defer -// error handling to the first use of the connection. If there is an error -// getting an underlying connection, then the connection Err, Do, Send, Flush -// and Receive methods return that error. -func (p *pool) Get() redis.Conn { - c, err := p.get() - if err != nil { - return errorConnection{err} +func (p *pool) Start() error { + if nil == p.ph { + panic("GRPC Pool: pool handler must be specified.") } - return &pooledConnection{p: p, c: c} -} + p.ph.Validate() -// ActiveCount returns the number of active connections in the pool. -func (p *pool) ActiveCount() int { - p.mu.Lock() - active := p.active - p.mu.Unlock() - return active -} + if p.stopChan != nil { + return fmt.Errorf("GRPC Pool: pool is already running. Stop it before starting it again") + } + p.stopChan = make(chan struct{}) + p.connections = make(map[redis.Conn]*pooledConnection, p.ph.GetMaxCapacity()) + p.connectionQueueChan = make(chan *pooledConnection, p.ph.GetMaxCapacity()) -// Close releases the resources used by the pool. -func (p *pool) Close() error { - p.mu.Lock() - idle := p.idle - p.idle.Init() - p.closed = true - p.active -= idle.Len() - if p.cond != nil { - p.cond.Broadcast() - } - p.mu.Unlock() - for e := idle.Front(); e != nil; e = e.Next() { - e.Value.(idleConn).c.Close() + if 0 < p.ph.GetMaxIdle() { + for i := 0; i < p.ph.GetMaxIdle(); i++ { + if err := p.createConnection(); nil != err { + p.Stop() + return err + } + } } + return nil } -// release decrements the active count and signals waiters. The caller must -// hold p.mu during the call. -func (p *pool) release() { - p.active -= 1 - if p.cond != nil { - p.cond.Signal() +func (p *pool) Stop() { + if p.stopChan == nil { + panic("GRPC Pool: pool must be started before stopping it") } + close(p.connectionQueueChan) + + for k, _ := range p.connections { + p.destroyConnection(k) + } + + p.stopWg.Wait() + p.stopChan = nil } -// get prunes stale connections and returns a connection from the idle list or -// creates a new connection. -func (p *pool) get() (redis.Conn, error) { - p.mu.Lock() +func (p *pool) createConnection() error { + maxCapacity := p.ph.GetMaxCapacity() + currentConnectionCount := len(p.connections) + if currentConnectionCount >= maxCapacity { + return ErrPoolFull + } - // Prune stale connections. + conn, err := p.ph.Dial() + if err != nil { + return err + } + pc := &pooledConnection{ + p: p, + c: conn, + idleTime: time.Now(), + inUse: false, + } + p.connections[conn] = pc + p.connectionQueueChan <- pc + p.stopWg.Add(1) - if timeout := p.handler.GetIdleTimeout(); timeout > 0 { - for i, n := 0, p.idle.Len(); i < n; i++ { - e := p.idle.Back() - if e == nil { - break - } - ic := e.Value.(idleConn) - if ic.t.Add(timeout).After(nowFunc()) { - break - } - p.idle.Remove(e) - p.release() - p.mu.Unlock() - ic.c.Close() - p.mu.Lock() + return nil +} + +func (p *pool) destroyConnection(conn redis.Conn) error { + var err error + pc, ok := p.connections[conn] + if !ok { + return nil + } + err = pc.c.Close() + if nil != err { + return err + } + delete(p.connections, conn) + p.stopWg.Done() + + return nil +} + +func (p *pool) get() (*pooledConnection, error) { + var pc *pooledConnection + var err error + + avail := len(p.connectionQueueChan) + if 0 == avail { + if err = p.createConnection(); nil != err { + return nil, err } } for { - - // Get idle connection. - - for i, n := 0, p.idle.Len(); i < n; i++ { - e := p.idle.Front() - if e == nil { - break - } - ic := e.Value.(idleConn) - p.idle.Remove(e) - test := p.handler.TestOnBorrow - p.mu.Unlock() - if test == nil || test(ic.c, ic.t) == nil { - return ic.c, nil - } - ic.c.Close() - p.mu.Lock() - p.release() + select { + case pc = <-p.connectionQueueChan: + // All good + default: } - - // Check for pool closed before dialing a new connection. - - if p.closed { - p.mu.Unlock() - return nil, errors.New("redigo: get on closed pool") + idleTimeout := p.ph.GetIdleTimeout() + if 1 < len(p.connectionQueueChan) && 0 < idleTimeout && pc.idleTime.Add(idleTimeout).Before(time.Now()) { + go p.destroyConnection(pc.c) + continue } + break + } - // Dial new connection if under limit. + return pc, nil +} - if p.handler.GetMaxActive() == 0 || p.active < p.handler.GetMaxActive() { - dial := p.handler.Dial - p.active += 1 - p.mu.Unlock() - c, err := dial() - if err != nil { - p.mu.Lock() - p.release() - p.mu.Unlock() - c = nil - } - return c, err - } +func (p *pool) Get() redis.Conn { + var err error + var pc *pooledConnection + if pc, err = p.get(); nil != err { + return errorConnection{err} + } - if !p.handler.IsWait() { - p.mu.Unlock() - return nil, ErrPoolExhausted - } + pc.inUse = true - if p.cond == nil { - p.cond = sync.NewCond(&p.mu) - } - p.cond.Wait() + return pc +} + +func (p *pool) Put(conn redis.Conn) { + pc, ok := p.connections[conn.(*pooledConnection).c] + if !ok { + return + } + + pc.idleTime = time.Now() + pc.inUse = false + + select { + case p.connectionQueueChan <- pc: + // All good + default: + // channel is full + return } } -func (p *pool) put(c redis.Conn, forceClose bool) error { - err := c.Err() - p.mu.Lock() - if !p.closed && err == nil && !forceClose { - p.idle.PushFront(idleConn{t: nowFunc(), c: c}) - if p.idle.Len() > p.handler.GetMaxIdle() { - c = p.idle.Remove(p.idle.Back()).(idleConn).c - } else { - c = nil - } - } +func (p *pool) Capacity() int { + return cap(p.connectionQueueChan) +} - if c == nil { - if p.cond != nil { - p.cond.Signal() - } - p.mu.Unlock() - return nil - } - - p.release() - p.mu.Unlock() - return c.Close() +func (p *pool) Available() int { + return len(p.connectionQueueChan) } type pooledConnection struct { - p *pool - c redis.Conn - state int + p *pool + c redis.Conn + idleTime time.Time + inUse bool + state int } var ( @@ -347,7 +343,9 @@ func (pc *pooledConnection) Close() error { } } c.Do("") - pc.p.put(c, pc.state != 0) + + pc.p.Put(pc) + return nil } diff --git a/pool_handler.go b/pool_handler.go index 9743dad..5529668 100644 --- a/pool_handler.go +++ b/pool_handler.go @@ -1,4 +1,4 @@ -package overflow_redis_pool +package redis_pool import ( "time" @@ -21,12 +21,12 @@ type PoolHandler interface { // closed. TestOnBorrow(c redis.Conn, t time.Time) error - // Maximum number of idle connections in the pool. - GetMaxIdle() int - // Maximum number of connections allocated by the pool at a given time. // When zero, there is no limit on the number of connections in the pool. - GetMaxActive() int + GetMaxCapacity() int + + // Maximum number of idle connections in the pool. + GetMaxIdle() int // Close connections after remaining idle for this duration. If the value // is zero, then idle connections are not closed. Applications should set @@ -36,4 +36,6 @@ type PoolHandler interface { // If Wait is true and the pool is at the MaxActive limit, then Get() waits // for a connection to be returned to the pool before returning. IsWait() bool + + Validate() } diff --git a/pool_handlers.go b/pool_handlers.go index 94ec40c..feac814 100644 --- a/pool_handlers.go +++ b/pool_handlers.go @@ -1,18 +1,19 @@ -package overflow_redis_pool +package redis_pool import ( + "fmt" "time" "github.com/garyburd/redigo/redis" ) type PoolHandlers struct { - // Maximum number of idle connections in the pool. - MaxIdle int - // Maximum number of connections allocated by the pool at a given time. // When zero, there is no limit on the number of connections in the pool. - MaxActive int + MaxCapacity int + + // Maximum number of idle connections in the pool. + MaxIdle int // Close connections after remaining idle for this duration. If the value // is zero, then idle connections are not closed. Applications should set @@ -24,26 +25,35 @@ type PoolHandlers struct { Wait bool } -func (h *PoolHandlers) Dial() (redis.Conn, error) { - return nil, nil +func (ph *PoolHandlers) Dial() (redis.Conn, error) { + return nil, fmt.Errorf("Redis Pool: Dial method is not implemented") } -func (h *PoolHandlers) TestOnBorrow(c redis.Conn, t time.Time) error { +func (ph *PoolHandlers) TestOnBorrow(c redis.Conn, t time.Time) error { return nil } -func (h *PoolHandlers) GetMaxIdle() int { - return h.MaxIdle +func (ph *PoolHandlers) GetMaxCapacity() int { + return ph.MaxCapacity +} +func (ph *PoolHandlers) GetMaxIdle() int { + return ph.MaxIdle +} +func (ph *PoolHandlers) GetIdleTimeout() time.Duration { + return ph.IdleTimeout +} +func (ph *PoolHandlers) IsWait() bool { + return ph.Wait } -func (h *PoolHandlers) GetMaxActive() int { - return h.MaxActive -} - -func (h *PoolHandlers) GetIdleTimeout() time.Duration { - return h.IdleTimeout -} - -func (h *PoolHandlers) IsWait() bool { - return h.Wait +func (ph *PoolHandlers) Validate() { + if ph.MaxIdle <= 0 { + ph.MaxIdle = DefaultMaxIdle + } + if ph.MaxActive <= 0 { + ph.MaxActive = DefaultMaxActive + } + if ph.IdleTimeout <= 0 { + ph.IdleTimeout = DefaultIdleTimeout + } }