172 lines
2.8 KiB
Go
172 lines
2.8 KiB
Go
|
package overflow_grpc_pool
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"time"
|
||
|
|
||
|
"google.golang.org/grpc"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// ErrFullPool is the error when the pool is already full
|
||
|
ErrPoolFull = errors.New("grpc pool: Pool is reached maximum capacity.")
|
||
|
)
|
||
|
|
||
|
type pooledInstance struct {
|
||
|
clientConn *grpc.ClientConn
|
||
|
idleTime time.Time
|
||
|
instance interface{}
|
||
|
inUse bool
|
||
|
}
|
||
|
|
||
|
type Pool interface {
|
||
|
Get() (interface{}, error)
|
||
|
Put(i interface{})
|
||
|
Capacity() int
|
||
|
Available() int
|
||
|
Destroy()
|
||
|
}
|
||
|
|
||
|
type pool struct {
|
||
|
ctx context.Context
|
||
|
handler PoolHandler
|
||
|
instances map[interface{}]*pooledInstance
|
||
|
instancesCh chan *pooledInstance
|
||
|
}
|
||
|
|
||
|
func New(ctx context.Context, handler PoolHandler) (Pool, error) {
|
||
|
var err error
|
||
|
p := &pool{
|
||
|
ctx: ctx,
|
||
|
handler: handler,
|
||
|
instances: make(map[interface{}]*pooledInstance, handler.GetMaxCapacity()),
|
||
|
instancesCh: make(chan *pooledInstance, handler.GetMaxCapacity()),
|
||
|
}
|
||
|
|
||
|
if 0 < handler.GetMaxIdle() {
|
||
|
for i := 0; i < handler.GetMaxIdle(); i++ {
|
||
|
err = p.createInstance()
|
||
|
if nil != err {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return p, nil
|
||
|
}
|
||
|
|
||
|
func (p *pool) createInstance() error {
|
||
|
maxCapacity := p.handler.GetMaxCapacity()
|
||
|
currentInstanceCount := len(p.instances)
|
||
|
if currentInstanceCount >= maxCapacity {
|
||
|
return ErrPoolFull
|
||
|
}
|
||
|
|
||
|
conn, i, err := p.handler.OnCreate()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
pi := &pooledInstance{
|
||
|
clientConn: conn,
|
||
|
idleTime: time.Now(),
|
||
|
instance: i,
|
||
|
inUse: false,
|
||
|
}
|
||
|
p.instances[i] = pi
|
||
|
p.instancesCh <- pi
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *pool) destroyInstance(i interface{}) error {
|
||
|
var err error
|
||
|
pi, ok := p.instances[i]
|
||
|
if !ok {
|
||
|
return nil
|
||
|
}
|
||
|
err = pi.clientConn.Close()
|
||
|
if nil != err {
|
||
|
return err
|
||
|
}
|
||
|
delete(p.instances, i)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *pool) get() (*pooledInstance, error) {
|
||
|
var pi *pooledInstance
|
||
|
var err error
|
||
|
|
||
|
avail := len(p.instancesCh)
|
||
|
if 0 == avail {
|
||
|
if err = p.createInstance(); nil != err {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case pi = <-p.instancesCh:
|
||
|
// All good
|
||
|
default:
|
||
|
}
|
||
|
idleTimeout := p.handler.GetIdleTimeout()
|
||
|
if 1 < len(p.instancesCh) && 0 < idleTimeout && pi.idleTime.Add(idleTimeout).Before(time.Now()) {
|
||
|
go p.destroyInstance(pi.instance)
|
||
|
continue
|
||
|
}
|
||
|
break
|
||
|
}
|
||
|
|
||
|
return pi, nil
|
||
|
}
|
||
|
|
||
|
func (p *pool) Get() (interface{}, error) {
|
||
|
var err error
|
||
|
var pi *pooledInstance
|
||
|
if pi, err = p.get(); nil != err {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
pi.inUse = true
|
||
|
|
||
|
return pi.instance, nil
|
||
|
}
|
||
|
|
||
|
func (p *pool) Put(i interface{}) {
|
||
|
pi, ok := p.instances[i]
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
pi.idleTime = time.Now()
|
||
|
pi.inUse = false
|
||
|
|
||
|
select {
|
||
|
case p.instancesCh <- pi:
|
||
|
// All good
|
||
|
default:
|
||
|
// channel is full
|
||
|
return
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
func (p *pool) Capacity() int {
|
||
|
return cap(p.instancesCh)
|
||
|
}
|
||
|
|
||
|
func (p *pool) Available() int {
|
||
|
return len(p.instancesCh)
|
||
|
}
|
||
|
|
||
|
func (p *pool) Destroy() {
|
||
|
close(p.instancesCh)
|
||
|
|
||
|
for k, _ := range p.instances {
|
||
|
p.destroyInstance(k)
|
||
|
}
|
||
|
|
||
|
}
|