consumer_metric/consumer/consumer-handler.go

110 lines
2.2 KiB
Go
Raw Normal View History

2018-05-11 10:07:17 +00:00
package consumer
import (
2018-05-11 11:15:26 +00:00
"fmt"
"strings"
2018-05-11 10:07:17 +00:00
"sync/atomic"
2018-05-11 11:15:26 +00:00
"time"
2018-05-11 10:07:17 +00:00
2018-05-11 11:15:26 +00:00
"git.loafle.net/commons/logging-go"
2018-05-11 10:07:17 +00:00
oc "git.loafle.net/overflow/consumer"
"git.loafle.net/overflow/consumer_metric/config"
oe "git.loafle.net/overflow/external-go"
2018-05-11 11:15:26 +00:00
oei "git.loafle.net/overflow/external-go/influxdb"
"github.com/influxdata/influxdb/client/v2"
2018-05-11 10:07:17 +00:00
"github.com/segmentio/kafka-go"
)
type ConsumerHandler interface {
oc.ConsumerHandler
}
type ConsumerHandlers struct {
oc.ConsumerHandlers
Config *config.Config
2018-05-11 11:15:26 +00:00
InfluxDBClientName string
2018-05-11 10:07:17 +00:00
validated atomic.Value
}
func (ch *ConsumerHandlers) Init(consumerCtx oc.ConsumerCtx) error {
if err := ch.ConsumerHandlers.Init(consumerCtx); nil != err {
return err
}
oe.InitPackage(ch.Config.External)
return nil
}
func (ch *ConsumerHandlers) OnStart(consumerCtx oc.ConsumerCtx) error {
if err := ch.ConsumerHandlers.OnStart(consumerCtx); nil != err {
return err
}
oe.StartPackage(ch.Config.External)
return nil
}
func (ch *ConsumerHandlers) OnStop(consumerCtx oc.ConsumerCtx) {
oe.StopPackage(ch.Config.External)
ch.ConsumerHandlers.OnStop(consumerCtx)
}
func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) {
oe.DestroyPackage(ch.Config.External)
ch.ConsumerHandlers.Destroy(consumerCtx)
}
func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) {
2018-05-11 11:15:26 +00:00
c := oei.ClientFor(ch.InfluxDBClientName)
if nil == c {
logging.Logger().Errorf("Client of InfluxDB is not valid")
return
}
bp := oei.BatchPointsFor("")
if nil == bp {
logging.Logger().Errorf("BatchPoints of InfluxDB is not valid")
return
}
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
2018-05-11 10:07:17 +00:00
2018-05-11 11:15:26 +00:00
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if nil != err {
logging.Logger().Error(err)
}
bp.AddPoint(pt)
if err := c.Write(bp); err != nil {
logging.Logger().Error(err)
}
2018-05-11 10:07:17 +00:00
}
func (ch *ConsumerHandlers) Validate() error {
if err := ch.ConsumerHandlers.Validate(); nil != err {
return err
}
if nil != ch.validated.Load() {
return nil
}
ch.validated.Store(true)
2018-05-11 11:15:26 +00:00
if "" == strings.TrimSpace(ch.InfluxDBClientName) {
return fmt.Errorf("InfluxDBClientName is not valid")
}
2018-05-11 10:07:17 +00:00
return nil
}