From 9d30cf97269cedc72e7c978b4be5159cb06cf920 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 11 May 2018 12:24:29 +0900 Subject: [PATCH] ing --- config.json | 10 --- server/server.go | 3 - servlet/data-servlet.go | 145 ---------------------------------------- 3 files changed, 158 deletions(-) delete mode 100644 servlet/data-servlet.go diff --git a/config.json b/config.json index 80e4868..ff2ec6a 100644 --- a/config.json +++ b/config.json @@ -24,16 +24,6 @@ "redis": { "network": "tcp4", "address": "192.168.1.50:6379" - }, - "kafka": { - "producers": { - "metric": { - "brokers": [ - "192.168.1.50:9092" - ], - "topic": "overflow-metric-topic" - } - } } } } diff --git a/server/server.go b/server/server.go index 7653ac9..cc1c079 100644 --- a/server/server.go +++ b/server/server.go @@ -18,8 +18,6 @@ func New(_config *config.Config) *cssw.Server { ps.RPCServerCodec = rpcServerCodec ps.UseSession = true - ds := &servlet.DataServlets{} - sh := &ServerHandlers{ ServerHandlers: *_config.ServerHandler, Config: _config, @@ -27,7 +25,6 @@ func New(_config *config.Config) *cssw.Server { sh.RegisterServlet("/auth", as) sh.RegisterServlet("/probe", ps) - sh.RegisterServlet("/data", ds) s := &cssw.Server{ ServerHandler: sh, diff --git a/servlet/data-servlet.go b/servlet/data-servlet.go deleted file mode 100644 index 439c913..0000000 --- a/servlet/data-servlet.go +++ /dev/null @@ -1,145 +0,0 @@ -package servlet - -import ( - "context" - "fmt" - - "github.com/valyala/fasthttp" - - logging "git.loafle.net/commons/logging-go" - crp "git.loafle.net/commons/rpc-go/protocol" - "git.loafle.net/commons/server-go" - "git.loafle.net/commons/server-go/socket" - cssw "git.loafle.net/commons/server-go/socket/web" - occp "git.loafle.net/overflow/commons-go/config/probe" - og "git.loafle.net/overflow/gateway" - "git.loafle.net/overflow/gateway/external/grpc" - "git.loafle.net/overflow/gateway/external/kafka" -) - -type DataServlet interface { - cssw.Servlet -} - -type DataServlets struct { - cssw.Servlets - - RPCServerCodec crp.ServerCodec -} - -func (s *DataServlets) Init(serverCtx server.ServerCtx) error { - if err := s.Servlets.Init(serverCtx); nil != err { - return err - } - - return nil -} - -func (s *DataServlets) OnStart(serverCtx server.ServerCtx) error { - if err := s.Servlets.OnStart(serverCtx); nil != err { - return err - } - - return nil -} - -func (s *DataServlets) OnStop(serverCtx server.ServerCtx) { - - s.Servlets.OnStop(serverCtx) -} - -func (s *DataServlets) Destroy(serverCtx server.ServerCtx) { - - s.Servlets.Destroy(serverCtx) -} - -func (s *DataServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) { - // probe key extraction - bMethod := ctx.Request.Header.Peek(occp.HTTPRequestHeaderKey_Probe_Method) - if nil == bMethod { - return nil, fmt.Errorf("Unexpected probe method: %v", bMethod) - } - - method := string(bMethod) - - switch method { - case occp.HTTPRequestHeaderValue_Probe_Method_Connect: - default: - return nil, fmt.Errorf("Unexpected probe method: %s", method) - } - - bProbeKey := ctx.Request.Header.Peek(occp.HTTPRequestHeaderKey_Probe_ProbeKey) - if nil == bProbeKey { - return nil, fmt.Errorf("Unexpected probe key : %v", bProbeKey) - } - - probeKey := string(bProbeKey) - - grpcCTX := context.Background() - _, err := grpc.Exec(grpcCTX, "ProbeService.readByProbeKey", probeKey) - if nil != err { - return nil, fmt.Errorf("grpc call Error: %s", err.Error()) - } - - servletCtx.SetAttribute(og.SessionIDKey, probeKey) - servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE) - servletCtx.SetAttribute(og.SessionTargetIDKey, probeKey) - - return nil, nil -} - -func (s *DataServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { - s.Servlets.OnConnect(servletCtx, conn) -} - -func (s *DataServlets) OnDisconnect(servletCtx server.ServletCtx) { - s.Servlets.OnDisconnect(servletCtx) -} - -func (s *DataServlets) Handle(servletCtx server.ServletCtx, - stopChan <-chan struct{}, doneChan chan<- struct{}, - readChan <-chan []byte, writeChan chan<- []byte) { - defer func() { - doneChan <- struct{}{} - }() - - var ( - src crp.ServerRequestCodec - params []string - err error - ) - - for { - select { - case msg, ok := <-readChan: - if !ok { - return - } - src, err = s.RPCServerCodec.NewRequest(msg) - if nil != err { - logging.Logger().Error(err) - break - } - - params, err = src.Params() - if nil != err { - logging.Logger().Error(err) - break - } - if nil == params || 2 != len(params) { - logging.Logger().Errorf("metric is not valid %v", params) - break - } - err = kafka.Write("metric", []byte(params[0]), []byte(params[1])) - if nil != err { - logging.Logger().Error(err) - break - } - - case <-stopChan: - return - } - - } - -}