diff --git a/client/client.go b/client/client.go index 82cd311..b96ced0 100644 --- a/client/client.go +++ b/client/client.go @@ -303,7 +303,7 @@ func (c *client) rpcReader(readerDone chan<- error) { }() for { - msg, err := c.ch.GetCodec().NewMessage(c.conn) + resCodec, err := c.ch.GetCodec().NewResponse(c.conn) if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { logging.Logger().Info("Client: disconnected from server") @@ -314,12 +314,10 @@ func (c *client) rpcReader(readerDone chan<- error) { continue } - switch msg.MessageType() { - case protocol.MessageTypeResponse: - c.handleResponse(msg) - case protocol.MessageTypeNotification: - c.handleNotification(msg) - default: + if nil != resCodec.ID() { + err = c.handleResponse(resCodec) + } else { + err = c.handleNotification(resCodec) } if nil != err { @@ -330,14 +328,7 @@ func (c *client) rpcReader(readerDone chan<- error) { } -func (c *client) handleResponse(msg protocol.ClientMessageCodec) error { - codec, err := msg.MessageCodec() - if nil != err { - return err - } - - resCodec := codec.(protocol.ClientResponseCodec) - +func (c *client) handleResponse(resCodec protocol.ClientResponseCodec) error { c.pendingRequestsLock.Lock() id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint() @@ -369,14 +360,12 @@ func (c *client) handleResponse(msg protocol.ClientMessageCodec) error { return nil } -func (c *client) handleNotification(msg protocol.ClientMessageCodec) error { - codec, err := msg.MessageCodec() +func (c *client) handleNotification(resCodec protocol.ClientResponseCodec) error { + notiCodec, err := resCodec.Notification() if nil != err { return err } - notiCodec := codec.(protocol.ClientNotificationCodec) - _, err = c.ch.GetRPCRegistry().Invoke(notiCodec) return err diff --git a/connection/socket/servlet_handlers.go b/connection/socket/servlet_handlers.go index 09b8525..add09a9 100644 --- a/connection/socket/servlet_handlers.go +++ b/connection/socket/servlet_handlers.go @@ -10,14 +10,14 @@ import ( type ServletHandlers struct { } -func (sh *ServletHandlers) GetRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { +func (sh *ServletHandlers) ReadRequest(servletCTX rpc.RPCServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { nConn := conn.(net.Conn) requestCodec, err := codec.NewRequest(nConn) return requestCodec, err } -func (sh *ServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { +func (sh *ServletHandlers) WriteResponse(servletCTX rpc.RPCServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { nConn := conn.(net.Conn) if nil != err { @@ -33,7 +33,7 @@ func (sh *ServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn inte return nil } -func (sh *ServletHandlers) SendNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error { +func (sh *ServletHandlers) WriteNotification(servletCTX rpc.RPCServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { nConn := conn.(net.Conn) if wErr := codec.WriteNotification(nConn, method, args); nil != wErr { diff --git a/connection/websocket/fasthttp/servlet_handlers.go b/connection/websocket/fasthttp/servlet_handlers.go index 33958c6..3c9f497 100644 --- a/connection/websocket/fasthttp/servlet_handlers.go +++ b/connection/websocket/fasthttp/servlet_handlers.go @@ -1,17 +1,16 @@ package fasthttp import ( - "github.com/gorilla/websocket" - "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" cwf "git.loafle.net/commons_go/websocket_fasthttp" + "git.loafle.net/commons_go/websocket_fasthttp/websocket" ) type ServletHandlers struct { } -func (sh *ServletHandlers) GetRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { +func (sh *ServletHandlers) ReadRequest(servletCTX rpc.RPCServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { soc := conn.(cwf.Socket) _, r, err := soc.NextReader() @@ -20,7 +19,7 @@ func (sh *ServletHandlers) GetRequest(servletCTX rpc.ServletContext, codec proto return requestCodec, err } -func (sh *ServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { +func (sh *ServletHandlers) WriteResponse(servletCTX rpc.RPCServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { soc := conn.(cwf.Socket) wc, wErr := soc.NextWriter(websocket.TextMessage) @@ -41,7 +40,7 @@ func (sh *ServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn inte return nil } -func (sh *ServletHandlers) SendNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error { +func (sh *ServletHandlers) WriteNotification(servletCTX rpc.RPCServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { soc := conn.(cwf.Socket) wc, wErr := soc.NextWriter(websocket.TextMessage) diff --git a/constants.go b/constants.go index 4cd6c6c..eab1ee4 100644 --- a/constants.go +++ b/constants.go @@ -3,7 +3,7 @@ package rpc import cuc "git.loafle.net/commons_go/util/context" const ( - DefaultPendingMessages = 32 * 1024 + DefaultPendingResponses = 32 * 1024 ) var ( diff --git a/protocol/client_codec.go b/protocol/client_codec.go index c77b25b..1f2a134 100644 --- a/protocol/client_codec.go +++ b/protocol/client_codec.go @@ -4,40 +4,17 @@ import ( "io" ) -type MessageType int - -const ( - MessageTypeUnknown MessageType = iota - MessageTypeRequest - MessageTypeResponse - MessageTypeNotification -) - // ClientCodec creates a ClientCodecRequest to process each request. type ClientCodec interface { WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error - NewMessage(rc io.Reader) (ClientMessageCodec, error) -} - -type ClientRequestCodec interface { - RegistryCodec -} - -// ClientMessageCodec decodes a response or notification using a specific -// serialization scheme. -type ClientMessageCodec interface { - MessageType() MessageType - // Reads the message filling the RPC response or notification. - MessageCodec() (interface{}, error) - - Close() + NewResponse(rc io.Reader) (ClientResponseCodec, error) } type ClientResponseCodec interface { + Notification() (ClientNotificationCodec, error) Result(result interface{}) error Error() error ID() interface{} - Close() } type ClientNotificationCodec interface { diff --git a/protocol/json/client.go b/protocol/json/client.go index f6555eb..550ed3a 100644 --- a/protocol/json/client.go +++ b/protocol/json/client.go @@ -2,21 +2,12 @@ package json import ( "encoding/json" - "fmt" "io" - "sync" "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" ) -// clientMessage represents a JSON-RPC message sent to a client. -type clientMessage struct { - Version string `json:"jsonrpc"` - MessageType protocol.MessageType `json:"messageType"` - Message *json.RawMessage `json:"message"` -} - // ---------------------------------------------------------------------------- // Codec // ---------------------------------------------------------------------------- @@ -37,10 +28,12 @@ type ClientCodec struct { } func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error { - req := retainClientRequest(method, args, id) - defer func() { - releaseClientRequest(req) - }() + req := &clientRequest{ + Version: Version, + Method: method, + Params: args, + ID: id, + } encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w)) if err := encoder.Encode(req); nil != err { @@ -51,96 +44,6 @@ func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{} } // NewMessage returns a ClientMessageCodec. -func (cc *ClientCodec) NewMessage(r io.Reader) (protocol.ClientMessageCodec, error) { - return newClientMessageCodec(r, cc.codecSel.SelectByReader(r)) -} - -// newClientMessageCodec returns a new ClientMessageCodec. -func newClientMessageCodec(r io.Reader, codec codec.Codec) (protocol.ClientMessageCodec, error) { - msg := retainClientMessage() - err := json.NewDecoder(r).Decode(msg) - if err != nil { - if err == io.ErrUnexpectedEOF || err == io.EOF { - return nil, err - } - err = &Error{ - Code: E_PARSE, - Message: err.Error(), - Data: msg, - } - } - if msg.Version != Version { - err = &Error{ - Code: E_INVALID_REQ, - Message: "jsonrpc must be " + Version, - Data: msg, - } - } - - return retainClientMessageCodec(msg, err, codec), nil -} - -type ClientMessageCodec struct { - msg *clientMessage - err error - codec codec.Codec -} - -func (ccm *ClientMessageCodec) MessageType() protocol.MessageType { - return ccm.msg.MessageType -} - -func (ccm *ClientMessageCodec) MessageCodec() (interface{}, error) { - switch ccm.msg.MessageType { - case protocol.MessageTypeResponse: - return newClientResponseCodec(ccm.msg.Message, ccm.codec) - case protocol.MessageTypeNotification: - return newClientNotificationCodec(ccm.msg.Message, ccm.codec) - } - return nil, fmt.Errorf("RPC: Not supported message type[%v]", ccm.msg.MessageType) -} - -func (ccm *ClientMessageCodec) Close() { - if nil != ccm.msg { - releaseClientMessage(ccm.msg) - } - - releaseClientMessageCodec(ccm) -} - -var clientMessagePool sync.Pool - -func retainClientMessage() *clientMessage { - v := clientMessagePool.Get() - if v == nil { - return &clientMessage{} - } - return v.(*clientMessage) -} - -func releaseClientMessage(cm *clientMessage) { - clientMessagePool.Put(cm) -} - -var clientMessageCodecPool sync.Pool - -func retainClientMessageCodec(msg *clientMessage, err error, codec codec.Codec) *ClientMessageCodec { - var ccm *ClientMessageCodec - v := clientMessageCodecPool.Get() - if v == nil { - ccm = &ClientMessageCodec{} - } else { - ccm = v.(*ClientMessageCodec) - } - - ccm.msg = msg - ccm.err = err - ccm.codec = codec - - return ccm -} - -func releaseClientMessageCodec(cr *ClientMessageCodec) { - - clientMessageCodecPool.Put(cr) +func (cc *ClientCodec) NewResponse(r io.Reader) (protocol.ClientResponseCodec, error) { + return newClientResponseCodec(r, cc.codecSel.SelectByReader(r)) } diff --git a/protocol/json/client_notification.go b/protocol/json/client_notification.go index 9314c25..32dad5e 100644 --- a/protocol/json/client_notification.go +++ b/protocol/json/client_notification.go @@ -2,10 +2,8 @@ package json import ( "encoding/json" - "sync" "git.loafle.net/commons_go/rpc/codec" - "git.loafle.net/commons_go/rpc/protocol" ) // ---------------------------------------------------------------------------- @@ -19,79 +17,61 @@ type clientNotification struct { Params *json.RawMessage `json:"params,omitempty"` } -// newClientNotificationCodec returns a new ClientNotificationCodec. -func newClientNotificationCodec(raw *json.RawMessage, codec codec.Codec) (protocol.ClientNotificationCodec, error) { - // Decode the request body and check if RPC method is valid. - cnc := retainClientNotificationCodec() - - if err := json.Unmarshal(*raw, &cnc.notification); nil != err { - releaseClientNotificationCodec(cnc) - return nil, err - } - - return cnc, nil -} - // ClientNotificationCodec decodes and encodes a single notification. type ClientNotificationCodec struct { - notification clientNotification - err error + noti *clientNotification + err error + codec codec.Codec } -func (crc *ClientNotificationCodec) Method() string { - return crc.notification.Method +func (cnc *ClientNotificationCodec) Method() string { + return cnc.noti.Method } -func (crc *ClientNotificationCodec) ReadParams(args []interface{}) error { - if crc.err == nil && crc.notification.Params != nil { +func (cnc *ClientNotificationCodec) ReadParams(args []interface{}) error { + if cnc.err == nil && cnc.noti.Params != nil { // Note: if scr.request.Params is nil it's not an error, it's an optional member. // JSON params structured object. Unmarshal to the args object. - if err := json.Unmarshal(*crc.notification.Params, &args); err != nil { - crc.err = &Error{ + raws := make([]json.RawMessage, len(args)) + if err := json.Unmarshal(*cnc.noti.Params, &raws); err != nil { + cnc.err = &Error{ Code: E_INVALID_REQ, Message: err.Error(), - Data: crc.notification.Params, + Data: cnc.noti.Params, + } + return cnc.err + } + + for indexI := 0; indexI < len(args); indexI++ { + raw := raws[indexI] + arg := args[indexI] + if err := json.Unmarshal(raw, &arg); err != nil { + cnc.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: cnc.noti.Params, + } + return cnc.err } } } - return crc.err + return cnc.err } -func (crc *ClientNotificationCodec) Params() ([]string, error) { - if crc.err == nil && crc.notification.Params != nil { +func (cnc *ClientNotificationCodec) Params() ([]string, error) { + if cnc.err == nil && cnc.noti.Params != nil { var results []string - if err := json.Unmarshal(*crc.notification.Params, &results); err != nil { - crc.err = &Error{ + if err := json.Unmarshal(*cnc.noti.Params, &results); err != nil { + cnc.err = &Error{ Code: E_INVALID_REQ, Message: err.Error(), - Data: crc.notification.Params, + Data: cnc.noti.Params, } - return nil, crc.err + return nil, cnc.err } return results, nil } - return nil, crc.err -} - -func (crc *ClientNotificationCodec) Close() { - releaseClientNotificationCodec(crc) -} - -var clientNotificationCodecPool sync.Pool - -func retainClientNotificationCodec() *ClientNotificationCodec { - v := clientNotificationCodecPool.Get() - if v == nil { - return &ClientNotificationCodec{} - } - return v.(*ClientNotificationCodec) -} - -func releaseClientNotificationCodec(crc *ClientNotificationCodec) { - crc.notification.Method = "" - crc.notification.Params = nil - - clientNotificationCodecPool.Put(crc) + return nil, cnc.err } diff --git a/protocol/json/client_request.go b/protocol/json/client_request.go index 967954f..52c3828 100644 --- a/protocol/json/client_request.go +++ b/protocol/json/client_request.go @@ -1,7 +1,5 @@ package json -import "sync" - // ---------------------------------------------------------------------------- // Request and Response // ---------------------------------------------------------------------------- @@ -21,31 +19,3 @@ type clientRequest struct { // response with the request that it is replying to. ID interface{} `json:"id"` } - -var clientRequestPool sync.Pool - -func retainClientRequest(method string, params interface{}, id interface{}) *clientRequest { - var cr *clientRequest - v := clientRequestPool.Get() - if v == nil { - cr = &clientRequest{} - } else { - cr = v.(*clientRequest) - } - - cr.Version = Version - cr.Method = method - cr.Params = params - cr.ID = id - - return cr -} - -func releaseClientRequest(cr *clientRequest) { - cr.Version = "" - cr.Method = "" - cr.Params = nil - cr.ID = nil - - clientRequestPool.Put(cr) -} diff --git a/protocol/json/client_response.go b/protocol/json/client_response.go index e0c003c..0d5ede2 100644 --- a/protocol/json/client_response.go +++ b/protocol/json/client_response.go @@ -3,7 +3,7 @@ package json import ( "encoding/json" "fmt" - "sync" + "io" "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" @@ -14,73 +14,76 @@ import ( // ---------------------------------------------------------------------------- // clientResponse represents a JSON-RPC response returned to a client. type clientResponse struct { - Result *json.RawMessage `json:"result,omitempty"` - Error error `json:"error,omitempty"` - ID interface{} `json:"id,omitempty"` -} - -// newClientResponseCodec returns a new ClientResponseCodec. -func newClientResponseCodec(raw *json.RawMessage, codec codec.Codec) (protocol.ClientResponseCodec, error) { - // Decode the request body and check if RPC method is valid. - ccr := retainClientResponseCodec() - - err := json.Unmarshal(*raw, &ccr.res) - if err != nil { - releaseClientResponseCodec(ccr) - return nil, err - } - if nil == ccr.res.Result && nil == ccr.res.Error { - releaseClientResponseCodec(ccr) - return nil, fmt.Errorf("This is not Response") - } - - return ccr, nil + Version string `json:"jsonrpc"` + Result *json.RawMessage `json:"result,omitempty"` + Error error `json:"error,omitempty"` + ID interface{} `json:"id,omitempty"` } // ClientResponseCodec decodes and encodes a single request. type ClientResponseCodec struct { - res clientResponse - err error + res *clientResponse + err error + codec codec.Codec } -func (ccr *ClientResponseCodec) ID() interface{} { - return ccr.res.ID +func (crc *ClientResponseCodec) ID() interface{} { + return crc.res.ID } -func (ccr *ClientResponseCodec) Result(result interface{}) error { - if ccr.err == nil && ccr.res.Result != nil { - if err := json.Unmarshal(*ccr.res.Result, &result); err != nil { - params := [1]interface{}{result} - if err = json.Unmarshal(*ccr.res.Result, ¶ms); err != nil { - ccr.err = err +func (crc *ClientResponseCodec) Result(result interface{}) error { + if nil == crc.err && nil != crc.res.Result { + if err := json.Unmarshal(*crc.res.Result, result); nil != err { + crc.err = &Error{ + Code: E_PARSE, + Message: err.Error(), + Data: crc.res.Result, } } } - return ccr.err + + return crc.err } -func (ccr *ClientResponseCodec) Error() error { - return ccr.res.Error +func (crc *ClientResponseCodec) Error() error { + return crc.res.Error } -func (ccr *ClientResponseCodec) Close() { - releaseClientResponseCodec(ccr) -} - -var clientResponseCodecPool sync.Pool - -func retainClientResponseCodec() *ClientResponseCodec { - v := clientResponseCodecPool.Get() - if v == nil { - return &ClientResponseCodec{} +func (crc *ClientResponseCodec) Notification() (protocol.ClientNotificationCodec, error) { + if nil != crc.res.ID || nil == crc.res.Result { + return nil, fmt.Errorf("RPC[JSON]: This is not notification") } - return v.(*ClientResponseCodec) + + noti := &clientNotification{} + err := json.Unmarshal(*crc.res.Result, noti) + if nil != err { + return nil, err + } + + return &ClientNotificationCodec{noti: noti, err: err, codec: crc.codec}, nil } -func releaseClientResponseCodec(ccr *ClientResponseCodec) { - ccr.res.Result = nil - ccr.res.Error = nil - ccr.res.ID = 0 +// newClientMessageCodec returns a new ClientMessageCodec. +func newClientResponseCodec(r io.Reader, codec codec.Codec) (protocol.ClientResponseCodec, error) { + res := &clientResponse{} + err := json.NewDecoder(r).Decode(res) + if err != nil { + if err == io.ErrUnexpectedEOF || err == io.EOF { + return nil, err + } + err = &Error{ + Code: E_PARSE, + Message: err.Error(), + Data: res, + } + } + if res.Version != Version { + err = &Error{ + Code: E_INVALID_REQ, + Message: "jsonrpc must be " + Version, + Data: res, + } + } - clientResponseCodecPool.Put(ccr) + return &ClientResponseCodec{res: res, err: err, codec: codec}, nil } diff --git a/protocol/json/server.go b/protocol/json/server.go index 837c6b3..32e1940 100644 --- a/protocol/json/server.go +++ b/protocol/json/server.go @@ -3,7 +3,6 @@ package json import ( "encoding/json" "io" - "sync" "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" @@ -11,13 +10,6 @@ import ( var null = json.RawMessage([]byte("null")) -type serverMessage struct { - // JSON-RPC protocol. - Version string `json:"jsonrpc"` - MessageType protocol.MessageType `json:"messageType"` - Message interface{} `json:"message"` -} - // ---------------------------------------------------------------------------- // Codec // ---------------------------------------------------------------------------- @@ -44,44 +36,13 @@ func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerRequestCodec, err // WriteNotification send a notification from server to client. func (sc *ServerCodec) WriteNotification(w io.Writer, method string, args interface{}) error { - noti := retainServerNotification(method, args) - defer func() { - releaseServerNotification(noti) - }() - msg := retainServerMessage(protocol.MessageTypeNotification, noti) - defer func() { - releaseServerMessage(msg) - }() + noti := &serverNotification{Method: method, Params: args} + res := &serverResponse{Version: Version, Result: noti} + encoder := json.NewEncoder(sc.codecSel.SelectByWriter(w).Encode(w)) // Not sure in which case will this happen. But seems harmless. - if err := encoder.Encode(msg); nil != err { + if err := encoder.Encode(res); nil != err { return err } return nil } - -var serverMessagePool sync.Pool - -func retainServerMessage(msgType protocol.MessageType, msg interface{}) *serverMessage { - var sm *serverMessage - v := serverMessagePool.Get() - if v == nil { - sm = &serverMessage{} - } else { - sm = v.(*serverMessage) - } - - sm.Version = Version - sm.MessageType = msgType - sm.Message = msg - - return sm -} - -func releaseServerMessage(sm *serverMessage) { - sm.Version = "" - sm.MessageType = protocol.MessageTypeUnknown - sm.Message = nil - - serverMessagePool.Put(sm) -} diff --git a/protocol/json/server_notification.go b/protocol/json/server_notification.go index 4441add..124670a 100644 --- a/protocol/json/server_notification.go +++ b/protocol/json/server_notification.go @@ -1,9 +1,5 @@ package json -import ( - "sync" -) - // clientRequest represents a JSON-RPC request sent by a client. type serverNotification struct { // A String containing the name of the method to be invoked. @@ -12,27 +8,3 @@ type serverNotification struct { // Object to pass as request parameter to the method. Params interface{} `json:"params"` } - -var serverNotificationPool sync.Pool - -func retainServerNotification(method string, args interface{}) *serverNotification { - var sn *serverNotification - v := serverNotificationPool.Get() - if v == nil { - sn = &serverNotification{} - } else { - sn = v.(*serverNotification) - } - - sn.Method = method - sn.Params = args - - return sn -} - -func releaseServerNotification(sn *serverNotification) { - sn.Method = "" - sn.Params = nil - - serverNotificationPool.Put(sn) -} diff --git a/protocol/json/server_request.go b/protocol/json/server_request.go index faab653..db9d221 100644 --- a/protocol/json/server_request.go +++ b/protocol/json/server_request.go @@ -3,7 +3,6 @@ package json import ( "encoding/json" "io" - "sync" "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" @@ -37,7 +36,7 @@ type serverRequest struct { // newRequestCodec returns a new ServerRequestCodec. func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerRequestCodec, error) { // Decode the request body and check if RPC method is valid. - req := retainServerRequest() + req := &serverRequest{} err := json.NewDecoder(r).Decode(req) if err == io.ErrUnexpectedEOF || err == io.EOF { return nil, err @@ -57,7 +56,7 @@ func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerReque } } - return retainServerRequestCodec(req, err, codec), nil + return &ServerRequestCodec{req: req, err: err, codec: codec}, nil } // ServerRequestCodec decodes and encodes a single request. @@ -67,14 +66,6 @@ type ServerRequestCodec struct { codec codec.Codec } -// Close is callback function that end of request. -func (src *ServerRequestCodec) Close() { - if nil != src.req { - releaseServerRequest(src.req) - } - releaseServerRequestCodec(src) -} - // Method returns the RPC method for the current request. // // The method uses a dotted notation as in "Service.Method". @@ -157,7 +148,7 @@ func (src *ServerRequestCodec) Params() ([]string, error) { // WriteResponse encodes the response and writes it to the ResponseWriter. func (src *ServerRequestCodec) WriteResponse(w io.Writer, reply interface{}) error { - res := retainServerResponse(reply, nil, src.req.ID) + res := &serverResponse{Version: Version, Result: reply, ID: src.req.ID} return src.writeServerResponse(w, res) } @@ -170,69 +161,18 @@ func (src *ServerRequestCodec) WriteError(w io.Writer, status int, err error) er Message: err.Error(), } } - res := retainServerResponse(nil, jsonErr, src.req.ID) + res := &serverResponse{Version: Version, Error: jsonErr, ID: src.req.ID} return src.writeServerResponse(w, res) } func (src *ServerRequestCodec) writeServerResponse(w io.Writer, res *serverResponse) error { - defer func() { - releaseServerResponse(res) - }() // ID is null for notifications and they don't have a response. - if src.req.ID != nil { - msg := retainServerMessage(protocol.MessageTypeResponse, res) - defer func() { - releaseServerMessage(msg) - }() + if req.ID != nil { encoder := json.NewEncoder(src.codec.Encode(w)) // Not sure in which case will this happen. But seems harmless. - if err := encoder.Encode(msg); nil != err { + if err := encoder.Encode(res); nil != err { return err } } return nil } - -var serverRequestCodecPool sync.Pool - -func retainServerRequestCodec(req *serverRequest, err error, codec codec.Codec) *ServerRequestCodec { - var src *ServerRequestCodec - v := serverRequestCodecPool.Get() - if v == nil { - src = &ServerRequestCodec{} - } else { - src = v.(*ServerRequestCodec) - } - - src.req = req - src.err = err - src.codec = codec - - return src -} - -func releaseServerRequestCodec(src *ServerRequestCodec) { - src.req = nil - src.err = nil - src.codec = nil - - serverRequestCodecPool.Put(src) -} - -var serverRequestPool sync.Pool - -func retainServerRequest() *serverRequest { - v := serverRequestPool.Get() - if v == nil { - return &serverRequest{} - } - return v.(*serverRequest) -} - -func releaseServerRequest(sr *serverRequest) { - sr.Method = "" - sr.Params = nil - sr.ID = nil - - serverRequestPool.Put(sr) -} diff --git a/protocol/json/server_response.go b/protocol/json/server_response.go index 1c15515..4038b55 100644 --- a/protocol/json/server_response.go +++ b/protocol/json/server_response.go @@ -2,7 +2,6 @@ package json import ( "encoding/json" - "sync" ) // ---------------------------------------------------------------------------- @@ -11,6 +10,8 @@ import ( // serverResponse represents a JSON-RPC response returned by the server. type serverResponse struct { + // JSON-RPC protocol. + Version string `json:"jsonrpc"` // The Object that was returned by the invoked method. This must be null // in case there was an error invoking the method. // As per spec the member will be omitted if there was an error. @@ -24,29 +25,3 @@ type serverResponse struct { // This must be the same id as the request it is responding to. ID *json.RawMessage `json:"id,omitempty"` } - -var serverResponsePool sync.Pool - -func retainServerResponse(result interface{}, err *Error, id *json.RawMessage) *serverResponse { - var sr *serverResponse - v := serverResponsePool.Get() - if v == nil { - sr = &serverResponse{} - } else { - sr = v.(*serverResponse) - } - - sr.Result = result - sr.Error = err - sr.ID = id - - return sr -} - -func releaseServerResponse(sr *serverResponse) { - sr.Result = nil - sr.Error = nil - sr.ID = nil - - serverResponsePool.Put(sr) -} diff --git a/protocol/registry_codec.go b/protocol/registry_codec.go index 429663b..b0ecc68 100644 --- a/protocol/registry_codec.go +++ b/protocol/registry_codec.go @@ -10,5 +10,4 @@ type RegistryCodec interface { // Reads the request filling the RPC method args. ReadParams(args []interface{}) error Params() ([]string, error) - Close() } diff --git a/protocol/server_codec.go b/protocol/server_codec.go index 911322b..6dbcc89 100644 --- a/protocol/server_codec.go +++ b/protocol/server_codec.go @@ -14,6 +14,7 @@ type ServerCodec interface { // serialization scheme. type ServerRequestCodec interface { RegistryCodec + WriteResponse(w io.Writer, reply interface{}) error WriteError(w io.Writer, status int, err error) error } diff --git a/servlet.go b/rpc_servlet.go similarity index 54% rename from servlet.go rename to rpc_servlet.go index c5d3c19..8b5e6ae 100644 --- a/servlet.go +++ b/rpc_servlet.go @@ -11,25 +11,25 @@ import ( cuc "git.loafle.net/commons_go/util/context" ) -func NewServlet(sh ServletHandler) Servlet { - return &servlet{ +func NewRPCServlet(sh RPCServletHandler) RPCServlet { + return &rpcServlet{ sh: sh, } } -type Servlet interface { +type RPCServlet interface { Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error Stop() Send(method string, args ...interface{}) (err error) - Context() ServletContext + Context() RPCServletContext } -type servlet struct { - ctx ServletContext - sh ServletHandler - messageQueueChan chan *messageState +type rpcServlet struct { + ctx RPCServletContext + sh RPCServletHandler + responseQueueChan chan *responseState doneChan chan<- error conn interface{} @@ -39,7 +39,7 @@ type servlet struct { stopWg sync.WaitGroup } -func (s *servlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error { +func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error { if nil == s.sh { panic("Servlet: servlet handler must be specified.") } @@ -64,7 +64,7 @@ func (s *servlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- } s.stopChan = make(chan struct{}) - s.messageQueueChan = make(chan *messageState, s.sh.GetPendingMessages()) + s.responseQueueChan = make(chan *responseState, s.sh.GetPendingResponses()) s.stopWg.Add(1) go handleServlet(s) @@ -72,7 +72,7 @@ func (s *servlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- return nil } -func (s *servlet) Stop() { +func (s *rpcServlet) Stop() { if s.stopChan == nil { panic("Server: server must be started before stopping it") } @@ -81,7 +81,7 @@ func (s *servlet) Stop() { s.stopChan = nil s.sh.Destroy(s.ctx) - s.messageQueueChan = nil + s.responseQueueChan = nil s.conn = nil s.serverCodec = nil @@ -89,21 +89,25 @@ func (s *servlet) Stop() { logging.Logger().Info(fmt.Sprintf("Servlet is stopped")) } -func (s *servlet) Send(method string, args ...interface{}) (err error) { - ms := retainMessageState(protocol.MessageTypeNotification) - ms.noti.method = method - ms.noti.args = args +func (s *rpcServlet) Send(method string, args ...interface{}) (err error) { + noti := ¬ification{ + method: method, + args: args, + } + rs := &responseState{ + noti: noti, + } - s.messageQueueChan <- ms + s.responseQueueChan <- rs return nil } -func (s *servlet) Context() ServletContext { +func (s *rpcServlet) Context() RPCServletContext { return s.ctx } -func handleServlet(s *servlet) { +func handleServlet(s *rpcServlet) { var err error defer func() { @@ -137,7 +141,7 @@ func handleServlet(s *servlet) { } } -func handleReader(s *servlet, stopChan chan struct{}, doneChan chan error) { +func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { var err error defer func() { if r := recover(); r != nil { @@ -149,7 +153,7 @@ func handleReader(s *servlet, stopChan chan struct{}, doneChan chan error) { }() for { - requestCodec, err := s.sh.GetRequest(s.ctx, s.serverCodec, s.conn) + requestCodec, err := s.sh.ReadRequest(s.ctx, s.serverCodec, s.conn) if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { err = fmt.Errorf("RPC Server: disconnected from client") @@ -171,108 +175,72 @@ func handleReader(s *servlet, stopChan chan struct{}, doneChan chan error) { } } -func handleWriter(s *servlet, stopChan chan struct{}, doneChan chan error) { +func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { var err error defer func() { if r := recover(); r != nil { if err == nil { - err = fmt.Errorf("RPC Server: Panic when writing message to client: %v", r) + err = fmt.Errorf("RPC Server: Panic when writing response to client: %v", r) } } doneChan <- err }() for { - var ms *messageState + var rs *responseState select { - case ms = <-s.messageQueueChan: + case rs = <-s.responseQueueChan: default: - // Give the last chance for ready goroutines filling s.messageQueueChan :) + // Give the last chance for ready goroutines filling s.responseQueueChan :) runtime.Gosched() select { case <-stopChan: err = fmt.Errorf("RPC Server: writing message stopped because get stop channel") return - case ms = <-s.messageQueueChan: + case rs = <-s.responseQueueChan: } } - switch ms.messageType { - case protocol.MessageTypeResponse: - if err := s.sh.SendResponse(s.ctx, s.conn, ms.res.requestCodec, ms.res.result, ms.res.err); nil != err { - logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err)) + if nil != rs.requestCodec { + if err := s.sh.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err { + logging.Logger().Error(fmt.Sprintf("RPC Server: response error %v", err)) } - ms.res.requestCodec.Close() - case protocol.MessageTypeNotification: - if err := s.sh.SendNotification(s.ctx, s.conn, s.serverCodec, ms.noti.method, ms.noti.args...); nil != err { - logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err)) + } else { + if err := s.sh.WriteNotification(s.ctx, s.conn, s.serverCodec, rs.noti.method, rs.noti.args); nil != err { + logging.Logger().Error(fmt.Sprintf("RPC Server: notification error %v", err)) } - default: } } } -func handleRequest(s *servlet, requestCodec protocol.ServerRequestCodec) { +func handleRequest(s *rpcServlet, requestCodec protocol.ServerRequestCodec) { defer func() { s.stopWg.Done() }() result, err := s.sh.Invoke(s.ctx, requestCodec) - ms := retainMessageState(protocol.MessageTypeResponse) - ms.res.requestCodec = requestCodec - ms.res.result = result - ms.res.err = err + rs := &responseState{ + requestCodec: requestCodec, + result: result, + err: err, + } - s.messageQueueChan <- ms + s.responseQueueChan <- rs } -type messageState struct { - messageType protocol.MessageType - res messageResponse - noti messageNotification -} - -type messageResponse struct { +type responseState struct { requestCodec protocol.ServerRequestCodec result interface{} + noti *notification err error } -type messageNotification struct { +type notification struct { method string args []interface{} } - -var messageStatePool sync.Pool - -func retainMessageState(messageType protocol.MessageType) *messageState { - var ms *messageState - v := messageStatePool.Get() - if v == nil { - ms = &messageState{} - } else { - ms = v.(*messageState) - } - - ms.messageType = messageType - - return ms -} - -func releaseMessageState(ms *messageState) { - ms.messageType = protocol.MessageTypeUnknown - - ms.res.requestCodec = nil - ms.res.result = nil - ms.res.err = nil - - ms.noti.method = "" - ms.noti.args = nil - - messageStatePool.Put(ms) -} diff --git a/rpc_servlet_context.go b/rpc_servlet_context.go new file mode 100644 index 0000000..4642905 --- /dev/null +++ b/rpc_servlet_context.go @@ -0,0 +1,20 @@ +package rpc + +import ( + cuc "git.loafle.net/commons_go/util/context" +) + +type RPCServletContext interface { + cuc.Context +} + +type rpcServletContext struct { + cuc.Context +} + +func newRPCServletContext(parent cuc.Context) RPCServletContext { + sCTX := &rpcServletContext{} + sCTX.Context = cuc.NewContext(parent) + + return sCTX +} diff --git a/rpc_servlet_handler.go b/rpc_servlet_handler.go new file mode 100644 index 0000000..44c236b --- /dev/null +++ b/rpc_servlet_handler.go @@ -0,0 +1,25 @@ +package rpc + +import ( + "git.loafle.net/commons_go/rpc/protocol" + cuc "git.loafle.net/commons_go/util/context" +) + +type RPCServletHandler interface { + ServletContext(parent cuc.Context) RPCServletContext + + Init(servletCTX RPCServletContext) error + + ReadRequest(servletCTX RPCServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) + Invoke(servletCTX RPCServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) + WriteResponse(servletCTX RPCServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error + WriteNotification(servletCTX RPCServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error + + Destroy(servletCTX RPCServletContext) + + RegisterCodec(contentType string, codec protocol.ServerCodec) + getCodec(contentType string) (protocol.ServerCodec, error) + + GetPendingResponses() int + Validate() +} diff --git a/rpc_servlet_handlers.go b/rpc_servlet_handlers.go new file mode 100644 index 0000000..4732e97 --- /dev/null +++ b/rpc_servlet_handlers.go @@ -0,0 +1,87 @@ +package rpc + +import ( + "fmt" + "strings" + + "git.loafle.net/commons_go/rpc/protocol" + cuc "git.loafle.net/commons_go/util/context" +) + +type RPCServletHandlers struct { + // The maximum number of pending messages in the queue. + // + // The number of pending requsts should exceed the expected number + // of concurrent goroutines calling client's methods. + // Otherwise a lot of ClientError.Overflow errors may appear. + // + // Default is DefaultPendingMessages. + PendingResponses int + + codecs map[string]protocol.ServerCodec +} + +func (sh *RPCServletHandlers) ServletContext(parent cuc.Context) RPCServletContext { + return newRPCServletContext(parent) +} + +func (sh *RPCServletHandlers) Init(servletCTX RPCServletContext) error { + return nil +} + +func (sh *RPCServletHandlers) ReadRequest(servletCTX RPCServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { + return nil, fmt.Errorf("Servlet Handler: ReadRequest is not implemented") +} + +func (sh *RPCServletHandlers) Invoke(servletCTX RPCServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) { + return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented") +} + +func (sh *RPCServletHandlers) WriteResponse(servletCTX RPCServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { + return fmt.Errorf("Servlet Handler: WriteResponse is not implemented") +} + +func (sh *RPCServletHandlers) WriteNotification(servletCTX RPCServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { + return fmt.Errorf("Servlet Handler: WriteNotification is not implemented") +} + +func (sh *RPCServletHandlers) Destroy(servletCTX RPCServletContext) { + // no op +} + +// RegisterCodec adds a new codec to the server. +// +// Codecs are defined to process a given serialization scheme, e.g., JSON or +// XML. A codec is chosen based on the "Content-Type" header from the request, +// excluding the charset definition. +func (sh *RPCServletHandlers) RegisterCodec(contentType string, codec protocol.ServerCodec) { + if nil == sh.codecs { + sh.codecs = make(map[string]protocol.ServerCodec) + } + sh.codecs[strings.ToLower(contentType)] = codec +} + +func (sh *RPCServletHandlers) getCodec(contentType string) (protocol.ServerCodec, error) { + var codec protocol.ServerCodec + if contentType == "" && len(sh.codecs) == 1 { + // If Content-Type is not set and only one codec has been registered, + // then default to that codec. + for _, c := range sh.codecs { + codec = c + } + } else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil { + return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType) + } + + return codec, nil +} + +func (sh *RPCServletHandlers) GetPendingResponses() int { + return sh.PendingResponses +} + +func (sh *RPCServletHandlers) Validate() { + if 0 >= sh.PendingResponses { + sh.PendingResponses = DefaultPendingResponses + } +} diff --git a/server/servlet_handler.go b/server/servlet_handler.go index 05b9903..13d1d83 100644 --- a/server/servlet_handler.go +++ b/server/servlet_handler.go @@ -3,5 +3,5 @@ package server import "git.loafle.net/commons_go/rpc" type ServletHandler interface { - rpc.ServletHandler + rpc.RPCServletHandler } diff --git a/server/servlet_handlers.go b/server/servlet_handlers.go index 1dc7c45..e31e4ab 100644 --- a/server/servlet_handlers.go +++ b/server/servlet_handlers.go @@ -9,12 +9,12 @@ import ( ) type ServletHandlers struct { - rpc.ServletHandlers + rpc.RPCServletHandlers RPCRegistry rpc.Registry } -func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) { +func (sh *ServletHandlers) Invoke(servletCTX rpc.RPCServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) { if !sh.RPCRegistry.HasMethod(requestCodec.Method()) { return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method()) } @@ -32,7 +32,7 @@ func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result i } func (sh *ServletHandlers) Validate() { - sh.ServletHandlers.Validate() + sh.RPCServletHandlers.Validate() if nil == sh.RPCRegistry { logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Registry must be specified")) diff --git a/servlet_context.go b/servlet_context.go deleted file mode 100644 index 391f340..0000000 --- a/servlet_context.go +++ /dev/null @@ -1,20 +0,0 @@ -package rpc - -import ( - cuc "git.loafle.net/commons_go/util/context" -) - -type ServletContext interface { - cuc.Context -} - -type servletContext struct { - cuc.Context -} - -func newServletContext(parent cuc.Context) ServletContext { - sCTX := &servletContext{} - sCTX.Context = cuc.NewContext(parent) - - return sCTX -} diff --git a/servlet_handler.go b/servlet_handler.go deleted file mode 100644 index c46c98b..0000000 --- a/servlet_handler.go +++ /dev/null @@ -1,26 +0,0 @@ -package rpc - -import ( - "git.loafle.net/commons_go/rpc/protocol" - cuc "git.loafle.net/commons_go/util/context" -) - -type ServletHandler interface { - ServletContext(parent cuc.Context) ServletContext - - Init(servletCTX ServletContext) error - - GetRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) - Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) - SendResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error - SendNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error - - Destroy(servletCTX ServletContext) - - RegisterCodec(contentType string, codec protocol.ServerCodec) - - getCodec(contentType string) (protocol.ServerCodec, error) - - GetPendingMessages() int - Validate() -} diff --git a/servlet_handlers.go b/servlet_handlers.go deleted file mode 100644 index 093ef9e..0000000 --- a/servlet_handlers.go +++ /dev/null @@ -1,87 +0,0 @@ -package rpc - -import ( - "fmt" - "strings" - - "git.loafle.net/commons_go/rpc/protocol" - cuc "git.loafle.net/commons_go/util/context" -) - -type ServletHandlers struct { - // The maximum number of pending messages in the queue. - // - // The number of pending requsts should exceed the expected number - // of concurrent goroutines calling client's methods. - // Otherwise a lot of ClientError.Overflow errors may appear. - // - // Default is DefaultPendingMessages. - PendingMessages int - - codecs map[string]protocol.ServerCodec -} - -func (sh *ServletHandlers) ServletContext(parent cuc.Context) ServletContext { - return newServletContext(parent) -} - -func (sh *ServletHandlers) Init(servletCTX ServletContext) error { - return nil -} - -func (sh *ServletHandlers) GetRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { - return nil, fmt.Errorf("Servlet Handler: GetRequest is not implemented") -} - -func (sh *ServletHandlers) Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) { - return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented") -} - -func (sh *ServletHandlers) SendResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { - return fmt.Errorf("Servlet Handler: SendResponse is not implemented") -} - -func (sh *ServletHandlers) SendNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error { - return fmt.Errorf("Servlet Handler: SendNotification is not implemented") -} - -func (sh *ServletHandlers) Destroy(servletCTX ServletContext) { - // no op -} - -// RegisterCodec adds a new codec to the server. -// -// Codecs are defined to process a given serialization scheme, e.g., JSON or -// XML. A codec is chosen based on the "Content-Type" header from the request, -// excluding the charset definition. -func (sh *ServletHandlers) RegisterCodec(contentType string, codec protocol.ServerCodec) { - if nil == sh.codecs { - sh.codecs = make(map[string]protocol.ServerCodec) - } - sh.codecs[strings.ToLower(contentType)] = codec -} - -func (sh *ServletHandlers) getCodec(contentType string) (protocol.ServerCodec, error) { - var codec protocol.ServerCodec - if contentType == "" && len(sh.codecs) == 1 { - // If Content-Type is not set and only one codec has been registered, - // then default to that codec. - for _, c := range sh.codecs { - codec = c - } - } else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil { - return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType) - } - - return codec, nil -} - -func (sh *ServletHandlers) GetPendingMessages() int { - return sh.PendingMessages -} - -func (sh *ServletHandlers) Validate() { - if 0 >= sh.PendingMessages { - sh.PendingMessages = DefaultPendingMessages - } -}