This commit is contained in:
crusader 2017-11-30 13:59:46 +09:00
parent 92448ee89d
commit 824265e5a7
6 changed files with 36 additions and 68 deletions

View File

@ -222,7 +222,9 @@ func (c *client) handleRPC() {
<-writerDone <-writerDone
} }
c.rwcHandler.Disconnect(c.ctx, c.conn) if nil != c.conn {
c.rwcHandler.Disconnect(c.ctx, c.conn)
}
if err != nil { if err != nil {
//c.LogError("%s", err) //c.LogError("%s", err)
@ -295,6 +297,11 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
requestID = rs.ID requestID = rs.ID
} }
if nil == c.conn {
err = io.EOF
return
}
err = c.rwcHandler.WriteRequest(c.ctx, c.ch.GetCodec(), c.conn, rs.Method, rs.Args, requestID) err = c.rwcHandler.WriteRequest(c.ctx, c.ch.GetCodec(), c.conn, rs.Method, rs.Args, requestID)
if !rs.hasResponse { if !rs.hasResponse {
rs.Error = err rs.Error = err
@ -325,6 +332,10 @@ func (c *client) rpcReader(readerDone chan<- error) {
}() }()
for { for {
if nil == c.conn {
err = io.EOF
return
}
resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.conn) resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.conn)
if nil != err { if nil != err {
if err == io.ErrUnexpectedEOF || err == io.EOF { if err == io.ErrUnexpectedEOF || err == io.EOF {

View File

@ -1,18 +1,13 @@
package socket package socket
import ( import (
"io"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc/client" "git.loafle.net/commons_go/rpc/client"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/server" "git.loafle.net/commons_go/server"
) )
func New(address string) client.ClientReadWriteCloseHandler { func New() client.ClientReadWriteCloseHandler {
return &ClientReadWriteCloseHandlers{ return &ClientReadWriteCloseHandlers{}
Address: address,
}
} }
type ClientReadWriteCloseHandlers struct { type ClientReadWriteCloseHandlers struct {
@ -20,10 +15,6 @@ type ClientReadWriteCloseHandlers struct {
} }
func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) {
if nil == conn {
return nil, io.EOF
}
soc := conn.(server.Socket) soc := conn.(server.Socket)
resCodec, err := codec.NewResponse(soc) resCodec, err := codec.NewResponse(soc)
@ -31,13 +22,9 @@ func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientC
} }
func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error { func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error {
if nil == conn {
return io.EOF
}
soc := conn.(server.Socket) soc := conn.(server.Socket)
if wErr := codec.WriteRequest(soc, method, params); nil != wErr { if wErr := codec.WriteRequest(soc, method, params, id); nil != wErr {
return wErr return wErr
} }
@ -45,16 +32,10 @@ func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientC
} }
func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) { func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) {
if nil == conn {
return
}
soc := conn.(server.Socket) soc := conn.(server.Socket)
soc.Close() soc.Close()
} }
func (crwch *ClientReadWriteCloseHandlers) Validate() { func (crwch *ClientReadWriteCloseHandlers) Validate() {
if "" == crwch.Address { crwch.ClientReadWriteCloseHandlers.Validate()
logging.Logger().Panic("RPC Client RWC Handler: Address must be specified")
}
} }

View File

@ -1,8 +1,6 @@
package fasthttp package fasthttp
import ( import (
"io"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"git.loafle.net/commons_go/rpc/client" "git.loafle.net/commons_go/rpc/client"
@ -19,10 +17,6 @@ type ClientReadWriteCloseHandlers struct {
} }
func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) {
if nil == conn {
return nil, io.EOF
}
soc := conn.(cwf.Socket) soc := conn.(cwf.Socket)
_, r, err := soc.NextReader() _, r, err := soc.NextReader()
@ -32,10 +26,6 @@ func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientC
} }
func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error { func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error {
if nil == conn {
return io.EOF
}
soc := conn.(cwf.Socket) soc := conn.(cwf.Socket)
wc, wErr := soc.NextWriter(websocket.TextMessage) wc, wErr := soc.NextWriter(websocket.TextMessage)
@ -46,7 +36,7 @@ func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientC
wc.Close() wc.Close()
}() }()
if wErr := codec.WriteRequest(wc, method, params); nil != wErr { if wErr := codec.WriteRequest(wc, method, params, id); nil != wErr {
return wErr return wErr
} }
@ -54,14 +44,10 @@ func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientC
} }
func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) { func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) {
if nil == conn {
return
}
soc := conn.(cwf.Socket) soc := conn.(cwf.Socket)
soc.Close() soc.Close()
} }
func (crwch *ClientReadWriteCloseHandlers) Validate() { func (crwch *ClientReadWriteCloseHandlers) Validate() {
crwch.ClientReadWriteCloseHandlers.Validate()
} }

View File

@ -1,8 +1,6 @@
package socket package socket
import ( import (
"io"
"git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/server" "git.loafle.net/commons_go/server"
@ -17,10 +15,6 @@ type ServletReadWriteCloseHandlers struct {
} }
func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
if nil == conn {
return nil, io.EOF
}
soc := conn.(server.Socket) soc := conn.(server.Socket)
reqCodec, err := codec.NewRequest(soc) reqCodec, err := codec.NewRequest(soc)
@ -28,10 +22,6 @@ func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletCo
} }
func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error { func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error {
if nil == conn {
return io.EOF
}
soc := conn.(server.Socket) soc := conn.(server.Socket)
if nil != err { if nil != err {
@ -48,10 +38,6 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet
} }
func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error { func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error {
if nil == conn {
return io.EOF
}
soc := conn.(server.Socket) soc := conn.(server.Socket)
if wErr := codec.WriteNotification(soc, method, params); nil != wErr { if wErr := codec.WriteNotification(soc, method, params); nil != wErr {
@ -60,3 +46,7 @@ func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.Ser
return nil return nil
} }
func (srwch *ServletReadWriteCloseHandlers) Validate() {
srwch.ServletReadWriteCloseHandlers.Validate()
}

View File

@ -1,8 +1,6 @@
package fasthttp package fasthttp
import ( import (
"io"
"git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
cwf "git.loafle.net/commons_go/websocket_fasthttp" cwf "git.loafle.net/commons_go/websocket_fasthttp"
@ -18,10 +16,6 @@ type ServletReadWriteCloseHandlers struct {
} }
func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
if nil == conn {
return nil, io.EOF
}
soc := conn.(cwf.Socket) soc := conn.(cwf.Socket)
_, r, err := soc.NextReader() _, r, err := soc.NextReader()
@ -31,10 +25,6 @@ func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletCo
} }
func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
if nil == conn {
return io.EOF
}
soc := conn.(cwf.Socket) soc := conn.(cwf.Socket)
wc, wErr := soc.NextWriter(websocket.TextMessage) wc, wErr := soc.NextWriter(websocket.TextMessage)
@ -60,10 +50,6 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet
} }
func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error { func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error {
if nil == conn {
return io.EOF
}
soc := conn.(cwf.Socket) soc := conn.(cwf.Socket)
wc, wErr := soc.NextWriter(websocket.TextMessage) wc, wErr := soc.NextWriter(websocket.TextMessage)
@ -77,3 +63,7 @@ func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.Ser
return nil return nil
} }
func (srwch *ServletReadWriteCloseHandlers) Validate() {
srwch.ServletReadWriteCloseHandlers.Validate()
}

View File

@ -163,6 +163,11 @@ func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
}() }()
for { for {
if nil == s.conn {
err = fmt.Errorf("RPC Server: disconnected from client")
return
}
requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn) requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn)
if nil != err { if nil != err {
if err == io.ErrUnexpectedEOF || err == io.EOF { if err == io.ErrUnexpectedEOF || err == io.EOF {
@ -213,6 +218,11 @@ func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
} }
} }
if nil == s.conn {
err = fmt.Errorf("RPC Server: disconnected from client")
return
}
if nil != rs.requestCodec { if nil != rs.requestCodec {
if err := s.rwcSH.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err { if err := s.rwcSH.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err {
logging.Logger().Error(fmt.Sprintf("RPC Server: response error %v", err)) logging.Logger().Error(fmt.Sprintf("RPC Server: response error %v", err))