diff --git a/consumer.go b/consumer.go index 358dc84..4c3f8c4 100644 --- a/consumer.go +++ b/consumer.go @@ -37,11 +37,6 @@ func (c *Consumer) Start() error { return fmt.Errorf("%s ConsumerCtx is nil", c.logHeader()) } - reader := oek.Reader(c.ConsumerHandler.GetConsumerName()) - if nil == reader { - return fmt.Errorf("%s Reader[%s] is not valid", c.logHeader(), c.ConsumerHandler.GetConsumerName()) - } - if err := c.ConsumerHandler.Init(c.ctx); nil != err { return fmt.Errorf("%s Init error %v", c.logHeader(), err) } @@ -49,7 +44,7 @@ func (c *Consumer) Start() error { c.stopChan = make(chan struct{}) c.stopWg.Add(1) - go c.handleConsumer(reader) + go c.handleConsumer() return nil } @@ -72,7 +67,7 @@ func (c *Consumer) logHeader() string { return fmt.Sprintf("Consumer[%s]:", c.ConsumerHandler.GetName()) } -func (c *Consumer) handleConsumer(reader *kafka.Reader) { +func (c *Consumer) handleConsumer() { defer func() { c.ConsumerHandler.OnStop(c.ctx) logging.Logger().Infof("%s Stopped", c.logHeader()) @@ -87,7 +82,7 @@ func (c *Consumer) handleConsumer(reader *kafka.Reader) { stopChan := make(chan struct{}) receiveDoneChan := make(chan error) - go c.handleReceive(stopChan, receiveDoneChan, reader) + go c.handleReceive(stopChan, receiveDoneChan) logging.Logger().Infof("%s Started", c.logHeader()) @@ -100,7 +95,7 @@ func (c *Consumer) handleConsumer(reader *kafka.Reader) { } } -func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error, reader *kafka.Reader) { +func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error) { var ( m kafka.Message stopping atomic.Value @@ -111,6 +106,12 @@ func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error doneChan <- err }() + reader := oek.Reader(c.ConsumerHandler.GetConsumerName()) + if nil == reader { + err = fmt.Errorf("%s Reader[%s] is not valid", c.logHeader(), c.ConsumerHandler.GetConsumerName()) + return + } + for { readChan := make(chan struct{}) go func() {