2017-07-10 20:54:40 +09:00

105 lines
1.9 KiB
Go

package service
import (
"fmt"
"sync"
grpc "google.golang.org/grpc"
)
type ConnectionGenerator func() (*grpc.ClientConn, error)
type ConnectionChecker func(*grpc.ClientConn) bool
type ConnectionCloser func(*grpc.ClientConn)
type Pool struct {
New ConnectionGenerator
Ping ConnectionChecker
Close ConnectionCloser
Get func() (*grpc.ClientConn, error)
Put func(conn *grpc.ClientConn)
Destroy func()
store chan *grpc.ClientConn
mtx sync.Mutex
initCapacity int
maxCapacity int
}
func New(initCap int, maxCap int, connector ConnectionGenerator) (*Pool, error) {
if initCap < 0 || maxCap <= 0 || initCap > maxCap {
return nil, fmt.Errorf("invalid capacity settings")
}
p := new(Pool)
p.store = make(chan *grpc.ClientConn, maxCap)
if connector != nil {
p.New = connector
}
for i := 0; i < initCap; i++ {
conn, err := p.create()
if err != nil {
return p, err
}
p.store <- conn
}
return p, nil
}
func (p *Pool) Len() int {
return len(p.store)
}
func (p *Pool) Get() (*grpc.ClientConn, error) {
if p.store == nil {
// pool aleardy destroyed, returns new connection
return p.create()
}
for {
select {
case conn := <-p.store:
if p.Ping != nil && p.Ping(conn) == false {
continue
}
return conn, nil
default:
return p.create()
}
}
}
func (p *Pool) Put(conn *grpc.ClientConn) {
select {
case p.store <- conn:
return
default:
// pool is full, close passed connection
if p.Close != nil {
p.Close(conn)
}
return
}
}
func (p *Pool) Destroy() {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.store == nil {
// pool aleardy destroyed
return
}
close(p.store)
for conn := range p.store {
if p.Close != nil {
p.Close(conn)
}
}
p.store = nil
}
func (p *Pool) create() (*grpc.ClientConn, error) {
if p.New == nil {
return nil, fmt.Errorf("Pool.New is nil, can not create connection")
}
return p.New()
}