commit 267d8ac0e1404c8518d214d44caee75efc04dc98 Author: crusader Date: Wed Oct 25 23:52:47 2017 +0900 ing diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..7492502 Binary files /dev/null and b/.DS_Store differ diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..20af2f6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +// Place your settings in this file to overwrite default and user settings. +{ +} \ No newline at end of file diff --git a/adapter/http/handler.go b/adapter/http/handler.go new file mode 100644 index 0000000..0e8e2a5 --- /dev/null +++ b/adapter/http/handler.go @@ -0,0 +1,83 @@ +package http + +import ( + "fmt" + "net/http" + "reflect" + "strings" +) + +type HTTPAdapter struct { +} + +// ServeHTTP +func ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + WriteError(w, 405, "rpc: POST method required, received "+r.Method) + return + } + contentType := r.Header.Get("Content-Type") + idx := strings.Index(contentType, ";") + if idx != -1 { + contentType = contentType[:idx] + } + var codec Codec + if contentType == "" && len(s.codecs) == 1 { + // If Content-Type is not set and only one codec has been registered, + // then default to that codec. + for _, c := range s.codecs { + codec = c + } + } else if codec = s.codecs[strings.ToLower(contentType)]; codec == nil { + WriteError(w, 415, "rpc: unrecognized Content-Type: "+contentType) + return + } + // Create a new codec request. + codecReq := codec.NewRequest(r) + // Get service method to be called. + method, errMethod := codecReq.Method() + if errMethod != nil { + codecReq.WriteError(w, 400, errMethod) + return + } + serviceSpec, methodSpec, errGet := s.services.get(method) + if errGet != nil { + codecReq.WriteError(w, 400, errGet) + return + } + // Decode the args. + args := reflect.New(methodSpec.argsType) + if errRead := codecReq.ReadRequest(args.Interface()); errRead != nil { + codecReq.WriteError(w, 400, errRead) + return + } + // Call the service method. + reply := reflect.New(methodSpec.replyType) + errValue := methodSpec.method.Func.Call([]reflect.Value{ + serviceSpec.rcvr, + reflect.ValueOf(r), + args, + reply, + }) + // Cast the result to error if needed. + var errResult error + errInter := errValue[0].Interface() + if errInter != nil { + errResult = errInter.(error) + } + // Prevents Internet Explorer from MIME-sniffing a response away + // from the declared content-type + w.Header().Set("x-content-type-options", "nosniff") + // Encode the response. + if errResult == nil { + codecReq.WriteResponse(w, reply.Interface()) + } else { + codecReq.WriteError(w, 400, errResult) + } +} + +func WriteError(w http.ResponseWriter, status int, msg string) { + w.WriteHeader(status) + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + fmt.Fprint(w, msg) +} diff --git a/codec/codec.go b/codec/codec.go new file mode 100644 index 0000000..29e00f2 --- /dev/null +++ b/codec/codec.go @@ -0,0 +1,27 @@ +package codec + +import ( + "io" +) + +// ---------------------------------------------------------------------------- +// Codec +// ---------------------------------------------------------------------------- + +// Codec creates a CodecRequest to process each request. +type Codec interface { + NewRequest(r io.Reader) (CodecRequest, bool) +} + +// CodecRequest decodes a request and encodes a response using a specific +// serialization scheme. +type CodecRequest interface { + // Reads the request and returns the RPC method name. + Method() (string, error) + // Reads the request filling the RPC method args. + ReadRequest(interface{}) error + // Writes the response using the RPC method reply. + WriteResponse(io.Writer, interface{}) + // Writes an error produced by the server. + WriteError(w io.Writer, status int, err error) +} diff --git a/codec/json/error.go b/codec/json/error.go new file mode 100644 index 0000000..9ebbc86 --- /dev/null +++ b/codec/json/error.go @@ -0,0 +1,34 @@ +package json + +import ( + "errors" +) + +type ErrorCode int + +const ( + E_PARSE ErrorCode = -32700 + E_INVALID_REQ ErrorCode = -32600 + E_NO_METHOD ErrorCode = -32601 + E_BAD_PARAMS ErrorCode = -32602 + E_INTERNAL ErrorCode = -32603 + E_SERVER ErrorCode = -32000 +) + +var ErrNullResult = errors.New("result is null") + +type Error struct { + // A Number that indicates the error type that occurred. + Code ErrorCode `json:"code"` /* required */ + + // A String providing a short description of the error. + // The message SHOULD be limited to a concise single sentence. + Message string `json:"message"` /* required */ + + // A Primitive or Structured value that contains additional information about the error. + Data interface{} `json:"data"` /* optional */ +} + +func (e *Error) Error() string { + return e.Message +} diff --git a/codec/json/server.go b/codec/json/server.go new file mode 100644 index 0000000..f55a370 --- /dev/null +++ b/codec/json/server.go @@ -0,0 +1,195 @@ +package json + +import ( + "encoding/json" + "net/http" +) + +var null = json.RawMessage([]byte("null")) +var Version = "2.0" + +// ---------------------------------------------------------------------------- +// Request and Response +// ---------------------------------------------------------------------------- + +// serverRequest represents a JSON-RPC request received by the server. +type serverRequest struct { + // JSON-RPC protocol. + Version string `json:"jsonrpc"` + + // A String containing the name of the method to be invoked. + Method string `json:"method"` + + // A Structured value to pass as arguments to the method. + Params *json.RawMessage `json:"params"` + + // The request id. MUST be a string, number or null. + // Our implementation will not do type checking for id. + // It will be copied as it is. + ID *json.RawMessage `json:"id"` +} + +// serverResponse represents a JSON-RPC response returned by the server. +type serverResponse struct { + // JSON-RPC protocol. + Version string `json:"jsonrpc"` + + // The Object that was returned by the invoked method. This must be null + // in case there was an error invoking the method. + // As per spec the member will be omitted if there was an error. + Result interface{} `json:"result,omitempty"` + + // An Error object if there was an error invoking the method. It must be + // null if there was no error. + // As per spec the member will be omitted if there was no error. + Error *Error `json:"error,omitempty"` + + // This must be the same id as the request it is responding to. + ID *json.RawMessage `json:"id"` +} + +// ---------------------------------------------------------------------------- +// Codec +// ---------------------------------------------------------------------------- + +// NewcustomCodec returns a new JSON Codec based on passed encoder selector. +func NewCustomCodec(encSel rpc.EncoderSelector) *Codec { + return &Codec{encSel: encSel} +} + +// NewCodec returns a new JSON Codec. +func NewCodec() *Codec { + return NewCustomCodec(rpc.DefaultEncoderSelector) +} + +// Codec creates a CodecRequest to process each request. +type Codec struct { + encSel rpc.EncoderSelector +} + +// NewRequest returns a CodecRequest. +func (c *Codec) NewRequest(r *http.Request) rpc.CodecRequest { + return newCodecRequest(r, c.encSel.Select(r)) +} + +// ---------------------------------------------------------------------------- +// CodecRequest +// ---------------------------------------------------------------------------- + +// newCodecRequest returns a new CodecRequest. +func newCodecRequest(r *http.Request, encoder rpc.Encoder) rpc.CodecRequest { + // Decode the request body and check if RPC method is valid. + req := new(serverRequest) + err := json.NewDecoder(r.Body).Decode(req) + if err != nil { + err = &Error{ + Code: E_PARSE, + Message: err.Error(), + Data: req, + } + } + if req.Version != Version { + err = &Error{ + Code: E_INVALID_REQ, + Message: "jsonrpc must be " + Version, + Data: req, + } + } + r.Body.Close() + return &CodecRequest{request: req, err: err, encoder: encoder} +} + +// CodecRequest decodes and encodes a single request. +type CodecRequest struct { + request *serverRequest + err error + encoder rpc.Encoder +} + +// Method returns the RPC method for the current request. +// +// The method uses a dotted notation as in "Service.Method". +func (c *CodecRequest) Method() (string, error) { + if c.err == nil { + return c.request.Method, nil + } + return "", c.err +} + +// ReadRequest fills the request object for the RPC method. +// +// ReadRequest parses request parameters in two supported forms in +// accordance with http://www.jsonrpc.org/specification#parameter_structures +// +// by-position: params MUST be an Array, containing the +// values in the Server expected order. +// +// by-name: params MUST be an Object, with member names +// that match the Server expected parameter names. The +// absence of expected names MAY result in an error being +// generated. The names MUST match exactly, including +// case, to the method's expected parameters. +func (c *CodecRequest) ReadRequest(args interface{}) error { + if c.err == nil && c.request.Params != nil { + // Note: if c.request.Params is nil it's not an error, it's an optional member. + // JSON params structured object. Unmarshal to the args object. + if err := json.Unmarshal(*c.request.Params, args); err != nil { + // Clearly JSON params is not a structured object, + // fallback and attempt an unmarshal with JSON params as + // array value and RPC params is struct. Unmarshal into + // array containing the request struct. + params := [1]interface{}{args} + if err = json.Unmarshal(*c.request.Params, ¶ms); err != nil { + c.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: c.request.Params, + } + } + } + } + return c.err +} + +// WriteResponse encodes the response and writes it to the ResponseWriter. +func (c *CodecRequest) WriteResponse(w http.ResponseWriter, reply interface{}) { + res := &serverResponse{ + Version: Version, + Result: reply, + Id: c.request.Id, + } + c.writeServerResponse(w, res) +} + +func (c *CodecRequest) WriteError(w http.ResponseWriter, status int, err error) { + jsonErr, ok := err.(*Error) + if !ok { + jsonErr = &Error{ + Code: E_SERVER, + Message: err.Error(), + } + } + res := &serverResponse{ + Version: Version, + Error: jsonErr, + Id: c.request.Id, + } + c.writeServerResponse(w, res) +} + +func (c *CodecRequest) writeServerResponse(w http.ResponseWriter, res *serverResponse) { + // Id is null for notifications and they don't have a response. + if c.request.Id != nil { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + encoder := json.NewEncoder(c.encoder.Encode(w)) + err := encoder.Encode(res) + + // Not sure in which case will this happen. But seems harmless. + if err != nil { + rpc.WriteError(w, 400, err.Error()) + } + } +} + +type EmptyResponse struct { +} diff --git a/encoder_selector.go b/encoder_selector.go new file mode 100644 index 0000000..ceea276 --- /dev/null +++ b/encoder_selector.go @@ -0,0 +1,38 @@ +package rpc + +import ( + "io" + "net/http" +) + +// Encoder interface contains the encoder for http response. +// Eg. gzip, flate compressions. +type Encoder interface { + Encode(w http.ResponseWriter) io.Writer +} + +type encoder struct { +} + +func (_ *encoder) Encode(w http.ResponseWriter) io.Writer { + return w +} + +var DefaultEncoder = &encoder{} + +// EncoderSelector interface provides a way to select encoder using the http +// request. Typically people can use this to check HEADER of the request and +// figure out client capabilities. +// Eg. "Accept-Encoding" tells about supported compressions. +type EncoderSelector interface { + Select(r *http.Request) Encoder +} + +type encoderSelector struct { +} + +func (_ *encoderSelector) Select(_ *http.Request) Encoder { + return DefaultEncoder +} + +var DefaultEncoderSelector = &encoderSelector{} diff --git a/registry.go b/registry.go new file mode 100644 index 0000000..df93320 --- /dev/null +++ b/registry.go @@ -0,0 +1,142 @@ +package rpc + +import ( + "fmt" + "io" + "reflect" + "strings" + + "git.loafle.net/commons_go/rpc/codec" +) + +/** + +Network connection + Handshake(1..n) + Send request + HTTP + + +*/ + +// NewRPCRegistry returns a new RPC registry. +func NewRegistry() Registry { + return &rpcRegistry{ + codecs: make(map[string]codec.Codec), + services: new(serviceMap), + } +} + +type Registry interface { + RegisterCodec(codec codec.Codec, contentType string) + RegisterService(receiver interface{}, name string) error + HasMethod(method string) bool + Invoke(contentType string, reader io.Reader, writer io.Writer) error +} + +// RPCRegistry serves registered RPC services using registered codecs. +type rpcRegistry struct { + codecs map[string]codec.Codec + services *serviceMap +} + +// RegisterCodec adds a new codec to the server. +// +// Codecs are defined to process a given serialization scheme, e.g., JSON or +// XML. A codec is chosen based on the "Content-Type" header from the request, +// excluding the charset definition. +func (r *rpcRegistry) RegisterCodec(codec codec.Codec, contentType string) { + r.codecs[strings.ToLower(contentType)] = codec +} + +// RegisterService adds a new service to the server. +// +// The name parameter is optional: if empty it will be inferred from +// the receiver type name. +// +// Methods from the receiver will be extracted if these rules are satisfied: +// +// - The receiver is exported (begins with an upper case letter) or local +// (defined in the package registering the service). +// - The method name is exported. +// - The method has three arguments: *http.Request, *args, *reply. +// - All three arguments are pointers. +// - The second and third arguments are exported or local. +// - The method has return type error. +// +// All other methods are ignored. +func (r *rpcRegistry) RegisterService(receiver interface{}, name string) error { + return r.services.register(receiver, name) +} + +// HasMethod returns true if the given method is registered. +// +// The method uses a dotted notation as in "Service.Method". +func (r *rpcRegistry) HasMethod(method string) bool { + if _, _, err := r.services.get(method); err == nil { + return true + } + return false +} + +// Invoke execute a method. +// +// Codecs are defined to process a given serialization scheme, e.g., JSON or +// XML. A codec is chosen based on the "Content-Type" header from the request, +// excluding the charset definition. +func (r *rpcRegistry) Invoke(contentType string, reader io.Reader, writer io.Writer) error { + var codec codec.Codec + if contentType == "" && len(r.codecs) == 1 { + // If Content-Type is not set and only one codec has been registered, + // then default to that codec. + for _, c := range r.codecs { + codec = c + } + } else if codec = r.codecs[strings.ToLower(contentType)]; codec == nil { + return fmt.Errorf("Unrecognized Content-Type: %s", contentType) + } + // Create a new codec request. + codecReq, hasResponse := codec.NewRequest(reader) + // Get service method to be called. + method, errMethod := codecReq.Method() + if errMethod != nil { + return errMethod + } + serviceSpec, methodSpec, errGet := r.services.get(method) + if errGet != nil { + return errGet + } + // Decode the args. + args := reflect.New(methodSpec.argsType) + if errRead := codecReq.ReadRequest(args.Interface()); errRead != nil { + return errRead + } + // Call the service method. + reply := reflect.New(methodSpec.replyType) + errValue := methodSpec.method.Func.Call([]reflect.Value{ + serviceSpec.rcvr, + reflect.ValueOf(r), + args, + reply, + }) + + if !hasResponse { + return nil + } + + // Cast the result to error if needed. + var errResult error + errInter := errValue[0].Interface() + if errInter != nil { + errResult = errInter.(error) + } + + // Encode the response. + if errResult == nil { + codecReq.WriteResponse(writer, reply.Interface()) + } else { + codecReq.WriteError(writer, 400, errResult) + } + + return nil +} diff --git a/service_map.go b/service_map.go new file mode 100644 index 0000000..f05d2ed --- /dev/null +++ b/service_map.go @@ -0,0 +1,159 @@ +package rpc + +import ( + "fmt" + "net/http" + "reflect" + "strings" + "sync" + "unicode" + "unicode/utf8" +) + +var ( + // Precompute the reflect.Type of error and http.Request + typeOfError = reflect.TypeOf((*error)(nil)).Elem() + typeOfRequest = reflect.TypeOf((*http.Request)(nil)).Elem() +) + +// ---------------------------------------------------------------------------- +// service +// ---------------------------------------------------------------------------- + +type service struct { + name string // name of service + rcvr reflect.Value // receiver of methods for the service + rcvrType reflect.Type // type of the receiver + methods map[string]*serviceMethod // registered methods +} + +type serviceMethod struct { + method reflect.Method // receiver method + argsType reflect.Type // type of the request argument + replyType reflect.Type // type of the response argument +} + +// ---------------------------------------------------------------------------- +// serviceMap +// ---------------------------------------------------------------------------- + +// serviceMap is a registry for services. +type serviceMap struct { + mutex sync.Mutex + services map[string]*service +} + +// register adds a new service using reflection to extract its methods. +func (m *serviceMap) register(rcvr interface{}, name string) error { + // Setup service. + s := &service{ + name: name, + rcvr: reflect.ValueOf(rcvr), + rcvrType: reflect.TypeOf(rcvr), + methods: make(map[string]*serviceMethod), + } + if name == "" { + s.name = reflect.Indirect(s.rcvr).Type().Name() + if !isExported(s.name) { + return fmt.Errorf("rpc: type %q is not exported", s.name) + } + } + if s.name == "" { + return fmt.Errorf("rpc: no service name for type %q", + s.rcvrType.String()) + } + // Setup methods. + for i := 0; i < s.rcvrType.NumMethod(); i++ { + method := s.rcvrType.Method(i) + mtype := method.Type + // Method must be exported. + if method.PkgPath != "" { + continue + } + // Method needs four ins: receiver, *http.Request, *args, *reply. + if mtype.NumIn() != 4 { + continue + } + // First argument must be a pointer and must be http.Request. + reqType := mtype.In(1) + if reqType.Kind() != reflect.Ptr || reqType.Elem() != typeOfRequest { + continue + } + // Second argument must be a pointer and must be exported. + args := mtype.In(2) + if args.Kind() != reflect.Ptr || !isExportedOrBuiltin(args) { + continue + } + // Third argument must be a pointer and must be exported. + reply := mtype.In(3) + if reply.Kind() != reflect.Ptr || !isExportedOrBuiltin(reply) { + continue + } + // Method needs one out: error. + if mtype.NumOut() != 1 { + continue + } + if returnType := mtype.Out(0); returnType != typeOfError { + continue + } + s.methods[method.Name] = &serviceMethod{ + method: method, + argsType: args.Elem(), + replyType: reply.Elem(), + } + } + if len(s.methods) == 0 { + return fmt.Errorf("rpc: %q has no exported methods of suitable type", + s.name) + } + // Add to the map. + m.mutex.Lock() + defer m.mutex.Unlock() + if m.services == nil { + m.services = make(map[string]*service) + } else if _, ok := m.services[s.name]; ok { + return fmt.Errorf("rpc: service already defined: %q", s.name) + } + m.services[s.name] = s + return nil +} + +// get returns a registered service given a method name. +// +// The method name uses a dotted notation as in "Service.Method". +func (m *serviceMap) get(method string) (*service, *serviceMethod, error) { + parts := strings.Split(method, ".") + if len(parts) != 2 { + err := fmt.Errorf("rpc: service/method request ill-formed: %q", method) + return nil, nil, err + } + m.mutex.Lock() + service := m.services[parts[0]] + m.mutex.Unlock() + if service == nil { + err := fmt.Errorf("rpc: can't find service %q", method) + return nil, nil, err + } + serviceMethod := service.methods[parts[1]] + if serviceMethod == nil { + err := fmt.Errorf("rpc: can't find method %q", method) + return nil, nil, err + } + return service, serviceMethod, nil +} + +// isExported returns true of a string is an exported (upper case) name. +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// isExportedOrBuiltin returns true if a type is exported or a builtin. +func isExportedOrBuiltin(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +}