consumer/consumer-handler.go

78 lines
1.5 KiB
Go
Raw Permalink Normal View History

2018-05-11 09:23:08 +00:00
package consumer
import (
"fmt"
"strings"
"sync/atomic"
2018-05-11 10:06:30 +00:00
"github.com/segmentio/kafka-go"
2018-05-11 09:23:08 +00:00
)
type ConsumerHandler interface {
GetName() string
GetConsumerName() string
ConsumerCtx() ConsumerCtx
Init(consumerCtx ConsumerCtx) error
OnStart(consumerCtx ConsumerCtx) error
OnStop(consumerCtx ConsumerCtx)
Destroy(consumerCtx ConsumerCtx)
2018-05-11 10:06:30 +00:00
OnMessage(msg *kafka.Message)
2018-05-11 09:23:08 +00:00
Validate() error
}
type ConsumerHandlers struct {
Name string `json:"name,omitempty"`
ConsumerName string `json:"consumerName,omitempty"`
validated atomic.Value
}
func (ch *ConsumerHandlers) ConsumerCtx() ConsumerCtx {
return NewConsumerCtx(nil)
}
func (ch *ConsumerHandlers) Init(consumerCtx ConsumerCtx) error {
return nil
}
func (ch *ConsumerHandlers) OnStart(consumerCtx ConsumerCtx) error {
return nil
}
func (ch *ConsumerHandlers) OnStop(consumerCtx ConsumerCtx) {
}
func (ch *ConsumerHandlers) Destroy(consumerCtx ConsumerCtx) {
}
2018-05-11 10:06:30 +00:00
func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) {
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
2018-05-11 09:23:08 +00:00
func (ch *ConsumerHandlers) GetName() string {
return ch.Name
}
func (ch *ConsumerHandlers) GetConsumerName() string {
return ch.ConsumerName
}
func (ch *ConsumerHandlers) Validate() error {
if nil != ch.validated.Load() {
return nil
}
ch.validated.Store(true)
if "" == strings.TrimSpace(ch.ConsumerName) {
return fmt.Errorf("ConsumerName is not valid")
}
return nil
}