From 8299f95f4cdd0e959b015a5b54455560dc0eab38 Mon Sep 17 00:00:00 2001 From: crusader Date: Thu, 31 Aug 2017 19:08:14 +0900 Subject: [PATCH] ing --- protocol/jsonrpc/handler.go | 72 ---------------------- protocol/jsonrpc/jsonrpc.go | 77 +++++++++++++++++++++++ protocol/jsonrpc/jsonrpc_handler.go | 11 ++++ protocol/jsonrpc/jsonrpc_handlers.go | 22 +++++++ protocol/jsonrpc/options.go | 33 ---------- hadler.go => protocol_handler.go | 2 +- server.go | 18 ++++-- server_handlers.go | 11 ++++ server_options.go | 82 ------------------------- server_options2.go | 56 ----------------- socket.go | 18 +++--- socket_handler.go | 17 +++++ socket_options.go => socket_handlers.go | 47 +++++++++----- 13 files changed, 190 insertions(+), 276 deletions(-) delete mode 100644 protocol/jsonrpc/handler.go create mode 100644 protocol/jsonrpc/jsonrpc.go create mode 100644 protocol/jsonrpc/jsonrpc_handler.go create mode 100644 protocol/jsonrpc/jsonrpc_handlers.go delete mode 100644 protocol/jsonrpc/options.go rename hadler.go => protocol_handler.go (77%) delete mode 100644 server_options.go delete mode 100644 server_options2.go create mode 100644 socket_handler.go rename socket_options.go => socket_handlers.go (62%) diff --git a/protocol/jsonrpc/handler.go b/protocol/jsonrpc/handler.go deleted file mode 100644 index 4f17cc3..0000000 --- a/protocol/jsonrpc/handler.go +++ /dev/null @@ -1,72 +0,0 @@ -package jsonrpc - -import ( - "context" - "encoding/json" - "io" - "log" - - gws "git.loafle.net/overflow/overflow_gateway_websocket" -) - -type MessageHandler interface { - gws.MessageHandler -} - -type messageHandler struct { - ctx context.Context - o *Options -} - -func NewHandler(ctx context.Context, o *Options) MessageHandler { - h := &messageHandler{ - ctx: ctx, - o: o, - } - - return h -} - -func (h *messageHandler) OnMessage(soc gws.Socket, messageType int, r io.Reader) []byte { - var err error - req := &ServerRequest{} - err = json.NewDecoder(r).Decode(req) - if err != nil { - } - - var result []byte - - if nil != req.Id { - result = h.onRequest(soc, req) - } else { - h.onNotify(soc, req) - } - - return result -} - -func (h *messageHandler) onRequest(soc gws.Socket, req *ServerRequest) []byte { - var err error - result, err := h.o.OnRequest(soc, req.Method, req.Params) - - res := &ServerResponse{ - Protocol: req.Protocol, - Result: result, - Error: err, - Id: req.Id, - } - - j, err := json.Marshal(res) - if nil != err { - log.Printf("JSON RPC error: %v", err) - } - - return j -} - -func (h *messageHandler) onNotify(soc gws.Socket, req *ServerRequest) { - err := h.o.OnNotify(soc, req.Method, req.Params) - if nil != err { - log.Printf("JSON RPC error: %v", err) - } -} diff --git a/protocol/jsonrpc/jsonrpc.go b/protocol/jsonrpc/jsonrpc.go new file mode 100644 index 0000000..70d35af --- /dev/null +++ b/protocol/jsonrpc/jsonrpc.go @@ -0,0 +1,77 @@ +package jsonrpc + +import ( + "context" + "encoding/json" + "fmt" + "io" + + "go.uber.org/zap" + + "git.loafle.net/commons_go/logging" + ogw "git.loafle.net/overflow/overflow_gateway_websocket" +) + +type JSONRpc interface { + ogw.ProtocolHandler +} + +type jsonRpc struct { + ctx context.Context + logger *zap.Logger + jh JSONRpcHandler +} + +func New(ctx context.Context, jh JSONRpcHandler) JSONRpc { + jr := &jsonRpc{ + ctx: ctx, + logger: logging.WithContext(ctx), + jh: jh, + } + + return jr +} + +func (jr *jsonRpc) OnMessage(soc ogw.Socket, messageType int, r io.Reader) []byte { + var err error + req := &ServerRequest{} + err = json.NewDecoder(r).Decode(req) + if err != nil { + } + + var result []byte + + if nil != req.Id { + result = jr.onRequest(soc, req) + } else { + jr.onNotify(soc, req) + } + + return result +} + +func (jr *jsonRpc) onRequest(soc ogw.Socket, req *ServerRequest) []byte { + var err error + result, err := jr.jh.OnRequest(soc, req.Method, req.Params) + + res := &ServerResponse{ + Protocol: req.Protocol, + Result: result, + Error: err, + Id: req.Id, + } + + j, err := json.Marshal(res) + if nil != err { + jr.logger.Error(fmt.Sprintf("JSON RPC error: %v", err)) + } + + return j +} + +func (jr *jsonRpc) onNotify(soc ogw.Socket, req *ServerRequest) { + err := jr.jh.OnNotify(soc, req.Method, req.Params) + if nil != err { + jr.logger.Error(fmt.Sprintf("JSON RPC error: %v", err)) + } +} diff --git a/protocol/jsonrpc/jsonrpc_handler.go b/protocol/jsonrpc/jsonrpc_handler.go new file mode 100644 index 0000000..660d495 --- /dev/null +++ b/protocol/jsonrpc/jsonrpc_handler.go @@ -0,0 +1,11 @@ +package jsonrpc + +import ( + ogw "git.loafle.net/overflow/overflow_gateway_websocket" +) + +// ClientOptions is configuration of the websocket server +type JSONRpcHandler interface { + OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) + OnNotify(soc ogw.Socket, method string, params []string) error +} diff --git a/protocol/jsonrpc/jsonrpc_handlers.go b/protocol/jsonrpc/jsonrpc_handlers.go new file mode 100644 index 0000000..0de2deb --- /dev/null +++ b/protocol/jsonrpc/jsonrpc_handlers.go @@ -0,0 +1,22 @@ +package jsonrpc + +import ( + ogw "git.loafle.net/overflow/overflow_gateway_websocket" +) + +// SocketOptions is configuration of the websocket server +type JSONRpcHandlers struct { +} + +func (jh *JSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) { + return nil, nil +} + +func (jh *JSONRpcHandlers) OnNotify(soc ogw.Socket, method string, params []string) error { + return nil +} + +// Validate validates the configuration +func (js *JSONRpcHandlers) Validate() { + +} diff --git a/protocol/jsonrpc/options.go b/protocol/jsonrpc/options.go deleted file mode 100644 index e03aa23..0000000 --- a/protocol/jsonrpc/options.go +++ /dev/null @@ -1,33 +0,0 @@ -package jsonrpc - -import ( - gws "git.loafle.net/overflow/overflow_gateway_websocket" -) - -type ( - OnRequestFunc func(soc gws.Socket, method string, params []string) (interface{}, error) - OnNotifyFunc func(soc gws.Socket, method string, params []string) error -) - -// ClientOptions is configuration of the websocket server -type Options struct { - OnRequest OnRequestFunc - OnNotify OnNotifyFunc -} - -// Validate validates the configuration -func (o *Options) Validate() *Options { - if o.OnRequest == nil { - o.OnRequest = func(soc gws.Socket, method string, params []string) (interface{}, error) { - return nil, nil - } - } - - if o.OnNotify == nil { - o.OnNotify = func(soc gws.Socket, method string, params []string) error { - return nil - } - } - - return o -} diff --git a/hadler.go b/protocol_handler.go similarity index 77% rename from hadler.go rename to protocol_handler.go index b8b4759..fe4ac63 100644 --- a/hadler.go +++ b/protocol_handler.go @@ -2,6 +2,6 @@ package overflow_gateway_websocket import "io" -type MessageHandler interface { +type ProtocolHandler interface { OnMessage(soc Socket, messageType int, r io.Reader) []byte } diff --git a/server.go b/server.go index 0326b7e..73f18ec 100644 --- a/server.go +++ b/server.go @@ -4,6 +4,9 @@ import ( "context" "net/http" + "go.uber.org/zap" + + "git.loafle.net/commons_go/logging" channelUtil "git.loafle.net/commons_go/util/channel" "git.loafle.net/overflow/overflow_gateway_websocket/websocket" "github.com/valyala/fasthttp" @@ -18,14 +21,15 @@ type socketsChannelAction struct { type Server interface { ListenAndServe(addr string) error - HandleSocket(pattern string, o *SocketOptions) + HandleSocket(pattern string, o SocketHandler) } type server struct { _ctx context.Context + _logger *zap.Logger _sh ServerHandler _upgrader *websocket.Upgrader - _handlers map[string]*SocketOptions + _handlers map[string]SocketHandler _sockets map[string]Socket _socketsCh chan socketsChannelAction } @@ -35,8 +39,9 @@ func NewServer(ctx context.Context, sh ServerHandler) Server { s := &server{ _ctx: ctx, + _logger: logging.WithContext(ctx), _sh: sh, - _handlers: make(map[string]*SocketOptions, 1), + _handlers: make(map[string]SocketHandler, 1), _sockets: make(map[string]Socket, 100), _socketsCh: make(chan socketsChannelAction), } @@ -125,10 +130,11 @@ func (s *server) Sockets() map[string]Socket { return s._sockets } -func (s *server) HandleSocket(pattern string, o *SocketOptions) { - o.onDisconnected = s.onDisconnected +func (s *server) HandleSocket(pattern string, soch SocketHandler) { + soch.(*SocketHandlers).onDisconnected = s.onDisconnected - s._handlers[pattern] = o.Validate() + soch.Validate() + s._handlers[pattern] = soch } func (s *server) ListenAndServe(addr string) error { diff --git a/server_handlers.go b/server_handlers.go index 25810e8..ece1231 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -7,6 +7,17 @@ import ( "github.com/valyala/fasthttp" ) +const ( + // DefaultHandshakeTimeout is default value of websocket handshake Timeout + DefaultHandshakeTimeout = 0 + // DefaultReadBufferSize is default value of Read Buffer Size + DefaultReadBufferSize = 4096 + // DefaultWriteBufferSize is default value of Write Buffer Size + DefaultWriteBufferSize = 4096 + // DefaultEnableCompression is default value of support compression + DefaultEnableCompression = false +) + // ServerHandlers is implementation of the Server handler interface type ServerHandlers struct { HandshakeTimeout time.Duration diff --git a/server_options.go b/server_options.go deleted file mode 100644 index 0422caf..0000000 --- a/server_options.go +++ /dev/null @@ -1,82 +0,0 @@ -package overflow_gateway_websocket - -import ( - "time" - - uuid "github.com/satori/go.uuid" - "github.com/valyala/fasthttp" -) - -type ( - OnConnectionFunc func(soc Socket) - OnDisconnectedFunc func(soc Socket) - OnCheckOriginFunc func(ctx *fasthttp.RequestCtx) bool -) - -const ( - // DefaultHandshakeTimeout is default value of websocket handshake Timeout - DefaultHandshakeTimeout = 0 - // DefaultReadBufferSize is default value of Read Buffer Size - DefaultReadBufferSize = 4096 - // DefaultWriteBufferSize is default value of Write Buffer Size - DefaultWriteBufferSize = 4096 - // DefaultEnableCompression is default value of support compression - DefaultEnableCompression = false -) - -var ( - // DefaultIDGenerator returns the UUID of the client - DefaultIDGenerator = func(ctx *fasthttp.RequestCtx) string { return uuid.NewV4().String() } -) - -// ServerOptions is configuration of the websocket server -type ServerOptions struct { - OnConnection OnConnectionFunc - OnDisconnected OnDisconnectedFunc - OnCheckOrigin OnCheckOriginFunc - OnError func(ctx *fasthttp.RequestCtx, status int, reason error) - IDGenerator func(ctx *fasthttp.RequestCtx) string - - HandshakeTimeout time.Duration - ReadBufferSize int - WriteBufferSize int - EnableCompression bool -} - -// Validate validates the configuration -func (o *ServerOptions) Validate() *ServerOptions { - if o.ReadBufferSize <= 0 { - o.ReadBufferSize = DefaultReadBufferSize - } - - if o.WriteBufferSize <= 0 { - o.WriteBufferSize = DefaultWriteBufferSize - } - - if o.OnConnection == nil { - o.OnConnection = func(soc Socket) { - } - } - - if o.OnDisconnected == nil { - o.OnDisconnected = func(soc Socket) { - } - } - - if o.OnError == nil { - o.OnError = func(ctx *fasthttp.RequestCtx, status int, reason error) { - } - } - - if o.OnCheckOrigin == nil { - o.OnCheckOrigin = func(ctx *fasthttp.RequestCtx) bool { - return true - } - } - - if o.IDGenerator == nil { - o.IDGenerator = DefaultIDGenerator - } - - return o -} diff --git a/server_options2.go b/server_options2.go deleted file mode 100644 index 08becba..0000000 --- a/server_options2.go +++ /dev/null @@ -1,56 +0,0 @@ -package overflow_gateway_websocket - -import ( - "time" - - uuid "github.com/satori/go.uuid" - "github.com/valyala/fasthttp" -) - -// ServerOptions is configuration of the websocket server -type ServerOptions2 struct { - HandshakeTimeout time.Duration - ReadBufferSize int - WriteBufferSize int - EnableCompression bool -} - -func (o *ServerOptions2) GetHandshakeTimeout() time.Duration { - return o.HandshakeTimeout -} -func (o *ServerOptions2) GetReadBufferSize() int { - return o.ReadBufferSize -} -func (o *ServerOptions2) GetWriteBufferSize() int { - return o.WriteBufferSize -} -func (o *ServerOptions2) GetEnableCompression() bool { - return o.EnableCompression -} - -func (o *ServerOptions2) OnConnection(soc Socket) { - -} -func (o *ServerOptions2) OnDisconnected(soc Socket) { - -} -func (o *ServerOptions2) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool { - return true -} -func (o *ServerOptions2) OnError(ctx *fasthttp.RequestCtx, status int, reason error) { - -} -func (o *ServerOptions2) OnIDGenerate(ctx *fasthttp.RequestCtx) string { - return uuid.NewV4().String() -} - -// Validate validates the configuration -func (o *ServerOptions2) Validate() { - if o.ReadBufferSize <= 0 { - o.ReadBufferSize = DefaultReadBufferSize - } - - if o.WriteBufferSize <= 0 { - o.WriteBufferSize = DefaultWriteBufferSize - } -} diff --git a/socket.go b/socket.go index 20f73c1..26f9db9 100644 --- a/socket.go +++ b/socket.go @@ -20,7 +20,7 @@ type Socket interface { type socket struct { ctx context.Context id string - o *SocketOptions + sh SocketHandler conn *websocket.Conn path string messageType int @@ -28,11 +28,11 @@ type socket struct { disconnectCh chan bool } -func NewSocket(ctx context.Context, id string, path string, o *SocketOptions, conn *websocket.Conn) Socket { +func NewSocket(ctx context.Context, id string, path string, sh SocketHandler, conn *websocket.Conn) Socket { c := &socket{ ctx: ctx, id: id, - o: o, + sh: sh, conn: conn, path: path, writeCh: make(chan []byte), @@ -60,8 +60,8 @@ func (soc *socket) Write(m []byte) { } func (soc *socket) run() { - hasReadTimeout := soc.o.ReadTimeout > 0 - soc.conn.SetReadLimit(soc.o.MaxMessageSize) + hasReadTimeout := soc.sh.GetReadTimeout() > 0 + soc.conn.SetReadLimit(soc.sh.GetMaxMessageSize()) defer func() { soc.onDisconnected() @@ -71,7 +71,7 @@ func (soc *socket) run() { for { if hasReadTimeout { - soc.conn.SetReadDeadline(time.Now().Add(soc.o.ReadTimeout)) + soc.conn.SetReadDeadline(time.Now().Add(soc.sh.GetReadTimeout())) } // messageType, data, err := c.conn.ReadMessage() @@ -90,11 +90,11 @@ func (soc *socket) run() { func (soc *socket) onDisconnected() { soc.disconnectCh <- true - soc.o.onDisconnected(soc) + soc.sh.(*SocketHandlers).onDisconnected(soc) } func (soc *socket) onMessage(messageType int, r io.Reader) { - result := soc.o.Handler.OnMessage(soc, messageType, r) + result := soc.sh.GetProtocolHandler().OnMessage(soc, messageType, r) if nil == result { return } @@ -106,7 +106,7 @@ func (soc *socket) listenWrite() { select { // send message to the client case w := <-soc.writeCh: - if writeTimeout := soc.o.WriteTimeout; writeTimeout > 0 { + if writeTimeout := soc.sh.GetWriteTimeout(); writeTimeout > 0 { err := soc.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) log.Printf("Socket timeout: %v", err) } diff --git a/socket_handler.go b/socket_handler.go new file mode 100644 index 0000000..1761974 --- /dev/null +++ b/socket_handler.go @@ -0,0 +1,17 @@ +package overflow_gateway_websocket + +import "time" + +// SocketHandler is configuration of the websocket server +type SocketHandler interface { + GetMaxMessageSize() int64 + GetWriteTimeout() time.Duration + GetReadTimeout() time.Duration + GetPongTimeout() time.Duration + GetPingTimeout() time.Duration + GetPingPeriod() time.Duration + IsBinaryMessage() bool + GetProtocolHandler() ProtocolHandler + + Validate() +} diff --git a/socket_options.go b/socket_handlers.go similarity index 62% rename from socket_options.go rename to socket_handlers.go index a2ed657..86ee9fe 100644 --- a/socket_options.go +++ b/socket_handlers.go @@ -3,12 +3,6 @@ package overflow_gateway_websocket import ( "log" "time" - - "github.com/valyala/fasthttp" -) - -type ( - OnErrorFunc func(ctx *fasthttp.RequestCtx, status int, reason error) ) const ( @@ -27,10 +21,10 @@ const ( ) // SocketOptions is configuration of the websocket server -type SocketOptions struct { - onDisconnected OnDisconnectedFunc +type SocketHandlers struct { + onDisconnected func(soc Socket) - Handler MessageHandler + Handler ProtocolHandler MaxMessageSize int64 WriteTimeout time.Duration @@ -41,11 +35,36 @@ type SocketOptions struct { BinaryMessage bool } +func (sh *SocketHandlers) GetMaxMessageSize() int64 { + return sh.MaxMessageSize +} +func (sh *SocketHandlers) GetWriteTimeout() time.Duration { + return sh.WriteTimeout +} +func (sh *SocketHandlers) GetReadTimeout() time.Duration { + return sh.ReadTimeout +} +func (sh *SocketHandlers) GetPongTimeout() time.Duration { + return sh.PongTimeout +} +func (sh *SocketHandlers) GetPingTimeout() time.Duration { + return sh.PingTimeout +} +func (sh *SocketHandlers) GetPingPeriod() time.Duration { + return sh.PingPeriod +} +func (sh *SocketHandlers) IsBinaryMessage() bool { + return sh.BinaryMessage +} + +func (sh *SocketHandlers) GetProtocolHandler() ProtocolHandler { + return sh.Handler +} + // Validate validates the configuration -func (o *SocketOptions) Validate() *SocketOptions { +func (o *SocketHandlers) Validate() { if nil == o.Handler { log.Fatalf("Message Handler must specified.\n") - return nil } if o.WriteTimeout < 0 { @@ -68,10 +87,4 @@ func (o *SocketOptions) Validate() *SocketOptions { o.MaxMessageSize = DefaultMaxMessageSize } - if o.onDisconnected == nil { - o.onDisconnected = func(soc Socket) { - } - } - - return o }