rpc/registry.go

163 lines
4.4 KiB
Go
Raw Normal View History

2017-10-25 14:52:47 +00:00
package rpc
import (
"fmt"
"io"
"reflect"
"strings"
2017-10-26 07:21:35 +00:00
"git.loafle.net/commons_go/rpc/protocol"
2017-10-25 14:52:47 +00:00
)
/**
Network connection
Handshake(1..n)
Send request
HTTP
*/
2017-10-26 07:21:35 +00:00
type WriteHookFunc func(io.Writer)
2017-10-25 14:52:47 +00:00
// NewRPCRegistry returns a new RPC registry.
func NewRegistry() Registry {
return &rpcRegistry{
2017-10-26 07:21:35 +00:00
codecs: make(map[string]protocol.Codec),
2017-10-25 14:52:47 +00:00
services: new(serviceMap),
}
}
type Registry interface {
2017-10-26 07:21:35 +00:00
RegisterCodec(codec protocol.Codec, contentType string)
2017-10-25 14:52:47 +00:00
RegisterService(receiver interface{}, name string) error
HasMethod(method string) bool
2017-10-26 07:21:35 +00:00
Invoke(contentType string, reader io.Reader, writer io.Writer, beforeWrite WriteHookFunc, afterWrite WriteHookFunc) error
2017-10-25 14:52:47 +00:00
}
// RPCRegistry serves registered RPC services using registered codecs.
type rpcRegistry struct {
2017-10-26 07:21:35 +00:00
codecs map[string]protocol.Codec
2017-10-25 14:52:47 +00:00
services *serviceMap
}
// RegisterCodec adds a new codec to the server.
//
// Codecs are defined to process a given serialization scheme, e.g., JSON or
// XML. A codec is chosen based on the "Content-Type" header from the request,
// excluding the charset definition.
2017-10-26 07:21:35 +00:00
func (rr *rpcRegistry) RegisterCodec(codec protocol.Codec, contentType string) {
rr.codecs[strings.ToLower(contentType)] = codec
2017-10-25 14:52:47 +00:00
}
// RegisterService adds a new service to the server.
//
// The name parameter is optional: if empty it will be inferred from
// the receiver type name.
//
// Methods from the receiver will be extracted if these rules are satisfied:
//
// - The receiver is exported (begins with an upper case letter) or local
// (defined in the package registering the service).
// - The method name is exported.
// - The method has three arguments: *http.Request, *args, *reply.
// - All three arguments are pointers.
// - The second and third arguments are exported or local.
// - The method has return type error.
//
// All other methods are ignored.
2017-10-26 07:21:35 +00:00
func (rr *rpcRegistry) RegisterService(receiver interface{}, name string) error {
return rr.services.register(receiver, name)
2017-10-25 14:52:47 +00:00
}
// HasMethod returns true if the given method is registered.
//
// The method uses a dotted notation as in "Service.Method".
2017-10-26 07:21:35 +00:00
func (rr *rpcRegistry) HasMethod(method string) bool {
if _, _, err := rr.services.get(method); err == nil {
2017-10-25 14:52:47 +00:00
return true
}
return false
}
// Invoke execute a method.
//
// Codecs are defined to process a given serialization scheme, e.g., JSON or
// XML. A codec is chosen based on the "Content-Type" header from the request,
// excluding the charset definition.
2017-10-26 07:21:35 +00:00
func (rr *rpcRegistry) Invoke(contentType string, r io.Reader, w io.Writer, beforeWrite WriteHookFunc, afterWrite WriteHookFunc) error {
var codec protocol.Codec
if contentType == "" && len(rr.codecs) == 1 {
2017-10-25 14:52:47 +00:00
// If Content-Type is not set and only one codec has been registered,
// then default to that codec.
2017-10-26 07:21:35 +00:00
for _, c := range rr.codecs {
2017-10-25 14:52:47 +00:00
codec = c
}
2017-10-26 07:21:35 +00:00
} else if codec = rr.codecs[strings.ToLower(contentType)]; codec == nil {
2017-10-25 14:52:47 +00:00
return fmt.Errorf("Unrecognized Content-Type: %s", contentType)
}
2017-10-26 07:21:35 +00:00
2017-10-25 14:52:47 +00:00
// Create a new codec request.
2017-10-26 07:21:35 +00:00
codecReq := codec.NewRequest(r)
2017-10-25 14:52:47 +00:00
// Get service method to be called.
method, errMethod := codecReq.Method()
if errMethod != nil {
2017-10-26 07:21:35 +00:00
return write(codecReq, w, beforeWrite, afterWrite, nil, errMethod)
2017-10-25 14:52:47 +00:00
}
2017-10-26 07:21:35 +00:00
serviceSpec, methodSpec, errGet := rr.services.get(method)
2017-10-25 14:52:47 +00:00
if errGet != nil {
2017-10-26 07:21:35 +00:00
return write(codecReq, w, beforeWrite, afterWrite, nil, errGet)
2017-10-25 14:52:47 +00:00
}
// Decode the args.
args := reflect.New(methodSpec.argsType)
if errRead := codecReq.ReadRequest(args.Interface()); errRead != nil {
2017-10-26 07:21:35 +00:00
return write(codecReq, w, beforeWrite, afterWrite, nil, errRead)
2017-10-25 14:52:47 +00:00
}
// Call the service method.
reply := reflect.New(methodSpec.replyType)
errValue := methodSpec.method.Func.Call([]reflect.Value{
serviceSpec.rcvr,
reflect.ValueOf(r),
args,
reply,
})
// Cast the result to error if needed.
var errResult error
errInter := errValue[0].Interface()
if errInter != nil {
errResult = errInter.(error)
}
2017-10-26 07:21:35 +00:00
if errResult != nil {
return write(codecReq, w, beforeWrite, afterWrite, nil, errResult)
}
return write(codecReq, w, beforeWrite, afterWrite, reply.Interface(), nil)
}
func write(codecReq protocol.CodecRequest, w io.Writer, beforeWrite WriteHookFunc, afterWrite WriteHookFunc, result interface{}, err error) error {
if nil != beforeWrite {
beforeWrite(w)
}
var wErr error
if err == nil {
wErr = codecReq.WriteResponse(w, result)
2017-10-25 14:52:47 +00:00
} else {
2017-10-26 07:21:35 +00:00
wErr = codecReq.WriteError(w, 400, err)
}
if nil != wErr {
return wErr
}
if nil != afterWrite {
afterWrite(w)
2017-10-25 14:52:47 +00:00
}
return nil
}