This commit is contained in:
crusader 2017-12-01 16:48:40 +09:00
parent 8b1a5e3c74
commit 81d3fff70a
8 changed files with 188 additions and 152 deletions

View File

@ -399,7 +399,7 @@ func (c *client) handleNotification(resCodec protocol.ClientResponseCodec) error
return err return err
} }
if nil == c.ch.GetRPCRegistry() { if nil == c.ch.GetRPCInvoker() {
params, err := notiCodec.Params() params, err := notiCodec.Params()
if nil != err { if nil != err {
return err return err
@ -407,7 +407,7 @@ func (c *client) handleNotification(resCodec protocol.ClientResponseCodec) error
return fmt.Errorf("Client: Get Notification[method: %s, params: %v]. But RPC registry is not specified", notiCodec.Method(), params) return fmt.Errorf("Client: Get Notification[method: %s, params: %v]. But RPC registry is not specified", notiCodec.Method(), params)
} }
_, err = c.ch.GetRPCRegistry().Invoke(notiCodec) _, err = c.ch.GetRPCInvoker().Invoke(notiCodec)
return err return err
} }

View File

@ -3,8 +3,8 @@ package client
import ( import (
"time" "time"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
crr "git.loafle.net/commons_go/rpc/registry"
cuc "git.loafle.net/commons_go/util/context" cuc "git.loafle.net/commons_go/util/context"
) )
@ -15,7 +15,7 @@ type ClientHandler interface {
Destroy(clientCTX ClientContext) Destroy(clientCTX ClientContext)
GetCodec() protocol.ClientCodec GetCodec() protocol.ClientCodec
GetRPCRegistry() rpc.Registry GetRPCInvoker() crr.RPCInvoker
GetRequestTimeout() time.Duration GetRequestTimeout() time.Duration
GetPendingRequests() int GetPendingRequests() int
GetRequestID() uint64 GetRequestID() uint64

View File

@ -6,14 +6,14 @@ import (
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
crr "git.loafle.net/commons_go/rpc/registry"
cuc "git.loafle.net/commons_go/util/context" cuc "git.loafle.net/commons_go/util/context"
) )
type ClientHandlers struct { type ClientHandlers struct {
Codec protocol.ClientCodec Codec protocol.ClientCodec
RPCRegistry rpc.Registry RPCInvoker crr.RPCInvoker
// Maximum request time. // Maximum request time.
// Default value is DefaultRequestTimeout. // Default value is DefaultRequestTimeout.
@ -47,8 +47,8 @@ func (ch *ClientHandlers) GetCodec() protocol.ClientCodec {
return ch.Codec return ch.Codec
} }
func (ch *ClientHandlers) GetRPCRegistry() rpc.Registry { func (ch *ClientHandlers) GetRPCInvoker() crr.RPCInvoker {
return ch.RPCRegistry return ch.RPCInvoker
} }
func (ch *ClientHandlers) GetRequestTimeout() time.Duration { func (ch *ClientHandlers) GetRequestTimeout() time.Duration {

View File

@ -1,144 +1,10 @@
package registry package registry
import ( import (
"reflect"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
) )
// NewRPCRegistry returns a new RPC registry. type RPCInvoker interface {
func NewRPCRegistry() RPCRegistry {
return &rpcRegistry{
services: new(serviceMap),
}
}
type RPCRegistry interface {
Invoke(codec protocol.RegistryCodec) (result interface{}, err error)
RegisterService(receiver interface{}, name string) error
HasMethod(method string) bool HasMethod(method string) bool
Invoke(codec protocol.RegistryCodec) (result interface{}, err error)
} }
// RPCRegistry serves registered RPC services using registered codecs.
type rpcRegistry struct {
services *serviceMap
}
// RegisterService adds a new service to the server.
//
// The name parameter is optional: if empty it will be inferred from
// the receiver type name.
//
// Methods from the receiver will be extracted if these rules are satisfied:
//
// - The receiver is exported (begins with an upper case letter) or local
// (defined in the package registering the service).
// - The method name is exported.
// - The method has two arguments: *args, *reply.
// - All two arguments are pointers.
// - The first and second arguments are exported or local.
// - The method has return type error.
//
// All other methods are ignored.
func (rr *rpcRegistry) RegisterService(receiver interface{}, name string) error {
return rr.services.register(receiver, name)
}
// HasMethod returns true if the given method is registered.
//
// The method uses a dotted notation as in "Service.Method".
func (rr *rpcRegistry) HasMethod(method string) bool {
if _, _, err := rr.services.get(method); err == nil {
return true
}
return false
}
// Invoke execute a method.
//
// Codecs are defined to process a given serialization scheme, e.g., JSON or
// XML. A codec is chosen based on the "Content-Type" header from the request,
// excluding the charset definition.
func (rr *rpcRegistry) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) {
serviceSpec, methodSpec, errGet := rr.services.get(codec.Method())
if errGet != nil {
return nil, errGet
}
// Decode the args.
var in []reflect.Value
pValues, pInstances := methodSpec.getParamValues()
if nil != pInstances && 0 < len(pInstances) {
if errRead := codec.ReadParams(pInstances); errRead != nil {
return nil, errRead
}
pCount := len(pInstances)
in = make([]reflect.Value, pCount+1)
for indexI := 0; indexI < pCount; indexI++ {
if pValues[indexI].Type().Kind() == reflect.Ptr && pValues[indexI].Type().Elem().Kind() != reflect.Struct {
in[indexI+1] = reflect.Indirect(pValues[indexI])
} else {
in[indexI+1] = pValues[indexI]
}
}
} else {
in = make([]reflect.Value, 1)
}
in[0] = serviceSpec.rcvrV
// Call the service method.
returnValues := methodSpec.method.Func.Call(in)
if nil != methodSpec.returnType {
result = returnValues[0].Interface()
if nil != returnValues[1].Interface() {
err = returnValues[1].Interface().(error)
}
} else {
if nil != returnValues[0].Interface() {
err = returnValues[0].Interface().(error)
}
}
return
}
// func convertValues(values []interface{}, types []reflect.Type) []reflect.Value {
// c := len(values)
// vs := make([]reflect.Value, c)
// for indexI := 0; indexI < c; indexI++ {
// vs[indexI] = convertValue(&values[indexI], types[indexI])
// }
// return vs
// }
// func convertValue(v *interface{}, t reflect.Type) reflect.Value {
// switch t.Kind() {
// case reflect.Bool:
// return reflect.ValueOf(*v).Convert(t)
// case reflect.Float32, reflect.Float64:
// return reflect.ValueOf(*v).Convert(t)
// case reflect.Int, reflect.Int32, reflect.Int64:
// return reflect.ValueOf(*v).Convert(t)
// case reflect.Interface:
// // When we see an interface, we make our own thing
// return reflect.ValueOf(*v).Convert(t)
// case reflect.Map:
// return reflect.ValueOf(*v).M
// return d.decodeMap(name, node, result)
// case reflect.Ptr:
// return d.decodePtr(name, node, result)
// case reflect.Slice:
// return d.decodeSlice(name, node, result)
// case reflect.String:
// return d.decodeString(name, node, result)
// case reflect.Struct:
// return d.decodeStruct(name, node, result)
// default:
// return &parser.PosError{
// Pos: node.Pos(),
// Err: fmt.Errorf("%s: unknown kind to decode into: %s", name, k.Kind()),
// }
// }
// }

18
registry/rpc_gateway.go Normal file
View File

@ -0,0 +1,18 @@
package registry
import (
"fmt"
"git.loafle.net/commons_go/rpc/protocol"
)
type RPCGateway struct {
}
func (rg *RPCGateway) HasMethod(method string) bool {
return false
}
func (rg *RPCGateway) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) {
return nil, fmt.Errorf("RPC Gateway: Invoke is not implemented")
}

143
registry/rpc_registry.go Normal file
View File

@ -0,0 +1,143 @@
package registry
import (
"reflect"
"git.loafle.net/commons_go/rpc/protocol"
)
// NewRPCRegistry returns a new RPC registry.
func NewRPCRegistry() RPCRegistry {
return &rpcRegistry{
services: new(serviceMap),
}
}
type RPCRegistry interface {
RPCInvoker
RegisterService(receiver interface{}, name string) error
}
// RPCRegistry serves registered RPC services using registered codecs.
type rpcRegistry struct {
services *serviceMap
}
// RegisterService adds a new service to the server.
//
// The name parameter is optional: if empty it will be inferred from
// the receiver type name.
//
// Methods from the receiver will be extracted if these rules are satisfied:
//
// - The receiver is exported (begins with an upper case letter) or local
// (defined in the package registering the service).
// - The method name is exported.
// - The method has two arguments: *args, *reply.
// - All two arguments are pointers.
// - The first and second arguments are exported or local.
// - The method has return type error.
//
// All other methods are ignored.
func (rr *rpcRegistry) RegisterService(receiver interface{}, name string) error {
return rr.services.register(receiver, name)
}
// HasMethod returns true if the given method is registered.
//
// The method uses a dotted notation as in "Service.Method".
func (rr *rpcRegistry) HasMethod(method string) bool {
if _, _, err := rr.services.get(method); err == nil {
return true
}
return false
}
// Invoke execute a method.
//
// Codecs are defined to process a given serialization scheme, e.g., JSON or
// XML. A codec is chosen based on the "Content-Type" header from the request,
// excluding the charset definition.
func (rr *rpcRegistry) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) {
serviceSpec, methodSpec, errGet := rr.services.get(codec.Method())
if errGet != nil {
return nil, errGet
}
// Decode the args.
var in []reflect.Value
pValues, pInstances := methodSpec.getParamValues()
if nil != pInstances && 0 < len(pInstances) {
if errRead := codec.ReadParams(pInstances); errRead != nil {
return nil, errRead
}
pCount := len(pInstances)
in = make([]reflect.Value, pCount+1)
for indexI := 0; indexI < pCount; indexI++ {
if pValues[indexI].Type().Kind() == reflect.Ptr && pValues[indexI].Type().Elem().Kind() != reflect.Struct {
in[indexI+1] = reflect.Indirect(pValues[indexI])
} else {
in[indexI+1] = pValues[indexI]
}
}
} else {
in = make([]reflect.Value, 1)
}
in[0] = serviceSpec.rcvrV
// Call the service method.
returnValues := methodSpec.method.Func.Call(in)
if nil != methodSpec.returnType {
result = returnValues[0].Interface()
if nil != returnValues[1].Interface() {
err = returnValues[1].Interface().(error)
}
} else {
if nil != returnValues[0].Interface() {
err = returnValues[0].Interface().(error)
}
}
return
}
// func convertValues(values []interface{}, types []reflect.Type) []reflect.Value {
// c := len(values)
// vs := make([]reflect.Value, c)
// for indexI := 0; indexI < c; indexI++ {
// vs[indexI] = convertValue(&values[indexI], types[indexI])
// }
// return vs
// }
// func convertValue(v *interface{}, t reflect.Type) reflect.Value {
// switch t.Kind() {
// case reflect.Bool:
// return reflect.ValueOf(*v).Convert(t)
// case reflect.Float32, reflect.Float64:
// return reflect.ValueOf(*v).Convert(t)
// case reflect.Int, reflect.Int32, reflect.Int64:
// return reflect.ValueOf(*v).Convert(t)
// case reflect.Interface:
// // When we see an interface, we make our own thing
// return reflect.ValueOf(*v).Convert(t)
// case reflect.Map:
// return reflect.ValueOf(*v).M
// return d.decodeMap(name, node, result)
// case reflect.Ptr:
// return d.decodePtr(name, node, result)
// case reflect.Slice:
// return d.decodeSlice(name, node, result)
// case reflect.String:
// return d.decodeString(name, node, result)
// case reflect.Struct:
// return d.decodeStruct(name, node, result)
// default:
// return &parser.PosError{
// Pos: node.Pos(),
// Err: fmt.Errorf("%s: unknown kind to decode into: %s", name, k.Kind()),
// }
// }
// }

View File

@ -1,7 +1,12 @@
package server package server
import "git.loafle.net/commons_go/rpc" import (
cr "git.loafle.net/commons_go/rpc"
crr "git.loafle.net/commons_go/rpc/registry"
)
type ServletHandler interface { type ServletHandler interface {
rpc.ServletHandler cr.ServletHandler
GetRPCInvoker() crr.RPCInvoker
} }

View File

@ -12,15 +12,15 @@ import (
type ServletHandlers struct { type ServletHandlers struct {
rpc.ServletHandlers rpc.ServletHandlers
RPCRegistry registry.RPCRegistry RPCInvoker registry.RPCInvoker
} }
func (sh *ServletHandlers) Invoke(servletCTX rpc.ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) { func (sh *ServletHandlers) Invoke(servletCTX rpc.ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) {
if !sh.RPCRegistry.HasMethod(requestCodec.Method()) { if !sh.RPCInvoker.HasMethod(requestCodec.Method()) {
return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method()) return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method())
} }
result, err = sh.RPCRegistry.Invoke(requestCodec) result, err = sh.RPCInvoker.Invoke(requestCodec)
if nil != err { if nil != err {
if params, pErr := requestCodec.Params(); nil != err { if params, pErr := requestCodec.Params(); nil != err {
logging.Logger().Error(fmt.Sprintf("RPC Servlet Handler: Read Param of Method[%s] returns error %v", requestCodec.Method(), pErr)) logging.Logger().Error(fmt.Sprintf("RPC Servlet Handler: Read Param of Method[%s] returns error %v", requestCodec.Method(), pErr))
@ -32,10 +32,14 @@ func (sh *ServletHandlers) Invoke(servletCTX rpc.ServletContext, requestCodec pr
return return
} }
func (sh *ServletHandlers) GetRPCInvoker() registry.RPCInvoker {
return sh.RPCInvoker
}
func (sh *ServletHandlers) Validate() { func (sh *ServletHandlers) Validate() {
sh.ServletHandlers.Validate() sh.ServletHandlers.Validate()
if nil == sh.RPCRegistry { if nil == sh.RPCInvoker {
logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Registry must be specified")) logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Invoker must be specified"))
} }
} }