diff --git a/client/client.go b/client/client.go index aba638c..40da88e 100644 --- a/client/client.go +++ b/client/client.go @@ -330,11 +330,6 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { func (c *client) rpcReader(readerDone chan<- error) { var err error defer func() { - if r := recover(); r != nil { - if err == nil { - err = fmt.Errorf("Client: Panic when reading data from server: %v", r) - } - } readerDone <- err }() diff --git a/client/rwc/socket/client_rwc_handlers.go b/client/rwc/socket/client_rwc_handlers.go index 381cc9e..828f6b3 100644 --- a/client/rwc/socket/client_rwc_handlers.go +++ b/client/rwc/socket/client_rwc_handlers.go @@ -2,6 +2,7 @@ package socket import ( "git.loafle.net/commons_go/logging" + "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/client" "git.loafle.net/commons_go/rpc/protocol" csc "git.loafle.net/commons_go/server/client" @@ -25,20 +26,38 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex } func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { + var ( + buf []byte + rErr error + ) soc := conn.(csc.Socket) - return codec.NewResponse(soc) + buf, rErr = rpc.TCPReadData(soc) + if nil != rErr { + return nil, rErr + } + + return codec.NewResponseB(buf) } func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error { var ( + buf []byte wErr error ) soc := conn.(csc.Socket) - wErr = codec.WriteRequest(soc, method, params, id) + buf, wErr = codec.NewRequestB(method, params, id) + if nil != wErr { + return wErr + } - return wErr + wErr = rpc.TCPWriteData(soc, buf) + if nil != wErr { + return wErr + } + + return nil } func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) { diff --git a/client/rwc/websocket/fasthttp/client_rwc_handlers.go b/client/rwc/websocket/fasthttp/client_rwc_handlers.go index 5f4bc3d..3f93f88 100644 --- a/client/rwc/websocket/fasthttp/client_rwc_handlers.go +++ b/client/rwc/websocket/fasthttp/client_rwc_handlers.go @@ -30,7 +30,7 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { soc := conn.(cwfc.Socket) - _, r, err := soc.NextReader() + _, buf, err := soc.ReadMessage() if nil != err { if websocket.IsUnexpectedCloseError(err) { return nil, io.EOF @@ -38,30 +38,30 @@ func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientC return nil, err } - return codec.NewResponse(r) + return codec.NewResponseB(buf) } func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error { var ( - wc io.WriteCloser + buf []byte wErr error ) soc := conn.(cwfc.Socket) - wc, wErr = soc.NextWriter(websocket.TextMessage) + buf, wErr = codec.NewRequestB(method, params, id) + if nil != wErr { + return wErr + } + + wErr = soc.WriteMessage(websocket.TextMessage, buf) if nil != wErr { if websocket.IsUnexpectedCloseError(wErr) { return io.EOF } return wErr } - defer func() { - wc.Close() - }() - wErr = codec.WriteRequest(wc, method, params, id) - - return wErr + return nil } func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) { diff --git a/protocol/client_codec.go b/protocol/client_codec.go index 7f1a27e..2be739d 100644 --- a/protocol/client_codec.go +++ b/protocol/client_codec.go @@ -6,6 +6,9 @@ import ( // ClientCodec creates a ClientCodecRequest to process each request. type ClientCodec interface { + NewRequestB(method string, args []interface{}, id interface{}) ([]byte, error) + NewResponseB(buf []byte) (ClientResponseCodec, error) + WriteRequest(w io.Writer, method string, args []interface{}, id interface{}) error NewResponse(r io.Reader) (ClientResponseCodec, error) } diff --git a/protocol/json/client.go b/protocol/json/client.go index 7365066..9550936 100644 --- a/protocol/json/client.go +++ b/protocol/json/client.go @@ -45,3 +45,22 @@ func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args []interface func (cc *ClientCodec) NewResponse(r io.Reader) (protocol.ClientResponseCodec, error) { return newClientResponseCodec(r) } + +func (cc *ClientCodec) NewRequestB(method string, args []interface{}, id interface{}) ([]byte, error) { + params, err := convertParamsToStringArray(args) + if nil != err { + return nil, err + } + + req := &clientRequest{ + Version: Version, + Method: method, + Params: params, + ID: id, + } + + return json.Marshal(req) +} +func (cc *ClientCodec) NewResponseB(buf []byte) (protocol.ClientResponseCodec, error) { + return newClientResponseCodecB(buf) +} diff --git a/protocol/json/client_response.go b/protocol/json/client_response.go index 1101007..2787cc4 100644 --- a/protocol/json/client_response.go +++ b/protocol/json/client_response.go @@ -92,3 +92,27 @@ func newClientResponseCodec(r io.Reader) (protocol.ClientResponseCodec, error) { return &ClientResponseCodec{res: res, err: err}, nil } + +// newClientMessageCodec returns a new ClientMessageCodec. +func newClientResponseCodecB(buf []byte) (protocol.ClientResponseCodec, error) { + + res := &clientResponse{} + err := json.Unmarshal(buf, res) + + if err != nil { + err = &Error{ + Code: crp.E_PARSE, + Message: err.Error(), + Data: res, + } + } + if res.Version != Version { + err = &Error{ + Code: crp.E_INVALID_REQ, + Message: "jsonrpc must be " + Version, + Data: res, + } + } + + return &ClientResponseCodec{res: res, err: err}, nil +} diff --git a/protocol/json/server.go b/protocol/json/server.go index 39e44b9..244ef55 100644 --- a/protocol/json/server.go +++ b/protocol/json/server.go @@ -22,6 +22,22 @@ func NewServerCodec() protocol.ServerCodec { type ServerCodec struct { } +func (sc *ServerCodec) NewRequestB(buf []byte) (protocol.ServerRequestCodec, error) { + return newServerRequestCodecB(buf) +} + +func (sc *ServerCodec) NewNotificationB(method string, args []interface{}) ([]byte, error) { + params, err := convertParamsToStringArray(args) + if nil != err { + return nil, err + } + + noti := &serverNotification{Method: method, Params: params} + res := &serverResponse{Version: Version, Result: noti} + + return json.Marshal(res) +} + // NewRequest returns a ServerRequestCodec. func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerRequestCodec, error) { return newServerRequestCodec(r) diff --git a/protocol/json/server_request.go b/protocol/json/server_request.go index 5ca81e0..baa313a 100644 --- a/protocol/json/server_request.go +++ b/protocol/json/server_request.go @@ -66,6 +66,31 @@ func newServerRequestCodec(r io.Reader) (protocol.ServerRequestCodec, error) { return &ServerRequestCodec{req: req, err: err}, nil } +// newRequestCodec returns a new ServerRequestCodec. +func newServerRequestCodecB(buf []byte) (protocol.ServerRequestCodec, error) { + + req := &serverRequest{} + err := json.Unmarshal(buf, req) + + if err != nil { + err = &Error{ + Code: crp.E_PARSE, + Message: err.Error(), + Data: req, + } + } + + if req.Version != Version { + err = &Error{ + Code: crp.E_INVALID_REQ, + Message: "jsonrpc must be " + Version, + Data: req, + } + } + + return &ServerRequestCodec{req: req, err: err}, nil +} + // ServerRequestCodec decodes and encodes a single request. type ServerRequestCodec struct { req *serverRequest @@ -161,3 +186,23 @@ func (src *ServerRequestCodec) writeServerResponse(w io.Writer, res *serverRespo } return nil } + +func (src *ServerRequestCodec) NewResponseB(reply interface{}) ([]byte, error) { + res := &serverResponse{Version: Version, Result: reply, ID: src.req.ID} + return src.newServerResponseB(res) +} +func (src *ServerRequestCodec) NewErrorB(status int, err error) ([]byte, error) { + jsonErr, ok := err.(*Error) + if !ok { + jsonErr = &Error{ + Code: crp.E_SERVER, + Message: err.Error(), + } + } + res := &serverResponse{Version: Version, Error: jsonErr, ID: src.req.ID} + return src.newServerResponseB(res) +} +func (src *ServerRequestCodec) newServerResponseB(res *serverResponse) ([]byte, error) { + + return json.Marshal(res) +} diff --git a/protocol/server_codec.go b/protocol/server_codec.go index 92f98de..3e22301 100644 --- a/protocol/server_codec.go +++ b/protocol/server_codec.go @@ -8,6 +8,9 @@ import ( type ServerCodec interface { NewRequest(r io.Reader) (ServerRequestCodec, error) WriteNotification(w io.Writer, method string, args []interface{}) error + + NewRequestB(buf []byte) (ServerRequestCodec, error) + NewNotificationB(method string, args []interface{}) ([]byte, error) } // ServerRequestCodec decodes a request and encodes a response using a specific @@ -15,6 +18,9 @@ type ServerCodec interface { type ServerRequestCodec interface { RegistryCodec + NewResponseB(reply interface{}) ([]byte, error) + NewErrorB(status int, err error) ([]byte, error) + WriteResponse(w io.Writer, reply interface{}) error WriteError(w io.Writer, status int, err error) error } diff --git a/server/rwc/socket/servlet_rwc_handlers.go b/server/rwc/socket/servlet_rwc_handlers.go index c40539f..bf34278 100644 --- a/server/rwc/socket/servlet_rwc_handlers.go +++ b/server/rwc/socket/servlet_rwc_handlers.go @@ -15,35 +15,62 @@ type ServletReadWriteCloseHandlers struct { } func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { + var ( + buf []byte + rErr error + ) soc := conn.(server.Socket) - return codec.NewRequest(soc) + buf, rErr = rpc.TCPReadData(soc) + if nil != rErr { + return nil, rErr + } + + return codec.NewRequestB(buf) } func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error { var ( + buf []byte wErr error ) soc := conn.(server.Socket) if nil != err { - wErr = reqCodec.WriteError(soc, 500, err) + buf, wErr = reqCodec.NewErrorB(500, err) } else { - wErr = reqCodec.WriteResponse(soc, result) + buf, wErr = reqCodec.NewResponseB(result) + } + if nil != wErr { + return wErr } - return wErr + wErr = rpc.TCPWriteData(soc, buf) + if nil != wErr { + return wErr + } + + return nil } func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error { var ( + buf []byte wErr error ) soc := conn.(server.Socket) - wErr = codec.WriteNotification(soc, method, params) + buf, wErr = codec.NewNotificationB(method, params) + if nil != wErr { + return wErr + } - return wErr + wErr = rpc.TCPWriteData(soc, buf) + if nil != wErr { + return wErr + } + + return nil } func (srwch *ServletReadWriteCloseHandlers) Validate() { diff --git a/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go b/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go index fe2b1ab..439a5d6 100644 --- a/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go +++ b/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go @@ -20,7 +20,7 @@ type ServletReadWriteCloseHandlers struct { func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { soc := conn.(cwf.Socket) - _, r, err := soc.NextReader() + _, buf, err := soc.ReadMessage() if nil != err { if websocket.IsUnexpectedCloseError(err) { return nil, io.EOF @@ -28,17 +28,30 @@ func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletCo return nil, err } - return codec.NewRequest(r) + return codec.NewRequestB(buf) } func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { + if !requestCodec.HasResponse() { + return nil + } + var ( - wc io.WriteCloser + buf []byte wErr error ) soc := conn.(cwf.Socket) - wc, wErr = soc.NextWriter(websocket.TextMessage) + if nil != err { + buf, wErr = requestCodec.NewErrorB(500, err) + } else { + buf, wErr = requestCodec.NewResponseB(result) + } + if nil != wErr { + return wErr + } + + wErr = soc.WriteMessage(websocket.TextMessage, buf) if nil != wErr { if websocket.IsUnexpectedCloseError(wErr) { return io.EOF @@ -46,27 +59,22 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet return wErr } - defer func() { - wc.Close() - }() - - if nil != err { - wErr = requestCodec.WriteError(wc, 500, err) - } else { - wErr = requestCodec.WriteResponse(wc, result) - } - - return wErr + return nil } func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error { var ( - wc io.WriteCloser + buf []byte wErr error ) soc := conn.(cwf.Socket) - wc, wErr = soc.NextWriter(websocket.TextMessage) + buf, wErr = codec.NewNotificationB(method, params) + if nil != wErr { + return wErr + } + + wErr = soc.WriteMessage(websocket.TextMessage, buf) if nil != wErr { if websocket.IsUnexpectedCloseError(wErr) { return io.EOF @@ -74,13 +82,7 @@ func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.Ser return wErr } - defer func() { - wc.Close() - }() - - wErr = codec.WriteNotification(wc, method, params) - - return wErr + return nil } func (srwch *ServletReadWriteCloseHandlers) Validate() { diff --git a/tcp.go b/tcp.go new file mode 100644 index 0000000..06f8014 --- /dev/null +++ b/tcp.go @@ -0,0 +1,48 @@ +package rpc + +import ( + "encoding/binary" + "io" + "net" +) + +func TCPWriteData(conn net.Conn, buf []byte) error { + var wErr error + + prefix := make([]byte, 4) + binary.BigEndian.PutUint32(prefix, uint32(len(buf))) + + _, wErr = conn.Write(prefix) + if nil != wErr { + return wErr + } + _, wErr = conn.Write(buf) + if nil != wErr { + return wErr + } + + return nil +} + +func TCPReadData(conn net.Conn) ([]byte, error) { + var ( + buf []byte + rErr error + ) + + prefix := make([]byte, 4) + _, rErr = io.ReadFull(conn, prefix) + if nil != rErr { + return nil, rErr + } + + length := binary.BigEndian.Uint32(prefix) + + buf = make([]byte, int(length)) + _, rErr = io.ReadFull(conn, buf) + if nil != rErr { + return nil, rErr + } + + return buf, nil +}