This commit is contained in:
crusader 2018-03-23 22:45:48 +09:00
parent 32361ec6f4
commit 3e82c65689
12 changed files with 252 additions and 48 deletions

View File

@ -330,11 +330,6 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
func (c *client) rpcReader(readerDone chan<- error) {
var err error
defer func() {
if r := recover(); r != nil {
if err == nil {
err = fmt.Errorf("Client: Panic when reading data from server: %v", r)
}
}
readerDone <- err
}()

View File

@ -2,6 +2,7 @@ package socket
import (
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/client"
"git.loafle.net/commons_go/rpc/protocol"
csc "git.loafle.net/commons_go/server/client"
@ -25,20 +26,38 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex
}
func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) {
var (
buf []byte
rErr error
)
soc := conn.(csc.Socket)
return codec.NewResponse(soc)
buf, rErr = rpc.TCPReadData(soc)
if nil != rErr {
return nil, rErr
}
return codec.NewResponseB(buf)
}
func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error {
var (
buf []byte
wErr error
)
soc := conn.(csc.Socket)
wErr = codec.WriteRequest(soc, method, params, id)
buf, wErr = codec.NewRequestB(method, params, id)
if nil != wErr {
return wErr
}
return wErr
wErr = rpc.TCPWriteData(soc, buf)
if nil != wErr {
return wErr
}
return nil
}
func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) {

View File

@ -30,7 +30,7 @@ func (crwch *ClientReadWriteCloseHandlers) Connect(clientCTX client.ClientContex
func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) {
soc := conn.(cwfc.Socket)
_, r, err := soc.NextReader()
_, buf, err := soc.ReadMessage()
if nil != err {
if websocket.IsUnexpectedCloseError(err) {
return nil, io.EOF
@ -38,30 +38,30 @@ func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX client.ClientC
return nil, err
}
return codec.NewResponse(r)
return codec.NewResponseB(buf)
}
func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX client.ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params []interface{}, id interface{}) error {
var (
wc io.WriteCloser
buf []byte
wErr error
)
soc := conn.(cwfc.Socket)
wc, wErr = soc.NextWriter(websocket.TextMessage)
buf, wErr = codec.NewRequestB(method, params, id)
if nil != wErr {
return wErr
}
wErr = soc.WriteMessage(websocket.TextMessage, buf)
if nil != wErr {
if websocket.IsUnexpectedCloseError(wErr) {
return io.EOF
}
return wErr
}
defer func() {
wc.Close()
}()
wErr = codec.WriteRequest(wc, method, params, id)
return wErr
return nil
}
func (crwch *ClientReadWriteCloseHandlers) Disconnect(clientCTX client.ClientContext, conn interface{}) {

View File

@ -6,6 +6,9 @@ import (
// ClientCodec creates a ClientCodecRequest to process each request.
type ClientCodec interface {
NewRequestB(method string, args []interface{}, id interface{}) ([]byte, error)
NewResponseB(buf []byte) (ClientResponseCodec, error)
WriteRequest(w io.Writer, method string, args []interface{}, id interface{}) error
NewResponse(r io.Reader) (ClientResponseCodec, error)
}

View File

@ -45,3 +45,22 @@ func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args []interface
func (cc *ClientCodec) NewResponse(r io.Reader) (protocol.ClientResponseCodec, error) {
return newClientResponseCodec(r)
}
func (cc *ClientCodec) NewRequestB(method string, args []interface{}, id interface{}) ([]byte, error) {
params, err := convertParamsToStringArray(args)
if nil != err {
return nil, err
}
req := &clientRequest{
Version: Version,
Method: method,
Params: params,
ID: id,
}
return json.Marshal(req)
}
func (cc *ClientCodec) NewResponseB(buf []byte) (protocol.ClientResponseCodec, error) {
return newClientResponseCodecB(buf)
}

View File

@ -92,3 +92,27 @@ func newClientResponseCodec(r io.Reader) (protocol.ClientResponseCodec, error) {
return &ClientResponseCodec{res: res, err: err}, nil
}
// newClientMessageCodec returns a new ClientMessageCodec.
func newClientResponseCodecB(buf []byte) (protocol.ClientResponseCodec, error) {
res := &clientResponse{}
err := json.Unmarshal(buf, res)
if err != nil {
err = &Error{
Code: crp.E_PARSE,
Message: err.Error(),
Data: res,
}
}
if res.Version != Version {
err = &Error{
Code: crp.E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: res,
}
}
return &ClientResponseCodec{res: res, err: err}, nil
}

View File

@ -22,6 +22,22 @@ func NewServerCodec() protocol.ServerCodec {
type ServerCodec struct {
}
func (sc *ServerCodec) NewRequestB(buf []byte) (protocol.ServerRequestCodec, error) {
return newServerRequestCodecB(buf)
}
func (sc *ServerCodec) NewNotificationB(method string, args []interface{}) ([]byte, error) {
params, err := convertParamsToStringArray(args)
if nil != err {
return nil, err
}
noti := &serverNotification{Method: method, Params: params}
res := &serverResponse{Version: Version, Result: noti}
return json.Marshal(res)
}
// NewRequest returns a ServerRequestCodec.
func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerRequestCodec, error) {
return newServerRequestCodec(r)

View File

@ -66,6 +66,31 @@ func newServerRequestCodec(r io.Reader) (protocol.ServerRequestCodec, error) {
return &ServerRequestCodec{req: req, err: err}, nil
}
// newRequestCodec returns a new ServerRequestCodec.
func newServerRequestCodecB(buf []byte) (protocol.ServerRequestCodec, error) {
req := &serverRequest{}
err := json.Unmarshal(buf, req)
if err != nil {
err = &Error{
Code: crp.E_PARSE,
Message: err.Error(),
Data: req,
}
}
if req.Version != Version {
err = &Error{
Code: crp.E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: req,
}
}
return &ServerRequestCodec{req: req, err: err}, nil
}
// ServerRequestCodec decodes and encodes a single request.
type ServerRequestCodec struct {
req *serverRequest
@ -161,3 +186,23 @@ func (src *ServerRequestCodec) writeServerResponse(w io.Writer, res *serverRespo
}
return nil
}
func (src *ServerRequestCodec) NewResponseB(reply interface{}) ([]byte, error) {
res := &serverResponse{Version: Version, Result: reply, ID: src.req.ID}
return src.newServerResponseB(res)
}
func (src *ServerRequestCodec) NewErrorB(status int, err error) ([]byte, error) {
jsonErr, ok := err.(*Error)
if !ok {
jsonErr = &Error{
Code: crp.E_SERVER,
Message: err.Error(),
}
}
res := &serverResponse{Version: Version, Error: jsonErr, ID: src.req.ID}
return src.newServerResponseB(res)
}
func (src *ServerRequestCodec) newServerResponseB(res *serverResponse) ([]byte, error) {
return json.Marshal(res)
}

View File

@ -8,6 +8,9 @@ import (
type ServerCodec interface {
NewRequest(r io.Reader) (ServerRequestCodec, error)
WriteNotification(w io.Writer, method string, args []interface{}) error
NewRequestB(buf []byte) (ServerRequestCodec, error)
NewNotificationB(method string, args []interface{}) ([]byte, error)
}
// ServerRequestCodec decodes a request and encodes a response using a specific
@ -15,6 +18,9 @@ type ServerCodec interface {
type ServerRequestCodec interface {
RegistryCodec
NewResponseB(reply interface{}) ([]byte, error)
NewErrorB(status int, err error) ([]byte, error)
WriteResponse(w io.Writer, reply interface{}) error
WriteError(w io.Writer, status int, err error) error
}

View File

@ -15,35 +15,62 @@ type ServletReadWriteCloseHandlers struct {
}
func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
var (
buf []byte
rErr error
)
soc := conn.(server.Socket)
return codec.NewRequest(soc)
buf, rErr = rpc.TCPReadData(soc)
if nil != rErr {
return nil, rErr
}
return codec.NewRequestB(buf)
}
func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error {
var (
buf []byte
wErr error
)
soc := conn.(server.Socket)
if nil != err {
wErr = reqCodec.WriteError(soc, 500, err)
buf, wErr = reqCodec.NewErrorB(500, err)
} else {
wErr = reqCodec.WriteResponse(soc, result)
buf, wErr = reqCodec.NewResponseB(result)
}
if nil != wErr {
return wErr
}
return wErr
wErr = rpc.TCPWriteData(soc, buf)
if nil != wErr {
return wErr
}
return nil
}
func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error {
var (
buf []byte
wErr error
)
soc := conn.(server.Socket)
wErr = codec.WriteNotification(soc, method, params)
buf, wErr = codec.NewNotificationB(method, params)
if nil != wErr {
return wErr
}
return wErr
wErr = rpc.TCPWriteData(soc, buf)
if nil != wErr {
return wErr
}
return nil
}
func (srwch *ServletReadWriteCloseHandlers) Validate() {

View File

@ -20,7 +20,7 @@ type ServletReadWriteCloseHandlers struct {
func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
soc := conn.(cwf.Socket)
_, r, err := soc.NextReader()
_, buf, err := soc.ReadMessage()
if nil != err {
if websocket.IsUnexpectedCloseError(err) {
return nil, io.EOF
@ -28,17 +28,30 @@ func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX rpc.ServletCo
return nil, err
}
return codec.NewRequest(r)
return codec.NewRequestB(buf)
}
func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
if !requestCodec.HasResponse() {
return nil
}
var (
wc io.WriteCloser
buf []byte
wErr error
)
soc := conn.(cwf.Socket)
wc, wErr = soc.NextWriter(websocket.TextMessage)
if nil != err {
buf, wErr = requestCodec.NewErrorB(500, err)
} else {
buf, wErr = requestCodec.NewResponseB(result)
}
if nil != wErr {
return wErr
}
wErr = soc.WriteMessage(websocket.TextMessage, buf)
if nil != wErr {
if websocket.IsUnexpectedCloseError(wErr) {
return io.EOF
@ -46,27 +59,22 @@ func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX rpc.Servlet
return wErr
}
defer func() {
wc.Close()
}()
if nil != err {
wErr = requestCodec.WriteError(wc, 500, err)
} else {
wErr = requestCodec.WriteResponse(wc, result)
}
return wErr
return nil
}
func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params []interface{}) error {
var (
wc io.WriteCloser
buf []byte
wErr error
)
soc := conn.(cwf.Socket)
wc, wErr = soc.NextWriter(websocket.TextMessage)
buf, wErr = codec.NewNotificationB(method, params)
if nil != wErr {
return wErr
}
wErr = soc.WriteMessage(websocket.TextMessage, buf)
if nil != wErr {
if websocket.IsUnexpectedCloseError(wErr) {
return io.EOF
@ -74,13 +82,7 @@ func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX rpc.Ser
return wErr
}
defer func() {
wc.Close()
}()
wErr = codec.WriteNotification(wc, method, params)
return wErr
return nil
}
func (srwch *ServletReadWriteCloseHandlers) Validate() {

48
tcp.go Normal file
View File

@ -0,0 +1,48 @@
package rpc
import (
"encoding/binary"
"io"
"net"
)
func TCPWriteData(conn net.Conn, buf []byte) error {
var wErr error
prefix := make([]byte, 4)
binary.BigEndian.PutUint32(prefix, uint32(len(buf)))
_, wErr = conn.Write(prefix)
if nil != wErr {
return wErr
}
_, wErr = conn.Write(buf)
if nil != wErr {
return wErr
}
return nil
}
func TCPReadData(conn net.Conn) ([]byte, error) {
var (
buf []byte
rErr error
)
prefix := make([]byte, 4)
_, rErr = io.ReadFull(conn, prefix)
if nil != rErr {
return nil, rErr
}
length := binary.BigEndian.Uint32(prefix)
buf = make([]byte, int(length))
_, rErr = io.ReadFull(conn, buf)
if nil != rErr {
return nil, rErr
}
return buf, nil
}