This commit is contained in:
crusader 2017-08-29 17:51:49 +09:00
parent 57b5c8e656
commit 5a3f20551f
2 changed files with 39 additions and 28 deletions

View File

@ -14,3 +14,6 @@ import:
- golang
- package: github.com/satori/go.uuid
version: v1.1.0
- package: git.loafle.net/commons_go/util
subpackages:
- channel

View File

@ -4,38 +4,40 @@ import (
"context"
"log"
"net/http"
"sync"
channelUtil "git.loafle.net/commons_go/util/channel"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
"github.com/valyala/fasthttp"
)
type ()
type socketsChannelAction struct {
channelUtil.Action
s Socket
}
type Server interface {
ListenAndServe(addr string) error
HandleSocket(pattern string, o *SocketOptions)
}
type server struct {
_ctx context.Context
_option *ServerOptions
_upgrader *websocket.Upgrader
_handlers map[string]*SocketOptions
_sockets map[string]Socket
_socketsMtx sync.Mutex
_addSocketCh chan Socket
_removeSocketCh chan Socket
_ctx context.Context
_option *ServerOptions
_upgrader *websocket.Upgrader
_handlers map[string]*SocketOptions
_sockets map[string]Socket
_socketsCh chan socketsChannelAction
}
func NewServer(ctx context.Context, o *ServerOptions) Server {
s := &server{
_ctx: ctx,
_option: o.Validate(),
_handlers: make(map[string]*SocketOptions, 1),
_sockets: make(map[string]Socket, 100),
_addSocketCh: make(chan Socket),
_removeSocketCh: make(chan Socket),
_ctx: ctx,
_option: o.Validate(),
_handlers: make(map[string]*SocketOptions, 1),
_sockets: make(map[string]Socket, 100),
_socketsCh: make(chan socketsChannelAction),
}
s._upgrader = &websocket.Upgrader{
@ -51,11 +53,19 @@ func NewServer(ctx context.Context, o *ServerOptions) Server {
}
func (s *server) addSocket(soc Socket) {
s._addSocketCh <- soc
ca := socketsChannelAction{
s: soc,
}
ca.Type = channelUtil.ActionTypeCreate
s._socketsCh <- ca
}
func (s *server) removeSocket(soc Socket) {
s._removeSocketCh <- soc
ca := socketsChannelAction{
s: soc,
}
ca.Type = channelUtil.ActionTypeDelete
s._socketsCh <- ca
}
func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) {
@ -98,17 +108,15 @@ func (s *server) listenHandler() {
case <-s._ctx.Done():
log.Println("websocket server: Context Done")
return
// Add new a socket
case soc := <-s._addSocketCh:
s._socketsMtx.Lock()
s._sockets[soc.ID()] = soc
s._socketsMtx.Unlock()
// remove a socket
case soc := <-s._removeSocketCh:
s._socketsMtx.Lock()
delete(s._sockets, soc.ID())
s._socketsMtx.Unlock()
case ca := <-s._socketsCh:
switch ca.Type {
case channelUtil.ActionTypeCreate:
s._sockets[ca.s.ID()] = ca.s
break
case channelUtil.ActionTypeDelete:
delete(s._sockets, ca.s.ID())
break
}
}
}
}