package json import ( "encoding/json" "fmt" "io" "sync" "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" ) // clientMessage represents a JSON-RPC message sent to a client. type clientMessage struct { Version string `json:"jsonrpc"` MessageType protocol.MessageType `json:"messageType"` Message *json.RawMessage `json:"message"` } // ---------------------------------------------------------------------------- // Codec // ---------------------------------------------------------------------------- // NewCustomClientCodec returns a new JSON Codec based on passed encoder selector. func NewCustomClientCodec(codecSel codec.CodecSelector) protocol.ClientCodec { return &ClientCodec{codecSel: codecSel} } // NewClientCodec returns a new JSON Codec. func NewClientCodec() protocol.ClientCodec { return NewCustomClientCodec(codec.DefaultCodecSelector) } // ClientCodec creates a ClientCodecRequest to process each request. type ClientCodec struct { codecSel codec.CodecSelector } func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error { req := retainClientRequest(method, args, id) defer func() { releaseClientRequest(req) }() encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w)) if err := encoder.Encode(req); nil != err { return err } return nil } // NewMessage returns a ClientMessageCodec. func (cc *ClientCodec) NewMessage(r io.Reader) (protocol.ClientMessageCodec, error) { return newClientMessageCodec(r, cc.codecSel.SelectByReader(r)) } // newClientMessageCodec returns a new ClientMessageCodec. func newClientMessageCodec(r io.Reader, codec codec.Codec) (protocol.ClientMessageCodec, error) { msg := retainClientMessage() err := json.NewDecoder(r).Decode(msg) if err != nil { if err == io.ErrUnexpectedEOF || err == io.EOF { return nil, err } err = &Error{ Code: E_PARSE, Message: err.Error(), Data: msg, } } if msg.Version != Version { err = &Error{ Code: E_INVALID_REQ, Message: "jsonrpc must be " + Version, Data: msg, } } return retainClientMessageCodec(msg, err, codec), nil } type ClientMessageCodec struct { msg *clientMessage err error codec codec.Codec } func (ccm *ClientMessageCodec) MessageType() protocol.MessageType { return ccm.msg.MessageType } func (ccm *ClientMessageCodec) MessageCodec() (interface{}, error) { switch ccm.msg.MessageType { case protocol.MessageTypeResponse: return newClientResponseCodec(ccm.msg.Message, ccm.codec) case protocol.MessageTypeNotification: return newClientNotificationCodec(ccm.msg.Message, ccm.codec) } return nil, fmt.Errorf("RPC: Not supported message type[%v]", ccm.msg.MessageType) } func (ccm *ClientMessageCodec) Close() { if nil != ccm.msg { releaseClientMessage(ccm.msg) } releaseClientMessageCodec(ccm) } var clientMessagePool sync.Pool func retainClientMessage() *clientMessage { v := clientMessagePool.Get() if v == nil { return &clientMessage{} } return v.(*clientMessage) } func releaseClientMessage(cm *clientMessage) { clientMessagePool.Put(cm) } var clientMessageCodecPool sync.Pool func retainClientMessageCodec(msg *clientMessage, err error, codec codec.Codec) *ClientMessageCodec { var ccm *ClientMessageCodec v := clientMessageCodecPool.Get() if v == nil { ccm = &ClientMessageCodec{} } else { ccm = v.(*ClientMessageCodec) } ccm.msg = msg ccm.err = err ccm.codec = codec return ccm } func releaseClientMessageCodec(cr *ClientMessageCodec) { clientMessageCodecPool.Put(cr) }