147 lines
3.5 KiB
Go
147 lines
3.5 KiB
Go
package json
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
"git.loafle.net/commons_go/rpc/codec"
|
|
"git.loafle.net/commons_go/rpc/protocol"
|
|
)
|
|
|
|
// clientMessage represents a JSON-RPC message sent to a client.
|
|
type clientMessage struct {
|
|
Version string `json:"jsonrpc"`
|
|
MessageType protocol.MessageType `json:"messageType"`
|
|
Message *json.RawMessage `json:"message"`
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Codec
|
|
// ----------------------------------------------------------------------------
|
|
|
|
// NewCustomClientCodec returns a new JSON Codec based on passed encoder selector.
|
|
func NewCustomClientCodec(codecSel codec.CodecSelector) protocol.ClientCodec {
|
|
return &ClientCodec{codecSel: codecSel}
|
|
}
|
|
|
|
// NewClientCodec returns a new JSON Codec.
|
|
func NewClientCodec() protocol.ClientCodec {
|
|
return NewCustomClientCodec(codec.DefaultCodecSelector)
|
|
}
|
|
|
|
// ClientCodec creates a ClientCodecRequest to process each request.
|
|
type ClientCodec struct {
|
|
codecSel codec.CodecSelector
|
|
}
|
|
|
|
func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error {
|
|
req := retainClientRequest(method, args, id)
|
|
defer func() {
|
|
releaseClientRequest(req)
|
|
}()
|
|
|
|
encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w))
|
|
if err := encoder.Encode(req); nil != err {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewMessage returns a ClientMessageCodec.
|
|
func (cc *ClientCodec) NewMessage(r io.Reader) (protocol.ClientMessageCodec, error) {
|
|
return newClientMessageCodec(r, cc.codecSel.SelectByReader(r))
|
|
}
|
|
|
|
// newClientMessageCodec returns a new ClientMessageCodec.
|
|
func newClientMessageCodec(r io.Reader, codec codec.Codec) (protocol.ClientMessageCodec, error) {
|
|
msg := retainClientMessage()
|
|
err := json.NewDecoder(r).Decode(msg)
|
|
if err != nil {
|
|
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
|
return nil, err
|
|
}
|
|
err = &Error{
|
|
Code: E_PARSE,
|
|
Message: err.Error(),
|
|
Data: msg,
|
|
}
|
|
}
|
|
if msg.Version != Version {
|
|
err = &Error{
|
|
Code: E_INVALID_REQ,
|
|
Message: "jsonrpc must be " + Version,
|
|
Data: msg,
|
|
}
|
|
}
|
|
|
|
return retainClientMessageCodec(msg, err, codec), nil
|
|
}
|
|
|
|
type ClientMessageCodec struct {
|
|
msg *clientMessage
|
|
err error
|
|
codec codec.Codec
|
|
}
|
|
|
|
func (ccm *ClientMessageCodec) MessageType() protocol.MessageType {
|
|
return ccm.msg.MessageType
|
|
}
|
|
|
|
func (ccm *ClientMessageCodec) MessageCodec() (interface{}, error) {
|
|
switch ccm.msg.MessageType {
|
|
case protocol.MessageTypeResponse:
|
|
return newClientResponseCodec(ccm.msg.Message, ccm.codec)
|
|
case protocol.MessageTypeNotification:
|
|
return newClientNotificationCodec(ccm.msg.Message, ccm.codec)
|
|
}
|
|
return nil, fmt.Errorf("RPC: Not supported message type[%v]", ccm.msg.MessageType)
|
|
}
|
|
|
|
func (ccm *ClientMessageCodec) Close() {
|
|
if nil != ccm.msg {
|
|
releaseClientMessage(ccm.msg)
|
|
}
|
|
|
|
releaseClientMessageCodec(ccm)
|
|
}
|
|
|
|
var clientMessagePool sync.Pool
|
|
|
|
func retainClientMessage() *clientMessage {
|
|
v := clientMessagePool.Get()
|
|
if v == nil {
|
|
return &clientMessage{}
|
|
}
|
|
return v.(*clientMessage)
|
|
}
|
|
|
|
func releaseClientMessage(cm *clientMessage) {
|
|
clientMessagePool.Put(cm)
|
|
}
|
|
|
|
var clientMessageCodecPool sync.Pool
|
|
|
|
func retainClientMessageCodec(msg *clientMessage, err error, codec codec.Codec) *ClientMessageCodec {
|
|
var ccm *ClientMessageCodec
|
|
v := clientMessageCodecPool.Get()
|
|
if v == nil {
|
|
ccm = &ClientMessageCodec{}
|
|
} else {
|
|
ccm = v.(*ClientMessageCodec)
|
|
}
|
|
|
|
ccm.msg = msg
|
|
ccm.err = err
|
|
ccm.codec = codec
|
|
|
|
return ccm
|
|
}
|
|
|
|
func releaseClientMessageCodec(cr *ClientMessageCodec) {
|
|
|
|
clientMessageCodecPool.Put(cr)
|
|
}
|