diff --git a/consumer/consumer-handler.go b/consumer/consumer-handler.go index 7ac020d..0660772 100644 --- a/consumer/consumer-handler.go +++ b/consumer/consumer-handler.go @@ -102,6 +102,8 @@ func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { return } + ch.processSensoring(metric, bp) + for _, sensorItem := range sensorItems { if err := ch.processSensorItem(sensorItem, metric, bp); nil != err { logging.Logger().Error(err) @@ -112,7 +114,25 @@ func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { if err := c.Write(bp); err != nil { logging.Logger().Error(err) } +} +func (ch *ConsumerHandlers) processSensoring(metric *ocmd.Metric, bp client.BatchPoints) error { + tags := map[string]string{ + "SensorID": metric.SensorID, + } + fields := map[string]interface{}{ + "StartDate": metric.StartDate, + "EndDate": metric.EndDate, + "Success": metric.Success, + } + + pt, err := client.NewPoint("sensor", tags, fields, time.Now()) + if nil != err { + return err + } + bp.AddPoint(pt) + + return nil } func (ch *ConsumerHandlers) processSensorItem(sensorItem *ocms.SensorItem, metric *ocmd.Metric, bp client.BatchPoints) error {