From 46d442f2c28c0e8c979e188bd670696c09ac4c29 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 25 Aug 2017 18:52:05 +0900 Subject: [PATCH] ing --- server.go | 73 ++++++++++++++++++++++++++++++++++++++++++++++--------- socket.go | 7 +++++- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/server.go b/server.go index b0bfee0..42e10b3 100644 --- a/server.go +++ b/server.go @@ -10,17 +10,28 @@ import ( type () +type sendRequest struct { + s Socket + m []byte +} + type Server interface { ListenAndServe(addr string) error HandleSocket(pattern string, o *SocketOptions) + Send(soc Socket, m []byte) + SendAll(m []byte) } type server struct { - _option *ServerOptions - _upgrader *websocket.Upgrader - _handlers map[string]*SocketOptions - _sockets map[string]Socket - _cMtx sync.Mutex + _option *ServerOptions + _upgrader *websocket.Upgrader + _handlers map[string]*SocketOptions + _sockets map[string]Socket + _sIdMtx sync.Mutex + _addSocketCh chan Socket + _removeSocketCh chan Socket + _sendCh chan sendRequest + _sendAllCh chan []byte } func NewServer(o *ServerOptions) Server { @@ -42,6 +53,14 @@ func NewServer(o *ServerOptions) Server { return s } +func (s *server) addSocket(soc Socket) { + s._addSocketCh <- soc +} + +func (s *server) removeSocket(soc Socket) { + s._removeSocketCh <- soc +} + func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) { ctx.Response.Header.Set("Sec-Websocket-Version", "13") ctx.Error(http.StatusText(status), status) @@ -50,8 +69,7 @@ func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) { } func (s *server) onDisconnected(soc Socket) { - delete(s._sockets, soc.ID()) - + s.removeSocket(soc) s._option.OnDisconnected(soc) } @@ -68,18 +86,49 @@ func (s *server) onConnection(ctx *fasthttp.RequestCtx) { s.onError(ctx, fasthttp.StatusInternalServerError, err) return } - s._cMtx.Lock() + s._sIdMtx.Lock() id := s._option.IDGenerator(ctx) + s._sIdMtx.Unlock() soc := NewSocket(id, path, co, conn) - s._sockets[id] = soc - s._cMtx.Unlock() - + s.addSocket(soc) s._option.OnConnection(soc) soc.run() }) } +func (s *server) listenHandler() { + for { + select { + // Add new a socket + case soc := <-s._addSocketCh: + s._sockets[soc.ID()] = soc + + // remove a socket + case soc := <-s._removeSocketCh: + delete(s._sockets, soc.ID()) + case sr := <-s._sendCh: + sr.s.Write(sr.m) + case m := <-s._sendAllCh: + for _, soc := range s._sockets { + soc.Write(m) + } + } + } +} + +func (s *server) Send(soc Socket, m []byte) { + r := sendRequest{ + s: soc, + m: m, + } + s._sendCh <- r +} + +func (s *server) SendAll(m []byte) { + s._sendAllCh <- m +} + func (s *server) HandleSocket(pattern string, o *SocketOptions) { o.onDisconnected = s.onDisconnected @@ -87,5 +136,7 @@ func (s *server) HandleSocket(pattern string, o *SocketOptions) { } func (s *server) ListenAndServe(addr string) error { + go s.listenHandler() + return fasthttp.ListenAndServe(addr, s.onConnection) } diff --git a/socket.go b/socket.go index 5d0443e..c1b4976 100644 --- a/socket.go +++ b/socket.go @@ -13,6 +13,7 @@ type Socket interface { ID() string Conn() *websocket.Conn Path() string + Write(m []byte) run() } @@ -52,6 +53,10 @@ func (soc *socket) Path() string { return soc.path } +func (soc *socket) Write(m []byte) { + soc.writeCh <- m +} + func (soc *socket) run() { hasReadTimeout := soc.o.ReadTimeout > 0 soc.conn.SetReadLimit(soc.o.MaxMessageSize) @@ -91,7 +96,7 @@ func (soc *socket) onMessage(messageType int, r io.Reader) { if nil == result { return } - soc.writeCh <- result + soc.Write(result) } func (soc *socket) listenWrite() {