commit 173576b085a1b83e616da8f814cb428e3361b592 Author: crusader Date: Wed Aug 22 18:04:25 2018 +0900 project initialized diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3733e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1,68 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +.idea/ +*.iml + +vendor/ +glide.lock +.DS_Store +dist/ +debug diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 0000000..4705ea6 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,79 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + branch = "master" + name = "git.loafle.net/overflow/config-go" + packages = ["."] + revision = "c4066fa55db336afc618d64156f703f59138f791" + +[[projects]] + branch = "master" + name = "git.loafle.net/overflow/log-go" + packages = ["."] + revision = "db46df21338e4bf16f20bfe3bf6a4ea5aa222fc8" + +[[projects]] + branch = "master" + name = "git.loafle.net/overflow/server-go" + packages = [ + ".", + "socket", + "socket/client" + ] + revision = "e78f2357407a58e94460caf8bac5c0d44e9f41b1" + +[[projects]] + branch = "master" + name = "git.loafle.net/overflow/util-go" + packages = [ + "ctx", + "encoding/json", + "reflect", + "service" + ] + revision = "01cc315e25b38331c0a366d8025600ed4e297e8e" + +[[projects]] + name = "github.com/BurntSushi/toml" + packages = ["."] + revision = "b26d9c308763d68093482582cea63d69be07a0f0" + version = "v0.3.0" + +[[projects]] + name = "go.uber.org/atomic" + packages = ["."] + revision = "1ea20fb1cbb1cc08cbd0d913a96dead89aa18289" + version = "v1.3.2" + +[[projects]] + name = "go.uber.org/multierr" + packages = ["."] + revision = "3c4937480c32f4c13a875a1829af76c98ca3d40a" + version = "v1.1.0" + +[[projects]] + name = "go.uber.org/zap" + packages = [ + ".", + "buffer", + "internal/bufferpool", + "internal/color", + "internal/exit", + "zapcore" + ] + revision = "ff33455a0e382e8a81d14dd7c922020b6b5e7982" + version = "v1.9.1" + +[[projects]] + name = "gopkg.in/yaml.v2" + packages = ["."] + revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" + version = "v2.2.1" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "91c3d85882cc402d6069d612723fa6102b724541b1bdbc1a0ada8702f0c31a59" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 0000000..00807f2 --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,42 @@ +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true + + +[[constraint]] + branch = "master" + name = "git.loafle.net/overflow/log-go" + +[[constraint]] + branch = "master" + name = "git.loafle.net/overflow/server-go" + +[[constraint]] + branch = "master" + name = "git.loafle.net/overflow/util-go" + +[prune] + go-tests = true + unused-packages = true diff --git a/client/client-handler.go b/client/client-handler.go new file mode 100644 index 0000000..805445b --- /dev/null +++ b/client/client-handler.go @@ -0,0 +1,72 @@ +package client + +import ( + "fmt" + "sync/atomic" + "time" + + "git.loafle.net/overflow/rpc-go" + "git.loafle.net/overflow/rpc-go/protocol" + "git.loafle.net/overflow/rpc-go/registry" + ossc "git.loafle.net/overflow/server-go/socket/client" +) + +type ClientHandler interface { + ossc.ClientHandler + + GetRPCCodec() protocol.ClientCodec + GetRPCInvoker() registry.RPCInvoker + GetPendingRequestCount() int + GetRequestTimeout() time.Duration +} + +type ClientHandlers struct { + ossc.ClientHandlers + + RPCCodec protocol.ClientCodec `json:"-"` + RPCInvoker registry.RPCInvoker `json:"-"` + PendingRequestCount int `json:"pendingRequests,omitempty"` + RequestTimeout time.Duration `json:"requestTimeout,omitempty"` + + validated atomic.Value +} + +func (ch *ClientHandlers) GetRPCCodec() protocol.ClientCodec { + return ch.RPCCodec +} +func (ch *ClientHandlers) GetRPCInvoker() registry.RPCInvoker { + return ch.RPCInvoker +} +func (ch *ClientHandlers) GetPendingRequestCount() int { + return ch.PendingRequestCount +} +func (ch *ClientHandlers) GetRequestTimeout() time.Duration { + return ch.RequestTimeout +} + +func (ch *ClientHandlers) Validate() error { + if err := ch.ClientHandlers.Validate(); nil != err { + return err + } + + if nil != ch.validated.Load() { + return nil + } + ch.validated.Store(true) + + if 0 >= ch.PendingRequestCount { + ch.PendingRequestCount = rpc.DefaultPendingRequestCount + } + + if ch.RequestTimeout <= 0 { + ch.RequestTimeout = rpc.DefaultRequestTimeout + } else { + ch.RequestTimeout = ch.RequestTimeout * time.Second + } + + if nil == ch.RPCCodec { + return fmt.Errorf("RPCCodec is not valid") + } + + return nil +} diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..47fd5a1 --- /dev/null +++ b/client/client.go @@ -0,0 +1,342 @@ +package client + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + "go.uber.org/zap" + + olog "git.loafle.net/overflow/log-go" + "git.loafle.net/overflow/rpc-go/protocol" + css "git.loafle.net/overflow/server-go/socket" + cssc "git.loafle.net/overflow/server-go/socket/client" +) + +var uint64Type = reflect.TypeOf(uint64(0)) + +type Client struct { + ClientHandler ClientHandler + + ctx cssc.ClientCtx + stopChan chan struct{} + stopWg sync.WaitGroup + + requestID uint64 + requestQueueChan chan *requestState + + pendingRequests sync.Map +} + +func (c *Client) Start() error { + if c.stopChan != nil { + return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader()) + } + + if nil == c.ClientHandler { + return fmt.Errorf("%s ClientHandler must be specified", c.logHeader()) + } + if err := c.ClientHandler.Validate(); nil != err { + return fmt.Errorf("%s validate error %v", c.logHeader(), err) + } + + c.ctx = c.ClientHandler.ClientCtx() + if nil == c.ctx { + return fmt.Errorf("%s ServerCtx is nil", c.logHeader()) + } + + if err := c.ClientHandler.Init(c.ctx); nil != err { + return fmt.Errorf("%s Init error %v", c.logHeader(), err) + } + + readChan, writeChan, err := c.ClientHandler.GetConnector().Connect() + if nil != err { + return err + } + + c.requestQueueChan = make(chan *requestState, c.ClientHandler.GetPendingRequestCount()) + c.stopChan = make(chan struct{}) + + c.stopWg.Add(1) + go c.handleClient(readChan, writeChan) + + return nil +} + +func (c *Client) Stop(ctx context.Context) error { + if c.stopChan == nil { + return fmt.Errorf("%s must be started before stopping it", c.logHeader()) + } + close(c.stopChan) + c.stopWg.Wait() + + c.ClientHandler.Destroy(c.ctx) + + c.stopChan = nil + + return nil +} + +func (c *Client) logHeader() string { + return fmt.Sprintf("RPC Client[%s]:", c.ClientHandler.GetName()) +} + +func (c *Client) Send(method string, params ...interface{}) error { + _, err := c.internalSend(false, nil, method, params...) + + return err +} + +func (c *Client) Call(result interface{}, method string, params ...interface{}) error { + return c.CallTimeout(c.ClientHandler.GetRequestTimeout(), result, method, params...) +} + +func (c *Client) CallTimeout(timeout time.Duration, result interface{}, method string, params ...interface{}) error { + rs, err := c.internalSend(true, result, method, params...) + if nil != err { + return err + } + + t := retainTimer(timeout) + defer func() { + releaseRequestState(rs) + releaseTimer(t) + }() + + select { + case <-rs.doneChan: + if nil != rs.clientError { + return rs.clientError + } + result = rs.result + return nil + case <-t.C: + rs.cancel() + return newError(method, params, fmt.Errorf("%s Timeout", c.logHeader())) + } +} + +func (c *Client) getRequestID() uint64 { + c.requestID++ + return c.requestID +} + +func (c *Client) internalSend(hasResponse bool, result interface{}, method string, params ...interface{}) (*requestState, error) { + rs := retainRequestState() + + rs.method = method + rs.params = params + if hasResponse { + rs.id = c.getRequestID() + rs.result = result + rs.doneChan = make(chan *requestState, 1) + } + + select { + case c.requestQueueChan <- rs: + return rs, nil + default: + if !hasResponse { + releaseRequestState(rs) + return nil, newError(method, params, fmt.Errorf("%s Request Queue overflow", c.logHeader())) + } + select { + case oldRS := <-c.requestQueueChan: + if nil != oldRS.doneChan { + oldRS.setError(fmt.Errorf("%s Request Queue overflow", c.logHeader())) + oldRS.done() + } else { + releaseRequestState(oldRS) + } + default: + } + select { + case c.requestQueueChan <- rs: + return rs, nil + default: + releaseRequestState(rs) + return nil, newError(method, params, fmt.Errorf("%s Request Queue overflow", c.logHeader())) + } + } +} + +func (c *Client) handleClient(readChan <-chan css.SocketMessage, writeChan chan<- css.SocketMessage) { + defer func() { + + if err := c.ClientHandler.GetConnector().Disconnect(); nil != err { + olog.Logger().Warn(err.Error()) + } + + c.ClientHandler.OnStop(c.ctx) + olog.Logger().Info(fmt.Sprintf("%s Stopped", c.logHeader())) + c.stopWg.Done() + }() + + if err := c.ClientHandler.OnStart(c.ctx); nil != err { + olog.Logger().Error(err.Error()) + return + } + + stopChan := make(chan struct{}) + sendDoneChan := make(chan error) + receiveDoneChan := make(chan error) + + go c.handleSend(stopChan, sendDoneChan, writeChan) + go c.handleReceive(stopChan, receiveDoneChan, readChan) + + olog.Logger().Info(fmt.Sprintf("%s Started", c.logHeader())) + + select { + case <-sendDoneChan: + close(stopChan) + <-receiveDoneChan + case <-receiveDoneChan: + close(stopChan) + <-sendDoneChan + case <-c.stopChan: + close(stopChan) + <-sendDoneChan + <-receiveDoneChan + } +} + +func (c *Client) handleSend(stopChan <-chan struct{}, doneChan chan<- error, writeChan chan<- css.SocketMessage) { + var ( + rs *requestState + id interface{} + messageType int + message []byte + err error + ok bool + ) + + defer func() { + doneChan <- err + }() + +LOOP: + for { + select { + case rs, ok = <-c.requestQueueChan: + if !ok { + return + } + if rs.isCanceled() { + if nil != rs.doneChan { + rs.done() + } else { + releaseRequestState(rs) + } + continue LOOP + } + + id = nil + if 0 < rs.id { + id = rs.id + } + messageType, message, err = c.ClientHandler.GetRPCCodec().NewRequest(rs.method, rs.params, id) + if nil != err { + rs.setError(err) + rs.done() + continue LOOP + } + + select { + case writeChan <- css.MakeSocketMessage(messageType, message): + default: + rs.setError(fmt.Errorf("%s cannot send request", c.logHeader())) + rs.done() + continue LOOP + } + + if 0 < rs.id { + c.pendingRequests.Store(rs.id, rs) + } + case <-stopChan: + return + } + } +} + +func (c *Client) handleReceive(stopChan <-chan struct{}, doneChan chan<- error, readChan <-chan css.SocketMessage) { + var ( + socketMessage css.SocketMessage + messageType int + message []byte + err error + ok bool + ) + + defer func() { + doneChan <- err + }() + +LOOP: + for { + select { + case socketMessage, ok = <-readChan: + if !ok { + return + } + + messageType, message = socketMessage() + + resCodec, err := c.ClientHandler.GetRPCCodec().NewResponse(messageType, message) + if nil != err { + olog.Logger().Debug(err.Error()) + continue LOOP + } + + if resCodec.IsNotification() { + // notification + notiCodec, err := resCodec.Notification() + if nil != err { + olog.Logger().Warn(fmt.Sprintf("%s notification error %v", c.logHeader()), zap.Error(err)) + continue LOOP + } + + go c.handleNotification(notiCodec) + } else { + // response + go c.handleResponse(resCodec) + } + case <-stopChan: + return + } + } + +} + +func (c *Client) handleResponse(resCodec protocol.ClientResponseCodec) { + id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint() + _rs, ok := c.pendingRequests.Load(id) + if !ok { + olog.Logger().Warn(fmt.Sprintf("%s unexpected ID=[%d] obtained from server", c.logHeader()), zap.Uint64("id", id)) + return + } + rs := _rs.(*requestState) + if nil != resCodec.Error() { + rs.setError(resCodec.Error()) + } else { + err := resCodec.Result(rs.result) + if nil != err { + rs.setError(err) + } + } + + rs.done() +} + +func (c *Client) handleNotification(notiCodec protocol.ClientNotificationCodec) { + if nil == c.ClientHandler.GetRPCInvoker() { + olog.Logger().Warn(fmt.Sprintf("%s received notification method[%s] but RPC Invoker is not exist", c.logHeader(), notiCodec.Method())) + return + } + + _, err := c.ClientHandler.GetRPCInvoker().Invoke(notiCodec) + if nil != err { + olog.Logger().Error(fmt.Sprintf("%s invoking of notification method[%s] has been failed %v", c.logHeader(), notiCodec.Method(), err)) + } +} diff --git a/client/error.go b/client/error.go new file mode 100644 index 0000000..01eb38c --- /dev/null +++ b/client/error.go @@ -0,0 +1,23 @@ +package client + +func newError(method string, params []interface{}, err error) *Error { + return &Error{ + Method: method, + Params: params, + Err: err, + } +} + +type Error struct { + Method string + Params []interface{} + + Err error +} + +func (e *Error) Error() string { + if nil == e.Err { + return "" + } + return e.Err.Error() +} diff --git a/client/request-state.go b/client/request-state.go new file mode 100644 index 0000000..2e89d8c --- /dev/null +++ b/client/request-state.go @@ -0,0 +1,96 @@ +package client + +import ( + "sync" + "sync/atomic" + "time" +) + +type requestState struct { + id uint64 + method string + params []interface{} + result interface{} + clientError *Error + doneChan chan *requestState + + canceled atomic.Value +} + +func (rs *requestState) done() { + select { + case rs.doneChan <- rs: + default: + } +} + +func (rs *requestState) cancel() { + rs.canceled.Store(true) +} + +func (rs *requestState) isCanceled() bool { + v := rs.canceled.Load() + if nil == v { + return false + } + vv := v.(bool) + + return vv +} + +func (rs *requestState) setError(err error) { + if nil == err { + return + } + rs.clientError = newError(rs.method, rs.params, err) +} + +var requestStatePool sync.Pool + +func retainRequestState() *requestState { + v := requestStatePool.Get() + if v == nil { + return &requestState{} + } + return v.(*requestState) +} + +func releaseRequestState(rs *requestState) { + rs.id = 0 + rs.method = "" + rs.params = nil + rs.result = nil + rs.clientError = nil + rs.doneChan = nil + rs.canceled.Store(false) + + requestStatePool.Put(rs) +} + +var timerPool sync.Pool + +func retainTimer(timeout time.Duration) *time.Timer { + tv := timerPool.Get() + if tv == nil { + return time.NewTimer(timeout) + } + + t := tv.(*time.Timer) + if t.Reset(timeout) { + panic("Client: Active timer trapped into retainTimer()") + } + return t +} + +func releaseTimer(t *time.Timer) { + if !t.Stop() { + // Collect possibly added time from the channel + // if timer has been stopped and nobody collected its' value. + select { + case <-t.C: + default: + } + } + + timerPool.Put(t) +} diff --git a/codec/codec.go b/codec/codec.go new file mode 100644 index 0000000..6a0ed7f --- /dev/null +++ b/codec/codec.go @@ -0,0 +1,44 @@ +package codec + +import ( + css "git.loafle.net/overflow/server-go/socket" +) + +type Codec interface { + Encode(message []byte) ([]byte, error) + Decode(message []byte) ([]byte, error) +} + +type codec struct { +} + +func (_ *codec) Encode(message []byte) ([]byte, error) { + return message, nil +} + +func (_ *codec) Decode(message []byte) ([]byte, error) { + return message, nil +} + +var DefaultCodec = &codec{} + +type CodecSelector interface { + Encode(message []byte) (messageType int, encMessage []byte, err error) + Decode(messageType int, message []byte) (decMessage []byte, err error) +} + +type codecSelector struct { +} + +func (_ *codecSelector) Encode(message []byte) (messageType int, encMessage []byte, err error) { + messageType = css.TextMessage + encMessage, err = DefaultCodec.Encode(message) + return +} + +func (_ *codecSelector) Decode(messageType int, message []byte) (decMessage []byte, err error) { + decMessage, err = DefaultCodec.Decode(message) + return +} + +var DefaultCodecSelector = &codecSelector{} diff --git a/codec/compression_codec.go b/codec/compression_codec.go new file mode 100644 index 0000000..e81c125 --- /dev/null +++ b/codec/compression_codec.go @@ -0,0 +1,83 @@ +package codec + +import ( + "bytes" + "compress/gzip" + "io/ioutil" + + css "git.loafle.net/overflow/server-go/socket" +) + +type GZipCodec struct { +} + +func (_ *GZipCodec) Encode(message []byte) ([]byte, error) { + var b bytes.Buffer + gWriter, err := gzip.NewWriterLevel(&b, gzip.BestSpeed) + if nil != err { + return nil, err + } + + if _, err := gWriter.Write(message); nil != err { + return nil, err + } + if err := gWriter.Flush(); nil != err { + return nil, err + } + if err := gWriter.Close(); nil != err { + return nil, err + } + + return b.Bytes(), nil +} + +func (_ *GZipCodec) Decode(message []byte) ([]byte, error) { + gReader, err := gzip.NewReader(bytes.NewBuffer(message)) + if nil != err { + return nil, err + } + defer gReader.Close() + + b, err := ioutil.ReadAll(gReader) + if nil != err { + return nil, err + } + + return b, nil +} + +var gZipCodec = &GZipCodec{} + +func NewCompressionCodecSelector(threshold int) CodecSelector { + return &compressionCodecSelector{ + threshold: threshold, + } +} + +type compressionCodecSelector struct { + threshold int +} + +func (cs *compressionCodecSelector) Encode(message []byte) (messageType int, encMessage []byte, err error) { + switch { + case cs.threshold < len(message): + messageType = css.BinaryMessage + encMessage, err = gZipCodec.Encode(message) + default: + messageType = css.TextMessage + encMessage, err = DefaultCodec.Encode(message) + } + + return +} + +func (cs *compressionCodecSelector) Decode(messageType int, message []byte) (decMessage []byte, err error) { + switch messageType { + case css.BinaryMessage: + decMessage, err = gZipCodec.Decode(message) + default: + decMessage, err = DefaultCodec.Decode(message) + } + + return +} diff --git a/const.go b/const.go new file mode 100644 index 0000000..c35b8ac --- /dev/null +++ b/const.go @@ -0,0 +1,8 @@ +package rpc + +import "time" + +const ( + DefaultPendingRequestCount = 1024 + DefaultRequestTimeout = 20 * time.Second +) diff --git a/protocol/client_codec.go b/protocol/client_codec.go new file mode 100644 index 0000000..a200664 --- /dev/null +++ b/protocol/client_codec.go @@ -0,0 +1,19 @@ +package protocol + +// ClientCodec creates a ClientCodecRequest to process each request. +type ClientCodec interface { + NewRequest(method string, args []interface{}, id interface{}) (messageType int, message []byte, err error) + NewResponse(messageType int, message []byte) (ClientResponseCodec, error) +} + +type ClientResponseCodec interface { + IsNotification() bool + Notification() (ClientNotificationCodec, error) + Result(result interface{}) error + Error() *Error + ID() interface{} +} + +type ClientNotificationCodec interface { + RegistryCodec +} diff --git a/protocol/error.go b/protocol/error.go new file mode 100644 index 0000000..8ece1da --- /dev/null +++ b/protocol/error.go @@ -0,0 +1,35 @@ +package protocol + +import ( + "errors" + "fmt" +) + +type ErrorCode int + +const ( + E_PARSE ErrorCode = -32700 + E_INVALID_REQ ErrorCode = -32600 + E_NO_METHOD ErrorCode = -32601 + E_BAD_PARAMS ErrorCode = -32602 + E_INTERNAL ErrorCode = -32603 + E_SERVER ErrorCode = -32000 +) + +var ErrNullResult = errors.New("result is null") + +type Error struct { + // A Number that indicates the error type that occurred. + Code ErrorCode `json:"code"` /* required */ + + // A String providing a short description of the error. + // The message SHOULD be limited to a concise single sentence. + Message string `json:"message"` /* required */ + + // A Primitive or Structured value that contains additional information about the error. + Data interface{} `json:"data"` /* optional */ +} + +func (e Error) Error() string { + return fmt.Sprintf("Code[%d] Message[%s] Data[%v]", e.Code, e.Message, e.Data) +} diff --git a/protocol/json/client.go b/protocol/json/client.go new file mode 100644 index 0000000..0d2f5e8 --- /dev/null +++ b/protocol/json/client.go @@ -0,0 +1,59 @@ +package json + +import ( + "encoding/json" + + "git.loafle.net/overflow/rpc-go/codec" + "git.loafle.net/overflow/rpc-go/protocol" +) + +// ---------------------------------------------------------------------------- +// Codec +// ---------------------------------------------------------------------------- + +// NewClientCodec returns a new JSON Codec. +func NewCustomClientCodec(codecSelector codec.CodecSelector) protocol.ClientCodec { + return &ClientCodec{ + codecSelector: codecSelector, + } +} + +// NewClientCodec returns a new JSON Codec. +func NewClientCodec() protocol.ClientCodec { + return NewCustomClientCodec(codec.DefaultCodecSelector) +} + +// ClientCodec creates a ClientCodecRequest to process each request. +type ClientCodec struct { + codecSelector codec.CodecSelector +} + +func (cc *ClientCodec) NewRequest(method string, args []interface{}, id interface{}) (messageType int, message []byte, err error) { + params, err := convertParamsToStringArray(args) + if nil != err { + return 0, nil, err + } + + req := &clientRequest{ + Version: Version, + Method: method, + Params: params, + ID: id, + } + + buf, err := json.Marshal(req) + if nil != err { + return 0, nil, err + } + + return cc.codecSelector.Encode(buf) +} + +func (cc *ClientCodec) NewResponse(messageType int, message []byte) (protocol.ClientResponseCodec, error) { + buf, err := cc.codecSelector.Decode(messageType, message) + if nil != err { + return nil, err + } + + return newClientResponseCodec(buf) +} diff --git a/protocol/json/client_notification.go b/protocol/json/client_notification.go new file mode 100644 index 0000000..d0f3b53 --- /dev/null +++ b/protocol/json/client_notification.go @@ -0,0 +1,53 @@ +package json + +import ( + "encoding/json" + "fmt" + + cuej "git.loafle.net/overflow/util-go/encoding/json" +) + +// ---------------------------------------------------------------------------- +// ClientNotificationCodec +// ---------------------------------------------------------------------------- +// clientRequest represents a JSON-RPC notification sent to a client. +type clientNotification struct { + // A String containing the name of the method to be invoked. + Method string `json:"method"` + // Object to pass as request parameter to the method. + Params *json.RawMessage `json:"params,omitempty"` +} + +// ClientNotificationCodec decodes and encodes a single notification. +type ClientNotificationCodec struct { + noti *clientNotification +} + +func (cnc *ClientNotificationCodec) Method() string { + return cnc.noti.Method +} + +func (cnc *ClientNotificationCodec) ReadParams(args []interface{}) error { + if nil != cnc.noti.Params { + // Note: if scr.request.Params is nil it's not an error, it's an optional member. + // JSON params structured object. Unmarshal to the args object. + if err := cuej.SetValueWithJSONStringArrayBytes(*cnc.noti.Params, args); nil != err { + return err + } + return nil + } + return fmt.Errorf("There is not params") +} + +func (cnc *ClientNotificationCodec) Params() ([]string, error) { + if nil != cnc.noti.Params { + var values []string + + if err := json.Unmarshal(*cnc.noti.Params, &values); err != nil { + return nil, err + } + + return values, nil + } + return nil, fmt.Errorf("There is not params") +} diff --git a/protocol/json/client_request.go b/protocol/json/client_request.go new file mode 100644 index 0000000..52c3828 --- /dev/null +++ b/protocol/json/client_request.go @@ -0,0 +1,21 @@ +package json + +// ---------------------------------------------------------------------------- +// Request and Response +// ---------------------------------------------------------------------------- + +// clientRequest represents a JSON-RPC request sent by a client. +type clientRequest struct { + // JSON-RPC protocol. + Version string `json:"jsonrpc"` + + // A String containing the name of the method to be invoked. + Method string `json:"method"` + + // Object to pass as request parameter to the method. + Params interface{} `json:"params"` + + // The request id. This can be of any type. It is used to match the + // response with the request that it is replying to. + ID interface{} `json:"id"` +} diff --git a/protocol/json/client_response.go b/protocol/json/client_response.go new file mode 100644 index 0000000..a47d3f8 --- /dev/null +++ b/protocol/json/client_response.go @@ -0,0 +1,80 @@ +package json + +import ( + "encoding/json" + "fmt" + + "git.loafle.net/overflow/rpc-go/protocol" +) + +// ---------------------------------------------------------------------------- +// ClientResponseCodec +// ---------------------------------------------------------------------------- +// clientResponse represents a JSON-RPC response returned to a client. +type clientResponse struct { + Version string `json:"jsonrpc"` + Result *json.RawMessage `json:"result,omitempty"` + Error *protocol.Error `json:"error,omitempty"` + ID interface{} `json:"id,omitempty"` +} + +// ClientResponseCodec decodes and encodes a single request. +type ClientResponseCodec struct { + res *clientResponse +} + +func (crc *ClientResponseCodec) ID() interface{} { + return crc.res.ID +} + +func (crc *ClientResponseCodec) Result(result interface{}) error { + if nil == crc.res.Error && nil != crc.res.Result { + if err := json.Unmarshal(*crc.res.Result, &result); nil != err { + return err + } + return nil + } + + return fmt.Errorf("There is no result") +} + +func (crc *ClientResponseCodec) Error() *protocol.Error { + return crc.res.Error +} + +func (crc *ClientResponseCodec) IsNotification() bool { + if nil == crc.res.ID && nil != crc.res.Result { + return true + } + return false +} + +func (crc *ClientResponseCodec) Notification() (protocol.ClientNotificationCodec, error) { + if nil != crc.res.ID || nil == crc.res.Result { + return nil, fmt.Errorf("This is not notification") + } + + noti := &clientNotification{} + err := json.Unmarshal(*crc.res.Result, noti) + if nil != err { + return nil, err + } + + return &ClientNotificationCodec{noti: noti}, nil +} + +// newClientMessageCodec returns a new ClientMessageCodec. +func newClientResponseCodec(buf []byte) (protocol.ClientResponseCodec, error) { + + res := &clientResponse{} + err := json.Unmarshal(buf, res) + + if err != nil { + return nil, fmt.Errorf("Cannot unmarshal response [%s] err: %v", string(buf), err) + } + if res.Version != Version { + return nil, fmt.Errorf("The protocol version of response[%s] is not %s", string(buf), Version) + } + + return &ClientResponseCodec{res: res}, nil +} diff --git a/protocol/json/constants.go b/protocol/json/constants.go new file mode 100644 index 0000000..b5e4742 --- /dev/null +++ b/protocol/json/constants.go @@ -0,0 +1,6 @@ +package json + +const ( + Name = "jsonrpc" + Version = "2.0" +) diff --git a/protocol/json/server.go b/protocol/json/server.go new file mode 100644 index 0000000..800d0ce --- /dev/null +++ b/protocol/json/server.go @@ -0,0 +1,77 @@ +package json + +import ( + "encoding/json" + + "git.loafle.net/overflow/rpc-go/codec" + "git.loafle.net/overflow/rpc-go/protocol" +) + +var null = json.RawMessage([]byte("null")) + +// ---------------------------------------------------------------------------- +// Codec +// ---------------------------------------------------------------------------- + +// NewCustomServerCodec returns a new JSON Codec. +func NewCustomServerCodec(codecSelector codec.CodecSelector) protocol.ServerCodec { + return &ServerCodec{ + codecSelector: codecSelector, + } +} + +// NewServerCodec returns a new JSON Codec. +func NewServerCodec() protocol.ServerCodec { + return NewCustomServerCodec(codec.DefaultCodecSelector) +} + +// ServerCodec creates a ServerRequestCodec to process each request. +type ServerCodec struct { + codecSelector codec.CodecSelector +} + +func (sc *ServerCodec) NewRequest(messageType int, message []byte) (protocol.ServerRequestCodec, error) { + buf, err := sc.codecSelector.Decode(messageType, message) + if nil != err { + return nil, err + } + return newServerRequestCodec(sc.codecSelector, buf) +} + +// func (sc *ServerCodec) NewRequestWithString(method string, params []string, id interface{}) (protocol.ServerRequestCodec, error) { +// req := &clientRequest{ +// Version: Version, +// Method: method, +// Params: params, +// ID: id, +// } + +// buf, err := json.Marshal(req) +// if nil != err { +// return nil, err +// } + +// return sc.NewRequest(buf) +// } + +func (sc *ServerCodec) NewNotification(method string, args []interface{}) (messageType int, message []byte, err error) { + params, err := convertParamsToStringArray(args) + if nil != err { + return 0, nil, err + } + + noti := &serverNotification{Method: method, Params: params} + res := &serverResponse{Version: Version, Result: noti} + + buf, err := json.Marshal(res) + if nil != err { + return 0, nil, err + } + + messageType, message, err = sc.codecSelector.Encode(buf) + if nil != err { + return 0, nil, err + } + + return +} diff --git a/protocol/json/server_notification.go b/protocol/json/server_notification.go new file mode 100644 index 0000000..124670a --- /dev/null +++ b/protocol/json/server_notification.go @@ -0,0 +1,10 @@ +package json + +// clientRequest represents a JSON-RPC request sent by a client. +type serverNotification struct { + // A String containing the name of the method to be invoked. + Method string `json:"method"` + + // Object to pass as request parameter to the method. + Params interface{} `json:"params"` +} diff --git a/protocol/json/server_request.go b/protocol/json/server_request.go new file mode 100644 index 0000000..2d0d7c1 --- /dev/null +++ b/protocol/json/server_request.go @@ -0,0 +1,173 @@ +package json + +import ( + "encoding/json" + "strings" + + "git.loafle.net/overflow/rpc-go/codec" + "git.loafle.net/overflow/rpc-go/protocol" + cuej "git.loafle.net/overflow/util-go/encoding/json" +) + +// ---------------------------------------------------------------------------- +// Request +// ---------------------------------------------------------------------------- + +// serverRequest represents a JSON-RPC request received by the server. +type serverRequest struct { + // JSON-RPC protocol. + Version string `json:"jsonrpc"` + + // A String containing the name of the method to be invoked. + Method string `json:"method"` + + // A Structured value to pass as arguments to the method. + Params *json.RawMessage `json:"params,omitempty"` + + // The request id. MUST be a string, number or null. + // Our implementation will not do type checking for id. + // It will be copied as it is. + ID *json.RawMessage `json:"id,omitempty"` +} + +// ---------------------------------------------------------------------------- +// ServerRequestCodec +// ---------------------------------------------------------------------------- + +// newRequestCodec returns a new ServerRequestCodec. +func newServerRequestCodec(codecSelector codec.CodecSelector, buf []byte) (protocol.ServerRequestCodec, error) { + + req := &serverRequest{} + err := json.Unmarshal(buf, req) + + if err != nil { + err = &protocol.Error{ + Code: protocol.E_PARSE, + Message: err.Error(), + Data: req, + } + } + + if req.Version != Version { + err = &protocol.Error{ + Code: protocol.E_INVALID_REQ, + Message: "jsonrpc must be " + Version, + Data: req, + } + } + + return &ServerRequestCodec{codecSelector: codecSelector, req: req, err: err}, nil +} + +// ServerRequestCodec decodes and encodes a single request. +type ServerRequestCodec struct { + codecSelector codec.CodecSelector + req *serverRequest + err error +} + +func (src *ServerRequestCodec) HasResponse() bool { + return src.req.ID != nil +} + +// Method returns the RPC method for the current request. +// +// The method uses a dotted notation as in "Service.Method". +func (src *ServerRequestCodec) Method() string { + return src.req.Method +} + +// ReadRequest fills the request object for the RPC method. +// +// ReadRequest parses request parameters in two supported forms in +// accordance with http://www.jsonrpc.org/specification#parameter_structures +// +// by-position: params MUST be an Array, containing the +// values in the Server expected order. +// +// by-name: params MUST be an Object, with member names +// that match the Server expected parameter names. The +// absence of expected names MAY result in an error being +// generated. The names MUST match exactly, including +// case, to the method's expected parameters. +func (src *ServerRequestCodec) ReadParams(args []interface{}) error { + if src.err == nil && src.req.Params != nil { + // Note: if scr.request.Params is nil it's not an error, it's an optional member. + // JSON params structured object. Unmarshal to the args object. + if err := cuej.SetValueWithJSONStringArrayBytes(*src.req.Params, args); nil != err { + src.err = &protocol.Error{ + Code: protocol.E_BAD_PARAMS, + Message: err.Error(), + Data: src.req.Params, + } + return src.err + } + return nil + } + return src.err +} + +func (src *ServerRequestCodec) Params() ([]string, error) { + if src.err == nil && src.req.Params != nil { + var values []string + + if err := json.Unmarshal(*src.req.Params, &values); err != nil { + src.err = &protocol.Error{ + Code: protocol.E_BAD_PARAMS, + Message: err.Error(), + Data: src.req.Params, + } + return nil, src.err + } + + return values, nil + } + return nil, src.err +} + +func (src *ServerRequestCodec) encodeResponse(res *serverResponse) (messageType int, message []byte, err error) { + buf, err := json.Marshal(res) + if nil != err { + return 0, nil, err + } + + return src.codecSelector.Encode(buf) +} + +func (src *ServerRequestCodec) NewResponse(reply interface{}, replyErr error) (messageType int, message []byte, err error) { + res := src.newServerResponse(reply, replyErr) + return src.encodeResponse(res) +} + +func (src *ServerRequestCodec) NewResponseWithString(reply string, replyErr error) (messageType int, message []byte, err error) { + res := src.newServerResponse(nil, replyErr) + + r := strings.TrimSpace(reply) + if nil == err && "" != r { + var i interface{} + mErr := json.Unmarshal([]byte(r), &i) + if nil != mErr { + return 0, nil, mErr + } + + res.Result = i + } + return src.encodeResponse(res) +} + +func (src *ServerRequestCodec) newServerResponse(reply interface{}, err error) *serverResponse { + res := &serverResponse{Version: Version, Result: reply, ID: src.req.ID} + + if nil != err { + jsonErr, ok := err.(*protocol.Error) + if !ok { + jsonErr = &protocol.Error{ + Code: protocol.E_SERVER, + Message: err.Error(), + } + } + res.Error = jsonErr + } + + return res +} diff --git a/protocol/json/server_response.go b/protocol/json/server_response.go new file mode 100644 index 0000000..be01dc5 --- /dev/null +++ b/protocol/json/server_response.go @@ -0,0 +1,29 @@ +package json + +import ( + "encoding/json" + + "git.loafle.net/overflow/rpc-go/protocol" +) + +// ---------------------------------------------------------------------------- +// Response +// ---------------------------------------------------------------------------- + +// serverResponse represents a JSON-RPC response returned by the server. +type serverResponse struct { + // JSON-RPC protocol. + Version string `json:"jsonrpc"` + // The Object that was returned by the invoked method. This must be null + // in case there was an error invoking the method. + // As per spec the member will be omitted if there was an error. + Result interface{} `json:"result,omitempty"` + + // An Error object if there was an error invoking the method. It must be + // null if there was no error. + // As per spec the member will be omitted if there was no error. + Error *protocol.Error `json:"error,omitempty"` + + // This must be the same id as the request it is responding to. + ID *json.RawMessage `json:"id,omitempty"` +} diff --git a/protocol/json/util.go b/protocol/json/util.go new file mode 100644 index 0000000..267f33b --- /dev/null +++ b/protocol/json/util.go @@ -0,0 +1,46 @@ +package json + +import ( + "encoding/json" + "fmt" + "reflect" + + cur "git.loafle.net/overflow/util-go/reflect" +) + +func convertParamsToStringArray(params []interface{}) ([]string, error) { + var values []string + if nil == params || 0 == len(params) { + return values, nil + } + + for i, param := range params { + t := reflect.TypeOf(param) + switch t.Kind() { + case reflect.String: + values = append(values, param.(string)) + case reflect.Array, reflect.Slice, reflect.Map, reflect.Struct: + b, err := json.Marshal(param) + if nil != err { + return nil, err + } + values = append(values, string(b)) + case reflect.Ptr: + if t.Elem().Kind() != reflect.Struct { + return nil, fmt.Errorf("Pointer of param[%d] is permitted only Struct type", i) + } + b, err := json.Marshal(param) + if nil != err { + return nil, err + } + values = append(values, string(b)) + default: + s, err := cur.ConvertToString(param) + if nil != err { + return nil, fmt.Errorf("String conversion of param[%d] has been failed [%v]", i, err) + } + values = append(values, s) + } + } + return values, nil +} diff --git a/protocol/registry_codec.go b/protocol/registry_codec.go new file mode 100644 index 0000000..b0ecc68 --- /dev/null +++ b/protocol/registry_codec.go @@ -0,0 +1,13 @@ +package protocol + +// ---------------------------------------------------------------------------- +// Codec +// ---------------------------------------------------------------------------- +// RegistryCodec creates a RegistryCodecRequest to process each request. +type RegistryCodec interface { + // Reads the request and returns the RPC method name. + Method() string + // Reads the request filling the RPC method args. + ReadParams(args []interface{}) error + Params() ([]string, error) +} diff --git a/protocol/server_codec.go b/protocol/server_codec.go new file mode 100644 index 0000000..ad75e41 --- /dev/null +++ b/protocol/server_codec.go @@ -0,0 +1,18 @@ +package protocol + +// ServerCodec creates a ServerRequestCodec to process each request. +type ServerCodec interface { + NewRequest(messageType int, message []byte) (ServerRequestCodec, error) + // NewRequestWithString(method string, params []string, id interface{}) (ServerRequestCodec, error) + NewNotification(method string, args []interface{}) (messageType int, message []byte, err error) +} + +// ServerRequestCodec decodes a request and encodes a response using a specific +// serialization scheme. +type ServerRequestCodec interface { + RegistryCodec + + HasResponse() bool + NewResponse(reply interface{}, replyErr error) (messageType int, message []byte, err error) + NewResponseWithString(reply string, replyErr error) (messageType int, message []byte, err error) +} diff --git a/registry/registry.go b/registry/registry.go new file mode 100644 index 0000000..83f7313 --- /dev/null +++ b/registry/registry.go @@ -0,0 +1,10 @@ +package registry + +import ( + "git.loafle.net/overflow/rpc-go/protocol" +) + +type RPCInvoker interface { + HasMethod(method string) bool + Invoke(codec protocol.RegistryCodec, leadingParams ...interface{}) (result interface{}, err error) +} diff --git a/registry/rpc-registry.go b/registry/rpc-registry.go new file mode 100644 index 0000000..eada2db --- /dev/null +++ b/registry/rpc-registry.go @@ -0,0 +1,188 @@ +package registry + +import ( + "reflect" + + "git.loafle.net/overflow/rpc-go/protocol" + ous "git.loafle.net/overflow/util-go/service" +) + +// NewRPCRegistry returns a new RPC registry. +func NewRPCRegistry() RPCRegistry { + return &rpcRegistry{ + serviceRegistry: new(ous.Registry), + } +} + +type RPCRegistry interface { + RPCInvoker + GetService(name string) interface{} + RegisterService(receiver interface{}, name string) error + RegisterServices(receivers ...interface{}) error + RegisterServiceMap(keysAndValues map[string]interface{}) error +} + +// RPCRegistry serves registered RPC services using registered codecs. +type rpcRegistry struct { + serviceRegistry *ous.Registry +} + +// RegisterService adds a new service to the server. +// +// The name parameter is optional: if empty it will be inferred from +// the receiver type name. +// +// Methods from the receiver will be extracted if these rules are satisfied: +// +// - The receiver is exported (begins with an upper case letter) or local +// (defined in the package registering the service). +// - The method name is exported. +// - The method has two arguments: *args, *reply. +// - All two arguments are pointers. +// - The first and second arguments are exported or local. +// - The method has return type error. +// +// All other methods are ignored. +func (rr *rpcRegistry) RegisterService(receiver interface{}, name string) error { + return rr.serviceRegistry.Register(receiver, name) +} + +func (rr *rpcRegistry) RegisterServices(receivers ...interface{}) error { + if nil == receivers || 0 == len(receivers) { + return nil + } + + for _, receiver := range receivers { + if err := rr.serviceRegistry.Register(receiver, ""); nil != err { + return err + } + } + return nil +} + +func (rr *rpcRegistry) RegisterServiceMap(keysAndValues map[string]interface{}) error { + if nil == keysAndValues || 0 == len(keysAndValues) { + return nil + } + + for name, receiver := range keysAndValues { + if err := rr.serviceRegistry.Register(receiver, name); nil != err { + return err + } + } + return nil +} + +func (rr *rpcRegistry) GetService(name string) interface{} { + return rr.serviceRegistry.GetService(name) +} + +// HasMethod returns true if the given method is registered. +// +// The method uses a dotted notation as in "Service.Method". +func (rr *rpcRegistry) HasMethod(method string) bool { + if _, _, err := rr.serviceRegistry.Get(method); err == nil { + return true + } + return false +} + +// Invoke execute a method. +// +// Codecs are defined to process a given serialization scheme, e.g., JSON or +// XML. A codec is chosen based on the "Content-Type" header from the request, +// excluding the charset definition. +func (rr *rpcRegistry) Invoke(codec protocol.RegistryCodec, leadingParams ...interface{}) (result interface{}, err error) { + serviceSpec, methodSpec, errGet := rr.serviceRegistry.Get(codec.Method()) + if nil != errGet { + return nil, errGet + } + // Decode the args. + + var in []reflect.Value + pValues, pInstances := methodSpec.ParamValues() + + lParamLen := 0 + if nil != leadingParams { + lParamLen = len(leadingParams) + } + + if nil != pInstances && 0 < len(pInstances) { + if errRead := codec.ReadParams(pInstances[lParamLen:]); errRead != nil { + return nil, errRead + } + + pCount := len(pInstances) + in = make([]reflect.Value, pCount+1) + + for indexI := lParamLen; indexI < pCount; indexI++ { + if pValues[indexI].Type().Kind() == reflect.Ptr && pValues[indexI].Type().Elem().Kind() != reflect.Struct { + in[indexI+1] = reflect.Indirect(pValues[indexI]) + } else { + in[indexI+1] = pValues[indexI] + } + } + } else { + in = make([]reflect.Value, 1) + } + in[0] = serviceSpec.ReceiverValue() + + for indexI := 0; indexI < lParamLen; indexI++ { + in[indexI+1] = reflect.ValueOf(leadingParams[indexI]) + } + + // Call the service method. + returnValues := methodSpec.Call(in) + + if nil != methodSpec.ReturnType() { + result = returnValues[0].Interface() + if nil != returnValues[1].Interface() { + err = returnValues[1].Interface().(error) + } + } else { + if nil != returnValues[0].Interface() { + err = returnValues[0].Interface().(error) + } + } + + return +} + +// func convertValues(values []interface{}, types []reflect.Type) []reflect.Value { +// c := len(values) +// vs := make([]reflect.Value, c) +// for indexI := 0; indexI < c; indexI++ { +// vs[indexI] = convertValue(&values[indexI], types[indexI]) +// } +// return vs +// } + +// func convertValue(v *interface{}, t reflect.Type) reflect.Value { +// switch t.Kind() { +// case reflect.Bool: +// return reflect.ValueOf(*v).Convert(t) +// case reflect.Float32, reflect.Float64: +// return reflect.ValueOf(*v).Convert(t) +// case reflect.Int, reflect.Int32, reflect.Int64: +// return reflect.ValueOf(*v).Convert(t) +// case reflect.Interface: +// // When we see an interface, we make our own thing +// return reflect.ValueOf(*v).Convert(t) +// case reflect.Map: +// return reflect.ValueOf(*v).M +// return d.decodeMap(name, node, result) +// case reflect.Ptr: +// return d.decodePtr(name, node, result) +// case reflect.Slice: +// return d.decodeSlice(name, node, result) +// case reflect.String: +// return d.decodeString(name, node, result) +// case reflect.Struct: +// return d.decodeStruct(name, node, result) +// default: +// return &parser.PosError{ +// Pos: node.Pos(), +// Err: fmt.Errorf("%s: unknown kind to decode into: %s", name, k.Kind()), +// } +// } +// }