diff --git a/internal/commandinfo.go b/internal/commandinfo.go deleted file mode 100644 index ace0421..0000000 --- a/internal/commandinfo.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2014 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package internal // import "git.loafle.net/commons_go/redis_pool/internal" - -import ( - "strings" -) - -const ( - WatchState = 1 << iota - MultiState - SubscribeState - MonitorState -) - -type CommandInfo struct { - Set, Clear int -} - -var commandInfos = map[string]CommandInfo{ - "WATCH": {Set: WatchState}, - "UNWATCH": {Clear: WatchState}, - "MULTI": {Set: MultiState}, - "EXEC": {Clear: WatchState | MultiState}, - "DISCARD": {Clear: WatchState | MultiState}, - "PSUBSCRIBE": {Set: SubscribeState}, - "SUBSCRIBE": {Set: SubscribeState}, - "MONITOR": {Set: MonitorState}, -} - -func init() { - for n, ci := range commandInfos { - commandInfos[strings.ToLower(n)] = ci - } -} - -func LookupCommandInfo(commandName string) CommandInfo { - if ci, ok := commandInfos[commandName]; ok { - return ci - } - return commandInfos[strings.ToUpper(commandName)] -} diff --git a/pool.go b/pool.go index 23a6c8e..207a868 100644 --- a/pool.go +++ b/pool.go @@ -1,108 +1,14 @@ package redis_pool import ( - "bytes" - "crypto/rand" - "crypto/sha1" - "errors" "fmt" - "io" - "strconv" "sync" - "time" - "git.loafle.net/commons_go/redis_pool/internal" + "git.loafle.net/commons_go/logging" + "github.com/garyburd/redigo/redis" ) -var nowFunc = time.Now // for testing - -var ( - // ErrPoolExhausted is returned from a pool connection method (Do, Send, - // Receive, Flush, Err) when the maximum number of database connections in the - // pool has been reached. - ErrPoolFull = errors.New("Redis pool: Pool is reached maximum capacity.") -) - -var ( - errPoolClosed = errors.New("Redis pool: connection pool closed") - errConnClosed = errors.New("Redis pool: connection closed") -) - -// Pool maintains a pool of connections. The application calls the Get method -// to get a connection from the pool and the connection's Close method to -// return the connection's resources to the pool. -// -// The following example shows how to use a pool in a web application. The -// application creates a pool at application startup and makes it available to -// request handlers using a package level variable. The pool configuration used -// here is an example, not a recommendation. -// -// func newPool(addr string) *redis.Pool { -// return &redis.Pool{ -// MaxIdle: 3, -// IdleTimeout: 240 * time.Second, -// Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) }, -// } -// } -// -// var ( -// pool *redis.Pool -// redisServer = flag.String("redisServer", ":6379", "") -// ) -// -// func main() { -// flag.Parse() -// pool = newPool(*redisServer) -// ... -// } -// -// A request handler gets a connection from the pool and closes the connection -// when the handler is done: -// -// func serveHome(w http.ResponseWriter, r *http.Request) { -// conn := pool.Get() -// defer conn.Close() -// ... -// } -// -// Use the Dial function to authenticate connections with the AUTH command or -// select a database with the SELECT command: -// -// pool := &redis.Pool{ -// // Other pool configuration not shown in this example. -// Dial: func () (redis.Conn, error) { -// c, err := redis.Dial("tcp", server) -// if err != nil { -// return nil, err -// } -// if _, err := c.Do("AUTH", password); err != nil { -// c.Close() -// return nil, err -// } -// if _, err := c.Do("SELECT", db); err != nil { -// c.Close() -// return nil, err -// } -// return c, nil -// } -// } -// -// Use the TestOnBorrow function to check the health of an idle connection -// before the connection is returned to the application. This example PINGs -// connections that have been idle more than a minute: -// -// pool := &redis.Pool{ -// // Other pool configuration not shown in this example. -// TestOnBorrow: func(c redis.Conn, t time.Time) error { -// if time.Since(t) < time.Minute { -// return nil -// } -// _, err := c.Do("PING") -// return err -// }, -// } -// type Pool interface { Start() error Stop() @@ -117,267 +23,73 @@ type Pool interface { type pool struct { ph PoolHandler - connections map[redis.Conn]*pooledConnection - connectionQueueChan chan *pooledConnection + rp *redis.Pool stopChan chan struct{} stopWg sync.WaitGroup } -type idleConn struct { - c redis.Conn - t time.Time -} - -// NewPool creates a new pool. -// -// Deprecated: Initialize the Pool directory as shown in the example. func New(ph PoolHandler) Pool { p := &pool{ ph: ph, } + + p.rp = &redis.Pool{ + MaxIdle: ph.GetMaxIdle(), + MaxActive: ph.GetMaxCapacity(), + IdleTimeout: ph.GetIdleTimeout(), + Wait: ph.IsWait(), + Dial: ph.Dial, + } + return p } func (p *pool) Start() error { + if p.stopChan != nil { + panic("Redis Pool: pool is already running. Stop it before starting it again") + } + if nil == p.ph { - panic("GRPC Pool: pool handler must be specified.") + panic("Redis Pool: pool handler must be specified.") } p.ph.Validate() - if p.stopChan != nil { - return fmt.Errorf("GRPC Pool: pool is already running. Stop it before starting it again") - } p.stopChan = make(chan struct{}) - p.connections = make(map[redis.Conn]*pooledConnection, p.ph.GetMaxCapacity()) - p.connectionQueueChan = make(chan *pooledConnection, p.ph.GetMaxCapacity()) - - if 0 < p.ph.GetMaxIdle() { - for i := 0; i < p.ph.GetMaxIdle(); i++ { - if err := p.createConnection(); nil != err { - p.Stop() - return err - } - } - } return nil } func (p *pool) Stop() { if p.stopChan == nil { - panic("GRPC Pool: pool must be started before stopping it") - } - close(p.connectionQueueChan) - - for k, _ := range p.connections { - p.destroyConnection(k) + panic("Redis Pool: pool must be started before stopping it") } - p.stopWg.Wait() + if err := p.rp.Close(); nil != err { + logging.Logger().Error(fmt.Sprintf("Redis Pool: redis pool stop error %v", err)) + } + + close(p.stopChan) p.stopChan = nil } -func (p *pool) createConnection() error { - maxCapacity := p.ph.GetMaxCapacity() - currentConnectionCount := len(p.connections) - if currentConnectionCount >= maxCapacity { - return ErrPoolFull - } - - conn, err := p.ph.Dial() - if err != nil { - return err - } - pc := &pooledConnection{ - p: p, - c: conn, - idleTime: time.Now(), - inUse: false, - } - p.connections[conn] = pc - p.connectionQueueChan <- pc - p.stopWg.Add(1) - - return nil -} - -func (p *pool) destroyConnection(conn redis.Conn) error { - var err error - pc, ok := p.connections[conn] - if !ok { - return nil - } - err = pc.c.Close() - if nil != err { - return err - } - delete(p.connections, conn) - p.stopWg.Done() - - return nil -} - -func (p *pool) get() (*pooledConnection, error) { - var pc *pooledConnection - var err error - - avail := len(p.connectionQueueChan) - if 0 == avail { - if err = p.createConnection(); nil != err { - return nil, err - } - } - - for { - select { - case pc = <-p.connectionQueueChan: - // All good - default: - } - idleTimeout := p.ph.GetIdleTimeout() - if 1 < len(p.connectionQueueChan) && 0 < idleTimeout && pc.idleTime.Add(idleTimeout).Before(time.Now()) { - go p.destroyConnection(pc.c) - continue - } - break - } - - return pc, nil -} - func (p *pool) Get() redis.Conn { - var err error - var pc *pooledConnection - if pc, err = p.get(); nil != err { - return errorConnection{err} - } - - pc.inUse = true - - return pc + return p.rp.Get() } func (p *pool) Put(conn redis.Conn) { - pc, ok := p.connections[conn.(*pooledConnection).c] - if !ok { + if nil == conn { return } - - pc.idleTime = time.Now() - pc.inUse = false - - select { - case p.connectionQueueChan <- pc: - // All good - default: - // channel is full - return + if err := conn.Close(); nil != err { + logging.Logger().Error(fmt.Sprintf("Redis Pool: redis connection close error %v", err)) } } func (p *pool) Capacity() int { - return cap(p.connectionQueueChan) + return p.rp.ActiveCount() } func (p *pool) Available() int { - return len(p.connectionQueueChan) + return p.rp.IdleCount() } - -type pooledConnection struct { - p *pool - c redis.Conn - idleTime time.Time - inUse bool - state int -} - -var ( - sentinel []byte - sentinelOnce sync.Once -) - -func initSentinel() { - p := make([]byte, 64) - if _, err := rand.Read(p); err == nil { - sentinel = p - } else { - h := sha1.New() - io.WriteString(h, "Oops, rand failed. Use time instead.") - io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10)) - sentinel = h.Sum(nil) - } -} - -func (pc *pooledConnection) Close() error { - c := pc.c - if _, ok := c.(errorConnection); ok { - return nil - } - pc.c = errorConnection{errConnClosed} - - if pc.state&internal.MultiState != 0 { - c.Send("DISCARD") - pc.state &^= (internal.MultiState | internal.WatchState) - } else if pc.state&internal.WatchState != 0 { - c.Send("UNWATCH") - pc.state &^= internal.WatchState - } - if pc.state&internal.SubscribeState != 0 { - c.Send("UNSUBSCRIBE") - c.Send("PUNSUBSCRIBE") - // To detect the end of the message stream, ask the server to echo - // a sentinel value and read until we see that value. - sentinelOnce.Do(initSentinel) - c.Send("ECHO", sentinel) - c.Flush() - for { - p, err := c.Receive() - if err != nil { - break - } - if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { - pc.state &^= internal.SubscribeState - break - } - } - } - c.Do("") - - pc.p.Put(pc) - - return nil -} - -func (pc *pooledConnection) Err() error { - return pc.c.Err() -} - -func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) { - ci := internal.LookupCommandInfo(commandName) - pc.state = (pc.state | ci.Set) &^ ci.Clear - return pc.c.Do(commandName, args...) -} - -func (pc *pooledConnection) Send(commandName string, args ...interface{}) error { - ci := internal.LookupCommandInfo(commandName) - pc.state = (pc.state | ci.Set) &^ ci.Clear - return pc.c.Send(commandName, args...) -} - -func (pc *pooledConnection) Flush() error { - return pc.c.Flush() -} - -func (pc *pooledConnection) Receive() (reply interface{}, err error) { - return pc.c.Receive() -} - -type errorConnection struct{ err error } - -func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err } -func (ec errorConnection) Send(string, ...interface{}) error { return ec.err } -func (ec errorConnection) Err() error { return ec.err } -func (ec errorConnection) Close() error { return ec.err } -func (ec errorConnection) Flush() error { return ec.err } -func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err }