diff --git a/client/rwc/socket/client_handlers.go b/client/rwc/socket/client_handlers.go index 36ec13c..11218a3 100644 --- a/client/rwc/socket/client_handlers.go +++ b/client/rwc/socket/client_handlers.go @@ -2,6 +2,7 @@ package socket import ( "fmt" + "io" "git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/rpc/client" @@ -26,6 +27,10 @@ func (crwch *ClientReadWriteCloseHandlers) Connect() (interface{}, error) { } func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { + if nil == conn { + return nil, io.EOF + } + soc := conn.(server.Socket) resCodec, err := codec.NewResponse(soc) @@ -33,6 +38,10 @@ func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, } func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error { + if nil == conn { + return io.EOF + } + soc := conn.(server.Socket) if wErr := codec.WriteRequest(soc, method, params); nil != wErr { @@ -43,6 +52,10 @@ func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX ClientContext, } func (crwch *ClientReadWriteCloseHandlers) Disconnect(conn interface{}) { + if nil == conn { + return + } + soc := conn.(server.Socket) soc.Close() } diff --git a/client/rwc/websocket/fasthttp/client_handlers.go b/client/rwc/websocket/fasthttp/client_handlers.go index 9b83bee..f79b365 100644 --- a/client/rwc/websocket/fasthttp/client_handlers.go +++ b/client/rwc/websocket/fasthttp/client_handlers.go @@ -2,6 +2,7 @@ package fasthttp import ( "fmt" + "io" "github.com/gorilla/websocket" @@ -23,6 +24,10 @@ func (crwch *ClientReadWriteCloseHandlers) Connect() (interface{}, error) { } func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { + if nil == conn { + return nil, io.EOF + } + soc := conn.(cwf.Socket) _, r, err := soc.NextReader() @@ -32,12 +37,19 @@ func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, } func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error { + if nil == conn { + return io.EOF + } + soc := conn.(cwf.Socket) wc, wErr := soc.NextWriter(websocket.TextMessage) if nil != wErr { return wErr } + defer func() { + wc.Close() + }() if wErr := codec.WriteRequest(wc, method, params); nil != wErr { return wErr @@ -47,6 +59,10 @@ func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX ClientContext, } func (crwch *ClientReadWriteCloseHandlers) Disconnect(conn interface{}) { + if nil == conn { + return + } + soc := conn.(cwf.Socket) soc.Close() } diff --git a/protocol/json/client_response.go b/protocol/json/client_response.go index 0d5ede2..3fc0562 100644 --- a/protocol/json/client_response.go +++ b/protocol/json/client_response.go @@ -65,8 +65,13 @@ func (crc *ClientResponseCodec) Notification() (protocol.ClientNotificationCodec // newClientMessageCodec returns a new ClientMessageCodec. func newClientResponseCodec(r io.Reader, codec codec.Codec) (protocol.ClientResponseCodec, error) { + decoder := json.NewDecoder(r) + if nil == r { + return nil, io.EOF + } + res := &clientResponse{} - err := json.NewDecoder(r).Decode(res) + err := decoder.Decode(res) if err != nil { if err == io.ErrUnexpectedEOF || err == io.EOF { return nil, err diff --git a/protocol/json/server_request.go b/protocol/json/server_request.go index ed294d4..4935031 100644 --- a/protocol/json/server_request.go +++ b/protocol/json/server_request.go @@ -36,8 +36,13 @@ type serverRequest struct { // newRequestCodec returns a new ServerRequestCodec. func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerRequestCodec, error) { // Decode the request body and check if RPC method is valid. + decoder := json.NewDecoder(r) + if nil == r { + return nil, io.EOF + } + req := &serverRequest{} - err := json.NewDecoder(r).Decode(req) + err := decoder.Decode(req) if err == io.ErrUnexpectedEOF || err == io.EOF { return nil, err } diff --git a/server/rwc/socket/servlet_handlers.go b/server/rwc/socket/servlet_handlers.go index a33e468..14724e5 100644 --- a/server/rwc/socket/servlet_handlers.go +++ b/server/rwc/socket/servlet_handlers.go @@ -1,6 +1,8 @@ package socket import ( + "io" + "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/server" @@ -15,6 +17,10 @@ type ServletReadWriteCloseHandlers struct { } func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { + if nil == conn { + return nil, io.EOF + } + soc := conn.(server.Socket) reqCodec, err := codec.NewRequest(soc) @@ -22,6 +28,10 @@ func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletCo } func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error { + if nil == conn { + return io.EOF + } + soc := conn.(server.Socket) if nil != err { @@ -38,6 +48,10 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet } func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error { + if nil == conn { + return io.EOF + } + soc := conn.(server.Socket) if wErr := codec.WriteNotification(soc, method, params); nil != wErr { diff --git a/server/rwc/websocket/fasthttp/servlet_handlers.go b/server/rwc/websocket/fasthttp/servlet_handlers.go index 0459970..cc60128 100644 --- a/server/rwc/websocket/fasthttp/servlet_handlers.go +++ b/server/rwc/websocket/fasthttp/servlet_handlers.go @@ -1,9 +1,8 @@ package fasthttp import ( - "fmt" + "io" - "git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" cwf "git.loafle.net/commons_go/websocket_fasthttp" @@ -19,7 +18,10 @@ type ServletReadWriteCloseHandlers struct { } func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { - logging.Logger().Debug(fmt.Sprintf("ReadRequest: conn %v", conn)) + if nil == conn { + return nil, io.EOF + } + soc := conn.(cwf.Socket) _, r, err := soc.NextReader() @@ -29,7 +31,10 @@ func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletCo } func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { - logging.Logger().Debug(fmt.Sprintf("WriteResponse: conn %v", conn)) + if nil == conn { + return io.EOF + } + soc := conn.(cwf.Socket) wc, wErr := soc.NextWriter(websocket.TextMessage) @@ -55,7 +60,10 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet } func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error { - logging.Logger().Debug(fmt.Sprintf("WriteNotification: conn %v", conn)) + if nil == conn { + return io.EOF + } + soc := conn.(cwf.Socket) wc, wErr := soc.NextWriter(websocket.TextMessage)