package consumer import ( "context" "fmt" "sync" "sync/atomic" "time" "git.loafle.net/commons/logging-go" oek "git.loafle.net/overflow/external-go/kafka" "github.com/segmentio/kafka-go" ) type Consumer struct { ConsumerHandler ConsumerHandler ctx ConsumerCtx stopChan chan struct{} stopWg sync.WaitGroup } func (c *Consumer) Start() error { if c.stopChan != nil { return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader()) } if nil == c.ConsumerHandler { return fmt.Errorf("%s ConsumerHandler must be specified", c.logHeader()) } if err := c.ConsumerHandler.Validate(); nil != err { return fmt.Errorf("%s validate error %v", c.logHeader(), err) } c.ctx = c.ConsumerHandler.ConsumerCtx() if nil == c.ctx { return fmt.Errorf("%s ConsumerCtx is nil", c.logHeader()) } if err := c.ConsumerHandler.Init(c.ctx); nil != err { return fmt.Errorf("%s Init error %v", c.logHeader(), err) } reader := oek.Reader(c.ConsumerHandler.GetConsumerName()) if nil == reader { return fmt.Errorf("%s Reader[%s] is not valid", c.logHeader(), c.ConsumerHandler.GetConsumerName()) } c.stopChan = make(chan struct{}) c.stopWg.Add(1) go c.handleConsumer(reader) return nil } func (c *Consumer) Stop(ctx context.Context) error { if c.stopChan == nil { return fmt.Errorf("%s must be started before stopping it", c.logHeader()) } close(c.stopChan) c.stopWg.Wait() c.ConsumerHandler.Destroy(c.ctx) c.stopChan = nil return nil } func (c *Consumer) logHeader() string { return fmt.Sprintf("Consumer[%s]:", c.ConsumerHandler.GetName()) } func (c *Consumer) handleConsumer(reader *kafka.Reader) { defer func() { c.ConsumerHandler.OnStop(c.ctx) logging.Logger().Infof("%s Stopped", c.logHeader()) c.stopWg.Done() }() if err := c.ConsumerHandler.OnStart(c.ctx); nil != err { logging.Logger().Error(err) return } stopChan := make(chan struct{}) receiveDoneChan := make(chan error) go c.handleReceive(stopChan, receiveDoneChan, reader) logging.Logger().Infof("%s Started", c.logHeader()) select { case <-receiveDoneChan: close(stopChan) case <-c.stopChan: close(stopChan) <-receiveDoneChan } } func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error, reader *kafka.Reader) { var ( m kafka.Message stopping atomic.Value err error ) defer func() { doneChan <- err }() for { readChan := make(chan struct{}) go func() { m, err = reader.ReadMessage(context.Background()) if err != nil { if nil == stopping.Load() { logging.Logger().Errorf("%s %v", c.logHeader(), err) } } close(readChan) }() select { case <-c.stopChan: stopping.Store(true) return case <-readChan: } if nil != err { select { case <-c.stopChan: return case <-time.After(time.Second): } continue } fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) } }