package overflow_grpc_pool import ( "errors" "fmt" "sync" "time" "google.golang.org/grpc" ) var ( // ErrFullPool is the error when the pool is already full ErrPoolFull = errors.New("grpc pool: Pool is reached maximum capacity.") ) type pooledInstance struct { clientConn *grpc.ClientConn idleTime time.Time instance interface{} inUse bool } type Pool interface { Start() error Stop() Get() (interface{}, error) Put(i interface{}) Capacity() int Available() int } type pool struct { ph PoolHandler instances map[interface{}]*pooledInstance instanceQueueChan chan *pooledInstance stopChan chan struct{} stopWg sync.WaitGroup } func New(ph PoolHandler) (Pool, error) { p := &pool{ ph: ph, } return p, nil } func (p *pool) Start() error { if nil == p.ph { panic("GRPC Pool: pool handler must be specified.") } p.ph.Validate() if p.stopChan != nil { return fmt.Errorf("GRPC Pool: pool is already running. Stop it before starting it again") } p.stopChan = make(chan struct{}) p.instances = make(map[interface{}]*pooledInstance, p.ph.GetMaxCapacity()) p.instanceQueueChan = make(chan *pooledInstance, p.ph.GetMaxCapacity()) if 0 < p.ph.GetMaxIdle() { for i := 0; i < p.ph.GetMaxIdle(); i++ { if err := p.createInstance(); nil != err { p.Stop() return err } } } return nil } func (p *pool) Stop() { if p.stopChan == nil { panic("GRPC Pool: pool must be started before stopping it") } close(p.instanceQueueChan) for k, _ := range p.instances { p.destroyInstance(k) } p.stopWg.Wait() p.stopChan = nil } func (p *pool) createInstance() error { maxCapacity := p.ph.GetMaxCapacity() currentInstanceCount := len(p.instances) if currentInstanceCount >= maxCapacity { return ErrPoolFull } conn, i, err := p.ph.OnCreate() if err != nil { return err } pi := &pooledInstance{ clientConn: conn, idleTime: time.Now(), instance: i, inUse: false, } p.instances[i] = pi p.instanceQueueChan <- pi p.stopWg.Add(1) return nil } func (p *pool) destroyInstance(i interface{}) error { var err error pi, ok := p.instances[i] if !ok { return nil } err = pi.clientConn.Close() if nil != err { return err } delete(p.instances, i) p.stopWg.Done() return nil } func (p *pool) get() (*pooledInstance, error) { var pi *pooledInstance var err error avail := len(p.instanceQueueChan) if 0 == avail { if err = p.createInstance(); nil != err { return nil, err } } for { select { case pi = <-p.instanceQueueChan: // All good default: } idleTimeout := p.ph.GetIdleTimeout() if 1 < len(p.instanceQueueChan) && 0 < idleTimeout && pi.idleTime.Add(idleTimeout).Before(time.Now()) { go p.destroyInstance(pi.instance) continue } break } return pi, nil } func (p *pool) Get() (interface{}, error) { var err error var pi *pooledInstance if pi, err = p.get(); nil != err { return nil, err } pi.inUse = true return pi.instance, nil } func (p *pool) Put(i interface{}) { pi, ok := p.instances[i] if !ok { return } pi.idleTime = time.Now() pi.inUse = false select { case p.instanceQueueChan <- pi: // All good default: // channel is full return } } func (p *pool) Capacity() int { return cap(p.instanceQueueChan) } func (p *pool) Available() int { return len(p.instanceQueueChan) }