deprecated_overflow_gateway.../server.go
crusader 6a1e7aa3e2 ing
2017-08-30 15:21:03 +09:00

143 lines
3.1 KiB
Go

package overflow_gateway_websocket
import (
"context"
"net/http"
"go.uber.org/zap"
"git.loafle.net/commons_go/logging"
channelUtil "git.loafle.net/commons_go/util/channel"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
"github.com/valyala/fasthttp"
)
type ()
type socketsChannelAction struct {
channelUtil.Action
s Socket
}
type Server interface {
ListenAndServe(addr string) error
HandleSocket(pattern string, o *SocketOptions)
}
type server struct {
_ctx context.Context
_logger *zap.Logger
_option *ServerOptions
_upgrader *websocket.Upgrader
_handlers map[string]*SocketOptions
_sockets map[string]Socket
_socketsCh chan socketsChannelAction
}
func NewServer(ctx context.Context, o *ServerOptions) Server {
s := &server{
_ctx: ctx,
_logger: logging.WithContext(ctx),
_option: o.Validate(),
_handlers: make(map[string]*SocketOptions, 1),
_sockets: make(map[string]Socket, 100),
_socketsCh: make(chan socketsChannelAction),
}
s._upgrader = &websocket.Upgrader{
HandshakeTimeout: s._option.HandshakeTimeout,
ReadBufferSize: s._option.ReadBufferSize,
WriteBufferSize: s._option.WriteBufferSize,
CheckOrigin: s._option.OnCheckOrigin,
Error: s.onError,
EnableCompression: s._option.EnableCompression,
}
return s
}
func (s *server) addSocket(soc Socket) {
ca := socketsChannelAction{
s: soc,
}
ca.Type = channelUtil.ActionTypeCreate
s._socketsCh <- ca
}
func (s *server) removeSocket(soc Socket) {
ca := socketsChannelAction{
s: soc,
}
ca.Type = channelUtil.ActionTypeDelete
s._socketsCh <- ca
}
func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) {
ctx.Response.Header.Set("Sec-Websocket-Version", "13")
ctx.Error(http.StatusText(status), status)
s._option.OnError(ctx, status, reason)
}
func (s *server) onDisconnected(soc Socket) {
s.removeSocket(soc)
s._option.OnDisconnected(soc)
}
func (s *server) onConnection(ctx *fasthttp.RequestCtx) {
path := string(ctx.Path())
co, ok := s._handlers[path]
if !ok {
s.onError(ctx, fasthttp.StatusNotFound, nil)
return
}
s._upgrader.Upgrade(ctx, nil, func(conn *websocket.Conn, err error) {
if err != nil {
s.onError(ctx, fasthttp.StatusInternalServerError, err)
return
}
id := s._option.IDGenerator(ctx)
soc := NewSocket(s._ctx, id, path, co, conn)
s.addSocket(soc)
s._option.OnConnection(soc)
soc.run()
})
}
func (s *server) listenHandler() {
for {
select {
case <-s._ctx.Done():
s._logger.Info("websocket server: Context Done")
return
case ca := <-s._socketsCh:
switch ca.Type {
case channelUtil.ActionTypeCreate:
s._sockets[ca.s.ID()] = ca.s
break
case channelUtil.ActionTypeDelete:
delete(s._sockets, ca.s.ID())
break
}
}
}
}
func (s *server) Sockets() map[string]Socket {
return s._sockets
}
func (s *server) HandleSocket(pattern string, o *SocketOptions) {
o.onDisconnected = s.onDisconnected
s._handlers[pattern] = o.Validate()
}
func (s *server) ListenAndServe(addr string) error {
go s.listenHandler()
return fasthttp.ListenAndServe(addr, s.onConnection)
}