diff --git a/client/client.go b/client/client.go index 3f281fd..5814076 100644 --- a/client/client.go +++ b/client/client.go @@ -36,7 +36,8 @@ type client struct { ch ClientHandler rwcHandler ClientReadWriteCloseHandler - conn interface{} + conn interface{} + decoder interface{} pendingRequestsCount uint32 pendingRequests map[uint64]*RequestState @@ -46,18 +47,21 @@ type client struct { stopChan chan struct{} stopWg sync.WaitGroup + + requestMtx sync.Mutex + responseMtx sync.Mutex } func (c *client) Connect() error { var err error if nil == c.ch { - panic("RPC Client: Client handler must be specified.") + return fmt.Errorf("RPC Client: Client handler must be specified") } c.ch.Validate() if nil == c.rwcHandler { - panic("RPC Client: Client RWC handler must be specified.") + return fmt.Errorf("RPC Client: Client RWC handler must be specified") } c.rwcHandler.Validate() @@ -67,12 +71,18 @@ func (c *client) Connect() error { c.ctx = c.ch.ClientContext(nil) if err := c.ch.Init(c.ctx); nil != err { - logging.Logger().Panicf("RPC Client: Initialization of client has been failed %v", err) + return fmt.Errorf("RPC Client: Initialization of client has been failed %v", err) } if c.conn, err = c.rwcHandler.Connect(c.ctx); nil != err { return err } + + c.decoder, err = c.rwcHandler.NewDecoder(c.ctx, c.ch.GetCodec(), c.conn) + if nil != err { + return fmt.Errorf("RPC Client: Cannot build rpc decoder") + } + c.stopChan = make(chan struct{}) c.requestQueueChan = make(chan *RequestState, c.ch.GetPendingRequests()) c.pendingRequests = make(map[uint64]*RequestState) @@ -303,7 +313,9 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { return } + c.requestMtx.Lock() err = c.rwcHandler.WriteRequest(c.ctx, c.ch.GetCodec(), c.conn, rs.Method, rs.Args, requestID) + c.requestMtx.Unlock() if !rs.hasResponse { rs.Error = err rs.Done() @@ -337,7 +349,9 @@ func (c *client) rpcReader(readerDone chan<- error) { err = io.EOF return } - resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.conn) + c.responseMtx.Lock() + resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.decoder) + c.responseMtx.Unlock() if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { logging.Logger().Infof("Client: disconnected from server") diff --git a/client/client_rwc_handler.go b/client/client_rwc_handler.go index 9693c51..d5670c7 100644 --- a/client/client_rwc_handler.go +++ b/client/client_rwc_handler.go @@ -4,7 +4,8 @@ import "git.loafle.net/commons_go/rpc/protocol" type ClientReadWriteCloseHandler interface { Connect(clientCTX ClientContext) (interface{}, error) - ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) + NewDecoder(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) + ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error Disconnect(clientCTX ClientContext, conn interface{}) diff --git a/client/client_rwc_handlers.go b/client/client_rwc_handlers.go index 6e98f42..559d5c0 100644 --- a/client/client_rwc_handlers.go +++ b/client/client_rwc_handlers.go @@ -2,21 +2,22 @@ package client import ( "fmt" - "sync" "git.loafle.net/commons_go/rpc/protocol" ) type ClientReadWriteCloseHandlers struct { - ReadMtx sync.Mutex - WriteMtx sync.Mutex } func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX ClientContext) (interface{}, error) { return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[Connect] is not implement") } -func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { +func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { + return nil, fmt.Errorf("RPC Client RWC Handler: NewDecoder is not implemented") +} + +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) { return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[ReadResponse] is not implement") } diff --git a/client/rwc/socket/client_rwc_handlers.go b/client/rwc/socket/client_rwc_handlers.go index 95d2516..56a52eb 100644 --- a/client/rwc/socket/client_rwc_handlers.go +++ b/client/rwc/socket/client_rwc_handlers.go @@ -24,12 +24,14 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex return csc.NewSocket(crwch.SocketBuilder, clientCTX) } -func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { +func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { soc := conn.(csc.Socket) - crwch.ReadMtx.Lock() - resCodec, err := codec.NewResponse(soc) - crwch.ReadMtx.Unlock() + return codec.NewDecoder(soc), nil +} + +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) { + resCodec, err := codec.NewResponse(decoder) return resCodec, err } @@ -40,9 +42,7 @@ func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientC ) soc := conn.(csc.Socket) - crwch.WriteMtx.Lock() wErr = codec.WriteRequest(soc, method, params, id) - crwch.WriteMtx.Unlock() return wErr } diff --git a/client/rwc/websocket/fasthttp/client_rwc_handlers.go b/client/rwc/websocket/fasthttp/client_rwc_handlers.go index 05c85f8..d216301 100644 --- a/client/rwc/websocket/fasthttp/client_rwc_handlers.go +++ b/client/rwc/websocket/fasthttp/client_rwc_handlers.go @@ -27,13 +27,18 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex return cwfc.NewSocket(crwch.SocketBuilder, clientCTX) } -func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { +func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { soc := conn.(cwfc.Socket) - - crwch.ReadMtx.Lock() _, r, err := soc.NextReader() - resCodec, err := codec.NewResponse(r) - crwch.ReadMtx.Unlock() + if nil != err { + return nil, err + } + + return codec.NewDecoder(r), nil +} + +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) { + resCodec, err := codec.NewResponse(decoder) return resCodec, err } @@ -45,10 +50,8 @@ func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientC ) soc := conn.(cwfc.Socket) - crwch.WriteMtx.Lock() wc, wErr = soc.NextWriter(websocket.TextMessage) if nil != wErr { - crwch.WriteMtx.Unlock() return wErr } defer func() { @@ -57,8 +60,6 @@ func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientC wErr = codec.WriteRequest(wc, method, params, id) - crwch.WriteMtx.Unlock() - return wErr } diff --git a/codec/codec.go b/codec/codec.go deleted file mode 100644 index 52643d0..0000000 --- a/codec/codec.go +++ /dev/null @@ -1,23 +0,0 @@ -package codec - -import "io" - -// Codec interface contains the encoder for response or decoder for request. -// Eg. gzip, flate compressions. -type Codec interface { - Encode(w io.Writer) io.Writer - Decode(r io.Reader) io.Reader -} - -type codec struct { -} - -func (_ *codec) Encode(w io.Writer) io.Writer { - return w -} - -func (_ *codec) Decode(r io.Reader) io.Reader { - return r -} - -var DefaultCodec = &codec{} diff --git a/codec/selector.go b/codec/selector.go deleted file mode 100644 index 902117f..0000000 --- a/codec/selector.go +++ /dev/null @@ -1,25 +0,0 @@ -package codec - -import "io" - -// CodecSelector interface provides a way to select encoder using the http -// request. Typically people can use this to check HEADER of the request and -// figure out client capabilities. -// Eg. "Accept-Encoding" tells about supported compressions. -type CodecSelector interface { - SelectByReader(r io.Reader) Codec - SelectByWriter(w io.Writer) Codec -} - -type codecSelector struct { -} - -func (_ *codecSelector) SelectByReader(_ io.Reader) Codec { - return DefaultCodec -} - -func (_ *codecSelector) SelectByWriter(_ io.Writer) Codec { - return DefaultCodec -} - -var DefaultCodecSelector = &codecSelector{} diff --git a/protocol/client_codec.go b/protocol/client_codec.go index b5a58bc..db3b291 100644 --- a/protocol/client_codec.go +++ b/protocol/client_codec.go @@ -6,8 +6,10 @@ import ( // ClientCodec creates a ClientCodecRequest to process each request. type ClientCodec interface { + NewDecoder(r io.Reader) interface{} + WriteRequest(w io.Writer, method string, args []interface{}, id interface{}) error - NewResponse(rc io.Reader) (ClientResponseCodec, error) + NewResponse(decoder interface{}) (ClientResponseCodec, error) } type ClientResponseCodec interface { diff --git a/protocol/json/client.go b/protocol/json/client.go index 3ead2d6..1a8f1c0 100644 --- a/protocol/json/client.go +++ b/protocol/json/client.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" - "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" ) @@ -12,19 +11,17 @@ import ( // Codec // ---------------------------------------------------------------------------- -// NewCustomClientCodec returns a new JSON Codec based on passed encoder selector. -func NewCustomClientCodec(codecSel codec.CodecSelector) protocol.ClientCodec { - return &ClientCodec{codecSel: codecSel} -} - // NewClientCodec returns a new JSON Codec. func NewClientCodec() protocol.ClientCodec { - return NewCustomClientCodec(codec.DefaultCodecSelector) + return &ClientCodec{} } // ClientCodec creates a ClientCodecRequest to process each request. type ClientCodec struct { - codecSel codec.CodecSelector +} + +func (cc *ClientCodec) NewDecoder(r io.Reader) interface{} { + return json.NewDecoder(r) } func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args []interface{}, id interface{}) error { @@ -40,7 +37,7 @@ func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args []interface ID: id, } - encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w)) + encoder := json.NewEncoder(w) if err := encoder.Encode(req); nil != err { return err } @@ -49,6 +46,6 @@ func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args []interface } // NewMessage returns a ClientMessageCodec. -func (cc *ClientCodec) NewResponse(r io.Reader) (protocol.ClientResponseCodec, error) { - return newClientResponseCodec(r, cc.codecSel.SelectByReader(r)) +func (cc *ClientCodec) NewResponse(decoder interface{}) (protocol.ClientResponseCodec, error) { + return newClientResponseCodec(decoder.(*json.Decoder)) } diff --git a/protocol/json/client_notification.go b/protocol/json/client_notification.go index 0c96da2..1229427 100644 --- a/protocol/json/client_notification.go +++ b/protocol/json/client_notification.go @@ -3,7 +3,6 @@ package json import ( "encoding/json" - "git.loafle.net/commons_go/rpc/codec" crp "git.loafle.net/commons_go/rpc/protocol" cuej "git.loafle.net/commons_go/util/encoding/json" ) @@ -21,9 +20,8 @@ type clientNotification struct { // ClientNotificationCodec decodes and encodes a single notification. type ClientNotificationCodec struct { - noti *clientNotification - err error - codec codec.Codec + noti *clientNotification + err error } func (cnc *ClientNotificationCodec) HasResponse() bool { diff --git a/protocol/json/client_response.go b/protocol/json/client_response.go index eddb1da..2ec41d3 100644 --- a/protocol/json/client_response.go +++ b/protocol/json/client_response.go @@ -5,7 +5,6 @@ import ( "fmt" "io" - "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" crp "git.loafle.net/commons_go/rpc/protocol" ) @@ -23,9 +22,8 @@ type clientResponse struct { // ClientResponseCodec decodes and encodes a single request. type ClientResponseCodec struct { - res *clientResponse - err error - codec codec.Codec + res *clientResponse + err error } func (crc *ClientResponseCodec) ID() interface{} { @@ -61,15 +59,11 @@ func (crc *ClientResponseCodec) Notification() (protocol.ClientNotificationCodec return nil, err } - return &ClientNotificationCodec{noti: noti, err: err, codec: crc.codec}, nil + return &ClientNotificationCodec{noti: noti, err: err}, nil } // newClientMessageCodec returns a new ClientMessageCodec. -func newClientResponseCodec(r io.Reader, codec codec.Codec) (protocol.ClientResponseCodec, error) { - decoder := json.NewDecoder(r) - if nil == r { - return nil, io.EOF - } +func newClientResponseCodec(decoder *json.Decoder) (protocol.ClientResponseCodec, error) { decoder.UseNumber() res := &clientResponse{} @@ -92,5 +86,5 @@ func newClientResponseCodec(r io.Reader, codec codec.Codec) (protocol.ClientResp } } - return &ClientResponseCodec{res: res, err: err, codec: codec}, nil + return &ClientResponseCodec{res: res, err: err}, nil } diff --git a/protocol/json/server.go b/protocol/json/server.go index cbc38f9..1eb5fb2 100644 --- a/protocol/json/server.go +++ b/protocol/json/server.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" - "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" ) @@ -14,24 +13,22 @@ var null = json.RawMessage([]byte("null")) // Codec // ---------------------------------------------------------------------------- -// NewCustomServerCodec returns a new JSON Codec based on passed encoder selector. -func NewCustomServerCodec(codecSel codec.CodecSelector) protocol.ServerCodec { - return &ServerCodec{codecSel: codecSel} -} - // NewServerCodec returns a new JSON Codec. func NewServerCodec() protocol.ServerCodec { - return NewCustomServerCodec(codec.DefaultCodecSelector) + return &ServerCodec{} } // ServerCodec creates a ServerRequestCodec to process each request. type ServerCodec struct { - codecSel codec.CodecSelector +} + +func (sc *ServerCodec) NewDecoder(r io.Reader) interface{} { + return json.NewDecoder(r) } // NewRequest returns a ServerRequestCodec. -func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerRequestCodec, error) { - return newServerRequestCodec(r, sc.codecSel.SelectByReader(r)) +func (sc *ServerCodec) NewRequest(decoder interface{}) (protocol.ServerRequestCodec, error) { + return newServerRequestCodec(decoder.(*json.Decoder)) } // WriteNotification send a notification from server to client. @@ -44,7 +41,7 @@ func (sc *ServerCodec) WriteNotification(w io.Writer, method string, args []inte noti := &serverNotification{Method: method, Params: params} res := &serverResponse{Version: Version, Result: noti} - encoder := json.NewEncoder(sc.codecSel.SelectByWriter(w).Encode(w)) + encoder := json.NewEncoder(w) // Not sure in which case will this happen. But seems harmless. if err := encoder.Encode(res); nil != err { return err diff --git a/protocol/json/server_request.go b/protocol/json/server_request.go index a446101..1361527 100644 --- a/protocol/json/server_request.go +++ b/protocol/json/server_request.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" - "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" crp "git.loafle.net/commons_go/rpc/protocol" cuej "git.loafle.net/commons_go/util/encoding/json" @@ -36,14 +35,8 @@ type serverRequest struct { // ---------------------------------------------------------------------------- // newRequestCodec returns a new ServerRequestCodec. -func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerRequestCodec, error) { +func newServerRequestCodec(decoder *json.Decoder) (protocol.ServerRequestCodec, error) { // Decode the request body and check if RPC method is valid. - decoder := json.NewDecoder(r) - if nil == r { - return nil, io.EOF - } - decoder.UseNumber() - req := &serverRequest{} err := decoder.Decode(req) if err == io.ErrUnexpectedEOF || err == io.EOF { @@ -64,14 +57,13 @@ func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerReque } } - return &ServerRequestCodec{req: req, err: err, codec: codec}, nil + return &ServerRequestCodec{req: req, err: err}, nil } // ServerRequestCodec decodes and encodes a single request. type ServerRequestCodec struct { - req *serverRequest - err error - codec codec.Codec + req *serverRequest + err error } func (src *ServerRequestCodec) HasResponse() bool { @@ -155,7 +147,7 @@ func (src *ServerRequestCodec) WriteError(w io.Writer, status int, err error) er func (src *ServerRequestCodec) writeServerResponse(w io.Writer, res *serverResponse) error { // ID is null for notifications and they don't have a response. if src.req.ID != nil { - encoder := json.NewEncoder(src.codec.Encode(w)) + encoder := json.NewEncoder(w) // Not sure in which case will this happen. But seems harmless. if err := encoder.Encode(res); nil != err { return err diff --git a/protocol/server_codec.go b/protocol/server_codec.go index 92f98de..15d74b7 100644 --- a/protocol/server_codec.go +++ b/protocol/server_codec.go @@ -6,7 +6,9 @@ import ( // ServerCodec creates a ServerRequestCodec to process each request. type ServerCodec interface { - NewRequest(r io.Reader) (ServerRequestCodec, error) + NewDecoder(r io.Reader) interface{} + + NewRequest(decoder interface{}) (ServerRequestCodec, error) WriteNotification(w io.Writer, method string, args []interface{}) error } diff --git a/server/rwc/socket/servlet_rwc_handlers.go b/server/rwc/socket/servlet_rwc_handlers.go index e3b9b1c..7206ceb 100644 --- a/server/rwc/socket/servlet_rwc_handlers.go +++ b/server/rwc/socket/servlet_rwc_handlers.go @@ -14,12 +14,14 @@ type ServletReadWriteCloseHandlers struct { rpc.ServletReadWriteCloseHandlers } -func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { +func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error) { soc := conn.(server.Socket) - srwch.ReadMtx.Lock() - reqCodec, err := codec.NewRequest(soc) - srwch.ReadMtx.Unlock() + return codec.NewDecoder(soc), nil +} + +func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) { + reqCodec, err := codec.NewRequest(decoder) return reqCodec, err } @@ -30,13 +32,11 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet ) soc := conn.(server.Socket) - srwch.WriteMtx.Lock() if nil != err { wErr = reqCodec.WriteError(soc, 500, err) } else { wErr = reqCodec.WriteResponse(soc, result) } - srwch.WriteMtx.Unlock() return wErr } @@ -47,9 +47,7 @@ func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.Ser ) soc := conn.(server.Socket) - srwch.WriteMtx.Lock() wErr = codec.WriteNotification(soc, method, params) - srwch.WriteMtx.Unlock() return wErr } diff --git a/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go b/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go index 2b4f0c2..3a8e120 100644 --- a/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go +++ b/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go @@ -17,13 +17,18 @@ type ServletReadWriteCloseHandlers struct { rpc.ServletReadWriteCloseHandlers } -func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { +func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error) { soc := conn.(cwf.Socket) - - srwch.ReadMtx.Lock() _, r, err := soc.NextReader() - requestCodec, err := codec.NewRequest(r) - srwch.ReadMtx.Unlock() + if nil != err { + return nil, err + } + + return codec.NewDecoder(r), nil +} + +func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) { + requestCodec, err := codec.NewRequest(decoder) return requestCodec, err } @@ -35,10 +40,8 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet ) soc := conn.(cwf.Socket) - srwch.WriteMtx.Lock() wc, wErr = soc.NextWriter(websocket.TextMessage) if nil != wErr { - srwch.WriteMtx.Unlock() return wErr } @@ -51,7 +54,6 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet } else { wErr = requestCodec.WriteResponse(wc, result) } - srwch.WriteMtx.Unlock() return wErr } @@ -63,10 +65,8 @@ func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.Ser ) soc := conn.(cwf.Socket) - srwch.WriteMtx.Lock() wc, wErr = soc.NextWriter(websocket.TextMessage) if nil != wErr { - srwch.WriteMtx.Unlock() return wErr } @@ -75,7 +75,6 @@ func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.Ser }() wErr = codec.WriteNotification(wc, method, params) - srwch.WriteMtx.Unlock() return wErr } diff --git a/servlet.go b/servlet.go index 2574b8c..1e5f5d6 100644 --- a/servlet.go +++ b/servlet.go @@ -36,11 +36,13 @@ type rpcServlet struct { conn interface{} serverCodec protocol.ServerCodec + decoder interface{} stopChan chan struct{} stopWg sync.WaitGroup - sendMtx sync.RWMutex + requestMtx sync.Mutex + responseMTX sync.Mutex } func (s *rpcServlet) Context() ServletContext { @@ -70,6 +72,10 @@ func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan cha s.conn = conn s.serverCodec = sc + s.decoder, err = s.rwcSH.NewDecoder(s.ctx, sc, conn) + if nil != err { + return fmt.Errorf("RPC Servlet: Cannot build rpc decoder") + } if err := s.sh.Init(s.ctx); nil != err { return fmt.Errorf("RPC Servlet: Initialization of servlet has been failed %v", err) @@ -168,7 +174,9 @@ func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { return } - requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn) + s.requestMtx.Lock() + requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.decoder) + s.requestMtx.Unlock() if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { err = fmt.Errorf("RPC Server: Disconnected from client") @@ -221,6 +229,7 @@ func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { return } + s.responseMTX.Lock() if nil != rs.requestCodec { if err := s.rwcSH.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err { logging.Logger().Errorf("RPC Servlet: response error %v", err) @@ -230,9 +239,9 @@ func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { logging.Logger().Errorf("RPC Servlet: notification error %v", err) } } + s.responseMTX.Unlock() } - } func handleRequest(s *rpcServlet, requestCodec protocol.ServerRequestCodec) { diff --git a/servlet_rwc_handler.go b/servlet_rwc_handler.go index c0e2ef7..cd2b2b5 100644 --- a/servlet_rwc_handler.go +++ b/servlet_rwc_handler.go @@ -3,7 +3,9 @@ package rpc import "git.loafle.net/commons_go/rpc/protocol" type ServletReadWriteCloseHandler interface { - ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) + NewDecoder(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error) + + ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) WriteResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error WriteNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error diff --git a/servlet_rwc_handlers.go b/servlet_rwc_handlers.go index 59a2973..d97649f 100644 --- a/servlet_rwc_handlers.go +++ b/servlet_rwc_handlers.go @@ -2,17 +2,20 @@ package rpc import ( "fmt" - "sync" + "git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/rpc/protocol" ) type ServletReadWriteCloseHandlers struct { - ReadMtx sync.Mutex - WriteMtx sync.Mutex } -func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { +func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) interface{} { + logging.Logger().Errorf("Servlet RWC Handler: NewDecoder is not implemented") + return nil +} + +func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) { return nil, fmt.Errorf("Servlet RWC Handler: ReadRequest is not implemented") }