package consumer import ( "encoding/json" "fmt" "strings" "sync/atomic" "time" "git.loafle.net/commons/logging-go" ocmd "git.loafle.net/overflow/commons-go/model/data" oc "git.loafle.net/overflow/consumer" "git.loafle.net/overflow/consumer_metric/config" oe "git.loafle.net/overflow/external-go" oei "git.loafle.net/overflow/external-go/influxdb" "github.com/influxdata/influxdb/client/v2" "github.com/segmentio/kafka-go" ) type ConsumerHandler interface { oc.ConsumerHandler } type ConsumerHandlers struct { oc.ConsumerHandlers Config *config.Config InfluxDBClientName string validated atomic.Value } func (ch *ConsumerHandlers) Init(consumerCtx oc.ConsumerCtx) error { if err := ch.ConsumerHandlers.Init(consumerCtx); nil != err { return err } oe.InitPackage(ch.Config.External) return nil } func (ch *ConsumerHandlers) OnStart(consumerCtx oc.ConsumerCtx) error { if err := ch.ConsumerHandlers.OnStart(consumerCtx); nil != err { return err } oe.StartPackage(ch.Config.External) return nil } func (ch *ConsumerHandlers) OnStop(consumerCtx oc.ConsumerCtx) { oe.StopPackage(ch.Config.External) ch.ConsumerHandlers.OnStop(consumerCtx) } func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) { oe.DestroyPackage(ch.Config.External) ch.ConsumerHandlers.Destroy(consumerCtx) } func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { metric := &ocmd.Metric{} if err := json.Unmarshal(msg.Value, metric); nil != err { logging.Logger().Error(err) return } c := oei.ClientFor(ch.InfluxDBClientName) if nil == c { logging.Logger().Errorf("Client of InfluxDB is not valid") return } bp := oei.BatchPointsFor("metric") if nil == bp { logging.Logger().Errorf("BatchPoints of InfluxDB is not valid") return } // Create a point and add to batch tags := map[string]string{ "SensorID": metric.SensorID, } fields := map[string]interface{}{ "StartDate": metric.StartDate, "EndDate": metric.EndDate, "Success": metric.Success, } for key, value := range metric.Data { fields[key] = value } pt, err := client.NewPoint("metric", tags, fields, time.Now()) if nil != err { logging.Logger().Error(err) } bp.AddPoint(pt) if err := c.Write(bp); err != nil { logging.Logger().Error(err) } } func (ch *ConsumerHandlers) Validate() error { if err := ch.ConsumerHandlers.Validate(); nil != err { return err } if nil != ch.validated.Load() { return nil } ch.validated.Store(true) if "" == strings.TrimSpace(ch.InfluxDBClientName) { return fmt.Errorf("InfluxDBClientName is not valid") } return nil }