diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..bd308dc --- /dev/null +++ b/constants.go @@ -0,0 +1,8 @@ +package overflow_grpc_pool + +const ( + // DefaultWriteTimeout is default value of Write Timeout + DefaultIdleTimeout = 0 + DefaultMaxIdle = 0 + DefaultMaxCapacity = 1 +) diff --git a/pool.go b/pool.go index 392408c..94aa757 100644 --- a/pool.go +++ b/pool.go @@ -1,8 +1,9 @@ package overflow_grpc_pool import ( - "context" "errors" + "fmt" + "sync" "time" "google.golang.org/grpc" @@ -21,49 +22,80 @@ type pooledInstance struct { } type Pool interface { + Start() error + Stop() + Get() (interface{}, error) Put(i interface{}) + Capacity() int Available() int - Destroy() } type pool struct { - ctx context.Context - handler PoolHandler - instances map[interface{}]*pooledInstance - instancesCh chan *pooledInstance + ph PoolHandler + instances map[interface{}]*pooledInstance + instanceQueueChan chan *pooledInstance + + stopChan chan struct{} + stopWg sync.WaitGroup } -func New(ctx context.Context, handler PoolHandler) (Pool, error) { - var err error +func New(ph PoolHandler) (Pool, error) { p := &pool{ - ctx: ctx, - handler: handler, - instances: make(map[interface{}]*pooledInstance, handler.GetMaxCapacity()), - instancesCh: make(chan *pooledInstance, handler.GetMaxCapacity()), - } - - if 0 < handler.GetMaxIdle() { - for i := 0; i < handler.GetMaxIdle(); i++ { - err = p.createInstance() - if nil != err { - return nil, err - } - } + ph: ph, } return p, nil } +func (p *pool) Start() error { + if nil == p.ph { + panic("GRPC Pool: pool handler must be specified.") + } + p.ph.Validate() + + if p.stopChan != nil { + return fmt.Errorf("GRPC Pool: pool is already running. Stop it before starting it again") + } + p.stopChan = make(chan struct{}) + p.instances = make(map[interface{}]*pooledInstance, p.ph.GetMaxCapacity()) + p.instanceQueueChan = make(chan *pooledInstance, p.ph.GetMaxCapacity()) + + if 0 < p.ph.GetMaxIdle() { + for i := 0; i < p.ph.GetMaxIdle(); i++ { + if err := p.createInstance(); nil != err { + p.Stop() + return err + } + } + } + + return nil +} + +func (p *pool) Stop() { + if p.stopChan == nil { + panic("GRPC Pool: pool must be started before stopping it") + } + close(p.instanceQueueChan) + + for k, _ := range p.instances { + p.destroyInstance(k) + } + + p.stopWg.Wait() + p.stopChan = nil +} + func (p *pool) createInstance() error { - maxCapacity := p.handler.GetMaxCapacity() + maxCapacity := p.ph.GetMaxCapacity() currentInstanceCount := len(p.instances) if currentInstanceCount >= maxCapacity { return ErrPoolFull } - conn, i, err := p.handler.OnCreate() + conn, i, err := p.ph.OnCreate() if err != nil { return err } @@ -74,7 +106,8 @@ func (p *pool) createInstance() error { inUse: false, } p.instances[i] = pi - p.instancesCh <- pi + p.instanceQueueChan <- pi + p.stopWg.Add(1) return nil } @@ -90,6 +123,7 @@ func (p *pool) destroyInstance(i interface{}) error { return err } delete(p.instances, i) + p.stopWg.Done() return nil } @@ -98,7 +132,7 @@ func (p *pool) get() (*pooledInstance, error) { var pi *pooledInstance var err error - avail := len(p.instancesCh) + avail := len(p.instanceQueueChan) if 0 == avail { if err = p.createInstance(); nil != err { return nil, err @@ -107,12 +141,12 @@ func (p *pool) get() (*pooledInstance, error) { for { select { - case pi = <-p.instancesCh: + case pi = <-p.instanceQueueChan: // All good default: } - idleTimeout := p.handler.GetIdleTimeout() - if 1 < len(p.instancesCh) && 0 < idleTimeout && pi.idleTime.Add(idleTimeout).Before(time.Now()) { + idleTimeout := p.ph.GetIdleTimeout() + if 1 < len(p.instanceQueueChan) && 0 < idleTimeout && pi.idleTime.Add(idleTimeout).Before(time.Now()) { go p.destroyInstance(pi.instance) continue } @@ -144,7 +178,7 @@ func (p *pool) Put(i interface{}) { pi.inUse = false select { - case p.instancesCh <- pi: + case p.instanceQueueChan <- pi: // All good default: // channel is full @@ -154,18 +188,9 @@ func (p *pool) Put(i interface{}) { } func (p *pool) Capacity() int { - return cap(p.instancesCh) + return cap(p.instanceQueueChan) } func (p *pool) Available() int { - return len(p.instancesCh) -} - -func (p *pool) Destroy() { - close(p.instancesCh) - - for k, _ := range p.instances { - p.destroyInstance(k) - } - + return len(p.instanceQueueChan) } diff --git a/pool_handler.go b/pool_handler.go index 72986c8..062f215 100644 --- a/pool_handler.go +++ b/pool_handler.go @@ -6,18 +6,11 @@ import ( "google.golang.org/grpc" ) -const ( - // DefaultWriteTimeout is default value of Write Timeout - DefaultIdleTimeout = 0 - DefaultMaxIdle = 0 - DefaultMaxCapacity = 1 -) - type PoolHandler interface { + Dial() (*grpc.ClientConn, interface{}, error) + GetIdleTimeout() time.Duration GetMaxIdle() int GetMaxCapacity() int - - OnCreate() (*grpc.ClientConn, interface{}, error) Validate() } diff --git a/pool_handlers.go b/pool_handlers.go index 79116e9..c015b4d 100644 --- a/pool_handlers.go +++ b/pool_handlers.go @@ -13,6 +13,10 @@ type PoolHandlers struct { MaxCapacity int } +func (h *PoolHandlers) Dial() (*grpc.ClientConn, interface{}, error) { + return nil, nil, nil +} + func (h *PoolHandlers) GetIdleTimeout() time.Duration { return h.IdleTimeout } @@ -22,9 +26,6 @@ func (h *PoolHandlers) GetMaxIdle() int { func (h *PoolHandlers) GetMaxCapacity() int { return h.MaxCapacity } -func (h *PoolHandlers) OnCreate() (*grpc.ClientConn, interface{}, error) { - return nil, nil, nil -} // Validate validates the configuration func (o *PoolHandlers) Validate() {