This commit is contained in:
crusader 2018-03-23 18:45:53 +09:00
parent 91fb8f2093
commit 9f1aa13de1
10 changed files with 17 additions and 62 deletions

View File

@ -36,8 +36,7 @@ type client struct {
ch ClientHandler ch ClientHandler
rwcHandler ClientReadWriteCloseHandler rwcHandler ClientReadWriteCloseHandler
conn interface{} conn interface{}
decoder interface{}
pendingRequestsCount uint32 pendingRequestsCount uint32
pendingRequests map[uint64]*RequestState pendingRequests map[uint64]*RequestState
@ -78,11 +77,6 @@ func (c *client) Connect() error {
return err return err
} }
c.decoder, err = c.rwcHandler.NewDecoder(c.ctx, c.ch.GetCodec(), c.conn)
if nil != err {
return fmt.Errorf("RPC Client: Cannot build rpc decoder")
}
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})
c.requestQueueChan = make(chan *RequestState, c.ch.GetPendingRequests()) c.requestQueueChan = make(chan *RequestState, c.ch.GetPendingRequests())
c.pendingRequests = make(map[uint64]*RequestState) c.pendingRequests = make(map[uint64]*RequestState)
@ -350,7 +344,7 @@ func (c *client) rpcReader(readerDone chan<- error) {
return return
} }
c.responseMtx.Lock() c.responseMtx.Lock()
resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.decoder) resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.conn)
c.responseMtx.Unlock() c.responseMtx.Unlock()
if nil != err { if nil != err {
if err == io.ErrUnexpectedEOF || err == io.EOF { if err == io.ErrUnexpectedEOF || err == io.EOF {

View File

@ -4,8 +4,7 @@ import "git.loafle.net/commons_go/rpc/protocol"
type ClientReadWriteCloseHandler interface { type ClientReadWriteCloseHandler interface {
Connect(clientCTX ClientContext) (interface{}, error) Connect(clientCTX ClientContext) (interface{}, error)
NewDecoder(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error)
ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error)
WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error
Disconnect(clientCTX ClientContext, conn interface{}) Disconnect(clientCTX ClientContext, conn interface{})

View File

@ -13,11 +13,7 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX ClientContext) (int
return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[Connect] is not implement") return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[Connect] is not implement")
} }
func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) {
return nil, fmt.Errorf("RPC Client RWC Handler: NewDecoder is not implemented")
}
func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) {
return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[ReadResponse] is not implement") return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[ReadResponse] is not implement")
} }

View File

@ -24,16 +24,10 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex
return csc.NewSocket(crwch.SocketBuilder, clientCTX) return csc.NewSocket(crwch.SocketBuilder, clientCTX)
} }
func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) {
soc := conn.(csc.Socket) soc := conn.(csc.Socket)
return codec.NewDecoder(soc), nil return codec.NewResponse(soc)
}
func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) {
resCodec, err := codec.NewResponse(decoder)
return resCodec, err
} }
func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error { func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error {

View File

@ -27,20 +27,15 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex
return cwfc.NewSocket(crwch.SocketBuilder, clientCTX) return cwfc.NewSocket(crwch.SocketBuilder, clientCTX)
} }
func (crwch *ClientReadWriteCloseHandlers) NewDecoder(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (interface{}, error) { func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) {
soc := conn.(cwfc.Socket) soc := conn.(cwfc.Socket)
_, r, err := soc.NextReader() _, r, err := soc.NextReader()
if nil != err { if nil != err {
return nil, err return nil, err
} }
return codec.NewDecoder(r), nil return codec.NewResponse(r)
}
func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, decoder interface{}) (protocol.ClientResponseCodec, error) {
resCodec, err := codec.NewResponse(decoder)
return resCodec, err
} }
func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error { func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error {

View File

@ -14,16 +14,10 @@ type ServletReadWriteCloseHandlers struct {
rpc.ServletReadWriteCloseHandlers rpc.ServletReadWriteCloseHandlers
} }
func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error) { func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
soc := conn.(server.Socket) soc := conn.(server.Socket)
return codec.NewDecoder(soc), nil return codec.NewRequest(soc)
}
func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) {
reqCodec, err := codec.NewRequest(decoder)
return reqCodec, err
} }
func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error { func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error {

View File

@ -17,20 +17,15 @@ type ServletReadWriteCloseHandlers struct {
rpc.ServletReadWriteCloseHandlers rpc.ServletReadWriteCloseHandlers
} }
func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error) { func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
soc := conn.(cwf.Socket) soc := conn.(cwf.Socket)
_, r, err := soc.NextReader() _, r, err := soc.NextReader()
if nil != err { if nil != err {
return nil, err return nil, err
} }
return codec.NewDecoder(r), nil return codec.NewRequest(r)
}
func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) {
requestCodec, err := codec.NewRequest(decoder)
return requestCodec, err
} }
func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {

View File

@ -36,7 +36,6 @@ type rpcServlet struct {
conn interface{} conn interface{}
serverCodec protocol.ServerCodec serverCodec protocol.ServerCodec
decoder interface{}
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
@ -72,10 +71,6 @@ func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan cha
s.conn = conn s.conn = conn
s.serverCodec = sc s.serverCodec = sc
s.decoder, err = s.rwcSH.NewDecoder(s.ctx, sc, conn)
if nil != err {
return fmt.Errorf("RPC Servlet: Cannot build rpc decoder")
}
if err := s.sh.Init(s.ctx); nil != err { if err := s.sh.Init(s.ctx); nil != err {
return fmt.Errorf("RPC Servlet: Initialization of servlet has been failed %v", err) return fmt.Errorf("RPC Servlet: Initialization of servlet has been failed %v", err)
@ -173,7 +168,7 @@ func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
} }
s.requestMtx.Lock() s.requestMtx.Lock()
requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.decoder) requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn)
s.requestMtx.Unlock() s.requestMtx.Unlock()
if nil != err { if nil != err {
if err == io.ErrUnexpectedEOF || err == io.EOF { if err == io.ErrUnexpectedEOF || err == io.EOF {

View File

@ -3,9 +3,8 @@ package rpc
import "git.loafle.net/commons_go/rpc/protocol" import "git.loafle.net/commons_go/rpc/protocol"
type ServletReadWriteCloseHandler interface { type ServletReadWriteCloseHandler interface {
NewDecoder(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (interface{}, error)
ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error)
WriteResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error WriteResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error
WriteNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error WriteNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error

View File

@ -3,19 +3,13 @@ package rpc
import ( import (
"fmt" "fmt"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
) )
type ServletReadWriteCloseHandlers struct { type ServletReadWriteCloseHandlers struct {
} }
func (srwch *ServletReadWriteCloseHandlers) NewDecoder(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) interface{} { func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
logging.Logger().Errorf("Servlet RWC Handler: NewDecoder is not implemented")
return nil
}
func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, decoder interface{}) (protocol.ServerRequestCodec, error) {
return nil, fmt.Errorf("Servlet RWC Handler: ReadRequest is not implemented") return nil, fmt.Errorf("Servlet RWC Handler: ReadRequest is not implemented")
} }