147 lines
2.9 KiB
Go
147 lines
2.9 KiB
Go
package consumer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.loafle.net/commons/logging-go"
|
|
oek "git.loafle.net/overflow/external-go/kafka"
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
type Consumer struct {
|
|
ConsumerHandler ConsumerHandler
|
|
|
|
ctx ConsumerCtx
|
|
stopChan chan struct{}
|
|
stopWg sync.WaitGroup
|
|
}
|
|
|
|
func (c *Consumer) Start() error {
|
|
if c.stopChan != nil {
|
|
return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader())
|
|
}
|
|
|
|
if nil == c.ConsumerHandler {
|
|
return fmt.Errorf("%s ConsumerHandler must be specified", c.logHeader())
|
|
}
|
|
if err := c.ConsumerHandler.Validate(); nil != err {
|
|
return fmt.Errorf("%s validate error %v", c.logHeader(), err)
|
|
}
|
|
|
|
c.ctx = c.ConsumerHandler.ConsumerCtx()
|
|
if nil == c.ctx {
|
|
return fmt.Errorf("%s ConsumerCtx is nil", c.logHeader())
|
|
}
|
|
|
|
if err := c.ConsumerHandler.Init(c.ctx); nil != err {
|
|
return fmt.Errorf("%s Init error %v", c.logHeader(), err)
|
|
}
|
|
|
|
reader := oek.Reader(c.ConsumerHandler.GetConsumerName())
|
|
if nil == reader {
|
|
return fmt.Errorf("%s Reader[%s] is not valid", c.logHeader(), c.ConsumerHandler.GetConsumerName())
|
|
}
|
|
|
|
c.stopChan = make(chan struct{})
|
|
|
|
c.stopWg.Add(1)
|
|
go c.handleConsumer(reader)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Consumer) Stop(ctx context.Context) error {
|
|
if c.stopChan == nil {
|
|
return fmt.Errorf("%s must be started before stopping it", c.logHeader())
|
|
}
|
|
close(c.stopChan)
|
|
c.stopWg.Wait()
|
|
|
|
c.ConsumerHandler.Destroy(c.ctx)
|
|
|
|
c.stopChan = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Consumer) logHeader() string {
|
|
return fmt.Sprintf("Consumer[%s]:", c.ConsumerHandler.GetName())
|
|
}
|
|
|
|
func (c *Consumer) handleConsumer(reader *kafka.Reader) {
|
|
defer func() {
|
|
c.ConsumerHandler.OnStop(c.ctx)
|
|
logging.Logger().Infof("%s Stopped", c.logHeader())
|
|
c.stopWg.Done()
|
|
}()
|
|
|
|
if err := c.ConsumerHandler.OnStart(c.ctx); nil != err {
|
|
logging.Logger().Error(err)
|
|
return
|
|
}
|
|
|
|
stopChan := make(chan struct{})
|
|
receiveDoneChan := make(chan error)
|
|
|
|
go c.handleReceive(stopChan, receiveDoneChan, reader)
|
|
|
|
logging.Logger().Infof("%s Started", c.logHeader())
|
|
|
|
select {
|
|
case <-receiveDoneChan:
|
|
close(stopChan)
|
|
case <-c.stopChan:
|
|
close(stopChan)
|
|
<-receiveDoneChan
|
|
}
|
|
}
|
|
|
|
func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error, reader *kafka.Reader) {
|
|
var (
|
|
m kafka.Message
|
|
stopping atomic.Value
|
|
err error
|
|
)
|
|
|
|
defer func() {
|
|
doneChan <- err
|
|
}()
|
|
|
|
for {
|
|
readChan := make(chan struct{})
|
|
go func() {
|
|
m, err = reader.ReadMessage(context.Background())
|
|
if err != nil {
|
|
if nil == stopping.Load() {
|
|
logging.Logger().Errorf("%s %v", c.logHeader(), err)
|
|
}
|
|
}
|
|
|
|
close(readChan)
|
|
}()
|
|
|
|
select {
|
|
case <-c.stopChan:
|
|
stopping.Store(true)
|
|
return
|
|
case <-readChan:
|
|
}
|
|
|
|
if nil != err {
|
|
select {
|
|
case <-c.stopChan:
|
|
return
|
|
case <-time.After(time.Second):
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.ConsumerHandler.OnMessage(&m)
|
|
}
|
|
|
|
}
|