package consumer import ( "context" "encoding/json" "fmt" "strings" "sync/atomic" "time" "git.loafle.net/commons/logging-go" ocmd "git.loafle.net/overflow/commons-go/model/data" ocmm "git.loafle.net/overflow/commons-go/model/meta" ocms "git.loafle.net/overflow/commons-go/model/sensor" oc "git.loafle.net/overflow/consumer" "git.loafle.net/overflow/consumer_metric/config" oe "git.loafle.net/overflow/external-go" oeg "git.loafle.net/overflow/external-go/grpc" oei "git.loafle.net/overflow/external-go/influxdb" "github.com/influxdata/influxdb/client/v2" "github.com/segmentio/kafka-go" ) type ConsumerHandler interface { oc.ConsumerHandler } type ConsumerHandlers struct { oc.ConsumerHandlers Config *config.Config InfluxDBClientName string validated atomic.Value } func (ch *ConsumerHandlers) Init(consumerCtx oc.ConsumerCtx) error { if err := ch.ConsumerHandlers.Init(consumerCtx); nil != err { return err } oe.InitPackage(ch.Config.External) return nil } func (ch *ConsumerHandlers) OnStart(consumerCtx oc.ConsumerCtx) error { if err := ch.ConsumerHandlers.OnStart(consumerCtx); nil != err { return err } oe.StartPackage(ch.Config.External) return nil } func (ch *ConsumerHandlers) OnStop(consumerCtx oc.ConsumerCtx) { oe.StopPackage(ch.Config.External) ch.ConsumerHandlers.OnStop(consumerCtx) } func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) { oe.DestroyPackage(ch.Config.External) ch.ConsumerHandlers.Destroy(consumerCtx) } func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { sensorID := string(msg.Key) metric := &ocmd.Metric{} if err := json.Unmarshal(msg.Value, metric); nil != err { logging.Logger().Error(err) return } if nil == metric.Data || 0 == len(metric.Data) { logging.Logger().Errorf("Data of Metric is not valid") return } c := oei.ClientFor(ch.InfluxDBClientName) if nil == c { logging.Logger().Errorf("Client of InfluxDB is not valid") return } // sensor, err := ch.getSensor(sensorID) // if nil != err { // logging.Logger().Error(err) // return // } sensorItems, err := ch.getSensorItems(sensorID) if nil != err { logging.Logger().Error(err) return } bp := oei.BatchPointsFor(ch.GetConsumerName()) if nil == bp { logging.Logger().Errorf("BatchPoints of InfluxDB is not valid") return } ch.processSensoring(metric, bp) for _, sensorItem := range sensorItems { if err := ch.processSensorItem(sensorItem, metric, bp); nil != err { logging.Logger().Error(err) return } } if err := c.Write(bp); err != nil { logging.Logger().Error(err) } } func (ch *ConsumerHandlers) processSensoring(metric *ocmd.Metric, bp client.BatchPoints) error { tags := map[string]string{ "SensorID": metric.SensorID, } fields := map[string]interface{}{ "StartDate": metric.StartDate, "EndDate": metric.EndDate, "Success": metric.Success, } pt, err := client.NewPoint("sensor", tags, fields, time.Now()) if nil != err { return err } bp.AddPoint(pt) return nil } func (ch *ConsumerHandlers) processSensorItem(sensorItem *ocms.SensorItem, metric *ocmd.Metric, bp client.BatchPoints) error { tags := map[string]string{ "SensorID": metric.SensorID, } fields := map[string]interface{}{ "StartDate": metric.StartDate, "Success": metric.Success, } metaCollectionItemMappings, err := ch.getMetaCollectionItemMappings(sensorItem.MetaDisplayItemMapping.ID.String()) if nil != err { return err } metaCollectionItemKeys := make([]string, 0) for _, metaCollectionItemMapping := range metaCollectionItemMappings { metaCollectionItemKeys = append(metaCollectionItemKeys, metaCollectionItemMapping.MetaCollectionItem.Key) } for _, metaCollectionItemKey := range metaCollectionItemKeys { collectionItemValue, ok := metric.Data[metaCollectionItemKey] if !ok { return fmt.Errorf("Value of CollectionItem[%s] is not exist", metaCollectionItemKey) } fields[metaCollectionItemKey] = collectionItemValue } displayItemKey := sensorItem.MetaDisplayItemMapping.MetaDisplayItem.Key displayItemValue, err := ch.getDisplayItemEval(sensorItem.MetaDisplayItemMapping.Formula, metaCollectionItemKeys, metric.Data) if nil != err { return err } tags[displayItemKey] = displayItemValue pt, err := client.NewPoint("metric", tags, fields, time.Now()) if nil != err { return err } bp.AddPoint(pt) return nil } func (ch *ConsumerHandlers) getDisplayItemEval(formula string, metaCollectionItemKeys []string, items map[string]string) (string, error) { exp := formula for _, metaCollectionItemKey := range metaCollectionItemKeys { collectionItemValue, ok := items[metaCollectionItemKey] if !ok { return "", fmt.Errorf("Value of CollectionItem[%s] is not exist", metaCollectionItemKey) } exp = strings.Replace(exp, fmt.Sprintf("{{%s}}", metaCollectionItemKey), collectionItemValue, -1) } return exp, nil } func (ch *ConsumerHandlers) getSensor(sensorID string) (*ocms.Sensor, error) { grpcCTX := context.Background() r, err := oeg.Exec(grpcCTX, "SensorService.read", sensorID) if nil != err { return nil, err } sensor := &ocms.Sensor{} err = json.Unmarshal([]byte(r), sensor) if nil != err { return nil, err } return sensor, nil } func (ch *ConsumerHandlers) getSensorItems(sensorID string) ([]*ocms.SensorItem, error) { grpcCTX := context.Background() r, err := oeg.Exec(grpcCTX, "SensorItemService.readAllBySensorID", sensorID) if nil != err { return nil, err } sensorItems := make([]*ocms.SensorItem, 0) err = json.Unmarshal([]byte(r), &sensorItems) if nil != err { return nil, err } return sensorItems, nil } func (ch *ConsumerHandlers) getMetaCollectionItemMappings(metaDisplayItemMappingID string) ([]*ocmm.MetaCollectionItemMapping, error) { grpcCTX := context.Background() r, err := oeg.Exec(grpcCTX, "MetaCollectionItemMappingService.readAllByMetaDisplayItemMappingID", metaDisplayItemMappingID) if nil != err { return nil, err } metaCollectionItemMappings := make([]*ocmm.MetaCollectionItemMapping, 0) err = json.Unmarshal([]byte(r), &metaCollectionItemMappings) if nil != err { return nil, err } return metaCollectionItemMappings, nil } func (ch *ConsumerHandlers) Validate() error { if err := ch.ConsumerHandlers.Validate(); nil != err { return err } if nil != ch.validated.Load() { return nil } ch.validated.Store(true) if "" == strings.TrimSpace(ch.InfluxDBClientName) { return fmt.Errorf("InfluxDBClientName is not valid") } return nil }