239 lines
5.9 KiB
Go
239 lines
5.9 KiB
Go
package consumer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.loafle.net/commons/logging-go"
|
|
ocmd "git.loafle.net/overflow/commons-go/model/data"
|
|
ocmm "git.loafle.net/overflow/commons-go/model/meta"
|
|
ocms "git.loafle.net/overflow/commons-go/model/sensor"
|
|
oc "git.loafle.net/overflow/consumer"
|
|
"git.loafle.net/overflow/consumer_metric/config"
|
|
oe "git.loafle.net/overflow/external-go"
|
|
oeg "git.loafle.net/overflow/external-go/grpc"
|
|
oei "git.loafle.net/overflow/external-go/influxdb"
|
|
"github.com/influxdata/influxdb/client/v2"
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
type ConsumerHandler interface {
|
|
oc.ConsumerHandler
|
|
}
|
|
|
|
type ConsumerHandlers struct {
|
|
oc.ConsumerHandlers
|
|
|
|
Config *config.Config
|
|
|
|
InfluxDBClientName string
|
|
|
|
validated atomic.Value
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) Init(consumerCtx oc.ConsumerCtx) error {
|
|
if err := ch.ConsumerHandlers.Init(consumerCtx); nil != err {
|
|
return err
|
|
}
|
|
oe.InitPackage(ch.Config.External)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) OnStart(consumerCtx oc.ConsumerCtx) error {
|
|
if err := ch.ConsumerHandlers.OnStart(consumerCtx); nil != err {
|
|
return err
|
|
}
|
|
oe.StartPackage(ch.Config.External)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) OnStop(consumerCtx oc.ConsumerCtx) {
|
|
oe.StopPackage(ch.Config.External)
|
|
|
|
ch.ConsumerHandlers.OnStop(consumerCtx)
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) {
|
|
oe.DestroyPackage(ch.Config.External)
|
|
|
|
ch.ConsumerHandlers.Destroy(consumerCtx)
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) {
|
|
sensorID := string(msg.Key)
|
|
metric := &ocmd.Metric{}
|
|
if err := json.Unmarshal(msg.Value, metric); nil != err {
|
|
logging.Logger().Error(err)
|
|
return
|
|
}
|
|
|
|
if nil == metric.Data || 0 == len(metric.Data) {
|
|
logging.Logger().Errorf("Data of Metric is not valid")
|
|
return
|
|
}
|
|
|
|
c := oei.ClientFor(ch.InfluxDBClientName)
|
|
if nil == c {
|
|
logging.Logger().Errorf("Client of InfluxDB is not valid")
|
|
return
|
|
}
|
|
|
|
// sensor, err := ch.getSensor(sensorID)
|
|
// if nil != err {
|
|
// logging.Logger().Error(err)
|
|
// return
|
|
// }
|
|
|
|
sensorItems, err := ch.getSensorItems(sensorID)
|
|
if nil != err {
|
|
logging.Logger().Error(err)
|
|
return
|
|
}
|
|
|
|
bp := oei.BatchPointsFor(ch.GetConsumerName())
|
|
if nil == bp {
|
|
logging.Logger().Errorf("BatchPoints of InfluxDB is not valid")
|
|
return
|
|
}
|
|
|
|
for _, sensorItem := range sensorItems {
|
|
if err := ch.processSensorItem(sensorItem, metric, bp); nil != err {
|
|
logging.Logger().Error(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := c.Write(bp); err != nil {
|
|
logging.Logger().Error(err)
|
|
}
|
|
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) processSensorItem(sensorItem *ocms.SensorItem, metric *ocmd.Metric, bp client.BatchPoints) error {
|
|
tags := map[string]string{
|
|
"SensorID": metric.SensorID,
|
|
}
|
|
fields := map[string]interface{}{
|
|
"StartDate": metric.StartDate,
|
|
"Success": metric.Success,
|
|
}
|
|
|
|
metaCollectionItemMappings, err := ch.getMetaCollectionItemMappings(sensorItem.MetaDisplayItemMapping.ID.String())
|
|
if nil != err {
|
|
return err
|
|
}
|
|
|
|
metaCollectionItemKeys := make([]string, 0)
|
|
|
|
for _, metaCollectionItemMapping := range metaCollectionItemMappings {
|
|
metaCollectionItemKeys = append(metaCollectionItemKeys, metaCollectionItemMapping.MetaCollectionItem.Key)
|
|
}
|
|
|
|
for _, metaCollectionItemKey := range metaCollectionItemKeys {
|
|
collectionItemValue, ok := metric.Data[metaCollectionItemKey]
|
|
if !ok {
|
|
return fmt.Errorf("Value of CollectionItem[%s] is not exist", metaCollectionItemKey)
|
|
}
|
|
|
|
fields[metaCollectionItemKey] = collectionItemValue
|
|
}
|
|
|
|
displayItemKey := sensorItem.MetaDisplayItemMapping.MetaDisplayItem.Key
|
|
displayItemValue, err := ch.getDisplayItemEval(sensorItem.MetaDisplayItemMapping.Formula, metaCollectionItemKeys, metric.Data)
|
|
if nil != err {
|
|
return err
|
|
}
|
|
tags[displayItemKey] = displayItemValue
|
|
|
|
pt, err := client.NewPoint("metric", tags, fields, time.Now())
|
|
if nil != err {
|
|
return err
|
|
}
|
|
bp.AddPoint(pt)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) getDisplayItemEval(formula string, metaCollectionItemKeys []string, items map[string]string) (string, error) {
|
|
exp := formula
|
|
|
|
for _, metaCollectionItemKey := range metaCollectionItemKeys {
|
|
collectionItemValue, ok := items[metaCollectionItemKey]
|
|
if !ok {
|
|
return "", fmt.Errorf("Value of CollectionItem[%s] is not exist", metaCollectionItemKey)
|
|
}
|
|
|
|
exp = strings.Replace(exp, fmt.Sprintf("{{%s}}", metaCollectionItemKey), collectionItemValue, -1)
|
|
}
|
|
|
|
return exp, nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) getSensor(sensorID string) (*ocms.Sensor, error) {
|
|
grpcCTX := context.Background()
|
|
r, err := oeg.Exec(grpcCTX, "SensorService.read", sensorID)
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
|
|
sensor := &ocms.Sensor{}
|
|
err = json.Unmarshal([]byte(r), sensor)
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
return sensor, nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) getSensorItems(sensorID string) ([]*ocms.SensorItem, error) {
|
|
grpcCTX := context.Background()
|
|
r, err := oeg.Exec(grpcCTX, "SensorItemService.readAllBySensorID", sensorID)
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
|
|
sensorItems := make([]*ocms.SensorItem, 0)
|
|
err = json.Unmarshal([]byte(r), &sensorItems)
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
return sensorItems, nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) getMetaCollectionItemMappings(metaDisplayItemMappingID string) ([]*ocmm.MetaCollectionItemMapping, error) {
|
|
grpcCTX := context.Background()
|
|
r, err := oeg.Exec(grpcCTX, "MetaCollectionItemMappingService.readAllByMetaDisplayItemMappingID", metaDisplayItemMappingID)
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
|
|
metaCollectionItemMappings := make([]*ocmm.MetaCollectionItemMapping, 0)
|
|
err = json.Unmarshal([]byte(r), &metaCollectionItemMappings)
|
|
if nil != err {
|
|
return nil, err
|
|
}
|
|
return metaCollectionItemMappings, nil
|
|
}
|
|
|
|
func (ch *ConsumerHandlers) Validate() error {
|
|
if err := ch.ConsumerHandlers.Validate(); nil != err {
|
|
return err
|
|
}
|
|
|
|
if nil != ch.validated.Load() {
|
|
return nil
|
|
}
|
|
ch.validated.Store(true)
|
|
|
|
if "" == strings.TrimSpace(ch.InfluxDBClientName) {
|
|
return fmt.Errorf("InfluxDBClientName is not valid")
|
|
}
|
|
|
|
return nil
|
|
}
|