diff --git a/client/client.go b/client/client.go index 5814076..aba638c 100644 --- a/client/client.go +++ b/client/client.go @@ -36,8 +36,7 @@ type client struct { ch ClientHandler rwcHandler ClientReadWriteCloseHandler - conn interface{} - decoder interface{} + conn interface{} pendingRequestsCount uint32 pendingRequests map[uint64]*RequestState @@ -78,11 +77,6 @@ func (c *client) Connect() error { return err } - c.decoder, err = c.rwcHandler.NewDecoder(c.ctx, c.ch.GetCodec(), c.conn) - if nil != err { - return fmt.Errorf("RPC Client: Cannot build rpc decoder") - } - c.stopChan = make(chan struct{}) c.requestQueueChan = make(chan *RequestState, c.ch.GetPendingRequests()) c.pendingRequests = make(map[uint64]*RequestState) @@ -350,7 +344,7 @@ func (c *client) rpcReader(readerDone chan<- error) { return } c.responseMtx.Lock() - resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.decoder) + resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.conn) c.responseMtx.Unlock() if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { diff --git a/client/client_rwc_handler.go b/client/client_rwc_handler.go index d5670c7..9693c51 100644 --- a/client/client_rwc_handler.go +++ b/client/client_rwc_handler.go @@ -4,8 +4,7 @@ import "git.loafle.net/commons_go/rpc/protocol" type ClientReadWriteCloseHandler interface { Connect(clientCTX ClientContext) (interface{}, error) - NewDecoder(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) - ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) + ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error Disconnect(clientCTX ClientContext, conn interface{}) diff --git a/client/client_rwc_handlers.go b/client/client_rwc_handlers.go index 559d5c0..2461cfc 100644 --- a/client/client_rwc_handlers.go +++ b/client/client_rwc_handlers.go @@ -13,11 +13,7 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX ClientContext) (int return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[Connect] is not implement") } -func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { - return nil, fmt.Errorf("RPC Client RWC Handler: NewDecoder is not implemented") -} - -func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) { +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[ReadResponse] is not implement") } diff --git a/client/rwc/socket/client_rwc_handlers.go b/client/rwc/socket/client_rwc_handlers.go index 56a52eb..381cc9e 100644 --- a/client/rwc/socket/client_rwc_handlers.go +++ b/client/rwc/socket/client_rwc_handlers.go @@ -24,16 +24,10 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex return csc.NewSocket(crwch.SocketBuilder, clientCTX) } -func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { soc := conn.(csc.Socket) - return codec.NewDecoder(soc), nil -} - -func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) { - resCodec, err := codec.NewResponse(decoder) - - return resCodec, err + return codec.NewResponse(soc) } func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error { diff --git a/client/rwc/websocket/fasthttp/client_rwc_handlers.go b/client/rwc/websocket/fasthttp/client_rwc_handlers.go index d216301..a0f6d93 100644 --- a/client/rwc/websocket/fasthttp/client_rwc_handlers.go +++ b/client/rwc/websocket/fasthttp/client_rwc_handlers.go @@ -27,20 +27,15 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex return cwfc.NewSocket(crwch.SocketBuilder, clientCTX) } -func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { soc := conn.(cwfc.Socket) + _, r, err := soc.NextReader() if nil != err { return nil, err } - return codec.NewDecoder(r), nil -} - -func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) { - resCodec, err := codec.NewResponse(decoder) - - return resCodec, err + return codec.NewResponse(r) } func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error { diff --git a/server/rwc/socket/servlet_rwc_handlers.go b/server/rwc/socket/servlet_rwc_handlers.go index 7206ceb..c40539f 100644 --- a/server/rwc/socket/servlet_rwc_handlers.go +++ b/server/rwc/socket/servlet_rwc_handlers.go @@ -14,16 +14,10 @@ type ServletReadWriteCloseHandlers struct { rpc.ServletReadWriteCloseHandlers } -func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error) { +func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { soc := conn.(server.Socket) - return codec.NewDecoder(soc), nil -} - -func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) { - reqCodec, err := codec.NewRequest(decoder) - - return reqCodec, err + return codec.NewRequest(soc) } func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error { diff --git a/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go b/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go index 3a8e120..1599bac 100644 --- a/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go +++ b/server/rwc/websocket/fasthttp/servlet_rwc_handlers.go @@ -17,20 +17,15 @@ type ServletReadWriteCloseHandlers struct { rpc.ServletReadWriteCloseHandlers } -func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error) { +func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { soc := conn.(cwf.Socket) + _, r, err := soc.NextReader() if nil != err { return nil, err } - return codec.NewDecoder(r), nil -} - -func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) { - requestCodec, err := codec.NewRequest(decoder) - - return requestCodec, err + return codec.NewRequest(r) } func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { diff --git a/servlet.go b/servlet.go index 2446a34..ef1e77a 100644 --- a/servlet.go +++ b/servlet.go @@ -36,7 +36,6 @@ type rpcServlet struct { conn interface{} serverCodec protocol.ServerCodec - decoder interface{} stopChan chan struct{} stopWg sync.WaitGroup @@ -72,10 +71,6 @@ func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan cha s.conn = conn s.serverCodec = sc - s.decoder, err = s.rwcSH.NewDecoder(s.ctx, sc, conn) - if nil != err { - return fmt.Errorf("RPC Servlet: Cannot build rpc decoder") - } if err := s.sh.Init(s.ctx); nil != err { return fmt.Errorf("RPC Servlet: Initialization of servlet has been failed %v", err) @@ -173,7 +168,7 @@ func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { } s.requestMtx.Lock() - requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.decoder) + requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn) s.requestMtx.Unlock() if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { diff --git a/servlet_rwc_handler.go b/servlet_rwc_handler.go index cd2b2b5..c76337b 100644 --- a/servlet_rwc_handler.go +++ b/servlet_rwc_handler.go @@ -3,9 +3,8 @@ package rpc import "git.loafle.net/commons_go/rpc/protocol" type ServletReadWriteCloseHandler interface { - NewDecoder(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error) - ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) + ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) WriteResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error WriteNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error diff --git a/servlet_rwc_handlers.go b/servlet_rwc_handlers.go index d97649f..033c638 100644 --- a/servlet_rwc_handlers.go +++ b/servlet_rwc_handlers.go @@ -3,19 +3,13 @@ package rpc import ( "fmt" - "git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/rpc/protocol" ) type ServletReadWriteCloseHandlers struct { } -func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) interface{} { - logging.Logger().Errorf("Servlet RWC Handler: NewDecoder is not implemented") - return nil -} - -func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) { +func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { return nil, fmt.Errorf("Servlet RWC Handler: ReadRequest is not implemented") }