diff --git a/Gopkg.lock b/Gopkg.lock index fcf50f0..1c4ea85 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -28,8 +28,20 @@ [[projects]] branch = "master" name = "git.loafle.net/overflow/commons-go" - packages = ["config/external"] - revision = "e2c0324eb632822050f3d0379ddd1a13c2aac2b1" + packages = [ + "config/external", + "core/interfaces", + "core/util", + "model/data", + "model/domain", + "model/infra", + "model/member", + "model/meta", + "model/probe", + "model/sensor", + "model/target" + ] + revision = "7b089c913a281a3286b3c72456e3aab67f402720" [[projects]] branch = "master" @@ -199,6 +211,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "8c2fe7c8806019dedf322b9b494ae951084c859e7478067090a27775b324fcd8" + inputs-digest = "7146cea9108db4e1d0409f3072f2c3f091e9d0013298c896143e3903f61ea308" solver-name = "gps-cdcl" solver-version = 1 diff --git a/consumer/consumer-handler.go b/consumer/consumer-handler.go index 48ba89a..7ac020d 100644 --- a/consumer/consumer-handler.go +++ b/consumer/consumer-handler.go @@ -1,6 +1,7 @@ package consumer import ( + "context" "encoding/json" "fmt" "strings" @@ -9,9 +10,12 @@ import ( "git.loafle.net/commons/logging-go" ocmd "git.loafle.net/overflow/commons-go/model/data" + ocmm "git.loafle.net/overflow/commons-go/model/meta" + ocms "git.loafle.net/overflow/commons-go/model/sensor" oc "git.loafle.net/overflow/consumer" "git.loafle.net/overflow/consumer_metric/config" oe "git.loafle.net/overflow/external-go" + oeg "git.loafle.net/overflow/external-go/grpc" oei "git.loafle.net/overflow/external-go/influxdb" "github.com/influxdata/influxdb/client/v2" "github.com/segmentio/kafka-go" @@ -62,47 +66,158 @@ func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) { } func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { + sensorID := string(msg.Key) metric := &ocmd.Metric{} if err := json.Unmarshal(msg.Value, metric); nil != err { logging.Logger().Error(err) return } + if nil == metric.Data || 0 == len(metric.Data) { + logging.Logger().Errorf("Data of Metric is not valid") + return + } + c := oei.ClientFor(ch.InfluxDBClientName) if nil == c { logging.Logger().Errorf("Client of InfluxDB is not valid") return } + // sensor, err := ch.getSensor(sensorID) + // if nil != err { + // logging.Logger().Error(err) + // return + // } + + sensorItems, err := ch.getSensorItems(sensorID) + if nil != err { + logging.Logger().Error(err) + return + } + bp := oei.BatchPointsFor(ch.GetConsumerName()) if nil == bp { logging.Logger().Errorf("BatchPoints of InfluxDB is not valid") return } - // Create a point and add to batch + for _, sensorItem := range sensorItems { + if err := ch.processSensorItem(sensorItem, metric, bp); nil != err { + logging.Logger().Error(err) + return + } + } + + if err := c.Write(bp); err != nil { + logging.Logger().Error(err) + } + +} + +func (ch *ConsumerHandlers) processSensorItem(sensorItem *ocms.SensorItem, metric *ocmd.Metric, bp client.BatchPoints) error { tags := map[string]string{ "SensorID": metric.SensorID, } fields := map[string]interface{}{ "StartDate": metric.StartDate, - "EndDate": metric.EndDate, "Success": metric.Success, } - for key, value := range metric.Data { - fields[key] = value + metaCollectionItemMappings, err := ch.getMetaCollectionItemMappings(sensorItem.MetaDisplayItemMapping.ID.String()) + if nil != err { + return err } + metaCollectionItemKeys := make([]string, 0) + + for _, metaCollectionItemMapping := range metaCollectionItemMappings { + metaCollectionItemKeys = append(metaCollectionItemKeys, metaCollectionItemMapping.MetaCollectionItem.Key) + } + + for _, metaCollectionItemKey := range metaCollectionItemKeys { + collectionItemValue, ok := metric.Data[metaCollectionItemKey] + if !ok { + return fmt.Errorf("Value of CollectionItem[%s] is not exist", metaCollectionItemKey) + } + + fields[metaCollectionItemKey] = collectionItemValue + } + + displayItemKey := sensorItem.MetaDisplayItemMapping.MetaDisplayItem.Key + displayItemValue, err := ch.getDisplayItemEval(sensorItem.MetaDisplayItemMapping.Formula, metaCollectionItemKeys, metric.Data) + if nil != err { + return err + } + tags[displayItemKey] = displayItemValue + pt, err := client.NewPoint("metric", tags, fields, time.Now()) if nil != err { - logging.Logger().Error(err) + return err } bp.AddPoint(pt) - if err := c.Write(bp); err != nil { - logging.Logger().Error(err) + return nil +} + +func (ch *ConsumerHandlers) getDisplayItemEval(formula string, metaCollectionItemKeys []string, items map[string]string) (string, error) { + exp := formula + + for _, metaCollectionItemKey := range metaCollectionItemKeys { + collectionItemValue, ok := items[metaCollectionItemKey] + if !ok { + return "", fmt.Errorf("Value of CollectionItem[%s] is not exist", metaCollectionItemKey) + } + + exp = strings.Replace(exp, fmt.Sprintf("{{%s}}", metaCollectionItemKey), collectionItemValue, -1) } + + return exp, nil +} + +func (ch *ConsumerHandlers) getSensor(sensorID string) (*ocms.Sensor, error) { + grpcCTX := context.Background() + r, err := oeg.Exec(grpcCTX, "SensorService.read", sensorID) + if nil != err { + return nil, err + } + + sensor := &ocms.Sensor{} + err = json.Unmarshal([]byte(r), sensor) + if nil != err { + return nil, err + } + return sensor, nil +} + +func (ch *ConsumerHandlers) getSensorItems(sensorID string) ([]*ocms.SensorItem, error) { + grpcCTX := context.Background() + r, err := oeg.Exec(grpcCTX, "SensorItemService.readAllBySensorID", sensorID) + if nil != err { + return nil, err + } + + sensorItems := make([]*ocms.SensorItem, 0) + err = json.Unmarshal([]byte(r), &sensorItems) + if nil != err { + return nil, err + } + return sensorItems, nil +} + +func (ch *ConsumerHandlers) getMetaCollectionItemMappings(metaDisplayItemMappingID string) ([]*ocmm.MetaCollectionItemMapping, error) { + grpcCTX := context.Background() + r, err := oeg.Exec(grpcCTX, "MetaCollectionItemMappingService.readAllByMetaDisplayItemMappingID", metaDisplayItemMappingID) + if nil != err { + return nil, err + } + + metaCollectionItemMappings := make([]*ocmm.MetaCollectionItemMapping, 0) + err = json.Unmarshal([]byte(r), &metaCollectionItemMappings) + if nil != err { + return nil, err + } + return metaCollectionItemMappings, nil } func (ch *ConsumerHandlers) Validate() error {