package redis_pool import ( "bytes" "crypto/rand" "crypto/sha1" "errors" "fmt" "io" "strconv" "sync" "time" "git.loafle.net/commons_go/redis_pool/internal" "github.com/garyburd/redigo/redis" ) var nowFunc = time.Now // for testing var ( // ErrPoolExhausted is returned from a pool connection method (Do, Send, // Receive, Flush, Err) when the maximum number of database connections in the // pool has been reached. ErrPoolFull = errors.New("Redis pool: Pool is reached maximum capacity.") ) var ( errPoolClosed = errors.New("Redis pool: connection pool closed") errConnClosed = errors.New("Redis pool: connection closed") ) // Pool maintains a pool of connections. The application calls the Get method // to get a connection from the pool and the connection's Close method to // return the connection's resources to the pool. // // The following example shows how to use a pool in a web application. The // application creates a pool at application startup and makes it available to // request handlers using a package level variable. The pool configuration used // here is an example, not a recommendation. // // func newPool(addr string) *redis.Pool { // return &redis.Pool{ // MaxIdle: 3, // IdleTimeout: 240 * time.Second, // Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) }, // } // } // // var ( // pool *redis.Pool // redisServer = flag.String("redisServer", ":6379", "") // ) // // func main() { // flag.Parse() // pool = newPool(*redisServer) // ... // } // // A request handler gets a connection from the pool and closes the connection // when the handler is done: // // func serveHome(w http.ResponseWriter, r *http.Request) { // conn := pool.Get() // defer conn.Close() // ... // } // // Use the Dial function to authenticate connections with the AUTH command or // select a database with the SELECT command: // // pool := &redis.Pool{ // // Other pool configuration not shown in this example. // Dial: func () (redis.Conn, error) { // c, err := redis.Dial("tcp", server) // if err != nil { // return nil, err // } // if _, err := c.Do("AUTH", password); err != nil { // c.Close() // return nil, err // } // if _, err := c.Do("SELECT", db); err != nil { // c.Close() // return nil, err // } // return c, nil // } // } // // Use the TestOnBorrow function to check the health of an idle connection // before the connection is returned to the application. This example PINGs // connections that have been idle more than a minute: // // pool := &redis.Pool{ // // Other pool configuration not shown in this example. // TestOnBorrow: func(c redis.Conn, t time.Time) error { // if time.Since(t) < time.Minute { // return nil // } // _, err := c.Do("PING") // return err // }, // } // type Pool interface { Start() error Stop() Get() redis.Conn Put(conn redis.Conn) Capacity() int Available() int } type pool struct { ph PoolHandler connections map[redis.Conn]*pooledConnection connectionQueueChan chan *pooledConnection stopChan chan struct{} stopWg sync.WaitGroup } type idleConn struct { c redis.Conn t time.Time } // NewPool creates a new pool. // // Deprecated: Initialize the Pool directory as shown in the example. func New(ph PoolHandler) Pool { p := &pool{ ph: ph, } return p } func (p *pool) Start() error { if nil == p.ph { panic("GRPC Pool: pool handler must be specified.") } p.ph.Validate() if p.stopChan != nil { return fmt.Errorf("GRPC Pool: pool is already running. Stop it before starting it again") } p.stopChan = make(chan struct{}) p.connections = make(map[redis.Conn]*pooledConnection, p.ph.GetMaxCapacity()) p.connectionQueueChan = make(chan *pooledConnection, p.ph.GetMaxCapacity()) if 0 < p.ph.GetMaxIdle() { for i := 0; i < p.ph.GetMaxIdle(); i++ { if err := p.createConnection(); nil != err { p.Stop() return err } } } return nil } func (p *pool) Stop() { if p.stopChan == nil { panic("GRPC Pool: pool must be started before stopping it") } close(p.connectionQueueChan) for k, _ := range p.connections { p.destroyConnection(k) } p.stopWg.Wait() p.stopChan = nil } func (p *pool) createConnection() error { maxCapacity := p.ph.GetMaxCapacity() currentConnectionCount := len(p.connections) if currentConnectionCount >= maxCapacity { return ErrPoolFull } conn, err := p.ph.Dial() if err != nil { return err } pc := &pooledConnection{ p: p, c: conn, idleTime: time.Now(), inUse: false, } p.connections[conn] = pc p.connectionQueueChan <- pc p.stopWg.Add(1) return nil } func (p *pool) destroyConnection(conn redis.Conn) error { var err error pc, ok := p.connections[conn] if !ok { return nil } err = pc.c.Close() if nil != err { return err } delete(p.connections, conn) p.stopWg.Done() return nil } func (p *pool) get() (*pooledConnection, error) { var pc *pooledConnection var err error avail := len(p.connectionQueueChan) if 0 == avail { if err = p.createConnection(); nil != err { return nil, err } } for { select { case pc = <-p.connectionQueueChan: // All good default: } idleTimeout := p.ph.GetIdleTimeout() if 1 < len(p.connectionQueueChan) && 0 < idleTimeout && pc.idleTime.Add(idleTimeout).Before(time.Now()) { go p.destroyConnection(pc.c) continue } break } return pc, nil } func (p *pool) Get() redis.Conn { var err error var pc *pooledConnection if pc, err = p.get(); nil != err { return errorConnection{err} } pc.inUse = true return pc } func (p *pool) Put(conn redis.Conn) { pc, ok := p.connections[conn.(*pooledConnection).c] if !ok { return } pc.idleTime = time.Now() pc.inUse = false select { case p.connectionQueueChan <- pc: // All good default: // channel is full return } } func (p *pool) Capacity() int { return cap(p.connectionQueueChan) } func (p *pool) Available() int { return len(p.connectionQueueChan) } type pooledConnection struct { p *pool c redis.Conn idleTime time.Time inUse bool state int } var ( sentinel []byte sentinelOnce sync.Once ) func initSentinel() { p := make([]byte, 64) if _, err := rand.Read(p); err == nil { sentinel = p } else { h := sha1.New() io.WriteString(h, "Oops, rand failed. Use time instead.") io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10)) sentinel = h.Sum(nil) } } func (pc *pooledConnection) Close() error { c := pc.c if _, ok := c.(errorConnection); ok { return nil } pc.c = errorConnection{errConnClosed} if pc.state&internal.MultiState != 0 { c.Send("DISCARD") pc.state &^= (internal.MultiState | internal.WatchState) } else if pc.state&internal.WatchState != 0 { c.Send("UNWATCH") pc.state &^= internal.WatchState } if pc.state&internal.SubscribeState != 0 { c.Send("UNSUBSCRIBE") c.Send("PUNSUBSCRIBE") // To detect the end of the message stream, ask the server to echo // a sentinel value and read until we see that value. sentinelOnce.Do(initSentinel) c.Send("ECHO", sentinel) c.Flush() for { p, err := c.Receive() if err != nil { break } if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { pc.state &^= internal.SubscribeState break } } } c.Do("") pc.p.Put(pc) return nil } func (pc *pooledConnection) Err() error { return pc.c.Err() } func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) { ci := internal.LookupCommandInfo(commandName) pc.state = (pc.state | ci.Set) &^ ci.Clear return pc.c.Do(commandName, args...) } func (pc *pooledConnection) Send(commandName string, args ...interface{}) error { ci := internal.LookupCommandInfo(commandName) pc.state = (pc.state | ci.Set) &^ ci.Clear return pc.c.Send(commandName, args...) } func (pc *pooledConnection) Flush() error { return pc.c.Flush() } func (pc *pooledConnection) Receive() (reply interface{}, err error) { return pc.c.Receive() } type errorConnection struct{ err error } func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err } func (ec errorConnection) Send(string, ...interface{}) error { return ec.err } func (ec errorConnection) Err() error { return ec.err } func (ec errorConnection) Close() error { return ec.err } func (ec errorConnection) Flush() error { return ec.err } func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err }