196 lines
4.4 KiB
Go
196 lines
4.4 KiB
Go
package websocket
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
gWebsocket "github.com/gorilla/websocket"
|
|
)
|
|
|
|
type ClientStatus uint8
|
|
|
|
const (
|
|
CONNECTED ClientStatus = iota + 1
|
|
DISCONNECTED
|
|
)
|
|
|
|
type (
|
|
// OnDisconnectFunc is callback function that used when client is disconnected
|
|
OnDisconnectFunc func(Client)
|
|
// OnErrorFunc is callback function that used when error occurred
|
|
OnErrorFunc func(string)
|
|
// OnMessageFunc is callback function that receives messages from client
|
|
OnMessageFunc func([]byte)
|
|
// OnFunc is callback function that particular event which fires when a message to this event received
|
|
OnFunc interface{}
|
|
)
|
|
|
|
// Client is interface
|
|
type Client interface {
|
|
ID() string
|
|
HTTPRequest() *http.Request
|
|
Conn() Connection
|
|
Disconnect() error
|
|
OnMessage(OnMessageFunc)
|
|
OnError(OnErrorFunc)
|
|
OnDisconnect(OnDisconnectFunc)
|
|
On(string, OnFunc)
|
|
initialize() error
|
|
destroy() error
|
|
}
|
|
|
|
type client struct {
|
|
id string
|
|
status ClientStatus
|
|
messageType int
|
|
server *server
|
|
httpRequest *http.Request
|
|
conn Connection
|
|
pingTicker *time.Ticker
|
|
writeMTX sync.Mutex
|
|
onMessageListeners []OnMessageFunc
|
|
onErrorListeners []OnErrorFunc
|
|
onDisconnectListeners []OnDisconnectFunc
|
|
onListeners map[string][]OnFunc
|
|
}
|
|
|
|
var _ Client = &client{}
|
|
|
|
func newClient(s *server, r *http.Request, conn Connection, clientID string) Client {
|
|
c := &client{
|
|
id: clientID,
|
|
status: CONNECTED,
|
|
messageType: gWebsocket.TextMessage,
|
|
server: s,
|
|
httpRequest: r,
|
|
conn: conn,
|
|
onMessageListeners: make([]OnMessageFunc, 0),
|
|
onErrorListeners: make([]OnErrorFunc, 0),
|
|
onDisconnectListeners: make([]OnDisconnectFunc, 0),
|
|
onListeners: make(map[string][]OnFunc),
|
|
}
|
|
|
|
if s.options.BinaryMessage {
|
|
c.messageType = gWebsocket.BinaryMessage
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *client) ID() string {
|
|
return c.id
|
|
}
|
|
|
|
func (c *client) HTTPRequest() *http.Request {
|
|
return c.httpRequest
|
|
}
|
|
|
|
func (c *client) Conn() Connection {
|
|
return c.conn
|
|
}
|
|
|
|
func (c *client) Disconnect() error {
|
|
return c.server.Disconnect(c.ID())
|
|
}
|
|
|
|
func (c *client) OnDisconnect(cb OnDisconnectFunc) {
|
|
c.onDisconnectListeners = append(c.onDisconnectListeners, cb)
|
|
}
|
|
|
|
func (c *client) OnError(cb OnErrorFunc) {
|
|
c.onErrorListeners = append(c.onErrorListeners, cb)
|
|
}
|
|
|
|
func (c *client) OnMessage(cb OnMessageFunc) {
|
|
c.onMessageListeners = append(c.onMessageListeners, cb)
|
|
}
|
|
|
|
func (c *client) On(event string, cb OnFunc) {
|
|
if c.onListeners[event] == nil {
|
|
c.onListeners[event] = make([]OnFunc, 0)
|
|
}
|
|
|
|
c.onListeners[event] = append(c.onListeners[event], cb)
|
|
}
|
|
|
|
func (c *client) initialize() error {
|
|
c.startPing()
|
|
}
|
|
|
|
func (c *client) destroy() error {
|
|
c.status = DISCONNECTED
|
|
|
|
for i := range c.onDisconnectListeners {
|
|
c.onDisconnectListeners[i](c)
|
|
}
|
|
|
|
return c.conn.Close()
|
|
}
|
|
|
|
func (c *client) startPing() {
|
|
c.pingTicker = time.NewTicker(c.server.options.PingPeriod)
|
|
go func() {
|
|
defer c.pingTicker.Stop()
|
|
for {
|
|
<-c.pingTicker.C
|
|
if err := c.conn.WriteControl(gWebsocket.PingMessage, []byte{}, time.Now().Add(c.server.options.PingTimeout)); err != nil {
|
|
log.Println("ping:", err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (c *client) startReading() {
|
|
hasReadTimeout := c.server.options.ReadTimeout > 0
|
|
c.conn.SetReadLimit(c.server.options.MaxMessageSize)
|
|
c.conn.SetPongHandler(func(message string) error {
|
|
if hasReadTimeout {
|
|
c.conn.SetReadDeadline(time.Now().Add(c.server.options.PongTimeout))
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
defer func() {
|
|
c.Disconnect()
|
|
}()
|
|
|
|
for {
|
|
if hasReadTimeout {
|
|
c.conn.SetReadDeadline(time.Now().Add(c.server.options.ReadTimeout))
|
|
}
|
|
messageType, data, err := c.conn.ReadMessage()
|
|
if err != nil {
|
|
if gWebsocket.IsUnexpectedCloseError(err, gWebsocket.CloseGoingAway) {
|
|
c.EmitError(err.Error())
|
|
}
|
|
break
|
|
} else {
|
|
c.onMessageReceived(messageType, data)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func (c *client) onMessageReceived(messageType int, data []byte) {
|
|
}
|
|
|
|
func (c *client) write(messageType int, data []byte) {
|
|
c.writeMTX.Lock()
|
|
if writeTimeout := c.server.options.WriteTimeout; writeTimeout > 0 {
|
|
// set the write deadline based on the configuration
|
|
err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
log.Println(fmt.Errorf("%v", err))
|
|
}
|
|
|
|
err := c.conn.WriteMessage(messageType, data)
|
|
c.writeMTX.Unlock()
|
|
|
|
if nil != err {
|
|
c.Disconnect()
|
|
}
|
|
}
|