diff --git a/internal/server/rpc/Untitled-1 b/internal/server/rpc/Untitled-1 deleted file mode 100644 index b620f24..0000000 --- a/internal/server/rpc/Untitled-1 +++ /dev/null @@ -1,42 +0,0 @@ - codec, err := sh.gwRPCSH.GetCodec(soc.Context().GetAttribute("contentType").(string)) - if nil != err { - log.Printf("RPC Handle: %v", err) - doneChan <- struct{}{} - return - } - - var socConn *cwf.SocketConn - ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, rpcSoc) - // conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) - - for { - if socConn, err = rpcSoc.WaitRequest(); nil != err { - doneChan <- struct{}{} - return - } - - // // "git.loafle.net/commons_go/websocket_fasthttp/websocket" - // switch socConn.MessageType { - // case websocket.TextMessage: - // case websocket.BinaryMessage: - // } - - if err = gateway.Handle(ctx, sh.gwRPCSH, codec, socConn, socConn); nil != err { - if server.IsClientDisconnect(err) { - doneChan <- struct{}{} - return - } - log.Printf("RPC: %v", err) - } - - if err = socConn.Close(); nil != err { - doneChan <- struct{}{} - return - } - - select { - case <-stopChan: - return - default: - } - } diff --git a/internal/server/rpc/gateway_rpc_servlet_handlers.go b/internal/server/rpc/gateway_rpc_servlet_handlers.go index fc419e6..e62b7de 100644 --- a/internal/server/rpc/gateway_rpc_servlet_handlers.go +++ b/internal/server/rpc/gateway_rpc_servlet_handlers.go @@ -40,8 +40,6 @@ func (sh *GatewayRPCServletHandlers) Invoke(servletCTX rpc.ServletContext, reque } return r, nil - - return } func (sh *GatewayRPCServletHandlers) Validate() { diff --git a/internal/server/rpc/rpc_gateway_servlet_handlers.go b/internal/server/rpc/rpc_gateway_servlet_handlers.go index 3ea3799..5a81f09 100644 --- a/internal/server/rpc/rpc_gateway_servlet_handlers.go +++ b/internal/server/rpc/rpc_gateway_servlet_handlers.go @@ -60,18 +60,25 @@ func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) { soc.Context().SetAttribute(cRPC.ContentTypeKey, json.Name) } -func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { +func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- error) { + var err error rpcServlet := retainRPCServlet(sh.gwRPCSH) - if err := rpcServlet.Start(soc.Context(), soc, soc); nil != err { - doneChan <- struct{}{} + defer func() { + releaseRPCServlet(rpcServlet) + doneChan <- err + }() + + rpcDoneChan := make(chan error, 1) + + if err = rpcServlet.Start(soc.Context(), soc, rpcDoneChan); nil != err { return } select { + case err = <-rpcDoneChan: case <-stopChan: rpcServlet.Stop() - return } }