diff --git a/glide.yaml b/glide.yaml index 5c0e68d..8c95f7e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -22,3 +22,4 @@ import: - server - package: github.com/valyala/fasthttp version: ^20160617.0.0 +- package: git.loafle.net/overflow/external-go diff --git a/server/server-handler.go b/server/server-handler.go index 03809da..35be625 100644 --- a/server/server-handler.go +++ b/server/server-handler.go @@ -2,7 +2,7 @@ package server import ( cs "git.loafle.net/commons/server-go" - oge "git.loafle.net/overflow/gateway/external" + oe "git.loafle.net/overflow/external-go" ogrs "git.loafle.net/overflow/gateway_rpc/server" "git.loafle.net/overflow/probe_gateway_metric/config" ) @@ -21,7 +21,7 @@ func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error { if err := sh.ServerHandlers.Init(serverCtx); nil != err { return err } - oge.InitPackage(sh.Config.External) + oe.InitPackage(sh.Config.External) return nil } @@ -31,19 +31,19 @@ func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error { return err } - oge.StartPackage(sh.Config.External) + oe.StartPackage(sh.Config.External) return nil } func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) { - oge.StopPackage(sh.Config.External) + oe.StopPackage(sh.Config.External) sh.ServerHandlers.OnStop(serverCtx) } func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) { - oge.DestroyPackage(sh.Config.External) + oe.DestroyPackage(sh.Config.External) sh.ServerHandlers.Destroy(serverCtx) } diff --git a/servlet/data-servlet.go b/servlet/data-servlet.go index 0271a98..3e70442 100644 --- a/servlet/data-servlet.go +++ b/servlet/data-servlet.go @@ -2,6 +2,7 @@ package servlet import ( "context" + "encoding/json" "fmt" "github.com/valyala/fasthttp" @@ -11,9 +12,11 @@ import ( "git.loafle.net/commons/server-go/socket" cssw "git.loafle.net/commons/server-go/socket/web" occp "git.loafle.net/overflow/commons-go/config/probe" + ocmd "git.loafle.net/overflow/commons-go/model/data" + oe "git.loafle.net/overflow/external-go" + oeg "git.loafle.net/overflow/external-go/grpc" + oek "git.loafle.net/overflow/external-go/kafka" og "git.loafle.net/overflow/gateway" - "git.loafle.net/overflow/gateway/external/grpc" - "git.loafle.net/overflow/gateway/external/kafka" ) type DataServlet interface { @@ -73,13 +76,13 @@ func (s *DataServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.Req probeKey := string(bProbeKey) grpcCTX := context.Background() - _, err := grpc.Exec(grpcCTX, "ProbeService.readByProbeKey", probeKey) + _, err := oeg.Exec(grpcCTX, "ProbeService.readByProbeKey", probeKey) if nil != err { return nil, fmt.Errorf("grpc call Error: %s", err.Error()) } servletCtx.SetAttribute(og.SessionIDKey, probeKey) - servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE) + servletCtx.SetAttribute(og.SessionClientTypeKey, oe.PROBE) servletCtx.SetAttribute(og.SessionTargetIDKey, probeKey) return nil, nil @@ -111,7 +114,14 @@ func (s *DataServlets) Handle(servletCtx server.ServletCtx, return } - err = kafka.Write("metric", msg, msg) + m := &ocmd.Metric{} + err = json.Unmarshal(msg, m) + if nil != err { + logging.Logger().Error(err) + break + } + + err = oek.Write("metric", []byte(m.SensorConfigID), msg) if nil != err { logging.Logger().Error(err) break @@ -120,7 +130,5 @@ func (s *DataServlets) Handle(servletCtx server.ServletCtx, case <-stopChan: return } - } - }