crusader f49a711b5a ing
2017-07-13 21:01:28 +09:00

168 lines
3.3 KiB
Go

package grpc
import (
"fmt"
"log"
"sync"
"git.loafle.net/overflow/overflow_service_websocket/pool"
"google.golang.org/grpc"
)
type ClientCreator func(*grpc.ClientConn) (interface{}, error)
type ConnectionCreator func() (*grpc.ClientConn, error)
type grpcPool struct {
creator ClientCreator
connectionCreator ConnectionCreator
clients chan interface{}
connections map[interface{}]*grpc.ClientConn
mtx sync.Mutex
initCapacity int
maxCapacity int
}
func New(initCap int, maxCap int, creator ClientCreator, connectionCreator ConnectionCreator) (pool.Pool, error) {
if initCap < 0 || maxCap <= 0 || initCap > maxCap {
return nil, fmt.Errorf("invalid capacity settings")
}
if creator == nil {
return nil, fmt.Errorf("invalid ClientCreator settings")
}
if connectionCreator == nil {
return nil, fmt.Errorf("invalid ConnectionCreator settings")
}
p := &grpcPool{
initCapacity: initCap,
maxCapacity: maxCap,
clients: make(chan interface{}, initCap),
connections: make(map[interface{}]*grpc.ClientConn, initCap),
creator: creator,
connectionCreator: connectionCreator,
}
for i := 0; i < initCap; i++ {
c, err := p.create()
if err != nil {
return p, err
}
p.clients <- c
}
return p, nil
}
func (p *grpcPool) Capacity() int {
return cap(p.clients)
}
func (p *grpcPool) Available() int {
return len(p.clients)
}
func (p *grpcPool) Get() (interface{}, error) {
if p.clients == nil {
// pool aleardy destroyed, returns new client
return p.create()
}
for {
select {
case c := <-p.clients:
return c, nil
default:
return p.create()
}
}
}
func (p *grpcPool) Put(c interface{}) {
select {
case p.clients <- c:
return
default:
// pool is full, close passed connection
p.destroy(c)
return
}
}
func (p *grpcPool) Destroy() {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.clients == nil {
// pool aleardy destroyed
return
}
close(p.clients)
for c := range p.clients {
p.destroy(c)
}
p.clients = nil
}
func (p *grpcPool) putConnection(c interface{}, conn *grpc.ClientConn) error {
if nil != p.connections[c] {
return fmt.Errorf("Connection alread exist")
}
p.connections[c] = conn
return nil
}
func (p *grpcPool) removeConnection(c interface{}) (*grpc.ClientConn, error) {
if nil == p.connections[c] {
return nil, fmt.Errorf("Connection is not exist")
}
conn := p.connections[c]
delete(p.connections, conn)
return conn, nil
}
func (p *grpcPool) create() (interface{}, error) {
if p.creator == nil {
return nil, fmt.Errorf("ClientCreator is nil, can not create client")
}
var err error
conn, err := p.connectionCreate()
if nil != err {
return nil, err
}
c, err := p.creator(conn)
if nil != err {
err = conn.Close()
return nil, err
}
err = p.putConnection(c, conn)
if nil != err {
err = conn.Close()
return nil, err
}
return c, nil
}
func (p *grpcPool) destroy(c interface{}) {
conn, err := p.removeConnection(c)
if nil != err {
log.Println(fmt.Errorf("%v", err))
return
}
err = conn.Close()
if nil != err {
log.Println(fmt.Errorf("%v", err))
return
}
}
func (p *grpcPool) connectionCreate() (*grpc.ClientConn, error) {
if p.connectionCreator == nil {
return nil, fmt.Errorf("ConnectionCreator is nil, can not create connection")
}
return p.connectionCreator()
}