diff --git a/client/client_rwc_handlers.go b/client/client_rwc_handlers.go index 2461cfc..b2e8f8d 100644 --- a/client/client_rwc_handlers.go +++ b/client/client_rwc_handlers.go @@ -2,11 +2,14 @@ package client import ( "fmt" + "sync" "git.loafle.net/commons_go/rpc/protocol" ) type ClientReadWriteCloseHandlers struct { + ReadMtx sync.RWMutex + WriteMtx sync.RWMutex } func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX ClientContext) (interface{}, error) { diff --git a/client/rwc/socket/client_rwc_handlers.go b/client/rwc/socket/client_rwc_handlers.go index e9f25f1..f7ad15b 100644 --- a/client/rwc/socket/client_rwc_handlers.go +++ b/client/rwc/socket/client_rwc_handlers.go @@ -26,19 +26,25 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { soc := conn.(csc.Socket) + + crwch.ReadMtx.RLock() resCodec, err := codec.NewResponse(soc) + crwch.ReadMtx.RUnlock() return resCodec, err } func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error { + var ( + wErr error + ) soc := conn.(csc.Socket) - if wErr := codec.WriteRequest(soc, method, params, id); nil != wErr { - return wErr - } + crwch.WriteMtx.RLock() + wErr = codec.WriteRequest(soc, method, params, id) + crwch.WriteMtx.RUnlock() - return nil + return wErr } func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) { diff --git a/client/rwc/websocket/fasthttp/client_rwc_handlers.go b/client/rwc/websocket/fasthttp/client_rwc_handlers.go index 9401be2..071605c 100644 --- a/client/rwc/websocket/fasthttp/client_rwc_handlers.go +++ b/client/rwc/websocket/fasthttp/client_rwc_handlers.go @@ -1,6 +1,8 @@ package fasthttp import ( + "io" + "git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/rpc/client" "git.loafle.net/commons_go/rpc/protocol" @@ -27,29 +29,37 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { soc := conn.(cwfc.Socket) - _, r, err := soc.NextReader() + crwch.ReadMtx.RLock() + _, r, err := soc.NextReader() resCodec, err := codec.NewResponse(r) + crwch.ReadMtx.RUnlock() return resCodec, err } func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error { + var ( + wc io.WriteCloser + wErr error + ) soc := conn.(cwfc.Socket) - wc, wErr := soc.NextWriter(websocket.TextMessage) + crwch.WriteMtx.RLock() + wc, wErr = soc.NextWriter(websocket.TextMessage) if nil != wErr { + crwch.WriteMtx.RUnlock() return wErr } defer func() { wc.Close() }() - if wErr := codec.WriteRequest(wc, method, params, id); nil != wErr { - return wErr - } + wErr = codec.WriteRequest(wc, method, params, id) - return nil + crwch.WriteMtx.RUnlock() + + return wErr } func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) { diff --git a/protocol/json/client_response.go b/protocol/json/client_response.go index 5c53b1a..6a2d59b 100644 --- a/protocol/json/client_response.go +++ b/protocol/json/client_response.go @@ -66,6 +66,7 @@ func (crc *ClientResponseCodec) Notification() (protocol.ClientNotificationCodec // newClientMessageCodec returns a new ClientMessageCodec. func newClientResponseCodec(r io.Reader, codec codec.Codec) (protocol.ClientResponseCodec, error) { + decoder := json.NewDecoder(r) if nil == r { return nil, io.EOF diff --git a/server/rwc/socket/servlet_rwc_handlers.go b/server/rwc/socket/servlet_rwc_handlers.go index ccf374e..372e348 100644 --- a/server/rwc/socket/servlet_rwc_handlers.go +++ b/server/rwc/socket/servlet_rwc_handlers.go @@ -16,35 +16,42 @@ type ServletReadWriteCloseHandlers struct { func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { soc := conn.(server.Socket) + + srwch.ReadMtx.RLock() reqCodec, err := codec.NewRequest(soc) + srwch.ReadMtx.RUnlock() return reqCodec, err } func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error { + var ( + wErr error + ) soc := conn.(server.Socket) + srwch.WriteMtx.RLock() if nil != err { - if wErr := reqCodec.WriteError(soc, 500, err); nil != wErr { - return wErr - } + wErr = reqCodec.WriteError(soc, 500, err) } else { - if wErr := reqCodec.WriteResponse(soc, result); nil != wErr { - return wErr - } + wErr = reqCodec.WriteResponse(soc, result) } + srwch.WriteMtx.RUnlock() - return nil + return wErr } func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error { + var ( + wErr error + ) soc := conn.(server.Socket) - if wErr := codec.WriteNotification(soc, method, params); nil != wErr { - return wErr - } + srwch.WriteMtx.RLock() + wErr = codec.WriteNotification(soc, method, params) + srwch.WriteMtx.RUnlock() - return nil + return wErr } func (srwch *ServletReadWriteCloseHandlers) Validate() { diff --git a/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go b/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go index 99767ee..2c5d935 100644 --- a/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go +++ b/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go @@ -1,6 +1,8 @@ package fasthttp import ( + "io" + "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" cwf "git.loafle.net/commons_go/websocket_fasthttp" @@ -17,18 +19,26 @@ type ServletReadWriteCloseHandlers struct { func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { soc := conn.(cwf.Socket) - _, r, err := soc.NextReader() + srwch.ReadMtx.RLock() + _, r, err := soc.NextReader() requestCodec, err := codec.NewRequest(r) + srwch.ReadMtx.RUnlock() return requestCodec, err } func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { + var ( + wc io.WriteCloser + wErr error + ) soc := conn.(cwf.Socket) - wc, wErr := soc.NextWriter(websocket.TextMessage) + srwch.WriteMtx.RLock() + wc, wErr = soc.NextWriter(websocket.TextMessage) if nil != wErr { + srwch.WriteMtx.RUnlock() return wErr } @@ -37,23 +47,26 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet }() if nil != err { - if wErr := requestCodec.WriteError(wc, 500, err); nil != wErr { - return wErr - } + wErr = requestCodec.WriteError(wc, 500, err) } else { - if wErr := requestCodec.WriteResponse(wc, result); nil != wErr { - return wErr - } + wErr = requestCodec.WriteResponse(wc, result) } + srwch.WriteMtx.RUnlock() - return nil + return wErr } func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error { + var ( + wc io.WriteCloser + wErr error + ) soc := conn.(cwf.Socket) - wc, wErr := soc.NextWriter(websocket.TextMessage) + srwch.WriteMtx.RLock() + wc, wErr = soc.NextWriter(websocket.TextMessage) if nil != wErr { + srwch.WriteMtx.RUnlock() return wErr } @@ -61,11 +74,10 @@ func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.Ser wc.Close() }() - if wErr := codec.WriteNotification(wc, method, params); nil != wErr { - return wErr - } + wErr = codec.WriteNotification(wc, method, params) + srwch.WriteMtx.RUnlock() - return nil + return wErr } func (srwch *ServletReadWriteCloseHandlers) Validate() { diff --git a/servlet_rwc_handlers.go b/servlet_rwc_handlers.go index 033c638..b6fc252 100644 --- a/servlet_rwc_handlers.go +++ b/servlet_rwc_handlers.go @@ -2,11 +2,14 @@ package rpc import ( "fmt" + "sync" "git.loafle.net/commons_go/rpc/protocol" ) type ServletReadWriteCloseHandlers struct { + ReadMtx sync.RWMutex + WriteMtx sync.RWMutex } func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {