consumer/consumer-handler.go
crusader fedd128649 ing
2018-05-11 19:06:30 +09:00

78 lines
1.5 KiB
Go

package consumer
import (
"fmt"
"strings"
"sync/atomic"
"github.com/segmentio/kafka-go"
)
type ConsumerHandler interface {
GetName() string
GetConsumerName() string
ConsumerCtx() ConsumerCtx
Init(consumerCtx ConsumerCtx) error
OnStart(consumerCtx ConsumerCtx) error
OnStop(consumerCtx ConsumerCtx)
Destroy(consumerCtx ConsumerCtx)
OnMessage(msg *kafka.Message)
Validate() error
}
type ConsumerHandlers struct {
Name string `json:"name,omitempty"`
ConsumerName string `json:"consumerName,omitempty"`
validated atomic.Value
}
func (ch *ConsumerHandlers) ConsumerCtx() ConsumerCtx {
return NewConsumerCtx(nil)
}
func (ch *ConsumerHandlers) Init(consumerCtx ConsumerCtx) error {
return nil
}
func (ch *ConsumerHandlers) OnStart(consumerCtx ConsumerCtx) error {
return nil
}
func (ch *ConsumerHandlers) OnStop(consumerCtx ConsumerCtx) {
}
func (ch *ConsumerHandlers) Destroy(consumerCtx ConsumerCtx) {
}
func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) {
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
func (ch *ConsumerHandlers) GetName() string {
return ch.Name
}
func (ch *ConsumerHandlers) GetConsumerName() string {
return ch.ConsumerName
}
func (ch *ConsumerHandlers) Validate() error {
if nil != ch.validated.Load() {
return nil
}
ch.validated.Store(true)
if "" == strings.TrimSpace(ch.ConsumerName) {
return fmt.Errorf("ConsumerName is not valid")
}
return nil
}