197 lines
3.3 KiB
Go
197 lines
3.3 KiB
Go
package grpc_pool
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
var (
|
|
// ErrFullPool is the error when the pool is already full
|
|
ErrPoolFull = errors.New("grpc pool: Pool is reached maximum capacity.")
|
|
)
|
|
|
|
type pooledInstance struct {
|
|
clientConn *grpc.ClientConn
|
|
idleTime time.Time
|
|
instance interface{}
|
|
inUse bool
|
|
}
|
|
|
|
type Pool interface {
|
|
Start() error
|
|
Stop()
|
|
|
|
Get() (interface{}, error)
|
|
Put(i interface{})
|
|
|
|
Capacity() int
|
|
Available() int
|
|
}
|
|
|
|
type pool struct {
|
|
ph PoolHandler
|
|
instances map[interface{}]*pooledInstance
|
|
instanceQueueChan chan *pooledInstance
|
|
|
|
stopChan chan struct{}
|
|
stopWg sync.WaitGroup
|
|
}
|
|
|
|
func New(ph PoolHandler) (Pool, error) {
|
|
p := &pool{
|
|
ph: ph,
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *pool) Start() error {
|
|
if nil == p.ph {
|
|
panic("GRPC Pool: pool handler must be specified.")
|
|
}
|
|
p.ph.Validate()
|
|
|
|
if p.stopChan != nil {
|
|
return fmt.Errorf("GRPC Pool: pool is already running. Stop it before starting it again")
|
|
}
|
|
p.stopChan = make(chan struct{})
|
|
p.instances = make(map[interface{}]*pooledInstance, p.ph.GetMaxCapacity())
|
|
p.instanceQueueChan = make(chan *pooledInstance, p.ph.GetMaxCapacity())
|
|
|
|
if 0 < p.ph.GetMaxIdle() {
|
|
for i := 0; i < p.ph.GetMaxIdle(); i++ {
|
|
if err := p.createInstance(); nil != err {
|
|
p.Stop()
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *pool) Stop() {
|
|
if p.stopChan == nil {
|
|
panic("GRPC Pool: pool must be started before stopping it")
|
|
}
|
|
close(p.instanceQueueChan)
|
|
|
|
for k, _ := range p.instances {
|
|
p.destroyInstance(k)
|
|
}
|
|
|
|
p.stopWg.Wait()
|
|
p.stopChan = nil
|
|
}
|
|
|
|
func (p *pool) createInstance() error {
|
|
maxCapacity := p.ph.GetMaxCapacity()
|
|
currentInstanceCount := len(p.instances)
|
|
if currentInstanceCount >= maxCapacity {
|
|
return ErrPoolFull
|
|
}
|
|
|
|
conn, i, err := p.ph.Dial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pi := &pooledInstance{
|
|
clientConn: conn,
|
|
idleTime: time.Now(),
|
|
instance: i,
|
|
inUse: false,
|
|
}
|
|
p.instances[i] = pi
|
|
p.instanceQueueChan <- pi
|
|
p.stopWg.Add(1)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *pool) destroyInstance(i interface{}) error {
|
|
var err error
|
|
pi, ok := p.instances[i]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
err = pi.clientConn.Close()
|
|
if nil != err {
|
|
return err
|
|
}
|
|
delete(p.instances, i)
|
|
p.stopWg.Done()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *pool) get() (*pooledInstance, error) {
|
|
var pi *pooledInstance
|
|
var err error
|
|
|
|
avail := len(p.instanceQueueChan)
|
|
if 0 == avail {
|
|
if err = p.createInstance(); nil != err {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case pi = <-p.instanceQueueChan:
|
|
// All good
|
|
default:
|
|
}
|
|
idleTimeout := p.ph.GetIdleTimeout()
|
|
if 1 < len(p.instanceQueueChan) && 0 < idleTimeout && pi.idleTime.Add(idleTimeout).Before(time.Now()) {
|
|
go p.destroyInstance(pi.instance)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
return pi, nil
|
|
}
|
|
|
|
func (p *pool) Get() (interface{}, error) {
|
|
var err error
|
|
var pi *pooledInstance
|
|
if pi, err = p.get(); nil != err {
|
|
return nil, err
|
|
}
|
|
|
|
pi.inUse = true
|
|
|
|
return pi.instance, nil
|
|
}
|
|
|
|
func (p *pool) Put(i interface{}) {
|
|
pi, ok := p.instances[i]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
pi.idleTime = time.Now()
|
|
pi.inUse = false
|
|
|
|
select {
|
|
case p.instanceQueueChan <- pi:
|
|
// All good
|
|
default:
|
|
// channel is full
|
|
return
|
|
}
|
|
|
|
}
|
|
|
|
func (p *pool) Capacity() int {
|
|
return cap(p.instanceQueueChan)
|
|
}
|
|
|
|
func (p *pool) Available() int {
|
|
return len(p.instanceQueueChan)
|
|
}
|