From 4a99cee3bcdc9cb6b026b427ba400bbde62046d8 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 25 Aug 2017 14:40:04 +0900 Subject: [PATCH] ing --- client.go | 47 ++------------- client_options.go | 20 ++++--- hadler.go | 7 +++ protocol/jsonrpc/handler.go | 69 +++++++++++++++++++++++ protocol/jsonrpc/options.go | 33 +++++++++++ protocol/{ => jsonrpc}/server_error.go | 2 +- protocol/{ => jsonrpc}/server_request.go | 2 +- protocol/{ => jsonrpc}/server_response.go | 2 +- server.go | 7 ++- 9 files changed, 135 insertions(+), 54 deletions(-) create mode 100644 hadler.go create mode 100644 protocol/jsonrpc/handler.go create mode 100644 protocol/jsonrpc/options.go rename protocol/{ => jsonrpc}/server_error.go (96%) rename protocol/{ => jsonrpc}/server_request.go (93%) rename protocol/{ => jsonrpc}/server_response.go (91%) diff --git a/client.go b/client.go index cef60f6..81728b5 100644 --- a/client.go +++ b/client.go @@ -1,15 +1,12 @@ package overflow_gateway_websocket import ( - "encoding/json" "fmt" "io" "log" "sync" "time" - "git.loafle.net/overflow/overflow_gateway_websocket/protocol" - "git.loafle.net/overflow/overflow_gateway_websocket/websocket" ) @@ -58,7 +55,7 @@ func (c *client) run() { hasReadTimeout := c.o.ReadTimeout > 0 c.conn.SetReadLimit(c.o.MaxMessageSize) defer func() { - c.o.OnDisconnected(c) + c.o.onDisconnected(c) }() for { @@ -81,27 +78,9 @@ func (c *client) run() { } func (c *client) onMessage(messageType int, r io.Reader) { - var err error - req := &protocol.ServerRequest{} - err = json.NewDecoder(r).Decode(req) - if err != nil { - } - - if nil != req.Id { - c.onRequest(req) - } else { - c.onNotify(req) - } -} - -func (c *client) onRequest(req *protocol.ServerRequest) { - var err error - result, err := c.o.OnRequest(c, req.Method, req.Params) - - res := &protocol.ServerResponse{ - Id: req.Id, - Result: result, - Error: err, + result := c.o.Handler.OnMessage(c, messageType, r) + if nil == result { + return } c.writeMTX.Lock() @@ -111,26 +90,10 @@ func (c *client) onRequest(req *protocol.ServerRequest) { return } - j, err := json.Marshal(res) - if nil != err { - log.Println(fmt.Errorf("%v", err)) - } - - err = c.conn.WriteMessage(c.messageType, j) + err := c.conn.WriteMessage(c.messageType, result) c.writeMTX.Unlock() if nil != err { } } - -func (c *client) onNotify(req *protocol.ServerRequest) { - err := c.o.OnNotify(c, req.Method, req.Params) - if nil != err { - log.Println(fmt.Errorf("%v", err)) - } -} - -func (c *client) sendError() { - -} diff --git a/client_options.go b/client_options.go index f4988a4..01b2d19 100644 --- a/client_options.go +++ b/client_options.go @@ -1,15 +1,14 @@ package overflow_gateway_websocket import ( + "log" "time" "github.com/valyala/fasthttp" ) type ( - OnRequestFunc func(c Client, method string, params interface{}) (interface{}, error) - OnNotifyFunc func(c Client, method string, params interface{}) error - OnErrorFunc func(ctx *fasthttp.RequestCtx, status int, reason error) + OnErrorFunc func(ctx *fasthttp.RequestCtx, status int, reason error) ) const ( @@ -29,9 +28,9 @@ const ( // ClientOptions is configuration of the websocket server type ClientOptions struct { - OnRequest OnRequestFunc - OnNotify OnNotifyFunc - OnDisconnected OnDisconnectedFunc + onDisconnected OnDisconnectedFunc + + Handler MessageHandler MaxMessageSize int64 WriteTimeout time.Duration @@ -44,6 +43,11 @@ type ClientOptions struct { // Validate validates the configuration func (o *ClientOptions) Validate() *ClientOptions { + if nil == o.Handler { + log.Fatalf("Message Handler must specified.\n") + return nil + } + if o.WriteTimeout < 0 { o.WriteTimeout = DefaultWriteTimeout } @@ -76,8 +80,8 @@ func (o *ClientOptions) Validate() *ClientOptions { } } - if o.OnDisconnected == nil { - o.OnDisconnected = func(c Client) { + if o.onDisconnected == nil { + o.onDisconnected = func(c Client) { } } diff --git a/hadler.go b/hadler.go new file mode 100644 index 0000000..edabc7b --- /dev/null +++ b/hadler.go @@ -0,0 +1,7 @@ +package overflow_gateway_websocket + +import "io" + +type MessageHandler interface { + OnMessage(c Client, messageType int, r io.Reader) []byte +} diff --git a/protocol/jsonrpc/handler.go b/protocol/jsonrpc/handler.go new file mode 100644 index 0000000..bc42349 --- /dev/null +++ b/protocol/jsonrpc/handler.go @@ -0,0 +1,69 @@ +package jsonrpc + +import ( + "encoding/json" + "fmt" + "io" + "log" + + gws "git.loafle.net/overflow/overflow_gateway_websocket" +) + +type MessageHandler interface { + gws.MessageHandler +} + +type messageHandler struct { + o *Options +} + +func NewHandler(o *Options) MessageHandler { + h := &messageHandler{ + o: o, + } + + return h +} + +func (h *messageHandler) OnMessage(c gws.Client, messageType int, r io.Reader) []byte { + var err error + req := &ServerRequest{} + err = json.NewDecoder(r).Decode(req) + if err != nil { + } + + var result []byte + + if nil != req.Id { + result = h.onRequest(c, req) + } else { + h.onNotify(c, req) + } + + return result +} + +func (h *messageHandler) onRequest(c gws.Client, req *ServerRequest) []byte { + var err error + result, err := h.o.OnRequest(c, req.Method, req.Params) + + res := &ServerResponse{ + Id: req.Id, + Result: result, + Error: err, + } + + j, err := json.Marshal(res) + if nil != err { + log.Println(fmt.Errorf("%v", err)) + } + + return j +} + +func (h *messageHandler) onNotify(c gws.Client, req *ServerRequest) { + err := h.o.OnNotify(c, req.Method, req.Params) + if nil != err { + log.Println(fmt.Errorf("%v", err)) + } +} diff --git a/protocol/jsonrpc/options.go b/protocol/jsonrpc/options.go new file mode 100644 index 0000000..c6c0c72 --- /dev/null +++ b/protocol/jsonrpc/options.go @@ -0,0 +1,33 @@ +package jsonrpc + +import ( + gws "git.loafle.net/overflow/overflow_gateway_websocket" +) + +type ( + OnRequestFunc func(c gws.Client, method string, params interface{}) (interface{}, error) + OnNotifyFunc func(c gws.Client, method string, params interface{}) error +) + +// ClientOptions is configuration of the websocket server +type Options struct { + OnRequest OnRequestFunc + OnNotify OnNotifyFunc +} + +// Validate validates the configuration +func (o *Options) Validate() *Options { + if o.OnRequest == nil { + o.OnRequest = func(c gws.Client, method string, params interface{}) (interface{}, error) { + return nil, nil + } + } + + if o.OnNotify == nil { + o.OnNotify = func(c gws.Client, method string, params interface{}) error { + return nil + } + } + + return o +} diff --git a/protocol/server_error.go b/protocol/jsonrpc/server_error.go similarity index 96% rename from protocol/server_error.go rename to protocol/jsonrpc/server_error.go index eeb17db..6a2a6ab 100644 --- a/protocol/server_error.go +++ b/protocol/jsonrpc/server_error.go @@ -1,4 +1,4 @@ -package protocol +package jsonrpc type ServerErrorCode int diff --git a/protocol/server_request.go b/protocol/jsonrpc/server_request.go similarity index 93% rename from protocol/server_request.go rename to protocol/jsonrpc/server_request.go index f69a2ee..6878f3c 100644 --- a/protocol/server_request.go +++ b/protocol/jsonrpc/server_request.go @@ -1,4 +1,4 @@ -package protocol +package jsonrpc import ( "encoding/json" diff --git a/protocol/server_response.go b/protocol/jsonrpc/server_response.go similarity index 91% rename from protocol/server_response.go rename to protocol/jsonrpc/server_response.go index 73d9dfb..d02d783 100644 --- a/protocol/server_response.go +++ b/protocol/jsonrpc/server_response.go @@ -1,4 +1,4 @@ -package protocol +package jsonrpc import ( "encoding/json" diff --git a/server.go b/server.go index 2e26ae8..62aa4a0 100644 --- a/server.go +++ b/server.go @@ -36,7 +36,7 @@ func NewServer(o *ServerOptions) Server { ReadBufferSize: s._option.ReadBufferSize, WriteBufferSize: s._option.WriteBufferSize, CheckOrigin: s._option.OnCheckOrigin, - Error: s._option.OnError, + Error: s.onError, EnableCompression: s._option.EnableCompression, } @@ -47,6 +47,10 @@ func (s *server) onPush(cb OnPushFunc) { } +func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) { + +} + func (s *server) onDisconnected(c Client) { delete(s._clients, c.ID()) @@ -80,6 +84,7 @@ func (s *server) onConnection(ctx *fasthttp.RequestCtx) { } func (s *server) HandleClient(pattern string, o *ClientOptions) { + o.onDisconnected = s.onDisconnected s._handlers[pattern] = o.Validate() }