This commit is contained in:
crusader 2018-07-05 17:18:14 +09:00
parent 6fa4e03ab3
commit 36d04bd10b

View File

@ -102,6 +102,8 @@ func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) {
return
}
ch.processSensoring(metric, bp)
for _, sensorItem := range sensorItems {
if err := ch.processSensorItem(sensorItem, metric, bp); nil != err {
logging.Logger().Error(err)
@ -112,7 +114,25 @@ func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) {
if err := c.Write(bp); err != nil {
logging.Logger().Error(err)
}
}
func (ch *ConsumerHandlers) processSensoring(metric *ocmd.Metric, bp client.BatchPoints) error {
tags := map[string]string{
"SensorID": metric.SensorID,
}
fields := map[string]interface{}{
"StartDate": metric.StartDate,
"EndDate": metric.EndDate,
"Success": metric.Success,
}
pt, err := client.NewPoint("sensor", tags, fields, time.Now())
if nil != err {
return err
}
bp.AddPoint(pt)
return nil
}
func (ch *ConsumerHandlers) processSensorItem(sensorItem *ocms.SensorItem, metric *ocmd.Metric, bp client.BatchPoints) error {