2018-05-11 10:07:17 +00:00
|
|
|
package consumer
|
|
|
|
|
|
|
|
import (
|
2018-07-04 11:04:45 +00:00
|
|
|
"encoding/json"
|
2018-05-11 11:15:26 +00:00
|
|
|
"fmt"
|
|
|
|
"strings"
|
2018-05-11 10:07:17 +00:00
|
|
|
"sync/atomic"
|
2018-05-11 11:15:26 +00:00
|
|
|
"time"
|
2018-05-11 10:07:17 +00:00
|
|
|
|
2018-05-11 11:15:26 +00:00
|
|
|
"git.loafle.net/commons/logging-go"
|
2018-07-04 11:04:45 +00:00
|
|
|
ocmd "git.loafle.net/overflow/commons-go/model/data"
|
2018-05-11 10:07:17 +00:00
|
|
|
oc "git.loafle.net/overflow/consumer"
|
|
|
|
"git.loafle.net/overflow/consumer_metric/config"
|
|
|
|
oe "git.loafle.net/overflow/external-go"
|
2018-05-11 11:15:26 +00:00
|
|
|
oei "git.loafle.net/overflow/external-go/influxdb"
|
|
|
|
"github.com/influxdata/influxdb/client/v2"
|
2018-05-11 10:07:17 +00:00
|
|
|
"github.com/segmentio/kafka-go"
|
|
|
|
)
|
|
|
|
|
|
|
|
type ConsumerHandler interface {
|
|
|
|
oc.ConsumerHandler
|
|
|
|
}
|
|
|
|
|
|
|
|
type ConsumerHandlers struct {
|
|
|
|
oc.ConsumerHandlers
|
|
|
|
|
|
|
|
Config *config.Config
|
|
|
|
|
2018-05-11 11:15:26 +00:00
|
|
|
InfluxDBClientName string
|
|
|
|
|
2018-05-11 10:07:17 +00:00
|
|
|
validated atomic.Value
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ConsumerHandlers) Init(consumerCtx oc.ConsumerCtx) error {
|
|
|
|
if err := ch.ConsumerHandlers.Init(consumerCtx); nil != err {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
oe.InitPackage(ch.Config.External)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ConsumerHandlers) OnStart(consumerCtx oc.ConsumerCtx) error {
|
|
|
|
if err := ch.ConsumerHandlers.OnStart(consumerCtx); nil != err {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
oe.StartPackage(ch.Config.External)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ConsumerHandlers) OnStop(consumerCtx oc.ConsumerCtx) {
|
|
|
|
oe.StopPackage(ch.Config.External)
|
|
|
|
|
|
|
|
ch.ConsumerHandlers.OnStop(consumerCtx)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) {
|
|
|
|
oe.DestroyPackage(ch.Config.External)
|
|
|
|
|
|
|
|
ch.ConsumerHandlers.Destroy(consumerCtx)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) {
|
2018-07-04 11:04:45 +00:00
|
|
|
metric := &ocmd.Metric{}
|
|
|
|
if err := json.Unmarshal(msg.Value, metric); nil != err {
|
|
|
|
logging.Logger().Error(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-05-11 11:15:26 +00:00
|
|
|
c := oei.ClientFor(ch.InfluxDBClientName)
|
|
|
|
if nil == c {
|
|
|
|
logging.Logger().Errorf("Client of InfluxDB is not valid")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-07-04 11:14:25 +00:00
|
|
|
bp := oei.BatchPointsFor(ch.GetConsumerName())
|
2018-05-11 11:15:26 +00:00
|
|
|
if nil == bp {
|
|
|
|
logging.Logger().Errorf("BatchPoints of InfluxDB is not valid")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a point and add to batch
|
2018-07-04 11:04:45 +00:00
|
|
|
tags := map[string]string{
|
|
|
|
"SensorID": metric.SensorID,
|
|
|
|
}
|
2018-05-11 11:15:26 +00:00
|
|
|
fields := map[string]interface{}{
|
2018-07-04 11:04:45 +00:00
|
|
|
"StartDate": metric.StartDate,
|
|
|
|
"EndDate": metric.EndDate,
|
|
|
|
"Success": metric.Success,
|
|
|
|
}
|
|
|
|
|
|
|
|
for key, value := range metric.Data {
|
|
|
|
fields[key] = value
|
2018-05-11 11:15:26 +00:00
|
|
|
}
|
2018-05-11 10:07:17 +00:00
|
|
|
|
2018-07-04 11:04:45 +00:00
|
|
|
pt, err := client.NewPoint("metric", tags, fields, time.Now())
|
2018-05-11 11:15:26 +00:00
|
|
|
if nil != err {
|
|
|
|
logging.Logger().Error(err)
|
|
|
|
}
|
|
|
|
bp.AddPoint(pt)
|
|
|
|
|
|
|
|
if err := c.Write(bp); err != nil {
|
|
|
|
logging.Logger().Error(err)
|
|
|
|
}
|
2018-05-11 10:07:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ConsumerHandlers) Validate() error {
|
|
|
|
if err := ch.ConsumerHandlers.Validate(); nil != err {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if nil != ch.validated.Load() {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
ch.validated.Store(true)
|
|
|
|
|
2018-05-11 11:15:26 +00:00
|
|
|
if "" == strings.TrimSpace(ch.InfluxDBClientName) {
|
|
|
|
return fmt.Errorf("InfluxDBClientName is not valid")
|
|
|
|
}
|
|
|
|
|
2018-05-11 10:07:17 +00:00
|
|
|
return nil
|
|
|
|
}
|