overflow_server_app/backend/pool.go
2017-08-23 18:19:21 +09:00

145 lines
2.8 KiB
Go

package backend
import (
"fmt"
"sync"
"google.golang.org/grpc"
)
type Pool interface {
// Get gets a client. The application must close the returned client.
// This method always returns a valid client so that applications can defer
// error handling to the first use of the client. If there is an error
// getting an underlying client, then the client Err, Do, Send, Flush
// and Receive methods return that error.
Get() (PooledClient, error)
// Capacity returns the number of maximum clients in the pool.
Capacity() int
// Available returns the number of avaliable clients in the pool.
Available() int
// Destroy releases the resources used by the pool.
Destroy()
}
type pool struct {
dial func() (*grpc.ClientConn, error)
newClient func(*grpc.ClientConn) (interface{}, error)
exec func(client interface{}, target string, method string, params []string) (string, error)
initCapacity int
maxCapacity int
conn *grpc.ClientConn
mtx sync.Mutex
clients chan PooledClient
}
func NewPool(o Options) (Pool, error) {
if o.Dial == nil {
return nil, fmt.Errorf("invalid Dial settings")
}
if o.NewClient == nil {
return nil, fmt.Errorf("invalid NewClient settings")
}
if o.InitCapacity < 0 || o.MaxCapacity < 0 {
return nil, fmt.Errorf("invalid capacity settings")
}
p := &pool{
dial: o.Dial,
newClient: o.NewClient,
exec: o.Exec,
initCapacity: o.InitCapacity,
maxCapacity: o.MaxCapacity,
clients: make(chan PooledClient, o.InitCapacity),
}
var err error
p.conn, err = p.dial()
if nil != err {
return nil, err
}
for i := 0; i < p.initCapacity; i++ {
pc, err := p.create()
if err != nil {
p.conn.Close()
return nil, err
}
p.clients <- pc
}
return p, nil
}
func (p *pool) Capacity() int {
return cap(p.clients)
}
func (p *pool) Available() int {
return len(p.clients)
}
func (p *pool) Get() (PooledClient, error) {
if p.clients == nil {
// pool aleardy destroyed, returns new client
return p.create()
}
for {
select {
case c := <-p.clients:
return c, nil
default:
return p.create()
}
}
}
func (p *pool) Destroy() {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.clients == nil {
// pool aleardy destroyed
return
}
close(p.clients)
p.clients = nil
p.conn.Close()
}
func (p *pool) create() (PooledClient, error) {
c, err := p.newClient(p.conn)
if err != nil {
return nil, err
}
pc := &pooledClient{
p: p,
c: c,
}
return pc, nil
}
type pooledClient struct {
p *pool
c interface{}
}
func (pc *pooledClient) Exec(target string, method string, params []string) (string, error) {
return pc.p.exec(pc.c, target, method, params)
}
func (pc *pooledClient) Close() {
select {
case pc.p.clients <- pc:
return
default:
// pool is full, close passed connection
return
}
}