refactoring

This commit is contained in:
crusader 2017-11-09 14:46:04 +09:00
parent 1e3c1c1446
commit ea670e520a
4 changed files with 79 additions and 52 deletions

8
constants.go Normal file
View File

@ -0,0 +1,8 @@
package overflow_grpc_pool
const (
// DefaultWriteTimeout is default value of Write Timeout
DefaultIdleTimeout = 0
DefaultMaxIdle = 0
DefaultMaxCapacity = 1
)

105
pool.go
View File

@ -1,8 +1,9 @@
package overflow_grpc_pool package overflow_grpc_pool
import ( import (
"context"
"errors" "errors"
"fmt"
"sync"
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -21,49 +22,80 @@ type pooledInstance struct {
} }
type Pool interface { type Pool interface {
Start() error
Stop()
Get() (interface{}, error) Get() (interface{}, error)
Put(i interface{}) Put(i interface{})
Capacity() int Capacity() int
Available() int Available() int
Destroy()
} }
type pool struct { type pool struct {
ctx context.Context ph PoolHandler
handler PoolHandler instances map[interface{}]*pooledInstance
instances map[interface{}]*pooledInstance instanceQueueChan chan *pooledInstance
instancesCh chan *pooledInstance
stopChan chan struct{}
stopWg sync.WaitGroup
} }
func New(ctx context.Context, handler PoolHandler) (Pool, error) { func New(ph PoolHandler) (Pool, error) {
var err error
p := &pool{ p := &pool{
ctx: ctx, ph: ph,
handler: handler,
instances: make(map[interface{}]*pooledInstance, handler.GetMaxCapacity()),
instancesCh: make(chan *pooledInstance, handler.GetMaxCapacity()),
}
if 0 < handler.GetMaxIdle() {
for i := 0; i < handler.GetMaxIdle(); i++ {
err = p.createInstance()
if nil != err {
return nil, err
}
}
} }
return p, nil return p, nil
} }
func (p *pool) Start() error {
if nil == p.ph {
panic("GRPC Pool: pool handler must be specified.")
}
p.ph.Validate()
if p.stopChan != nil {
return fmt.Errorf("GRPC Pool: pool is already running. Stop it before starting it again")
}
p.stopChan = make(chan struct{})
p.instances = make(map[interface{}]*pooledInstance, p.ph.GetMaxCapacity())
p.instanceQueueChan = make(chan *pooledInstance, p.ph.GetMaxCapacity())
if 0 < p.ph.GetMaxIdle() {
for i := 0; i < p.ph.GetMaxIdle(); i++ {
if err := p.createInstance(); nil != err {
p.Stop()
return err
}
}
}
return nil
}
func (p *pool) Stop() {
if p.stopChan == nil {
panic("GRPC Pool: pool must be started before stopping it")
}
close(p.instanceQueueChan)
for k, _ := range p.instances {
p.destroyInstance(k)
}
p.stopWg.Wait()
p.stopChan = nil
}
func (p *pool) createInstance() error { func (p *pool) createInstance() error {
maxCapacity := p.handler.GetMaxCapacity() maxCapacity := p.ph.GetMaxCapacity()
currentInstanceCount := len(p.instances) currentInstanceCount := len(p.instances)
if currentInstanceCount >= maxCapacity { if currentInstanceCount >= maxCapacity {
return ErrPoolFull return ErrPoolFull
} }
conn, i, err := p.handler.OnCreate() conn, i, err := p.ph.OnCreate()
if err != nil { if err != nil {
return err return err
} }
@ -74,7 +106,8 @@ func (p *pool) createInstance() error {
inUse: false, inUse: false,
} }
p.instances[i] = pi p.instances[i] = pi
p.instancesCh <- pi p.instanceQueueChan <- pi
p.stopWg.Add(1)
return nil return nil
} }
@ -90,6 +123,7 @@ func (p *pool) destroyInstance(i interface{}) error {
return err return err
} }
delete(p.instances, i) delete(p.instances, i)
p.stopWg.Done()
return nil return nil
} }
@ -98,7 +132,7 @@ func (p *pool) get() (*pooledInstance, error) {
var pi *pooledInstance var pi *pooledInstance
var err error var err error
avail := len(p.instancesCh) avail := len(p.instanceQueueChan)
if 0 == avail { if 0 == avail {
if err = p.createInstance(); nil != err { if err = p.createInstance(); nil != err {
return nil, err return nil, err
@ -107,12 +141,12 @@ func (p *pool) get() (*pooledInstance, error) {
for { for {
select { select {
case pi = <-p.instancesCh: case pi = <-p.instanceQueueChan:
// All good // All good
default: default:
} }
idleTimeout := p.handler.GetIdleTimeout() idleTimeout := p.ph.GetIdleTimeout()
if 1 < len(p.instancesCh) && 0 < idleTimeout && pi.idleTime.Add(idleTimeout).Before(time.Now()) { if 1 < len(p.instanceQueueChan) && 0 < idleTimeout && pi.idleTime.Add(idleTimeout).Before(time.Now()) {
go p.destroyInstance(pi.instance) go p.destroyInstance(pi.instance)
continue continue
} }
@ -144,7 +178,7 @@ func (p *pool) Put(i interface{}) {
pi.inUse = false pi.inUse = false
select { select {
case p.instancesCh <- pi: case p.instanceQueueChan <- pi:
// All good // All good
default: default:
// channel is full // channel is full
@ -154,18 +188,9 @@ func (p *pool) Put(i interface{}) {
} }
func (p *pool) Capacity() int { func (p *pool) Capacity() int {
return cap(p.instancesCh) return cap(p.instanceQueueChan)
} }
func (p *pool) Available() int { func (p *pool) Available() int {
return len(p.instancesCh) return len(p.instanceQueueChan)
}
func (p *pool) Destroy() {
close(p.instancesCh)
for k, _ := range p.instances {
p.destroyInstance(k)
}
} }

View File

@ -6,18 +6,11 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
const (
// DefaultWriteTimeout is default value of Write Timeout
DefaultIdleTimeout = 0
DefaultMaxIdle = 0
DefaultMaxCapacity = 1
)
type PoolHandler interface { type PoolHandler interface {
Dial() (*grpc.ClientConn, interface{}, error)
GetIdleTimeout() time.Duration GetIdleTimeout() time.Duration
GetMaxIdle() int GetMaxIdle() int
GetMaxCapacity() int GetMaxCapacity() int
OnCreate() (*grpc.ClientConn, interface{}, error)
Validate() Validate()
} }

View File

@ -13,6 +13,10 @@ type PoolHandlers struct {
MaxCapacity int MaxCapacity int
} }
func (h *PoolHandlers) Dial() (*grpc.ClientConn, interface{}, error) {
return nil, nil, nil
}
func (h *PoolHandlers) GetIdleTimeout() time.Duration { func (h *PoolHandlers) GetIdleTimeout() time.Duration {
return h.IdleTimeout return h.IdleTimeout
} }
@ -22,9 +26,6 @@ func (h *PoolHandlers) GetMaxIdle() int {
func (h *PoolHandlers) GetMaxCapacity() int { func (h *PoolHandlers) GetMaxCapacity() int {
return h.MaxCapacity return h.MaxCapacity
} }
func (h *PoolHandlers) OnCreate() (*grpc.ClientConn, interface{}, error) {
return nil, nil, nil
}
// Validate validates the configuration // Validate validates the configuration
func (o *PoolHandlers) Validate() { func (o *PoolHandlers) Validate() {