diff --git a/config.json b/config.json index 7a2088b..eef73c5 100644 --- a/config.json +++ b/config.json @@ -16,6 +16,19 @@ "maxBytes": 1000000 } } + }, + "influxDB": { + "clientConfigs": { + "metric": { + "clientType": "http", + "address": "192.168.1.50:18086" + } + }, + "batchPointsConfigs": { + "metric": { + "database": "metric" + } + } } } } diff --git a/consumer/consumer-handler.go b/consumer/consumer-handler.go index d2e4bb9..9454161 100644 --- a/consumer/consumer-handler.go +++ b/consumer/consumer-handler.go @@ -1,11 +1,17 @@ package consumer import ( + "fmt" + "strings" "sync/atomic" + "time" + "git.loafle.net/commons/logging-go" oc "git.loafle.net/overflow/consumer" "git.loafle.net/overflow/consumer_metric/config" oe "git.loafle.net/overflow/external-go" + oei "git.loafle.net/overflow/external-go/influxdb" + "github.com/influxdata/influxdb/client/v2" "github.com/segmentio/kafka-go" ) @@ -18,6 +24,8 @@ type ConsumerHandlers struct { Config *config.Config + InfluxDBClientName string + validated atomic.Value } @@ -52,7 +60,35 @@ func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) { } func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { + c := oei.ClientFor(ch.InfluxDBClientName) + if nil == c { + logging.Logger().Errorf("Client of InfluxDB is not valid") + return + } + bp := oei.BatchPointsFor("") + if nil == bp { + logging.Logger().Errorf("BatchPoints of InfluxDB is not valid") + return + } + + // Create a point and add to batch + tags := map[string]string{"cpu": "cpu-total"} + fields := map[string]interface{}{ + "idle": 10.1, + "system": 53.3, + "user": 46.6, + } + + pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) + if nil != err { + logging.Logger().Error(err) + } + bp.AddPoint(pt) + + if err := c.Write(bp); err != nil { + logging.Logger().Error(err) + } } func (ch *ConsumerHandlers) Validate() error { @@ -65,5 +101,9 @@ func (ch *ConsumerHandlers) Validate() error { } ch.validated.Store(true) + if "" == strings.TrimSpace(ch.InfluxDBClientName) { + return fmt.Errorf("InfluxDBClientName is not valid") + } + return nil } diff --git a/glide.yaml b/glide.yaml index 5ab7ed7..1fd23e6 100644 --- a/glide.yaml +++ b/glide.yaml @@ -8,3 +8,7 @@ import: - package: github.com/segmentio/kafka-go - package: git.loafle.net/overflow/consumer - package: git.loafle.net/overflow/external-go +- package: github.com/influxdata/influxdb + version: ^1.5.2 + subpackages: + - client/v2