143 lines
3.8 KiB
Go
143 lines
3.8 KiB
Go
|
package rpc
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"reflect"
|
||
|
"strings"
|
||
|
|
||
|
"git.loafle.net/commons_go/rpc/codec"
|
||
|
)
|
||
|
|
||
|
/**
|
||
|
|
||
|
Network connection
|
||
|
Handshake(1..n)
|
||
|
Send request
|
||
|
HTTP
|
||
|
|
||
|
|
||
|
*/
|
||
|
|
||
|
// NewRPCRegistry returns a new RPC registry.
|
||
|
func NewRegistry() Registry {
|
||
|
return &rpcRegistry{
|
||
|
codecs: make(map[string]codec.Codec),
|
||
|
services: new(serviceMap),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type Registry interface {
|
||
|
RegisterCodec(codec codec.Codec, contentType string)
|
||
|
RegisterService(receiver interface{}, name string) error
|
||
|
HasMethod(method string) bool
|
||
|
Invoke(contentType string, reader io.Reader, writer io.Writer) error
|
||
|
}
|
||
|
|
||
|
// RPCRegistry serves registered RPC services using registered codecs.
|
||
|
type rpcRegistry struct {
|
||
|
codecs map[string]codec.Codec
|
||
|
services *serviceMap
|
||
|
}
|
||
|
|
||
|
// RegisterCodec adds a new codec to the server.
|
||
|
//
|
||
|
// Codecs are defined to process a given serialization scheme, e.g., JSON or
|
||
|
// XML. A codec is chosen based on the "Content-Type" header from the request,
|
||
|
// excluding the charset definition.
|
||
|
func (r *rpcRegistry) RegisterCodec(codec codec.Codec, contentType string) {
|
||
|
r.codecs[strings.ToLower(contentType)] = codec
|
||
|
}
|
||
|
|
||
|
// RegisterService adds a new service to the server.
|
||
|
//
|
||
|
// The name parameter is optional: if empty it will be inferred from
|
||
|
// the receiver type name.
|
||
|
//
|
||
|
// Methods from the receiver will be extracted if these rules are satisfied:
|
||
|
//
|
||
|
// - The receiver is exported (begins with an upper case letter) or local
|
||
|
// (defined in the package registering the service).
|
||
|
// - The method name is exported.
|
||
|
// - The method has three arguments: *http.Request, *args, *reply.
|
||
|
// - All three arguments are pointers.
|
||
|
// - The second and third arguments are exported or local.
|
||
|
// - The method has return type error.
|
||
|
//
|
||
|
// All other methods are ignored.
|
||
|
func (r *rpcRegistry) RegisterService(receiver interface{}, name string) error {
|
||
|
return r.services.register(receiver, name)
|
||
|
}
|
||
|
|
||
|
// HasMethod returns true if the given method is registered.
|
||
|
//
|
||
|
// The method uses a dotted notation as in "Service.Method".
|
||
|
func (r *rpcRegistry) HasMethod(method string) bool {
|
||
|
if _, _, err := r.services.get(method); err == nil {
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// Invoke execute a method.
|
||
|
//
|
||
|
// Codecs are defined to process a given serialization scheme, e.g., JSON or
|
||
|
// XML. A codec is chosen based on the "Content-Type" header from the request,
|
||
|
// excluding the charset definition.
|
||
|
func (r *rpcRegistry) Invoke(contentType string, reader io.Reader, writer io.Writer) error {
|
||
|
var codec codec.Codec
|
||
|
if contentType == "" && len(r.codecs) == 1 {
|
||
|
// If Content-Type is not set and only one codec has been registered,
|
||
|
// then default to that codec.
|
||
|
for _, c := range r.codecs {
|
||
|
codec = c
|
||
|
}
|
||
|
} else if codec = r.codecs[strings.ToLower(contentType)]; codec == nil {
|
||
|
return fmt.Errorf("Unrecognized Content-Type: %s", contentType)
|
||
|
}
|
||
|
// Create a new codec request.
|
||
|
codecReq, hasResponse := codec.NewRequest(reader)
|
||
|
// Get service method to be called.
|
||
|
method, errMethod := codecReq.Method()
|
||
|
if errMethod != nil {
|
||
|
return errMethod
|
||
|
}
|
||
|
serviceSpec, methodSpec, errGet := r.services.get(method)
|
||
|
if errGet != nil {
|
||
|
return errGet
|
||
|
}
|
||
|
// Decode the args.
|
||
|
args := reflect.New(methodSpec.argsType)
|
||
|
if errRead := codecReq.ReadRequest(args.Interface()); errRead != nil {
|
||
|
return errRead
|
||
|
}
|
||
|
// Call the service method.
|
||
|
reply := reflect.New(methodSpec.replyType)
|
||
|
errValue := methodSpec.method.Func.Call([]reflect.Value{
|
||
|
serviceSpec.rcvr,
|
||
|
reflect.ValueOf(r),
|
||
|
args,
|
||
|
reply,
|
||
|
})
|
||
|
|
||
|
if !hasResponse {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Cast the result to error if needed.
|
||
|
var errResult error
|
||
|
errInter := errValue[0].Interface()
|
||
|
if errInter != nil {
|
||
|
errResult = errInter.(error)
|
||
|
}
|
||
|
|
||
|
// Encode the response.
|
||
|
if errResult == nil {
|
||
|
codecReq.WriteResponse(writer, reply.Interface())
|
||
|
} else {
|
||
|
codecReq.WriteError(writer, 400, errResult)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|