From 5a3f20551fe0e3c4c4b0a7b4ec56f1587d2fbee2 Mon Sep 17 00:00:00 2001 From: crusader Date: Tue, 29 Aug 2017 17:51:49 +0900 Subject: [PATCH] ing --- glide.yaml | 3 +++ server.go | 64 ++++++++++++++++++++++++++++++------------------------ 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/glide.yaml b/glide.yaml index 63272a1..88d38f8 100644 --- a/glide.yaml +++ b/glide.yaml @@ -14,3 +14,6 @@ import: - golang - package: github.com/satori/go.uuid version: v1.1.0 +- package: git.loafle.net/commons_go/util + subpackages: + - channel diff --git a/server.go b/server.go index 25c38cc..216aeb5 100644 --- a/server.go +++ b/server.go @@ -4,38 +4,40 @@ import ( "context" "log" "net/http" - "sync" + channelUtil "git.loafle.net/commons_go/util/channel" "git.loafle.net/overflow/overflow_gateway_websocket/websocket" "github.com/valyala/fasthttp" ) type () +type socketsChannelAction struct { + channelUtil.Action + s Socket +} + type Server interface { ListenAndServe(addr string) error HandleSocket(pattern string, o *SocketOptions) } type server struct { - _ctx context.Context - _option *ServerOptions - _upgrader *websocket.Upgrader - _handlers map[string]*SocketOptions - _sockets map[string]Socket - _socketsMtx sync.Mutex - _addSocketCh chan Socket - _removeSocketCh chan Socket + _ctx context.Context + _option *ServerOptions + _upgrader *websocket.Upgrader + _handlers map[string]*SocketOptions + _sockets map[string]Socket + _socketsCh chan socketsChannelAction } func NewServer(ctx context.Context, o *ServerOptions) Server { s := &server{ - _ctx: ctx, - _option: o.Validate(), - _handlers: make(map[string]*SocketOptions, 1), - _sockets: make(map[string]Socket, 100), - _addSocketCh: make(chan Socket), - _removeSocketCh: make(chan Socket), + _ctx: ctx, + _option: o.Validate(), + _handlers: make(map[string]*SocketOptions, 1), + _sockets: make(map[string]Socket, 100), + _socketsCh: make(chan socketsChannelAction), } s._upgrader = &websocket.Upgrader{ @@ -51,11 +53,19 @@ func NewServer(ctx context.Context, o *ServerOptions) Server { } func (s *server) addSocket(soc Socket) { - s._addSocketCh <- soc + ca := socketsChannelAction{ + s: soc, + } + ca.Type = channelUtil.ActionTypeCreate + s._socketsCh <- ca } func (s *server) removeSocket(soc Socket) { - s._removeSocketCh <- soc + ca := socketsChannelAction{ + s: soc, + } + ca.Type = channelUtil.ActionTypeDelete + s._socketsCh <- ca } func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) { @@ -98,17 +108,15 @@ func (s *server) listenHandler() { case <-s._ctx.Done(): log.Println("websocket server: Context Done") return - // Add new a socket - case soc := <-s._addSocketCh: - s._socketsMtx.Lock() - s._sockets[soc.ID()] = soc - s._socketsMtx.Unlock() - - // remove a socket - case soc := <-s._removeSocketCh: - s._socketsMtx.Lock() - delete(s._sockets, soc.ID()) - s._socketsMtx.Unlock() + case ca := <-s._socketsCh: + switch ca.Type { + case channelUtil.ActionTypeCreate: + s._sockets[ca.s.ID()] = ca.s + break + case channelUtil.ActionTypeDelete: + delete(s._sockets, ca.s.ID()) + break + } } } }