From 00dff62f104cc6e2c5c1626fe79a8d25a2362feb Mon Sep 17 00:00:00 2001 From: crusader Date: Thu, 2 Nov 2017 15:39:30 +0900 Subject: [PATCH] ing --- client/call.go | 3 +- client/client.go | 50 +++++++++++++-------- gateway/gateway.go | 11 +++++ gateway/rpc_gateway_handler.go | 11 +++++ gateway/rpc_gateway_handlers.go | 20 +++++++++ gateway/server_handlers.go | 67 +++++++++++++++++++++++++++ protocol/json/client_notify.go | 17 +++---- protocol/json/server.go | 13 +++--- protocol/registry_codec.go | 2 +- registry.go | 41 +++++++++-------- server/rpc_server_handler.go | 2 +- server/rpc_server_handlers.go | 2 +- server/server_handlers.go | 2 +- service_map.go | 80 ++++++++++++++++++++++----------- 14 files changed, 232 insertions(+), 89 deletions(-) create mode 100644 gateway/gateway.go create mode 100644 gateway/rpc_gateway_handler.go create mode 100644 gateway/rpc_gateway_handlers.go create mode 100644 gateway/server_handlers.go diff --git a/client/call.go b/client/call.go index 511600d..a63b848 100644 --- a/client/call.go +++ b/client/call.go @@ -17,7 +17,8 @@ type CallState struct { Error error DoneChan chan *CallState - canceled uint32 + hasResponse bool + canceled uint32 } func (cs *CallState) done() { diff --git a/client/client.go b/client/client.go index 83bda92..4b3dd7b 100644 --- a/client/client.go +++ b/client/client.go @@ -24,9 +24,9 @@ type Client interface { Connect() error Close() - Notify(method string, args interface{}) error - Call(method string, args interface{}, result interface{}) error - CallTimeout(method string, args interface{}, result interface{}, timeout time.Duration) (err error) + Notify(method string, args ...interface{}) (err error) + Call(result interface{}, method string, args ...interface{}) error + CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) } type client struct { @@ -73,18 +73,28 @@ func (c *client) Close() { c.stopChan = nil } -func (c *client) Notify(method string, args interface{}) error { - _, err := c.send(method, args, nil, false, true) - return err -} - -func (c *client) Call(method string, args interface{}, result interface{}) error { - return c.CallTimeout(method, args, result, c.ch.GetRequestTimeout()) -} - -func (c *client) CallTimeout(method string, args interface{}, result interface{}, timeout time.Duration) (err error) { +func (c *client) Notify(method string, args ...interface{}) (err error) { var cs *CallState - if cs, err = c.send(method, args, result, true, true); nil != err { + if cs, err = c.send(true, false, nil, method, args...); nil != err { + return + } + + select { + case <-cs.DoneChan: + err = cs.Error + releaseCallState(cs) + } + + return +} + +func (c *client) Call(result interface{}, method string, args ...interface{}) error { + return c.CallTimeout(c.ch.GetRequestTimeout(), result, method, args...) +} + +func (c *client) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) { + var cs *CallState + if cs, err = c.send(true, true, result, method, args...); nil != err { return } @@ -104,7 +114,7 @@ func (c *client) CallTimeout(method string, args interface{}, result interface{} return } -func (c *client) send(method string, args interface{}, result interface{}, hasResponse bool, usePool bool) (cs *CallState, err error) { +func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (cs *CallState, err error) { if !hasResponse { usePool = true } @@ -115,13 +125,14 @@ func (c *client) send(method string, args interface{}, result interface{}, hasRe cs = &CallState{} } + cs.hasResponse = hasResponse cs.Method = method cs.Args = args + cs.DoneChan = make(chan *CallState, 1) if hasResponse { cs.ID = c.ch.GetRequestID() cs.Result = result - cs.DoneChan = make(chan *CallState, 1) } select { @@ -242,7 +253,7 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { continue } - if nil != cs.DoneChan { + if cs.hasResponse { c.pendingRequestsLock.Lock() n := len(c.pendingRequests) c.pendingRequests[cs.ID] = cs @@ -256,8 +267,9 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { } err = c.ch.GetCodec().Write(c.conn, cs.Method, cs.Args, cs.ID) - if nil == cs.DoneChan { - releaseCallState(cs) + if !cs.hasResponse { + cs.Error = err + cs.done() } if nil != err { err = fmt.Errorf("Client: Cannot send request to wire: [%s]", err) diff --git a/gateway/gateway.go b/gateway/gateway.go new file mode 100644 index 0000000..50715f9 --- /dev/null +++ b/gateway/gateway.go @@ -0,0 +1,11 @@ +package gateway + +import ( + "git.loafle.net/commons_go/server" +) + +func New(sh ServerHandler) server.Server { + s := server.New(sh) + + return s +} diff --git a/gateway/rpc_gateway_handler.go b/gateway/rpc_gateway_handler.go new file mode 100644 index 0000000..3e82d82 --- /dev/null +++ b/gateway/rpc_gateway_handler.go @@ -0,0 +1,11 @@ +package gateway + +import ( + "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/rpc/server" +) + +type RPCGetewayHandler interface { + server.RPCServerHandler + Handle(codecReq protocol.ServerCodecRequest) (interface{}, error) +} diff --git a/gateway/rpc_gateway_handlers.go b/gateway/rpc_gateway_handlers.go new file mode 100644 index 0000000..267f26f --- /dev/null +++ b/gateway/rpc_gateway_handlers.go @@ -0,0 +1,20 @@ +package gateway + +import ( + "errors" + + "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/rpc/server" +) + +type RPCGetewayHandlers struct { + server.RPCServerHandlers +} + +func (rpcGH *RPCGetewayHandlers) Handle(codecReq protocol.ServerCodecRequest) (result interface{}, err error) { + return nil, errors.New("RPC Gateway: Handler method[Handle] is not implement") +} + +func (rpcGH *RPCGetewayHandlers) Validate() { + rpcGH.RPCServerHandlers.Validate() +} diff --git a/gateway/server_handlers.go b/gateway/server_handlers.go new file mode 100644 index 0000000..c751185 --- /dev/null +++ b/gateway/server_handlers.go @@ -0,0 +1,67 @@ +package gateway + +import ( + "log" + "net" + + "git.loafle.net/commons_go/server" +) + +type ServerHandlers struct { + server.ServerHandlers + + RPCGetewayHandler RPCGetewayHandler +} + +func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { + contentType := sh.RPCGetewayHandler.GetContentType(conn) + codec, err := sh.RPCGetewayHandler.GetCodec(contentType) + if nil != err { + log.Printf("RPC Handle: %v", err) + doneChan <- struct{}{} + return + } + + for { + sh.RPCGetewayHandler.OnPreRead(conn) + // Create a new codec request. + codecReq, errNew := codec.NewRequest(conn) + if nil != errNew { + if sh.IsClientDisconnect(errNew) { + doneChan <- struct{}{} + return + } + log.Printf("RPC Handle: %v", errNew) + doneChan <- struct{}{} + return + } + sh.RPCGetewayHandler.OnPostRead(conn) + + result, err := sh.RPCGetewayHandler.Handle(codecReq) + + if nil != err { + sh.RPCGetewayHandler.OnPreWriteError(conn, err) + codecReq.WriteError(conn, 400, err) + sh.RPCGetewayHandler.OnPostWriteError(conn, err) + } else { + sh.RPCGetewayHandler.OnPreWriteResult(conn, result) + codecReq.WriteResponse(conn, result) + sh.RPCGetewayHandler.OnPostWriteResult(conn, result) + } + + select { + case <-stopChan: + return + default: + } + } + +} + +func (sh *ServerHandlers) Validate() { + sh.ServerHandlers.Validate() + + if nil == sh.RPCGetewayHandler { + panic("RPCGetewayHandler must be specified.") + } +} diff --git a/protocol/json/client_notify.go b/protocol/json/client_notify.go index dbcd33d..d5ea9aa 100644 --- a/protocol/json/client_notify.go +++ b/protocol/json/client_notify.go @@ -47,22 +47,15 @@ func (ccn *ClientCodecNotify) Method() string { return ccn.notify.Method } -func (ccn *ClientCodecNotify) ReadParams(args interface{}) error { +func (ccn *ClientCodecNotify) ReadParams(args []interface{}) error { if ccn.err == nil && ccn.notify.Params != nil { // Note: if scr.request.Params is nil it's not an error, it's an optional member. // JSON params structured object. Unmarshal to the args object. if err := json.Unmarshal(*ccn.notify.Params, args); err != nil { - // Clearly JSON params is not a structured object, - // fallback and attempt an unmarshal with JSON params as - // array value and RPC params is struct. Unmarshal into - // array containing the request struct. - params := [1]interface{}{args} - if err = json.Unmarshal(*ccn.notify.Params, ¶ms); err != nil { - ccn.err = &Error{ - Code: E_INVALID_REQ, - Message: err.Error(), - Data: ccn.notify.Params, - } + ccn.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: ccn.notify.Params, } } } diff --git a/protocol/json/server.go b/protocol/json/server.go index 4e50388..bf0f729 100644 --- a/protocol/json/server.go +++ b/protocol/json/server.go @@ -160,7 +160,7 @@ func (scr *ServerCodecRequest) Method() string { // absence of expected names MAY result in an error being // generated. The names MUST match exactly, including // case, to the method's expected parameters. -func (scr *ServerCodecRequest) ReadParams(args interface{}) error { +func (scr *ServerCodecRequest) ReadParams(args []interface{}) error { if scr.err == nil && scr.request.Params != nil { // Note: if scr.request.Params is nil it's not an error, it's an optional member. // JSON params structured object. Unmarshal to the args object. @@ -169,13 +169,10 @@ func (scr *ServerCodecRequest) ReadParams(args interface{}) error { // fallback and attempt an unmarshal with JSON params as // array value and RPC params is struct. Unmarshal into // array containing the request struct. - params := [1]interface{}{args} - if err = json.Unmarshal(*scr.request.Params, ¶ms); err != nil { - scr.err = &Error{ - Code: E_INVALID_REQ, - Message: err.Error(), - Data: scr.request.Params, - } + scr.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: scr.request.Params, } } } diff --git a/protocol/registry_codec.go b/protocol/registry_codec.go index 4446be5..61d76ca 100644 --- a/protocol/registry_codec.go +++ b/protocol/registry_codec.go @@ -8,6 +8,6 @@ type RegistryCodec interface { // Reads the request and returns the RPC method name. Method() string // Reads the request filling the RPC method args. - ReadParams(interface{}) error + ReadParams([]interface{}) error Complete() } diff --git a/registry.go b/registry.go index f563b7e..53634d2 100644 --- a/registry.go +++ b/registry.go @@ -75,28 +75,31 @@ func (rr *rpcRegistry) Invoke(codec protocol.RegistryCodec) (result interface{}, return nil, errGet } // Decode the args. - args := reflect.New(methodSpec.argsType) - if errRead := codec.ReadParams(args.Interface()); errRead != nil { - return nil, errRead + + var in []reflect.Value + paramValues, paramInstances := methodSpec.getParams() + if nil != paramValues { + in = make([]reflect.Value, len(paramValues)+1) + if errRead := codec.ReadParams(paramInstances); errRead != nil { + return nil, errRead + } + for indexI := 0; indexI < len(paramValues); indexI++ { + in[indexI+1] = paramValues[indexI] + } + } else { + in = make([]reflect.Value, 1) } + in[0] = serviceSpec.rcvr + // Call the service method. - reply := reflect.New(methodSpec.replyType) - errValue := methodSpec.method.Func.Call([]reflect.Value{ - serviceSpec.rcvr, - args, - reply, - }) + returnValues := methodSpec.method.Func.Call(in) - // Cast the result to error if needed. - var errResult error - errInter := errValue[0].Interface() - if errInter != nil { - errResult = errInter.(error) + if nil != methodSpec.returnType { + result = returnValues[0].Interface() + err = returnValues[1].Interface().(error) + } else { + err = returnValues[0].Interface().(error) } - if errResult != nil { - return nil, errResult - } - - return reply.Interface(), nil + return } diff --git a/server/rpc_server_handler.go b/server/rpc_server_handler.go index 0e963b8..b0026d9 100644 --- a/server/rpc_server_handler.go +++ b/server/rpc_server_handler.go @@ -19,6 +19,6 @@ type RPCServerHandler interface { OnPreWriteError(w io.Writer, err error) OnPostWriteError(w io.Writer, err error) - getCodec(contentType string) (protocol.ServerCodec, error) + GetCodec(contentType string) (protocol.ServerCodec, error) invoke(codec protocol.RegistryCodec) (result interface{}, err error) } diff --git a/server/rpc_server_handlers.go b/server/rpc_server_handlers.go index 9981cbb..cb6a3ea 100644 --- a/server/rpc_server_handlers.go +++ b/server/rpc_server_handlers.go @@ -60,7 +60,7 @@ func (rpcSH *RPCServerHandlers) Validate() { } } -func (rpcSH *RPCServerHandlers) getCodec(contentType string) (protocol.ServerCodec, error) { +func (rpcSH *RPCServerHandlers) GetCodec(contentType string) (protocol.ServerCodec, error) { var codec protocol.ServerCodec if contentType == "" && len(rpcSH.codecs) == 1 { // If Content-Type is not set and only one codec has been registered, diff --git a/server/server_handlers.go b/server/server_handlers.go index 47d5c45..69e7565 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -15,7 +15,7 @@ type ServerHandlers struct { func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { contentType := sh.RPCServerHandler.GetContentType(conn) - codec, err := sh.RPCServerHandler.getCodec(contentType) + codec, err := sh.RPCServerHandler.GetCodec(contentType) if nil != err { log.Printf("RPC Handle: %v", err) doneChan <- struct{}{} diff --git a/service_map.go b/service_map.go index 33cf571..e68f0d5 100644 --- a/service_map.go +++ b/service_map.go @@ -26,11 +26,27 @@ type service struct { } type serviceMethod struct { - method reflect.Method // receiver method - argsType reflect.Type // type of the request argument - replyType reflect.Type // type of the response argument + method reflect.Method // receiver method + paramTypes []reflect.Type // type of the request argument + returnType reflect.Type // type of the response argument } +func (sm *serviceMethod) getParams() (values []reflect.Value, instances []interface{}) { + if nil == sm.paramTypes || 0 == len(sm.paramTypes) { + return nil, nil + } + + pCount := len(sm.paramTypes) + values = make([]reflect.Value, pCount) + instances = make([]interface{}, pCount) + + for indexI := 0; indexI < pCount; indexI++ { + values[indexI] = reflect.New(sm.paramTypes[indexI]) + instances[indexI] = values[indexI].Interface() + } + + return +} // ---------------------------------------------------------------------------- // serviceMap @@ -63,6 +79,7 @@ func (m *serviceMap) register(rcvr interface{}, name string) error { } // Setup methods. +Loop: for i := 0; i < s.rcvrType.NumMethod(); i++ { method := s.rcvrType.Method(i) mtype := method.Type @@ -70,37 +87,48 @@ func (m *serviceMap) register(rcvr interface{}, name string) error { if method.PkgPath != "" { continue } - // Method needs four ins: receiver, *args, *reply. - if mtype.NumIn() != 3 { - continue + + var paramTypes []reflect.Type + + mCount := mtype.NumIn() + if 0 < mCount { + paramTypes = make([]reflect.Type, mCount) + for indexI := 0; indexI < mCount; indexI++ { + param := mtype.In(indexI) + if !isExportedOrBuiltin(param) { + continue Loop + } + paramTypes[indexI] = param.Elem() + } } - // First argument must be a pointer and must be exported. - args := mtype.In(1) - if args.Kind() != reflect.Ptr || !isExportedOrBuiltin(args) { - continue - } - // Second argument must be a pointer and must be exported. - reply := mtype.In(2) - if reply.Kind() != reflect.Ptr || !isExportedOrBuiltin(reply) { + + var returnType reflect.Type + switch mtype.NumOut() { + case 1: + if returnType := mtype.Out(0); returnType != typeOfError { + continue Loop + } + case 2: + if returnType := mtype.Out(0); !isExportedOrBuiltin(returnType) { + continue Loop + } + + if returnType := mtype.Out(1); returnType != typeOfError { + continue Loop + } + returnType = mtype.Out(1).Elem() + default: continue } - // Method needs one out: error. - if mtype.NumOut() != 1 { - continue - } - if returnType := mtype.Out(0); returnType != typeOfError { - continue - } s.methods[method.Name] = &serviceMethod{ - method: method, - argsType: args.Elem(), - replyType: reply.Elem(), + method: method, + paramTypes: paramTypes, + returnType: returnType, } } if len(s.methods) == 0 { - return fmt.Errorf("rpc: %q has no exported methods of suitable type", - s.name) + return fmt.Errorf("rpc: %q has no exported methods of suitable type", s.name) } // Add to the map. m.mutex.Lock()