diff --git a/Gopkg.lock b/Gopkg.lock index 9e30179..fcf50f0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -17,7 +17,7 @@ branch = "master" name = "git.loafle.net/commons/util-go" packages = ["context"] - revision = "4d4017d214d2a8fdde59d774254f421991fabe7e" + revision = "dd4f956c587b1947f84ad5eebd64722b3d6c5224" [[projects]] branch = "master" @@ -29,13 +29,13 @@ branch = "master" name = "git.loafle.net/overflow/commons-go" packages = ["config/external"] - revision = "846c510a1ae54597ef1b1589613dfca7a3f7a3ed" + revision = "e2c0324eb632822050f3d0379ddd1a13c2aac2b1" [[projects]] branch = "master" name = "git.loafle.net/overflow/consumer" packages = ["."] - revision = "d08498b62bce09b0301ac70186e2d68ad4203557" + revision = "722f1b56e01f547b3819cd1f34fcd020d46e72b1" [[projects]] branch = "master" @@ -47,7 +47,7 @@ "kafka", "redis" ] - revision = "42e9cb7c30edf264548145a08505fc36cf041ef5" + revision = "bcba3516af4af873066d818623c1870d80f1f9d1" [[projects]] name = "github.com/BurntSushi/toml" @@ -83,8 +83,8 @@ "models", "pkg/escape" ] - revision = "89e084a80fb1e0bf5e7d38038e3367f821fdf3d7" - version = "v1.5.3" + revision = "4e4e00bc5ab85a3ff5e988c91020cf0399a87026" + version = "v1.5.4" [[projects]] branch = "master" @@ -129,7 +129,7 @@ "internal/timeseries", "trace" ] - revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196" + revision = "ed29d75add3d7c4bf7ca65aac0c6df3d1420216f" [[projects]] name = "golang.org/x/text" @@ -156,7 +156,7 @@ branch = "master" name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] - revision = "32ee49c4dd805befd833990acba36cb75042378c" + revision = "ff3583edef7de132f219f0efc00e097cabcc0ec0" [[projects]] name = "google.golang.org/grpc" @@ -165,15 +165,16 @@ "balancer", "balancer/base", "balancer/roundrobin", - "channelz", "codes", "connectivity", "credentials", "encoding", "encoding/proto", - "grpclb/grpc_lb_v1/messages", "grpclog", "internal", + "internal/backoff", + "internal/channelz", + "internal/grpcrand", "keepalive", "metadata", "naming", @@ -186,8 +187,8 @@ "tap", "transport" ] - revision = "7a6a684ca69eb4cae85ad0a484f2e531598c047b" - version = "v1.12.2" + revision = "168a6198bcb0ef175f7dacec0b8691fc141dc9b8" + version = "v1.13.0" [[projects]] name = "gopkg.in/yaml.v2" diff --git a/consumer/consumer-handler.go b/consumer/consumer-handler.go index 9454161..37a5412 100644 --- a/consumer/consumer-handler.go +++ b/consumer/consumer-handler.go @@ -1,12 +1,14 @@ package consumer import ( + "encoding/json" "fmt" "strings" "sync/atomic" "time" "git.loafle.net/commons/logging-go" + ocmd "git.loafle.net/overflow/commons-go/model/data" oc "git.loafle.net/overflow/consumer" "git.loafle.net/overflow/consumer_metric/config" oe "git.loafle.net/overflow/external-go" @@ -60,27 +62,39 @@ func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) { } func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { + metric := &ocmd.Metric{} + if err := json.Unmarshal(msg.Value, metric); nil != err { + logging.Logger().Error(err) + return + } + c := oei.ClientFor(ch.InfluxDBClientName) if nil == c { logging.Logger().Errorf("Client of InfluxDB is not valid") return } - bp := oei.BatchPointsFor("") + bp := oei.BatchPointsFor("metric") if nil == bp { logging.Logger().Errorf("BatchPoints of InfluxDB is not valid") return } // Create a point and add to batch - tags := map[string]string{"cpu": "cpu-total"} + tags := map[string]string{ + "SensorID": metric.SensorID, + } fields := map[string]interface{}{ - "idle": 10.1, - "system": 53.3, - "user": 46.6, + "StartDate": metric.StartDate, + "EndDate": metric.EndDate, + "Success": metric.Success, } - pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) + for key, value := range metric.Data { + fields[key] = value + } + + pt, err := client.NewPoint("metric", tags, fields, time.Now()) if nil != err { logging.Logger().Error(err) }