From 7ec23e0ba2e22e798b87c91ec7af1eb807bbfd42 Mon Sep 17 00:00:00 2001 From: crusader Date: Thu, 19 Apr 2018 23:07:14 +0900 Subject: [PATCH] ing --- constants.go | 5 +++++ service/ProbeService.go | 12 +++--------- servlet/rpc-servlet.go | 22 ++++++++++++++++------ 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/constants.go b/constants.go index 1facbab..ff32f2a 100644 --- a/constants.go +++ b/constants.go @@ -5,3 +5,8 @@ const ( CONTAINER_RPC_CLIENT_CODEC = "CONTAINER_RPC_CLIENT_CODEC" CONTAINER_RPC_WRITE_CHAN = "CONTAINER_RPC_WRITE_CHAN" ) + +type RPCNotification struct { + Method string + Params []interface{} +} diff --git a/service/ProbeService.go b/service/ProbeService.go index e23bba2..c6d2c86 100644 --- a/service/ProbeService.go +++ b/service/ProbeService.go @@ -6,7 +6,7 @@ import ( cda "git.loafle.net/commons/di-go/annotation" cdr "git.loafle.net/commons/di-go/registry" - crp "git.loafle.net/commons/rpc-go/protocol" + "git.loafle.net/overflow/container-go" // For annotation _ "git.loafle.net/overflow/commons-go/core/annotation" @@ -21,8 +21,7 @@ func init() { type ProbeService struct { cda.TypeAnnotation `annotation:"@overflow:RPCService()"` - RPCClientCodec crp.ClientCodec `annotation:"@Resource(name='CONTAINER_RPC_CLIENT_CODEC')"` - RPCWriteChan chan<- []byte `annotation:"@Resource(name='CONTAINER_RPC_WRITE_CHAN')"` + RPCWriteChan chan<- *container.RPCNotification `annotation:"@Resource(name='CONTAINER_RPC_WRITE_CHAN')"` } func (s *ProbeService) InitService() error { @@ -43,13 +42,8 @@ func (s *ProbeService) DestroyService() { } func (s *ProbeService) Send(method string, params ...interface{}) error { - buf, err := s.RPCClientCodec.NewRequest(method, params, nil) - if nil != err { - return err - } - select { - case s.RPCWriteChan <- buf: + case s.RPCWriteChan <- &container.RPCNotification{Method: method, Params: params}: default: return fmt.Errorf("cannot write to rpcWriteChan") } diff --git a/servlet/rpc-servlet.go b/servlet/rpc-servlet.go index b907432..82f2b46 100644 --- a/servlet/rpc-servlet.go +++ b/servlet/rpc-servlet.go @@ -5,11 +5,11 @@ import ( "git.loafle.net/commons/logging-go" crp "git.loafle.net/commons/rpc-go/protocol" - crpj "git.loafle.net/commons/rpc-go/protocol/json" crr "git.loafle.net/commons/rpc-go/registry" "git.loafle.net/commons/server-go" css "git.loafle.net/commons/server-go/socket" cssn "git.loafle.net/commons/server-go/socket/net" + "git.loafle.net/overflow/container-go" ) type RPCServlet interface { @@ -19,8 +19,9 @@ type RPCServlet interface { type RPCServlets struct { cssn.Servlets - RPCInvoker crr.RPCInvoker - RPCWriteChan <-chan []byte + RPCServerCodec crp.ServerCodec + RPCInvoker crr.RPCInvoker + RPCWriteChan <-chan *container.RPCNotification } func (s *RPCServlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error { @@ -49,7 +50,7 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx, replyBuff []byte err error ) - sc := crpj.NewServerCodec() + go s.handleRPCWrite(stopChan, writeChan) for { @@ -59,7 +60,7 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx, return } // grpc exec method call - src, err = sc.NewRequest(msg) + src, err = s.RPCServerCodec.NewRequest(msg) if nil != err { logging.Logger().Error(err) break @@ -81,13 +82,22 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx, } func (s *RPCServlets) handleRPCWrite(stopChan <-chan struct{}, writeChan chan<- []byte) { + var ( + buf []byte + err error + ) for { select { - case buf, ok := <-s.RPCWriteChan: + case noti, ok := <-s.RPCWriteChan: if !ok { break } + buf, err = s.RPCServerCodec.NewNotification(noti.Method, noti.Params) + if nil != err { + logging.Logger().Error(err) + } + writeChan <- buf case <-stopChan: return