This commit is contained in:
crusader 2017-08-25 18:52:05 +09:00
parent 51cb76d861
commit 46d442f2c2
2 changed files with 68 additions and 12 deletions

View File

@ -10,17 +10,28 @@ import (
type ()
type sendRequest struct {
s Socket
m []byte
}
type Server interface {
ListenAndServe(addr string) error
HandleSocket(pattern string, o *SocketOptions)
Send(soc Socket, m []byte)
SendAll(m []byte)
}
type server struct {
_option *ServerOptions
_upgrader *websocket.Upgrader
_handlers map[string]*SocketOptions
_sockets map[string]Socket
_cMtx sync.Mutex
_option *ServerOptions
_upgrader *websocket.Upgrader
_handlers map[string]*SocketOptions
_sockets map[string]Socket
_sIdMtx sync.Mutex
_addSocketCh chan Socket
_removeSocketCh chan Socket
_sendCh chan sendRequest
_sendAllCh chan []byte
}
func NewServer(o *ServerOptions) Server {
@ -42,6 +53,14 @@ func NewServer(o *ServerOptions) Server {
return s
}
func (s *server) addSocket(soc Socket) {
s._addSocketCh <- soc
}
func (s *server) removeSocket(soc Socket) {
s._removeSocketCh <- soc
}
func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) {
ctx.Response.Header.Set("Sec-Websocket-Version", "13")
ctx.Error(http.StatusText(status), status)
@ -50,8 +69,7 @@ func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) {
}
func (s *server) onDisconnected(soc Socket) {
delete(s._sockets, soc.ID())
s.removeSocket(soc)
s._option.OnDisconnected(soc)
}
@ -68,18 +86,49 @@ func (s *server) onConnection(ctx *fasthttp.RequestCtx) {
s.onError(ctx, fasthttp.StatusInternalServerError, err)
return
}
s._cMtx.Lock()
s._sIdMtx.Lock()
id := s._option.IDGenerator(ctx)
s._sIdMtx.Unlock()
soc := NewSocket(id, path, co, conn)
s._sockets[id] = soc
s._cMtx.Unlock()
s.addSocket(soc)
s._option.OnConnection(soc)
soc.run()
})
}
func (s *server) listenHandler() {
for {
select {
// Add new a socket
case soc := <-s._addSocketCh:
s._sockets[soc.ID()] = soc
// remove a socket
case soc := <-s._removeSocketCh:
delete(s._sockets, soc.ID())
case sr := <-s._sendCh:
sr.s.Write(sr.m)
case m := <-s._sendAllCh:
for _, soc := range s._sockets {
soc.Write(m)
}
}
}
}
func (s *server) Send(soc Socket, m []byte) {
r := sendRequest{
s: soc,
m: m,
}
s._sendCh <- r
}
func (s *server) SendAll(m []byte) {
s._sendAllCh <- m
}
func (s *server) HandleSocket(pattern string, o *SocketOptions) {
o.onDisconnected = s.onDisconnected
@ -87,5 +136,7 @@ func (s *server) HandleSocket(pattern string, o *SocketOptions) {
}
func (s *server) ListenAndServe(addr string) error {
go s.listenHandler()
return fasthttp.ListenAndServe(addr, s.onConnection)
}

View File

@ -13,6 +13,7 @@ type Socket interface {
ID() string
Conn() *websocket.Conn
Path() string
Write(m []byte)
run()
}
@ -52,6 +53,10 @@ func (soc *socket) Path() string {
return soc.path
}
func (soc *socket) Write(m []byte) {
soc.writeCh <- m
}
func (soc *socket) run() {
hasReadTimeout := soc.o.ReadTimeout > 0
soc.conn.SetReadLimit(soc.o.MaxMessageSize)
@ -91,7 +96,7 @@ func (soc *socket) onMessage(messageType int, r io.Reader) {
if nil == result {
return
}
soc.writeCh <- result
soc.Write(result)
}
func (soc *socket) listenWrite() {