70 lines
1.3 KiB
Go
70 lines
1.3 KiB
Go
package consumer
|
|
|
|
import (
|
|
"sync/atomic"
|
|
|
|
oc "git.loafle.net/overflow/consumer"
|
|
"git.loafle.net/overflow/consumer_metric/config"
|
|
oe "git.loafle.net/overflow/external-go"
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
type ConsumerHandler interface {
|
|
oc.ConsumerHandler
|
|
}
|
|
|
|
type ConsumerHandlers struct {
|
|
oc.ConsumerHandlers
|
|
|
|
Config *config.Config
|
|
|
|
validated atomic.Value
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) Init(consumerCtx oc.ConsumerCtx) error {
|
|
if err := ch.ConsumerHandlers.Init(consumerCtx); nil != err {
|
|
return err
|
|
}
|
|
oe.InitPackage(ch.Config.External)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) OnStart(consumerCtx oc.ConsumerCtx) error {
|
|
if err := ch.ConsumerHandlers.OnStart(consumerCtx); nil != err {
|
|
return err
|
|
}
|
|
oe.StartPackage(ch.Config.External)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) OnStop(consumerCtx oc.ConsumerCtx) {
|
|
oe.StopPackage(ch.Config.External)
|
|
|
|
ch.ConsumerHandlers.OnStop(consumerCtx)
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) {
|
|
oe.DestroyPackage(ch.Config.External)
|
|
|
|
ch.ConsumerHandlers.Destroy(consumerCtx)
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) {
|
|
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) Validate() error {
|
|
if err := ch.ConsumerHandlers.Validate(); nil != err {
|
|
return err
|
|
}
|
|
|
|
if nil != ch.validated.Load() {
|
|
return nil
|
|
}
|
|
ch.validated.Store(true)
|
|
|
|
return nil
|
|
}
|