diff --git a/client.go b/client.go deleted file mode 100644 index 355791b..0000000 --- a/client.go +++ /dev/null @@ -1,99 +0,0 @@ -package overflow_gateway_websocket - -import ( - "fmt" - "io" - "log" - "sync" - "time" - - "git.loafle.net/overflow/overflow_gateway_websocket/websocket" -) - -type Client interface { - ID() string - Conn() *websocket.Conn - Path() string - run() -} - -type client struct { - id string - o *ClientOptions - conn *websocket.Conn - path string - messageType int - writeMTX sync.Mutex -} - -func NewClient(id string, path string, o *ClientOptions, conn *websocket.Conn) Client { - c := &client{ - id: id, - o: o, - conn: conn, - path: path, - messageType: websocket.TextMessage, - } - - return c -} - -func (c *client) ID() string { - return c.id -} - -func (c *client) Conn() *websocket.Conn { - return c.conn -} - -func (c *client) Path() string { - return c.path -} - -func (c *client) run() { - hasReadTimeout := c.o.ReadTimeout > 0 - c.conn.SetReadLimit(c.o.MaxMessageSize) - - defer func() { - c.o.onDisconnected(c) - }() - - for { - if hasReadTimeout { - c.conn.SetReadDeadline(time.Now().Add(c.o.ReadTimeout)) - } - - // messageType, data, err := c.conn.ReadMessage() - messageType, r, err := c.conn.NextReader() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - //c.fireError(err) - } - break - } else { - c.onMessage(messageType, r) - } - - } -} - -func (c *client) onMessage(messageType int, r io.Reader) { - result := c.o.Handler.OnMessage(c, messageType, r) - if nil == result { - return - } - - c.writeMTX.Lock() - if writeTimeout := c.o.WriteTimeout; writeTimeout > 0 { - err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) - log.Println(fmt.Errorf("%v", err)) - return - } - - err := c.conn.WriteMessage(c.messageType, result) - - c.writeMTX.Unlock() - - if nil != err { - } -} diff --git a/hadler.go b/hadler.go index edabc7b..b8b4759 100644 --- a/hadler.go +++ b/hadler.go @@ -3,5 +3,5 @@ package overflow_gateway_websocket import "io" type MessageHandler interface { - OnMessage(c Client, messageType int, r io.Reader) []byte + OnMessage(soc Socket, messageType int, r io.Reader) []byte } diff --git a/protocol/jsonrpc/handler.go b/protocol/jsonrpc/handler.go index bc42349..7b75421 100644 --- a/protocol/jsonrpc/handler.go +++ b/protocol/jsonrpc/handler.go @@ -25,7 +25,7 @@ func NewHandler(o *Options) MessageHandler { return h } -func (h *messageHandler) OnMessage(c gws.Client, messageType int, r io.Reader) []byte { +func (h *messageHandler) OnMessage(soc gws.Socket, messageType int, r io.Reader) []byte { var err error req := &ServerRequest{} err = json.NewDecoder(r).Decode(req) @@ -35,17 +35,17 @@ func (h *messageHandler) OnMessage(c gws.Client, messageType int, r io.Reader) [ var result []byte if nil != req.Id { - result = h.onRequest(c, req) + result = h.onRequest(soc, req) } else { - h.onNotify(c, req) + h.onNotify(soc, req) } return result } -func (h *messageHandler) onRequest(c gws.Client, req *ServerRequest) []byte { +func (h *messageHandler) onRequest(soc gws.Socket, req *ServerRequest) []byte { var err error - result, err := h.o.OnRequest(c, req.Method, req.Params) + result, err := h.o.OnRequest(soc, req.Method, req.Params) res := &ServerResponse{ Id: req.Id, @@ -61,8 +61,8 @@ func (h *messageHandler) onRequest(c gws.Client, req *ServerRequest) []byte { return j } -func (h *messageHandler) onNotify(c gws.Client, req *ServerRequest) { - err := h.o.OnNotify(c, req.Method, req.Params) +func (h *messageHandler) onNotify(soc gws.Socket, req *ServerRequest) { + err := h.o.OnNotify(soc, req.Method, req.Params) if nil != err { log.Println(fmt.Errorf("%v", err)) } diff --git a/protocol/jsonrpc/options.go b/protocol/jsonrpc/options.go index c6c0c72..da33262 100644 --- a/protocol/jsonrpc/options.go +++ b/protocol/jsonrpc/options.go @@ -5,8 +5,8 @@ import ( ) type ( - OnRequestFunc func(c gws.Client, method string, params interface{}) (interface{}, error) - OnNotifyFunc func(c gws.Client, method string, params interface{}) error + OnRequestFunc func(soc gws.Socket, method string, params interface{}) (interface{}, error) + OnNotifyFunc func(soc gws.Socket, method string, params interface{}) error ) // ClientOptions is configuration of the websocket server @@ -18,13 +18,13 @@ type Options struct { // Validate validates the configuration func (o *Options) Validate() *Options { if o.OnRequest == nil { - o.OnRequest = func(c gws.Client, method string, params interface{}) (interface{}, error) { + o.OnRequest = func(soc gws.Socket, method string, params interface{}) (interface{}, error) { return nil, nil } } if o.OnNotify == nil { - o.OnNotify = func(c gws.Client, method string, params interface{}) error { + o.OnNotify = func(soc gws.Socket, method string, params interface{}) error { return nil } } diff --git a/server.go b/server.go index 1ae02c2..b0bfee0 100644 --- a/server.go +++ b/server.go @@ -12,22 +12,22 @@ type () type Server interface { ListenAndServe(addr string) error - HandleClient(pattern string, o *ClientOptions) + HandleSocket(pattern string, o *SocketOptions) } type server struct { _option *ServerOptions _upgrader *websocket.Upgrader - _handlers map[string]*ClientOptions - _clients map[string]Client + _handlers map[string]*SocketOptions + _sockets map[string]Socket _cMtx sync.Mutex } func NewServer(o *ServerOptions) Server { s := &server{ _option: o.Validate(), - _handlers: make(map[string]*ClientOptions, 1), - _clients: make(map[string]Client, 100), + _handlers: make(map[string]*SocketOptions, 1), + _sockets: make(map[string]Socket, 100), } s._upgrader = &websocket.Upgrader{ @@ -49,10 +49,10 @@ func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) { s._option.OnError(ctx, status, reason) } -func (s *server) onDisconnected(c Client) { - delete(s._clients, c.ID()) +func (s *server) onDisconnected(soc Socket) { + delete(s._sockets, soc.ID()) - s._option.OnDisconnected(c) + s._option.OnDisconnected(soc) } func (s *server) onConnection(ctx *fasthttp.RequestCtx) { @@ -69,18 +69,18 @@ func (s *server) onConnection(ctx *fasthttp.RequestCtx) { return } s._cMtx.Lock() - cid := s._option.IDGenerator(ctx) - c := NewClient(cid, path, co, conn) - s._clients[cid] = c + id := s._option.IDGenerator(ctx) + soc := NewSocket(id, path, co, conn) + s._sockets[id] = soc s._cMtx.Unlock() - s._option.OnConnection(c) + s._option.OnConnection(soc) - c.run() + soc.run() }) } -func (s *server) HandleClient(pattern string, o *ClientOptions) { +func (s *server) HandleSocket(pattern string, o *SocketOptions) { o.onDisconnected = s.onDisconnected s._handlers[pattern] = o.Validate() diff --git a/server_options.go b/server_options.go index 8b29372..5569540 100644 --- a/server_options.go +++ b/server_options.go @@ -8,8 +8,8 @@ import ( ) type ( - OnConnectionFunc func(c Client) - OnDisconnectedFunc func(c Client) + OnConnectionFunc func(soc Socket) + OnDisconnectedFunc func(soc Socket) ) const ( @@ -53,12 +53,12 @@ func (o *ServerOptions) Validate() *ServerOptions { } if o.OnConnection == nil { - o.OnConnection = func(c Client) { + o.OnConnection = func(soc Socket) { } } if o.OnDisconnected == nil { - o.OnDisconnected = func(c Client) { + o.OnDisconnected = func(soc Socket) { } } diff --git a/socket.go b/socket.go new file mode 100644 index 0000000..b8cdd40 --- /dev/null +++ b/socket.go @@ -0,0 +1,99 @@ +package overflow_gateway_websocket + +import ( + "fmt" + "io" + "log" + "sync" + "time" + + "git.loafle.net/overflow/overflow_gateway_websocket/websocket" +) + +type Socket interface { + ID() string + Conn() *websocket.Conn + Path() string + run() +} + +type socket struct { + id string + o *SocketOptions + conn *websocket.Conn + path string + messageType int + writeMTX sync.Mutex +} + +func NewSocket(id string, path string, o *SocketOptions, conn *websocket.Conn) Socket { + c := &socket{ + id: id, + o: o, + conn: conn, + path: path, + messageType: websocket.TextMessage, + } + + return c +} + +func (soc *socket) ID() string { + return soc.id +} + +func (soc *socket) Conn() *websocket.Conn { + return soc.conn +} + +func (soc *socket) Path() string { + return soc.path +} + +func (soc *socket) run() { + hasReadTimeout := soc.o.ReadTimeout > 0 + soc.conn.SetReadLimit(soc.o.MaxMessageSize) + + defer func() { + soc.o.onDisconnected(soc) + }() + + for { + if hasReadTimeout { + soc.conn.SetReadDeadline(time.Now().Add(soc.o.ReadTimeout)) + } + + // messageType, data, err := c.conn.ReadMessage() + messageType, r, err := soc.conn.NextReader() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + //c.fireError(err) + } + break + } else { + soc.onMessage(messageType, r) + } + + } +} + +func (soc *socket) onMessage(messageType int, r io.Reader) { + result := soc.o.Handler.OnMessage(soc, messageType, r) + if nil == result { + return + } + + soc.writeMTX.Lock() + if writeTimeout := soc.o.WriteTimeout; writeTimeout > 0 { + err := soc.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) + log.Println(fmt.Errorf("%v", err)) + return + } + + err := soc.conn.WriteMessage(soc.messageType, result) + + soc.writeMTX.Unlock() + + if nil != err { + } +} diff --git a/client_options.go b/socket_options.go similarity index 89% rename from client_options.go rename to socket_options.go index 2743223..a2ed657 100644 --- a/client_options.go +++ b/socket_options.go @@ -26,8 +26,8 @@ const ( DefaultMaxMessageSize = 1024 ) -// ClientOptions is configuration of the websocket server -type ClientOptions struct { +// SocketOptions is configuration of the websocket server +type SocketOptions struct { onDisconnected OnDisconnectedFunc Handler MessageHandler @@ -42,7 +42,7 @@ type ClientOptions struct { } // Validate validates the configuration -func (o *ClientOptions) Validate() *ClientOptions { +func (o *SocketOptions) Validate() *SocketOptions { if nil == o.Handler { log.Fatalf("Message Handler must specified.\n") return nil @@ -69,7 +69,7 @@ func (o *ClientOptions) Validate() *ClientOptions { } if o.onDisconnected == nil { - o.onDisconnected = func(c Client) { + o.onDisconnected = func(soc Socket) { } }