package consumer import ( "fmt" "strings" "sync/atomic" "github.com/segmentio/kafka-go" ) type ConsumerHandler interface { GetName() string GetConsumerName() string ConsumerCtx() ConsumerCtx Init(consumerCtx ConsumerCtx) error OnStart(consumerCtx ConsumerCtx) error OnStop(consumerCtx ConsumerCtx) Destroy(consumerCtx ConsumerCtx) OnMessage(msg *kafka.Message) Validate() error } type ConsumerHandlers struct { Name string `json:"name,omitempty"` ConsumerName string `json:"consumerName,omitempty"` validated atomic.Value } func (ch *ConsumerHandlers) ConsumerCtx() ConsumerCtx { return NewConsumerCtx(nil) } func (ch *ConsumerHandlers) Init(consumerCtx ConsumerCtx) error { return nil } func (ch *ConsumerHandlers) OnStart(consumerCtx ConsumerCtx) error { return nil } func (ch *ConsumerHandlers) OnStop(consumerCtx ConsumerCtx) { } func (ch *ConsumerHandlers) Destroy(consumerCtx ConsumerCtx) { } func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } func (ch *ConsumerHandlers) GetName() string { return ch.Name } func (ch *ConsumerHandlers) GetConsumerName() string { return ch.ConsumerName } func (ch *ConsumerHandlers) Validate() error { if nil != ch.validated.Load() { return nil } ch.validated.Store(true) if "" == strings.TrimSpace(ch.ConsumerName) { return fmt.Errorf("ConsumerName is not valid") } return nil }