diff --git a/consumer.go b/consumer.go index 4c3f8c4..c3606e5 100644 --- a/consumer.go +++ b/consumer.go @@ -41,10 +41,15 @@ func (c *Consumer) Start() error { return fmt.Errorf("%s Init error %v", c.logHeader(), err) } + reader := oek.Reader(c.ConsumerHandler.GetConsumerName()) + if nil == reader { + return fmt.Errorf("%s Reader[%s] is not valid", c.logHeader(), c.ConsumerHandler.GetConsumerName()) + } + c.stopChan = make(chan struct{}) c.stopWg.Add(1) - go c.handleConsumer() + go c.handleConsumer(reader) return nil } @@ -67,7 +72,7 @@ func (c *Consumer) logHeader() string { return fmt.Sprintf("Consumer[%s]:", c.ConsumerHandler.GetName()) } -func (c *Consumer) handleConsumer() { +func (c *Consumer) handleConsumer(reader *kafka.Reader) { defer func() { c.ConsumerHandler.OnStop(c.ctx) logging.Logger().Infof("%s Stopped", c.logHeader()) @@ -82,7 +87,7 @@ func (c *Consumer) handleConsumer() { stopChan := make(chan struct{}) receiveDoneChan := make(chan error) - go c.handleReceive(stopChan, receiveDoneChan) + go c.handleReceive(stopChan, receiveDoneChan, reader) logging.Logger().Infof("%s Started", c.logHeader()) @@ -95,7 +100,7 @@ func (c *Consumer) handleConsumer() { } } -func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error) { +func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error, reader *kafka.Reader) { var ( m kafka.Message stopping atomic.Value @@ -106,12 +111,6 @@ func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error doneChan <- err }() - reader := oek.Reader(c.ConsumerHandler.GetConsumerName()) - if nil == reader { - err = fmt.Errorf("%s Reader[%s] is not valid", c.logHeader(), c.ConsumerHandler.GetConsumerName()) - return - } - for { readChan := make(chan struct{}) go func() {