diff --git a/config.json b/config.json index ff2ec6a..80e4868 100644 --- a/config.json +++ b/config.json @@ -24,6 +24,16 @@ "redis": { "network": "tcp4", "address": "192.168.1.50:6379" + }, + "kafka": { + "producers": { + "metric": { + "brokers": [ + "192.168.1.50:9092" + ], + "topic": "overflow-metric-topic" + } + } } } } diff --git a/server/server.go b/server/server.go index 924b87d..7653ac9 100644 --- a/server/server.go +++ b/server/server.go @@ -1,17 +1,23 @@ package server import ( + crpj "git.loafle.net/commons/rpc-go/protocol/json" cssw "git.loafle.net/commons/server-go/socket/web" "git.loafle.net/overflow/probe_gateway_rpc/config" "git.loafle.net/overflow/probe_gateway_rpc/servlet" ) func New(_config *config.Config) *cssw.Server { + rpcServerCodec := crpj.NewServerCodec() as := &servlet.AuthServlets{} + as.RPCServerCodec = rpcServerCodec as.UseSession = true + ps := &servlet.ProbeServlets{} + ps.RPCServerCodec = rpcServerCodec ps.UseSession = true + ds := &servlet.DataServlets{} sh := &ServerHandlers{ diff --git a/servlet/data-servlet.go b/servlet/data-servlet.go index 186397a..439c913 100644 --- a/servlet/data-servlet.go +++ b/servlet/data-servlet.go @@ -2,35 +2,33 @@ package servlet import ( "context" - "crypto/rsa" "fmt" - "sync" "github.com/valyala/fasthttp" + logging "git.loafle.net/commons/logging-go" + crp "git.loafle.net/commons/rpc-go/protocol" "git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go/socket" + cssw "git.loafle.net/commons/server-go/socket/web" occp "git.loafle.net/overflow/commons-go/config/probe" og "git.loafle.net/overflow/gateway" "git.loafle.net/overflow/gateway/external/grpc" - ogrs "git.loafle.net/overflow/gateway_rpc/servlet" + "git.loafle.net/overflow/gateway/external/kafka" ) type DataServlet interface { - ogrs.RPCServlet + cssw.Servlet } type DataServlets struct { - ogrs.RPCServlets + cssw.Servlets - VerifyKey *rsa.PublicKey - SignKey *rsa.PrivateKey - - connections sync.Map + RPCServerCodec crp.ServerCodec } func (s *DataServlets) Init(serverCtx server.ServerCtx) error { - if err := s.RPCServlets.Init(serverCtx); nil != err { + if err := s.Servlets.Init(serverCtx); nil != err { return err } @@ -38,7 +36,7 @@ func (s *DataServlets) Init(serverCtx server.ServerCtx) error { } func (s *DataServlets) OnStart(serverCtx server.ServerCtx) error { - if err := s.RPCServlets.OnStart(serverCtx); nil != err { + if err := s.Servlets.OnStart(serverCtx); nil != err { return err } @@ -47,12 +45,12 @@ func (s *DataServlets) OnStart(serverCtx server.ServerCtx) error { func (s *DataServlets) OnStop(serverCtx server.ServerCtx) { - s.RPCServlets.OnStop(serverCtx) + s.Servlets.OnStop(serverCtx) } func (s *DataServlets) Destroy(serverCtx server.ServerCtx) { - s.RPCServlets.Destroy(serverCtx) + s.Servlets.Destroy(serverCtx) } func (s *DataServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) { @@ -91,9 +89,57 @@ func (s *DataServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.Req } func (s *DataServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { - s.RPCServlets.OnConnect(servletCtx, conn) + s.Servlets.OnConnect(servletCtx, conn) } func (s *DataServlets) OnDisconnect(servletCtx server.ServletCtx) { - s.RPCServlets.OnDisconnect(servletCtx) + s.Servlets.OnDisconnect(servletCtx) +} + +func (s *DataServlets) Handle(servletCtx server.ServletCtx, + stopChan <-chan struct{}, doneChan chan<- struct{}, + readChan <-chan []byte, writeChan chan<- []byte) { + defer func() { + doneChan <- struct{}{} + }() + + var ( + src crp.ServerRequestCodec + params []string + err error + ) + + for { + select { + case msg, ok := <-readChan: + if !ok { + return + } + src, err = s.RPCServerCodec.NewRequest(msg) + if nil != err { + logging.Logger().Error(err) + break + } + + params, err = src.Params() + if nil != err { + logging.Logger().Error(err) + break + } + if nil == params || 2 != len(params) { + logging.Logger().Errorf("metric is not valid %v", params) + break + } + err = kafka.Write("metric", []byte(params[0]), []byte(params[1])) + if nil != err { + logging.Logger().Error(err) + break + } + + case <-stopChan: + return + } + + } + }