rpc/protocol/json/client.go

147 lines
3.5 KiB
Go
Raw Normal View History

2017-10-26 07:21:35 +00:00
package json
import (
"encoding/json"
2017-10-31 09:25:44 +00:00
"fmt"
2017-10-26 07:21:35 +00:00
"io"
2017-10-31 09:25:44 +00:00
"sync"
2017-11-26 10:15:51 +00:00
"git.loafle.net/commons_go/rpc/codec"
2017-10-31 09:25:44 +00:00
"git.loafle.net/commons_go/rpc/protocol"
2017-10-26 07:21:35 +00:00
)
2017-11-26 10:15:51 +00:00
// clientMessage represents a JSON-RPC message sent to a client.
type clientMessage struct {
Version string `json:"jsonrpc"`
MessageType protocol.MessageType `json:"messageType"`
Message *json.RawMessage `json:"message"`
2017-10-26 07:21:35 +00:00
}
2017-10-31 09:25:44 +00:00
// ----------------------------------------------------------------------------
// Codec
// ----------------------------------------------------------------------------
// NewCustomClientCodec returns a new JSON Codec based on passed encoder selector.
2017-11-26 10:15:51 +00:00
func NewCustomClientCodec(codecSel codec.CodecSelector) protocol.ClientCodec {
return &ClientCodec{codecSel: codecSel}
2017-10-26 07:21:35 +00:00
}
2017-10-31 09:25:44 +00:00
// NewClientCodec returns a new JSON Codec.
2017-11-26 10:15:51 +00:00
func NewClientCodec() protocol.ClientCodec {
return NewCustomClientCodec(codec.DefaultCodecSelector)
2017-10-31 09:25:44 +00:00
}
// ClientCodec creates a ClientCodecRequest to process each request.
type ClientCodec struct {
2017-11-26 10:15:51 +00:00
codecSel codec.CodecSelector
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error {
2017-10-31 09:25:44 +00:00
req := retainClientRequest(method, args, id)
defer func() {
2017-11-26 10:15:51 +00:00
releaseClientRequest(req)
2017-10-31 09:25:44 +00:00
}()
2017-11-26 10:15:51 +00:00
encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w))
2017-10-31 09:25:44 +00:00
if err := encoder.Encode(req); nil != err {
2017-10-26 07:21:35 +00:00
return err
}
2017-10-31 09:25:44 +00:00
return nil
}
2017-11-26 10:15:51 +00:00
// NewMessage returns a ClientMessageCodec.
func (cc *ClientCodec) NewMessage(r io.Reader) (protocol.ClientMessageCodec, error) {
return newClientMessageCodec(r, cc.codecSel.SelectByReader(r))
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
// newClientMessageCodec returns a new ClientMessageCodec.
func newClientMessageCodec(r io.Reader, codec codec.Codec) (protocol.ClientMessageCodec, error) {
msg := retainClientMessage()
err := json.NewDecoder(r).Decode(msg)
2017-10-31 09:25:44 +00:00
if err != nil {
2017-11-26 10:15:51 +00:00
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, err
}
2017-10-31 09:25:44 +00:00
err = &Error{
Code: E_PARSE,
Message: err.Error(),
2017-11-26 10:15:51 +00:00
Data: msg,
2017-10-26 07:21:35 +00:00
}
}
2017-11-26 10:15:51 +00:00
if msg.Version != Version {
err = &Error{
Code: E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: msg,
2017-11-23 09:22:58 +00:00
}
}
2017-11-26 10:15:51 +00:00
return retainClientMessageCodec(msg, err, codec), nil
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
type ClientMessageCodec struct {
msg *clientMessage
err error
codec codec.Codec
2017-10-31 09:25:44 +00:00
}
2017-11-28 10:06:35 +00:00
func (ccm *ClientMessageCodec) MessageType() protocol.MessageType {
2017-11-26 10:15:51 +00:00
return ccm.msg.MessageType
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
func (ccm *ClientMessageCodec) MessageCodec() (interface{}, error) {
switch ccm.msg.MessageType {
case protocol.MessageTypeResponse:
return newClientResponseCodec(ccm.msg.Message, ccm.codec)
case protocol.MessageTypeNotification:
return newClientNotificationCodec(ccm.msg.Message, ccm.codec)
}
return nil, fmt.Errorf("RPC: Not supported message type[%v]", ccm.msg.MessageType)
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
func (ccm *ClientMessageCodec) Close() {
if nil != ccm.msg {
releaseClientMessage(ccm.msg)
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
releaseClientMessageCodec(ccm)
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
var clientMessagePool sync.Pool
2017-10-31 09:25:44 +00:00
2017-11-26 10:15:51 +00:00
func retainClientMessage() *clientMessage {
v := clientMessagePool.Get()
2017-10-31 09:25:44 +00:00
if v == nil {
2017-11-26 10:15:51 +00:00
return &clientMessage{}
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
return v.(*clientMessage)
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
func releaseClientMessage(cm *clientMessage) {
clientMessagePool.Put(cm)
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
var clientMessageCodecPool sync.Pool
2017-10-31 09:25:44 +00:00
2017-11-26 10:15:51 +00:00
func retainClientMessageCodec(msg *clientMessage, err error, codec codec.Codec) *ClientMessageCodec {
var ccm *ClientMessageCodec
v := clientMessageCodecPool.Get()
2017-10-31 09:25:44 +00:00
if v == nil {
2017-11-26 10:15:51 +00:00
ccm = &ClientMessageCodec{}
} else {
ccm = v.(*ClientMessageCodec)
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
ccm.msg = msg
ccm.err = err
ccm.codec = codec
return ccm
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
func releaseClientMessageCodec(cr *ClientMessageCodec) {
2017-10-31 09:25:44 +00:00
2017-11-26 10:15:51 +00:00
clientMessageCodecPool.Put(cr)
2017-10-26 07:21:35 +00:00
}