This commit is contained in:
crusader 2017-11-02 15:39:30 +09:00
parent c6be50ff2c
commit 00dff62f10
14 changed files with 232 additions and 89 deletions

View File

@ -17,6 +17,7 @@ type CallState struct {
Error error
DoneChan chan *CallState
hasResponse bool
canceled uint32
}

View File

@ -24,9 +24,9 @@ type Client interface {
Connect() error
Close()
Notify(method string, args interface{}) error
Call(method string, args interface{}, result interface{}) error
CallTimeout(method string, args interface{}, result interface{}, timeout time.Duration) (err error)
Notify(method string, args ...interface{}) (err error)
Call(result interface{}, method string, args ...interface{}) error
CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error)
}
type client struct {
@ -73,18 +73,28 @@ func (c *client) Close() {
c.stopChan = nil
}
func (c *client) Notify(method string, args interface{}) error {
_, err := c.send(method, args, nil, false, true)
return err
}
func (c *client) Call(method string, args interface{}, result interface{}) error {
return c.CallTimeout(method, args, result, c.ch.GetRequestTimeout())
}
func (c *client) CallTimeout(method string, args interface{}, result interface{}, timeout time.Duration) (err error) {
func (c *client) Notify(method string, args ...interface{}) (err error) {
var cs *CallState
if cs, err = c.send(method, args, result, true, true); nil != err {
if cs, err = c.send(true, false, nil, method, args...); nil != err {
return
}
select {
case <-cs.DoneChan:
err = cs.Error
releaseCallState(cs)
}
return
}
func (c *client) Call(result interface{}, method string, args ...interface{}) error {
return c.CallTimeout(c.ch.GetRequestTimeout(), result, method, args...)
}
func (c *client) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) {
var cs *CallState
if cs, err = c.send(true, true, result, method, args...); nil != err {
return
}
@ -104,7 +114,7 @@ func (c *client) CallTimeout(method string, args interface{}, result interface{}
return
}
func (c *client) send(method string, args interface{}, result interface{}, hasResponse bool, usePool bool) (cs *CallState, err error) {
func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (cs *CallState, err error) {
if !hasResponse {
usePool = true
}
@ -115,13 +125,14 @@ func (c *client) send(method string, args interface{}, result interface{}, hasRe
cs = &CallState{}
}
cs.hasResponse = hasResponse
cs.Method = method
cs.Args = args
cs.DoneChan = make(chan *CallState, 1)
if hasResponse {
cs.ID = c.ch.GetRequestID()
cs.Result = result
cs.DoneChan = make(chan *CallState, 1)
}
select {
@ -242,7 +253,7 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
continue
}
if nil != cs.DoneChan {
if cs.hasResponse {
c.pendingRequestsLock.Lock()
n := len(c.pendingRequests)
c.pendingRequests[cs.ID] = cs
@ -256,8 +267,9 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
}
err = c.ch.GetCodec().Write(c.conn, cs.Method, cs.Args, cs.ID)
if nil == cs.DoneChan {
releaseCallState(cs)
if !cs.hasResponse {
cs.Error = err
cs.done()
}
if nil != err {
err = fmt.Errorf("Client: Cannot send request to wire: [%s]", err)

11
gateway/gateway.go Normal file
View File

@ -0,0 +1,11 @@
package gateway
import (
"git.loafle.net/commons_go/server"
)
func New(sh ServerHandler) server.Server {
s := server.New(sh)
return s
}

View File

@ -0,0 +1,11 @@
package gateway
import (
"git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/rpc/server"
)
type RPCGetewayHandler interface {
server.RPCServerHandler
Handle(codecReq protocol.ServerCodecRequest) (interface{}, error)
}

View File

@ -0,0 +1,20 @@
package gateway
import (
"errors"
"git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/rpc/server"
)
type RPCGetewayHandlers struct {
server.RPCServerHandlers
}
func (rpcGH *RPCGetewayHandlers) Handle(codecReq protocol.ServerCodecRequest) (result interface{}, err error) {
return nil, errors.New("RPC Gateway: Handler method[Handle] is not implement")
}
func (rpcGH *RPCGetewayHandlers) Validate() {
rpcGH.RPCServerHandlers.Validate()
}

View File

@ -0,0 +1,67 @@
package gateway
import (
"log"
"net"
"git.loafle.net/commons_go/server"
)
type ServerHandlers struct {
server.ServerHandlers
RPCGetewayHandler RPCGetewayHandler
}
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
contentType := sh.RPCGetewayHandler.GetContentType(conn)
codec, err := sh.RPCGetewayHandler.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
for {
sh.RPCGetewayHandler.OnPreRead(conn)
// Create a new codec request.
codecReq, errNew := codec.NewRequest(conn)
if nil != errNew {
if sh.IsClientDisconnect(errNew) {
doneChan <- struct{}{}
return
}
log.Printf("RPC Handle: %v", errNew)
doneChan <- struct{}{}
return
}
sh.RPCGetewayHandler.OnPostRead(conn)
result, err := sh.RPCGetewayHandler.Handle(codecReq)
if nil != err {
sh.RPCGetewayHandler.OnPreWriteError(conn, err)
codecReq.WriteError(conn, 400, err)
sh.RPCGetewayHandler.OnPostWriteError(conn, err)
} else {
sh.RPCGetewayHandler.OnPreWriteResult(conn, result)
codecReq.WriteResponse(conn, result)
sh.RPCGetewayHandler.OnPostWriteResult(conn, result)
}
select {
case <-stopChan:
return
default:
}
}
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
if nil == sh.RPCGetewayHandler {
panic("RPCGetewayHandler must be specified.")
}
}

View File

@ -47,17 +47,11 @@ func (ccn *ClientCodecNotify) Method() string {
return ccn.notify.Method
}
func (ccn *ClientCodecNotify) ReadParams(args interface{}) error {
func (ccn *ClientCodecNotify) ReadParams(args []interface{}) error {
if ccn.err == nil && ccn.notify.Params != nil {
// Note: if scr.request.Params is nil it's not an error, it's an optional member.
// JSON params structured object. Unmarshal to the args object.
if err := json.Unmarshal(*ccn.notify.Params, args); err != nil {
// Clearly JSON params is not a structured object,
// fallback and attempt an unmarshal with JSON params as
// array value and RPC params is struct. Unmarshal into
// array containing the request struct.
params := [1]interface{}{args}
if err = json.Unmarshal(*ccn.notify.Params, &params); err != nil {
ccn.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
@ -65,7 +59,6 @@ func (ccn *ClientCodecNotify) ReadParams(args interface{}) error {
}
}
}
}
return ccn.err
}

View File

@ -160,7 +160,7 @@ func (scr *ServerCodecRequest) Method() string {
// absence of expected names MAY result in an error being
// generated. The names MUST match exactly, including
// case, to the method's expected parameters.
func (scr *ServerCodecRequest) ReadParams(args interface{}) error {
func (scr *ServerCodecRequest) ReadParams(args []interface{}) error {
if scr.err == nil && scr.request.Params != nil {
// Note: if scr.request.Params is nil it's not an error, it's an optional member.
// JSON params structured object. Unmarshal to the args object.
@ -169,8 +169,6 @@ func (scr *ServerCodecRequest) ReadParams(args interface{}) error {
// fallback and attempt an unmarshal with JSON params as
// array value and RPC params is struct. Unmarshal into
// array containing the request struct.
params := [1]interface{}{args}
if err = json.Unmarshal(*scr.request.Params, &params); err != nil {
scr.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
@ -178,7 +176,6 @@ func (scr *ServerCodecRequest) ReadParams(args interface{}) error {
}
}
}
}
return scr.err
}

View File

@ -8,6 +8,6 @@ type RegistryCodec interface {
// Reads the request and returns the RPC method name.
Method() string
// Reads the request filling the RPC method args.
ReadParams(interface{}) error
ReadParams([]interface{}) error
Complete()
}

View File

@ -75,28 +75,31 @@ func (rr *rpcRegistry) Invoke(codec protocol.RegistryCodec) (result interface{},
return nil, errGet
}
// Decode the args.
args := reflect.New(methodSpec.argsType)
if errRead := codec.ReadParams(args.Interface()); errRead != nil {
var in []reflect.Value
paramValues, paramInstances := methodSpec.getParams()
if nil != paramValues {
in = make([]reflect.Value, len(paramValues)+1)
if errRead := codec.ReadParams(paramInstances); errRead != nil {
return nil, errRead
}
for indexI := 0; indexI < len(paramValues); indexI++ {
in[indexI+1] = paramValues[indexI]
}
} else {
in = make([]reflect.Value, 1)
}
in[0] = serviceSpec.rcvr
// Call the service method.
reply := reflect.New(methodSpec.replyType)
errValue := methodSpec.method.Func.Call([]reflect.Value{
serviceSpec.rcvr,
args,
reply,
})
returnValues := methodSpec.method.Func.Call(in)
// Cast the result to error if needed.
var errResult error
errInter := errValue[0].Interface()
if errInter != nil {
errResult = errInter.(error)
if nil != methodSpec.returnType {
result = returnValues[0].Interface()
err = returnValues[1].Interface().(error)
} else {
err = returnValues[0].Interface().(error)
}
if errResult != nil {
return nil, errResult
}
return reply.Interface(), nil
return
}

View File

@ -19,6 +19,6 @@ type RPCServerHandler interface {
OnPreWriteError(w io.Writer, err error)
OnPostWriteError(w io.Writer, err error)
getCodec(contentType string) (protocol.ServerCodec, error)
GetCodec(contentType string) (protocol.ServerCodec, error)
invoke(codec protocol.RegistryCodec) (result interface{}, err error)
}

View File

@ -60,7 +60,7 @@ func (rpcSH *RPCServerHandlers) Validate() {
}
}
func (rpcSH *RPCServerHandlers) getCodec(contentType string) (protocol.ServerCodec, error) {
func (rpcSH *RPCServerHandlers) GetCodec(contentType string) (protocol.ServerCodec, error) {
var codec protocol.ServerCodec
if contentType == "" && len(rpcSH.codecs) == 1 {
// If Content-Type is not set and only one codec has been registered,

View File

@ -15,7 +15,7 @@ type ServerHandlers struct {
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
contentType := sh.RPCServerHandler.GetContentType(conn)
codec, err := sh.RPCServerHandler.getCodec(contentType)
codec, err := sh.RPCServerHandler.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}

View File

@ -27,10 +27,26 @@ type service struct {
type serviceMethod struct {
method reflect.Method // receiver method
argsType reflect.Type // type of the request argument
replyType reflect.Type // type of the response argument
paramTypes []reflect.Type // type of the request argument
returnType reflect.Type // type of the response argument
}
func (sm *serviceMethod) getParams() (values []reflect.Value, instances []interface{}) {
if nil == sm.paramTypes || 0 == len(sm.paramTypes) {
return nil, nil
}
pCount := len(sm.paramTypes)
values = make([]reflect.Value, pCount)
instances = make([]interface{}, pCount)
for indexI := 0; indexI < pCount; indexI++ {
values[indexI] = reflect.New(sm.paramTypes[indexI])
instances[indexI] = values[indexI].Interface()
}
return
}
// ----------------------------------------------------------------------------
// serviceMap
@ -63,6 +79,7 @@ func (m *serviceMap) register(rcvr interface{}, name string) error {
}
// Setup methods.
Loop:
for i := 0; i < s.rcvrType.NumMethod(); i++ {
method := s.rcvrType.Method(i)
mtype := method.Type
@ -70,37 +87,48 @@ func (m *serviceMap) register(rcvr interface{}, name string) error {
if method.PkgPath != "" {
continue
}
// Method needs four ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
continue
var paramTypes []reflect.Type
mCount := mtype.NumIn()
if 0 < mCount {
paramTypes = make([]reflect.Type, mCount)
for indexI := 0; indexI < mCount; indexI++ {
param := mtype.In(indexI)
if !isExportedOrBuiltin(param) {
continue Loop
}
// First argument must be a pointer and must be exported.
args := mtype.In(1)
if args.Kind() != reflect.Ptr || !isExportedOrBuiltin(args) {
continue
paramTypes[indexI] = param.Elem()
}
// Second argument must be a pointer and must be exported.
reply := mtype.In(2)
if reply.Kind() != reflect.Ptr || !isExportedOrBuiltin(reply) {
}
var returnType reflect.Type
switch mtype.NumOut() {
case 1:
if returnType := mtype.Out(0); returnType != typeOfError {
continue Loop
}
case 2:
if returnType := mtype.Out(0); !isExportedOrBuiltin(returnType) {
continue Loop
}
if returnType := mtype.Out(1); returnType != typeOfError {
continue Loop
}
returnType = mtype.Out(1).Elem()
default:
continue
}
// Method needs one out: error.
if mtype.NumOut() != 1 {
continue
}
if returnType := mtype.Out(0); returnType != typeOfError {
continue
}
s.methods[method.Name] = &serviceMethod{
method: method,
argsType: args.Elem(),
replyType: reply.Elem(),
paramTypes: paramTypes,
returnType: returnType,
}
}
if len(s.methods) == 0 {
return fmt.Errorf("rpc: %q has no exported methods of suitable type",
s.name)
return fmt.Errorf("rpc: %q has no exported methods of suitable type", s.name)
}
// Add to the map.
m.mutex.Lock()