diff --git a/client/client.go b/client/client.go index 74a18f2..0a31a76 100644 --- a/client/client.go +++ b/client/client.go @@ -399,7 +399,7 @@ func (c *client) handleNotification(resCodec protocol.ClientResponseCodec) error return err } - if nil == c.ch.GetRPCRegistry() { + if nil == c.ch.GetRPCInvoker() { params, err := notiCodec.Params() if nil != err { return err @@ -407,7 +407,7 @@ func (c *client) handleNotification(resCodec protocol.ClientResponseCodec) error return fmt.Errorf("Client: Get Notification[method: %s, params: %v]. But RPC registry is not specified", notiCodec.Method(), params) } - _, err = c.ch.GetRPCRegistry().Invoke(notiCodec) + _, err = c.ch.GetRPCInvoker().Invoke(notiCodec) return err } diff --git a/client/client_handler.go b/client/client_handler.go index 3fe94a8..5ba59f9 100644 --- a/client/client_handler.go +++ b/client/client_handler.go @@ -3,8 +3,8 @@ package client import ( "time" - "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" + crr "git.loafle.net/commons_go/rpc/registry" cuc "git.loafle.net/commons_go/util/context" ) @@ -15,7 +15,7 @@ type ClientHandler interface { Destroy(clientCTX ClientContext) GetCodec() protocol.ClientCodec - GetRPCRegistry() rpc.Registry + GetRPCInvoker() crr.RPCInvoker GetRequestTimeout() time.Duration GetPendingRequests() int GetRequestID() uint64 diff --git a/client/client_handlers.go b/client/client_handlers.go index bfcc73d..da29d24 100644 --- a/client/client_handlers.go +++ b/client/client_handlers.go @@ -6,14 +6,14 @@ import ( "git.loafle.net/commons_go/logging" - "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" + crr "git.loafle.net/commons_go/rpc/registry" cuc "git.loafle.net/commons_go/util/context" ) type ClientHandlers struct { - Codec protocol.ClientCodec - RPCRegistry rpc.Registry + Codec protocol.ClientCodec + RPCInvoker crr.RPCInvoker // Maximum request time. // Default value is DefaultRequestTimeout. @@ -47,8 +47,8 @@ func (ch *ClientHandlers) GetCodec() protocol.ClientCodec { return ch.Codec } -func (ch *ClientHandlers) GetRPCRegistry() rpc.Registry { - return ch.RPCRegistry +func (ch *ClientHandlers) GetRPCInvoker() crr.RPCInvoker { + return ch.RPCInvoker } func (ch *ClientHandlers) GetRequestTimeout() time.Duration { diff --git a/registry/registry.go b/registry/registry.go index 4d146f2..c63b7dc 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,144 +1,10 @@ package registry import ( - "reflect" - "git.loafle.net/commons_go/rpc/protocol" ) -// NewRPCRegistry returns a new RPC registry. -func NewRPCRegistry() RPCRegistry { - return &rpcRegistry{ - services: new(serviceMap), - } -} - -type RPCRegistry interface { - Invoke(codec protocol.RegistryCodec) (result interface{}, err error) - RegisterService(receiver interface{}, name string) error +type RPCInvoker interface { HasMethod(method string) bool + Invoke(codec protocol.RegistryCodec) (result interface{}, err error) } - -// RPCRegistry serves registered RPC services using registered codecs. -type rpcRegistry struct { - services *serviceMap -} - -// RegisterService adds a new service to the server. -// -// The name parameter is optional: if empty it will be inferred from -// the receiver type name. -// -// Methods from the receiver will be extracted if these rules are satisfied: -// -// - The receiver is exported (begins with an upper case letter) or local -// (defined in the package registering the service). -// - The method name is exported. -// - The method has two arguments: *args, *reply. -// - All two arguments are pointers. -// - The first and second arguments are exported or local. -// - The method has return type error. -// -// All other methods are ignored. -func (rr *rpcRegistry) RegisterService(receiver interface{}, name string) error { - return rr.services.register(receiver, name) -} - -// HasMethod returns true if the given method is registered. -// -// The method uses a dotted notation as in "Service.Method". -func (rr *rpcRegistry) HasMethod(method string) bool { - if _, _, err := rr.services.get(method); err == nil { - return true - } - return false -} - -// Invoke execute a method. -// -// Codecs are defined to process a given serialization scheme, e.g., JSON or -// XML. A codec is chosen based on the "Content-Type" header from the request, -// excluding the charset definition. -func (rr *rpcRegistry) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) { - serviceSpec, methodSpec, errGet := rr.services.get(codec.Method()) - if errGet != nil { - return nil, errGet - } - // Decode the args. - - var in []reflect.Value - pValues, pInstances := methodSpec.getParamValues() - - if nil != pInstances && 0 < len(pInstances) { - if errRead := codec.ReadParams(pInstances); errRead != nil { - return nil, errRead - } - pCount := len(pInstances) - in = make([]reflect.Value, pCount+1) - for indexI := 0; indexI < pCount; indexI++ { - if pValues[indexI].Type().Kind() == reflect.Ptr && pValues[indexI].Type().Elem().Kind() != reflect.Struct { - in[indexI+1] = reflect.Indirect(pValues[indexI]) - } else { - in[indexI+1] = pValues[indexI] - } - } - } else { - in = make([]reflect.Value, 1) - } - in[0] = serviceSpec.rcvrV - - // Call the service method. - returnValues := methodSpec.method.Func.Call(in) - - if nil != methodSpec.returnType { - result = returnValues[0].Interface() - if nil != returnValues[1].Interface() { - err = returnValues[1].Interface().(error) - } - } else { - if nil != returnValues[0].Interface() { - err = returnValues[0].Interface().(error) - } - } - - return -} - -// func convertValues(values []interface{}, types []reflect.Type) []reflect.Value { -// c := len(values) -// vs := make([]reflect.Value, c) -// for indexI := 0; indexI < c; indexI++ { -// vs[indexI] = convertValue(&values[indexI], types[indexI]) -// } -// return vs -// } - -// func convertValue(v *interface{}, t reflect.Type) reflect.Value { -// switch t.Kind() { -// case reflect.Bool: -// return reflect.ValueOf(*v).Convert(t) -// case reflect.Float32, reflect.Float64: -// return reflect.ValueOf(*v).Convert(t) -// case reflect.Int, reflect.Int32, reflect.Int64: -// return reflect.ValueOf(*v).Convert(t) -// case reflect.Interface: -// // When we see an interface, we make our own thing -// return reflect.ValueOf(*v).Convert(t) -// case reflect.Map: -// return reflect.ValueOf(*v).M -// return d.decodeMap(name, node, result) -// case reflect.Ptr: -// return d.decodePtr(name, node, result) -// case reflect.Slice: -// return d.decodeSlice(name, node, result) -// case reflect.String: -// return d.decodeString(name, node, result) -// case reflect.Struct: -// return d.decodeStruct(name, node, result) -// default: -// return &parser.PosError{ -// Pos: node.Pos(), -// Err: fmt.Errorf("%s: unknown kind to decode into: %s", name, k.Kind()), -// } -// } -// } diff --git a/registry/rpc_gateway.go b/registry/rpc_gateway.go new file mode 100644 index 0000000..7e28373 --- /dev/null +++ b/registry/rpc_gateway.go @@ -0,0 +1,18 @@ +package registry + +import ( + "fmt" + + "git.loafle.net/commons_go/rpc/protocol" +) + +type RPCGateway struct { +} + +func (rg *RPCGateway) HasMethod(method string) bool { + return false +} + +func (rg *RPCGateway) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) { + return nil, fmt.Errorf("RPC Gateway: Invoke is not implemented") +} diff --git a/registry/rpc_registry.go b/registry/rpc_registry.go new file mode 100644 index 0000000..29a9b6b --- /dev/null +++ b/registry/rpc_registry.go @@ -0,0 +1,143 @@ +package registry + +import ( + "reflect" + + "git.loafle.net/commons_go/rpc/protocol" +) + +// NewRPCRegistry returns a new RPC registry. +func NewRPCRegistry() RPCRegistry { + return &rpcRegistry{ + services: new(serviceMap), + } +} + +type RPCRegistry interface { + RPCInvoker + RegisterService(receiver interface{}, name string) error +} + +// RPCRegistry serves registered RPC services using registered codecs. +type rpcRegistry struct { + services *serviceMap +} + +// RegisterService adds a new service to the server. +// +// The name parameter is optional: if empty it will be inferred from +// the receiver type name. +// +// Methods from the receiver will be extracted if these rules are satisfied: +// +// - The receiver is exported (begins with an upper case letter) or local +// (defined in the package registering the service). +// - The method name is exported. +// - The method has two arguments: *args, *reply. +// - All two arguments are pointers. +// - The first and second arguments are exported or local. +// - The method has return type error. +// +// All other methods are ignored. +func (rr *rpcRegistry) RegisterService(receiver interface{}, name string) error { + return rr.services.register(receiver, name) +} + +// HasMethod returns true if the given method is registered. +// +// The method uses a dotted notation as in "Service.Method". +func (rr *rpcRegistry) HasMethod(method string) bool { + if _, _, err := rr.services.get(method); err == nil { + return true + } + return false +} + +// Invoke execute a method. +// +// Codecs are defined to process a given serialization scheme, e.g., JSON or +// XML. A codec is chosen based on the "Content-Type" header from the request, +// excluding the charset definition. +func (rr *rpcRegistry) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) { + serviceSpec, methodSpec, errGet := rr.services.get(codec.Method()) + if errGet != nil { + return nil, errGet + } + // Decode the args. + + var in []reflect.Value + pValues, pInstances := methodSpec.getParamValues() + + if nil != pInstances && 0 < len(pInstances) { + if errRead := codec.ReadParams(pInstances); errRead != nil { + return nil, errRead + } + pCount := len(pInstances) + in = make([]reflect.Value, pCount+1) + for indexI := 0; indexI < pCount; indexI++ { + if pValues[indexI].Type().Kind() == reflect.Ptr && pValues[indexI].Type().Elem().Kind() != reflect.Struct { + in[indexI+1] = reflect.Indirect(pValues[indexI]) + } else { + in[indexI+1] = pValues[indexI] + } + } + } else { + in = make([]reflect.Value, 1) + } + in[0] = serviceSpec.rcvrV + + // Call the service method. + returnValues := methodSpec.method.Func.Call(in) + + if nil != methodSpec.returnType { + result = returnValues[0].Interface() + if nil != returnValues[1].Interface() { + err = returnValues[1].Interface().(error) + } + } else { + if nil != returnValues[0].Interface() { + err = returnValues[0].Interface().(error) + } + } + + return +} + +// func convertValues(values []interface{}, types []reflect.Type) []reflect.Value { +// c := len(values) +// vs := make([]reflect.Value, c) +// for indexI := 0; indexI < c; indexI++ { +// vs[indexI] = convertValue(&values[indexI], types[indexI]) +// } +// return vs +// } + +// func convertValue(v *interface{}, t reflect.Type) reflect.Value { +// switch t.Kind() { +// case reflect.Bool: +// return reflect.ValueOf(*v).Convert(t) +// case reflect.Float32, reflect.Float64: +// return reflect.ValueOf(*v).Convert(t) +// case reflect.Int, reflect.Int32, reflect.Int64: +// return reflect.ValueOf(*v).Convert(t) +// case reflect.Interface: +// // When we see an interface, we make our own thing +// return reflect.ValueOf(*v).Convert(t) +// case reflect.Map: +// return reflect.ValueOf(*v).M +// return d.decodeMap(name, node, result) +// case reflect.Ptr: +// return d.decodePtr(name, node, result) +// case reflect.Slice: +// return d.decodeSlice(name, node, result) +// case reflect.String: +// return d.decodeString(name, node, result) +// case reflect.Struct: +// return d.decodeStruct(name, node, result) +// default: +// return &parser.PosError{ +// Pos: node.Pos(), +// Err: fmt.Errorf("%s: unknown kind to decode into: %s", name, k.Kind()), +// } +// } +// } diff --git a/server/servlet_handler.go b/server/servlet_handler.go index 05b9903..c413961 100644 --- a/server/servlet_handler.go +++ b/server/servlet_handler.go @@ -1,7 +1,12 @@ package server -import "git.loafle.net/commons_go/rpc" +import ( + cr "git.loafle.net/commons_go/rpc" + crr "git.loafle.net/commons_go/rpc/registry" +) type ServletHandler interface { - rpc.ServletHandler + cr.ServletHandler + + GetRPCInvoker() crr.RPCInvoker } diff --git a/server/servlet_handlers.go b/server/servlet_handlers.go index 351e7a4..87577e2 100644 --- a/server/servlet_handlers.go +++ b/server/servlet_handlers.go @@ -12,15 +12,15 @@ import ( type ServletHandlers struct { rpc.ServletHandlers - RPCRegistry registry.RPCRegistry + RPCInvoker registry.RPCInvoker } func (sh *ServletHandlers) Invoke(servletCTX rpc.ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) { - if !sh.RPCRegistry.HasMethod(requestCodec.Method()) { + if !sh.RPCInvoker.HasMethod(requestCodec.Method()) { return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method()) } - result, err = sh.RPCRegistry.Invoke(requestCodec) + result, err = sh.RPCInvoker.Invoke(requestCodec) if nil != err { if params, pErr := requestCodec.Params(); nil != err { logging.Logger().Error(fmt.Sprintf("RPC Servlet Handler: Read Param of Method[%s] returns error %v", requestCodec.Method(), pErr)) @@ -32,10 +32,14 @@ func (sh *ServletHandlers) Invoke(servletCTX rpc.ServletContext, requestCodec pr return } +func (sh *ServletHandlers) GetRPCInvoker() registry.RPCInvoker { + return sh.RPCInvoker +} + func (sh *ServletHandlers) Validate() { sh.ServletHandlers.Validate() - if nil == sh.RPCRegistry { - logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Registry must be specified")) + if nil == sh.RPCInvoker { + logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Invoker must be specified")) } }