This commit is contained in:
crusader 2018-05-11 17:24:12 +09:00
parent 82625bdaae
commit 71acf24d9c
3 changed files with 21 additions and 12 deletions

View File

@ -22,3 +22,4 @@ import:
- server
- package: github.com/valyala/fasthttp
version: ^20160617.0.0
- package: git.loafle.net/overflow/external-go

View File

@ -2,7 +2,7 @@ package server
import (
cs "git.loafle.net/commons/server-go"
oge "git.loafle.net/overflow/gateway/external"
oe "git.loafle.net/overflow/external-go"
ogrs "git.loafle.net/overflow/gateway_rpc/server"
"git.loafle.net/overflow/probe_gateway_metric/config"
)
@ -21,7 +21,7 @@ func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.Init(serverCtx); nil != err {
return err
}
oge.InitPackage(sh.Config.External)
oe.InitPackage(sh.Config.External)
return nil
}
@ -31,19 +31,19 @@ func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error {
return err
}
oge.StartPackage(sh.Config.External)
oe.StartPackage(sh.Config.External)
return nil
}
func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) {
oge.StopPackage(sh.Config.External)
oe.StopPackage(sh.Config.External)
sh.ServerHandlers.OnStop(serverCtx)
}
func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) {
oge.DestroyPackage(sh.Config.External)
oe.DestroyPackage(sh.Config.External)
sh.ServerHandlers.Destroy(serverCtx)
}

View File

@ -2,6 +2,7 @@ package servlet
import (
"context"
"encoding/json"
"fmt"
"github.com/valyala/fasthttp"
@ -11,9 +12,11 @@ import (
"git.loafle.net/commons/server-go/socket"
cssw "git.loafle.net/commons/server-go/socket/web"
occp "git.loafle.net/overflow/commons-go/config/probe"
ocmd "git.loafle.net/overflow/commons-go/model/data"
oe "git.loafle.net/overflow/external-go"
oeg "git.loafle.net/overflow/external-go/grpc"
oek "git.loafle.net/overflow/external-go/kafka"
og "git.loafle.net/overflow/gateway"
"git.loafle.net/overflow/gateway/external/grpc"
"git.loafle.net/overflow/gateway/external/kafka"
)
type DataServlet interface {
@ -73,13 +76,13 @@ func (s *DataServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.Req
probeKey := string(bProbeKey)
grpcCTX := context.Background()
_, err := grpc.Exec(grpcCTX, "ProbeService.readByProbeKey", probeKey)
_, err := oeg.Exec(grpcCTX, "ProbeService.readByProbeKey", probeKey)
if nil != err {
return nil, fmt.Errorf("grpc call Error: %s", err.Error())
}
servletCtx.SetAttribute(og.SessionIDKey, probeKey)
servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE)
servletCtx.SetAttribute(og.SessionClientTypeKey, oe.PROBE)
servletCtx.SetAttribute(og.SessionTargetIDKey, probeKey)
return nil, nil
@ -111,7 +114,14 @@ func (s *DataServlets) Handle(servletCtx server.ServletCtx,
return
}
err = kafka.Write("metric", msg, msg)
m := &ocmd.Metric{}
err = json.Unmarshal(msg, m)
if nil != err {
logging.Logger().Error(err)
break
}
err = oek.Write("metric", []byte(m.SensorConfigID), msg)
if nil != err {
logging.Logger().Error(err)
break
@ -120,7 +130,5 @@ func (s *DataServlets) Handle(servletCtx server.ServletCtx,
case <-stopChan:
return
}
}
}