rpc/protocol/json/server.go
crusader e868bf48ee ing
2017-11-03 16:58:40 +09:00

294 lines
7.9 KiB
Go

package json
import (
"encoding/json"
"io"
"sync"
"git.loafle.net/commons_go/rpc/encode"
"git.loafle.net/commons_go/rpc/protocol"
)
var null = json.RawMessage([]byte("null"))
// ----------------------------------------------------------------------------
// Request and Response
// ----------------------------------------------------------------------------
// serverRequest represents a JSON-RPC request received by the server.
type serverRequest struct {
// JSON-RPC protocol.
Version string `json:"jsonrpc"`
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// A Structured value to pass as arguments to the method.
Params *json.RawMessage `json:"params"`
// The request id. MUST be a string, number or null.
// Our implementation will not do type checking for id.
// It will be copied as it is.
ID *json.RawMessage `json:"id"`
}
// serverResponse represents a JSON-RPC response returned by the server.
type serverResponse struct {
// JSON-RPC protocol.
Version string `json:"jsonrpc"`
// The Object that was returned by the invoked method. This must be null
// in case there was an error invoking the method.
// As per spec the member will be omitted if there was an error.
Result interface{} `json:"result,omitempty"`
// An Error object if there was an error invoking the method. It must be
// null if there was no error.
// As per spec the member will be omitted if there was no error.
Error *Error `json:"error,omitempty"`
// This must be the same id as the request it is responding to.
ID *json.RawMessage `json:"id"`
}
// ----------------------------------------------------------------------------
// Codec
// ----------------------------------------------------------------------------
// NewCustomServerCodec returns a new JSON Codec based on passed encoder selector.
func NewCustomServerCodec(encSel encode.EncoderSelector) *ServerCodec {
return &ServerCodec{encSel: encSel}
}
// NewServerCodec returns a new JSON Codec.
func NewServerCodec() *ServerCodec {
return NewCustomServerCodec(encode.DefaultEncoderSelector)
}
// ServerCodec creates a ServerCodecRequest to process each request.
type ServerCodec struct {
encSel encode.EncoderSelector
notifyMtx sync.Mutex
notify clientRequest
}
// NewRequest returns a ServerCodecRequest.
func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerCodecRequest, error) {
return newServerCodecRequest(r, sc.encSel.SelectByReader(r))
}
// WriteNotify send a notification from server to client.
func (sc *ServerCodec) WriteNotify(w io.Writer, method string, args interface{}) error {
sc.notifyMtx.Lock()
sc.notify.Version = Version
sc.notify.Method = method
sc.notify.Params = args
encoder := json.NewEncoder(sc.encSel.SelectByWriter(w).Encode(w))
err := encoder.Encode(&sc.notify)
sc.notifyMtx.Unlock()
// Not sure in which case will this happen. But seems harmless.
if err != nil {
return err
}
return nil
}
// ----------------------------------------------------------------------------
// ServerCodecRequest
// ----------------------------------------------------------------------------
// newCodecRequest returns a new ServerCodecRequest.
func newServerCodecRequest(r io.Reader, encoder encode.Encoder) (protocol.ServerCodecRequest, error) {
// Decode the request body and check if RPC method is valid.
req := retainServerRequest()
err := json.NewDecoder(r).Decode(req)
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, err
}
if err != nil {
err = &Error{
Code: E_PARSE,
Message: err.Error(),
Data: req,
}
}
if req.Version != Version {
err = &Error{
Code: E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: req,
}
}
return retainServerCodecRequest(req, err, encoder), nil
}
// CodecRequest decodes and encodes a single request.
type ServerCodecRequest struct {
request *serverRequest
err error
encoder encode.Encoder
}
// Complete is callback function that end of request.
func (scr *ServerCodecRequest) Complete() {
if nil != scr.request {
releaseServerRequest(scr.request)
}
releaseServerCodecRequest(scr)
}
// Method returns the RPC method for the current request.
//
// The method uses a dotted notation as in "Service.Method".
func (scr *ServerCodecRequest) Method() string {
return scr.request.Method
}
// ReadRequest fills the request object for the RPC method.
//
// ReadRequest parses request parameters in two supported forms in
// accordance with http://www.jsonrpc.org/specification#parameter_structures
//
// by-position: params MUST be an Array, containing the
// values in the Server expected order.
//
// by-name: params MUST be an Object, with member names
// that match the Server expected parameter names. The
// absence of expected names MAY result in an error being
// generated. The names MUST match exactly, including
// case, to the method's expected parameters.
func (scr *ServerCodecRequest) ReadParams(args []interface{}) error {
if scr.err == nil && scr.request.Params != nil {
// Note: if scr.request.Params is nil it's not an error, it's an optional member.
// JSON params structured object. Unmarshal to the args object.
if err := json.Unmarshal(*scr.request.Params, &args); err != nil {
// Clearly JSON params is not a structured object,
// fallback and attempt an unmarshal with JSON params as
// array value and RPC params is struct. Unmarshal into
// array containing the request struct.
scr.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: scr.request.Params,
}
}
}
return scr.err
}
// WriteResponse encodes the response and writes it to the ResponseWriter.
func (scr *ServerCodecRequest) WriteResponse(w io.Writer, reply interface{}) error {
res := retainServerResponse(Version, reply, nil, scr.request.ID)
return scr.writeServerResponse(w, res)
}
// WriteError encodes the response and writes it to the ResponseWriter.
func (scr *ServerCodecRequest) WriteError(w io.Writer, status int, err error) error {
jsonErr, ok := err.(*Error)
if !ok {
jsonErr = &Error{
Code: E_SERVER,
Message: err.Error(),
}
}
res := retainServerResponse(Version, nil, jsonErr, scr.request.ID)
return scr.writeServerResponse(w, res)
}
func (scr *ServerCodecRequest) writeServerResponse(w io.Writer, res *serverResponse) error {
defer func() {
if nil != res {
releaseServerResponse(res)
}
}()
// ID is null for notifications and they don't have a response.
if scr.request.ID != nil {
encoder := json.NewEncoder(scr.encoder.Encode(w))
err := encoder.Encode(res)
// Not sure in which case will this happen. But seems harmless.
if err != nil {
return err
}
}
return nil
}
type EmptyResponse struct {
}
var serverCodecRequestPool sync.Pool
func retainServerCodecRequest(request *serverRequest, err error, encoder encode.Encoder) *ServerCodecRequest {
var scr *ServerCodecRequest
v := serverCodecRequestPool.Get()
if v == nil {
scr = &ServerCodecRequest{}
} else {
scr = v.(*ServerCodecRequest)
}
scr.request = request
scr.err = err
scr.encoder = encoder
return scr
}
func releaseServerCodecRequest(scr *ServerCodecRequest) {
scr.request = nil
scr.err = nil
scr.encoder = nil
serverCodecRequestPool.Put(scr)
}
var serverRequestPool sync.Pool
func retainServerRequest() *serverRequest {
v := serverRequestPool.Get()
if v == nil {
return &serverRequest{}
}
return v.(*serverRequest)
}
func releaseServerRequest(sr *serverRequest) {
sr.Method = ""
sr.Params = nil
sr.ID = nil
serverRequestPool.Put(sr)
}
var serverResponsePool sync.Pool
func retainServerResponse(version string, result interface{}, err *Error, id *json.RawMessage) *serverResponse {
var sr *serverResponse
v := serverResponsePool.Get()
if v == nil {
sr = &serverResponse{}
} else {
sr = v.(*serverResponse)
}
sr.Version = version
sr.Result = result
sr.Error = err
sr.ID = id
return sr
}
func releaseServerResponse(sr *serverResponse) {
sr.Version = ""
sr.Result = nil
sr.Error = nil
sr.ID = nil
serverResponsePool.Put(sr)
}