package json import ( "encoding/json" "io" "sync" "git.loafle.net/commons_go/rpc/encode" "git.loafle.net/commons_go/rpc/protocol" ) var null = json.RawMessage([]byte("null")) // ---------------------------------------------------------------------------- // Request and Response // ---------------------------------------------------------------------------- // serverRequest represents a JSON-RPC request received by the server. type serverRequest struct { // JSON-RPC protocol. Version string `json:"jsonrpc"` // A String containing the name of the method to be invoked. Method string `json:"method"` // A Structured value to pass as arguments to the method. Params *json.RawMessage `json:"params"` // The request id. MUST be a string, number or null. // Our implementation will not do type checking for id. // It will be copied as it is. ID *json.RawMessage `json:"id"` } // serverResponse represents a JSON-RPC response returned by the server. type serverResponse struct { // JSON-RPC protocol. Version string `json:"jsonrpc"` // The Object that was returned by the invoked method. This must be null // in case there was an error invoking the method. // As per spec the member will be omitted if there was an error. Result interface{} `json:"result,omitempty"` // An Error object if there was an error invoking the method. It must be // null if there was no error. // As per spec the member will be omitted if there was no error. Error *Error `json:"error,omitempty"` // This must be the same id as the request it is responding to. ID *json.RawMessage `json:"id"` } // ---------------------------------------------------------------------------- // Codec // ---------------------------------------------------------------------------- // NewCustomServerCodec returns a new JSON Codec based on passed encoder selector. func NewCustomServerCodec(encSel encode.EncoderSelector) *ServerCodec { return &ServerCodec{encSel: encSel} } // NewServerCodec returns a new JSON Codec. func NewServerCodec() *ServerCodec { return NewCustomServerCodec(encode.DefaultEncoderSelector) } // ServerCodec creates a ServerCodecRequest to process each request. type ServerCodec struct { encSel encode.EncoderSelector notifyMtx sync.Mutex notify clientRequest } // NewRequest returns a ServerCodecRequest. func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerCodecRequest, error) { return newServerCodecRequest(r, sc.encSel.SelectByReader(r)) } // WriteNotify send a notification from server to client. func (sc *ServerCodec) WriteNotify(w io.Writer, method string, args interface{}) error { sc.notifyMtx.Lock() sc.notify.Version = Version sc.notify.Method = method sc.notify.Params = args encoder := json.NewEncoder(sc.encSel.SelectByWriter(w).Encode(w)) err := encoder.Encode(&sc.notify) sc.notifyMtx.Unlock() // Not sure in which case will this happen. But seems harmless. if err != nil { return err } return nil } // ---------------------------------------------------------------------------- // ServerCodecRequest // ---------------------------------------------------------------------------- // newCodecRequest returns a new ServerCodecRequest. func newServerCodecRequest(r io.Reader, encoder encode.Encoder) (protocol.ServerCodecRequest, error) { // Decode the request body and check if RPC method is valid. req := retainServerRequest() err := json.NewDecoder(r).Decode(req) if err == io.ErrUnexpectedEOF || err == io.EOF { return nil, err } if err != nil { err = &Error{ Code: E_PARSE, Message: err.Error(), Data: req, } } if req.Version != Version { err = &Error{ Code: E_INVALID_REQ, Message: "jsonrpc must be " + Version, Data: req, } } return retainServerCodecRequest(req, err, encoder), nil } // CodecRequest decodes and encodes a single request. type ServerCodecRequest struct { request *serverRequest err error encoder encode.Encoder } // Complete is callback function that end of request. func (scr *ServerCodecRequest) Complete() { if nil != scr.request { releaseServerRequest(scr.request) } releaseServerCodecRequest(scr) } // Method returns the RPC method for the current request. // // The method uses a dotted notation as in "Service.Method". func (scr *ServerCodecRequest) Method() string { return scr.request.Method } // ReadRequest fills the request object for the RPC method. // // ReadRequest parses request parameters in two supported forms in // accordance with http://www.jsonrpc.org/specification#parameter_structures // // by-position: params MUST be an Array, containing the // values in the Server expected order. // // by-name: params MUST be an Object, with member names // that match the Server expected parameter names. The // absence of expected names MAY result in an error being // generated. The names MUST match exactly, including // case, to the method's expected parameters. func (scr *ServerCodecRequest) ReadParams(args []interface{}) error { if scr.err == nil && scr.request.Params != nil { // Note: if scr.request.Params is nil it's not an error, it's an optional member. // JSON params structured object. Unmarshal to the args object. // if err := json.Unmarshal(*scr.request.Params, &args); err != nil { // // Clearly JSON params is not a structured object, // // fallback and attempt an unmarshal with JSON params as // // array value and RPC params is struct. Unmarshal into // // array containing the request struct. // scr.err = &Error{ // Code: E_INVALID_REQ, // Message: err.Error(), // Data: scr.request.Params, // } // } raws := make([]json.RawMessage, len(args)) if err := json.Unmarshal(*scr.request.Params, &raws); err != nil { scr.err = &Error{ Code: E_INVALID_REQ, Message: err.Error(), Data: scr.request.Params, } return scr.err } for indexI := 0; indexI < len(args); indexI++ { raw := raws[indexI] arg := args[indexI] if err := json.Unmarshal(raw, &arg); err != nil { scr.err = &Error{ Code: E_INVALID_REQ, Message: err.Error(), Data: scr.request.Params, } return scr.err } } } return scr.err } func (scr *ServerCodecRequest) Params() ([]string, error) { if scr.err == nil && scr.request.Params != nil { var results []string if err := json.Unmarshal(*scr.request.Params, &results); err != nil { scr.err = &Error{ Code: E_INVALID_REQ, Message: err.Error(), Data: scr.request.Params, } return nil, scr.err } return results, nil } return nil, scr.err } // WriteResponse encodes the response and writes it to the ResponseWriter. func (scr *ServerCodecRequest) WriteResponse(w io.Writer, reply interface{}) error { res := retainServerResponse(Version, reply, nil, scr.request.ID) return scr.writeServerResponse(w, res) } // WriteError encodes the response and writes it to the ResponseWriter. func (scr *ServerCodecRequest) WriteError(w io.Writer, status int, err error) error { jsonErr, ok := err.(*Error) if !ok { jsonErr = &Error{ Code: E_SERVER, Message: err.Error(), } } res := retainServerResponse(Version, nil, jsonErr, scr.request.ID) return scr.writeServerResponse(w, res) } func (scr *ServerCodecRequest) writeServerResponse(w io.Writer, res *serverResponse) error { defer func() { if nil != res { releaseServerResponse(res) } }() // ID is null for notifications and they don't have a response. if scr.request.ID != nil { encoder := json.NewEncoder(scr.encoder.Encode(w)) err := encoder.Encode(res) // Not sure in which case will this happen. But seems harmless. if err != nil { return err } } return nil } type EmptyResponse struct { } var serverCodecRequestPool sync.Pool func retainServerCodecRequest(request *serverRequest, err error, encoder encode.Encoder) *ServerCodecRequest { var scr *ServerCodecRequest v := serverCodecRequestPool.Get() if v == nil { scr = &ServerCodecRequest{} } else { scr = v.(*ServerCodecRequest) } scr.request = request scr.err = err scr.encoder = encoder return scr } func releaseServerCodecRequest(scr *ServerCodecRequest) { scr.request = nil scr.err = nil scr.encoder = nil serverCodecRequestPool.Put(scr) } var serverRequestPool sync.Pool func retainServerRequest() *serverRequest { v := serverRequestPool.Get() if v == nil { return &serverRequest{} } return v.(*serverRequest) } func releaseServerRequest(sr *serverRequest) { sr.Method = "" sr.Params = nil sr.ID = nil serverRequestPool.Put(sr) } var serverResponsePool sync.Pool func retainServerResponse(version string, result interface{}, err *Error, id *json.RawMessage) *serverResponse { var sr *serverResponse v := serverResponsePool.Get() if v == nil { sr = &serverResponse{} } else { sr = v.(*serverResponse) } sr.Version = version sr.Result = result sr.Error = err sr.ID = id return sr } func releaseServerResponse(sr *serverResponse) { sr.Version = "" sr.Result = nil sr.Error = nil sr.ID = nil serverResponsePool.Put(sr) }