145 lines
2.8 KiB
Go
145 lines
2.8 KiB
Go
package backend
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type Pool interface {
|
|
// Get gets a client. The application must close the returned client.
|
|
// This method always returns a valid client so that applications can defer
|
|
// error handling to the first use of the client. If there is an error
|
|
// getting an underlying client, then the client Err, Do, Send, Flush
|
|
// and Receive methods return that error.
|
|
Get() (PooledClient, error)
|
|
|
|
// Capacity returns the number of maximum clients in the pool.
|
|
Capacity() int
|
|
|
|
// Available returns the number of avaliable clients in the pool.
|
|
Available() int
|
|
|
|
// Destroy releases the resources used by the pool.
|
|
Destroy()
|
|
}
|
|
|
|
type pool struct {
|
|
dial func() (*grpc.ClientConn, error)
|
|
newClient func(*grpc.ClientConn) (interface{}, error)
|
|
exec func(client interface{}, target string, method string, params []string) (string, error)
|
|
initCapacity int
|
|
maxCapacity int
|
|
|
|
conn *grpc.ClientConn
|
|
mtx sync.Mutex
|
|
clients chan PooledClient
|
|
}
|
|
|
|
func NewPool(o Options) (Pool, error) {
|
|
if o.Dial == nil {
|
|
return nil, fmt.Errorf("invalid Dial settings")
|
|
}
|
|
|
|
if o.NewClient == nil {
|
|
return nil, fmt.Errorf("invalid NewClient settings")
|
|
}
|
|
|
|
if o.InitCapacity < 0 || o.MaxCapacity < 0 {
|
|
return nil, fmt.Errorf("invalid capacity settings")
|
|
}
|
|
|
|
p := &pool{
|
|
dial: o.Dial,
|
|
newClient: o.NewClient,
|
|
exec: o.Exec,
|
|
initCapacity: o.InitCapacity,
|
|
maxCapacity: o.MaxCapacity,
|
|
clients: make(chan PooledClient, o.InitCapacity),
|
|
}
|
|
|
|
var err error
|
|
p.conn, err = p.dial()
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
|
|
for i := 0; i < p.initCapacity; i++ {
|
|
pc, err := p.create()
|
|
if err != nil {
|
|
p.conn.Close()
|
|
return nil, err
|
|
}
|
|
p.clients <- pc
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *pool) Capacity() int {
|
|
return cap(p.clients)
|
|
}
|
|
|
|
func (p *pool) Available() int {
|
|
return len(p.clients)
|
|
}
|
|
|
|
func (p *pool) Get() (PooledClient, error) {
|
|
if p.clients == nil {
|
|
// pool aleardy destroyed, returns new client
|
|
return p.create()
|
|
}
|
|
for {
|
|
select {
|
|
case c := <-p.clients:
|
|
return c, nil
|
|
default:
|
|
return p.create()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *pool) Destroy() {
|
|
p.mtx.Lock()
|
|
defer p.mtx.Unlock()
|
|
if p.clients == nil {
|
|
// pool aleardy destroyed
|
|
return
|
|
}
|
|
close(p.clients)
|
|
p.clients = nil
|
|
p.conn.Close()
|
|
}
|
|
|
|
func (p *pool) create() (PooledClient, error) {
|
|
c, err := p.newClient(p.conn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pc := &pooledClient{
|
|
p: p,
|
|
c: c,
|
|
}
|
|
return pc, nil
|
|
}
|
|
|
|
type pooledClient struct {
|
|
p *pool
|
|
c interface{}
|
|
}
|
|
|
|
func (pc *pooledClient) Exec(target string, method string, params []string) (string, error) {
|
|
return pc.p.exec(pc.c, target, method, params)
|
|
}
|
|
|
|
func (pc *pooledClient) Close() {
|
|
select {
|
|
case pc.p.clients <- pc:
|
|
return
|
|
default:
|
|
// pool is full, close passed connection
|
|
return
|
|
}
|
|
}
|