deprecated_overflow_gateway.../server.go

145 lines
3.0 KiB
Go
Raw Normal View History

2017-08-18 11:30:29 +00:00
package overflow_gateway_websocket
2017-08-21 10:23:45 +00:00
import (
2017-08-29 07:04:21 +00:00
"context"
2017-08-25 02:51:27 +00:00
"net/http"
2017-08-21 10:23:45 +00:00
2017-08-31 10:08:14 +00:00
"go.uber.org/zap"
"git.loafle.net/commons_go/logging"
2017-08-29 08:51:49 +00:00
channelUtil "git.loafle.net/commons_go/util/channel"
2017-08-24 09:24:38 +00:00
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
"github.com/valyala/fasthttp"
2017-08-18 11:30:29 +00:00
)
2017-08-24 09:24:38 +00:00
type ()
2017-08-29 08:51:49 +00:00
type socketsChannelAction struct {
channelUtil.Action
s Socket
}
2017-08-18 11:30:29 +00:00
type Server interface {
2017-08-24 09:24:38 +00:00
ListenAndServe(addr string) error
2017-08-31 10:08:14 +00:00
HandleSocket(pattern string, o SocketHandler)
2017-08-18 11:30:29 +00:00
}
type server struct {
2017-08-29 08:51:49 +00:00
_ctx context.Context
2017-08-31 10:08:14 +00:00
_logger *zap.Logger
2017-08-31 08:58:29 +00:00
_sh ServerHandler
2017-08-29 08:51:49 +00:00
_upgrader *websocket.Upgrader
2017-08-31 10:08:14 +00:00
_handlers map[string]SocketHandler
2017-08-29 08:51:49 +00:00
_sockets map[string]Socket
_socketsCh chan socketsChannelAction
2017-08-18 11:30:29 +00:00
}
2017-08-31 08:58:29 +00:00
func NewServer(ctx context.Context, sh ServerHandler) Server {
sh.Validate()
2017-08-18 11:30:29 +00:00
s := &server{
2017-08-29 08:51:49 +00:00
_ctx: ctx,
2017-08-31 10:08:14 +00:00
_logger: logging.WithContext(ctx),
2017-08-31 08:58:29 +00:00
_sh: sh,
2017-08-31 10:08:14 +00:00
_handlers: make(map[string]SocketHandler, 1),
2017-08-29 08:51:49 +00:00
_sockets: make(map[string]Socket, 100),
_socketsCh: make(chan socketsChannelAction),
2017-08-21 10:23:45 +00:00
}
2017-08-24 10:42:40 +00:00
s._upgrader = &websocket.Upgrader{
2017-08-31 08:58:29 +00:00
HandshakeTimeout: s._sh.GetHandshakeTimeout(),
ReadBufferSize: s._sh.GetReadBufferSize(),
WriteBufferSize: s._sh.GetWriteBufferSize(),
CheckOrigin: s._sh.OnCheckOrigin,
2017-08-25 05:40:04 +00:00
Error: s.onError,
2017-08-31 08:58:29 +00:00
EnableCompression: s._sh.GetEnableCompression(),
2017-08-24 10:42:40 +00:00
}
2017-08-21 10:23:45 +00:00
2017-08-18 11:30:29 +00:00
return s
}
2017-08-25 09:52:05 +00:00
func (s *server) addSocket(soc Socket) {
2017-08-29 08:51:49 +00:00
ca := socketsChannelAction{
s: soc,
}
ca.Type = channelUtil.ActionTypeCreate
s._socketsCh <- ca
2017-08-25 09:52:05 +00:00
}
func (s *server) removeSocket(soc Socket) {
2017-08-29 08:51:49 +00:00
ca := socketsChannelAction{
s: soc,
}
ca.Type = channelUtil.ActionTypeDelete
s._socketsCh <- ca
2017-08-25 09:52:05 +00:00
}
2017-08-25 05:40:04 +00:00
func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) {
2017-08-25 07:40:17 +00:00
ctx.Response.Header.Set("Sec-Websocket-Version", "13")
ctx.Error(http.StatusText(status), status)
2017-08-25 05:40:04 +00:00
2017-08-31 08:58:29 +00:00
s._sh.OnError(ctx, status, reason)
2017-08-25 05:40:04 +00:00
}
2017-08-25 08:13:21 +00:00
func (s *server) onDisconnected(soc Socket) {
2017-08-25 09:52:05 +00:00
s.removeSocket(soc)
2017-08-31 08:58:29 +00:00
s._sh.OnDisconnected(soc)
2017-08-18 11:30:29 +00:00
}
2017-08-24 10:42:40 +00:00
func (s *server) onConnection(ctx *fasthttp.RequestCtx) {
2017-08-24 14:50:14 +00:00
path := string(ctx.Path())
2017-08-25 02:57:12 +00:00
co, ok := s._handlers[path]
if !ok {
2017-08-25 07:40:17 +00:00
s.onError(ctx, fasthttp.StatusNotFound, nil)
2017-08-25 02:57:12 +00:00
return
}
2017-08-24 14:50:14 +00:00
2017-08-24 10:42:40 +00:00
s._upgrader.Upgrade(ctx, nil, func(conn *websocket.Conn, err error) {
2017-08-24 09:24:38 +00:00
if err != nil {
2017-08-25 07:40:17 +00:00
s.onError(ctx, fasthttp.StatusInternalServerError, err)
2017-08-24 09:24:38 +00:00
return
}
2017-08-31 08:58:29 +00:00
id := s._sh.OnIDGenerate(ctx)
2017-08-30 06:21:03 +00:00
soc := NewSocket(s._ctx, id, path, co, conn)
2017-08-25 09:52:05 +00:00
s.addSocket(soc)
2017-08-31 08:58:29 +00:00
s._sh.OnConnection(soc)
2017-08-25 06:10:00 +00:00
2017-08-25 08:13:21 +00:00
soc.run()
2017-08-24 09:24:38 +00:00
})
}
2017-08-18 11:30:29 +00:00
2017-08-25 09:52:05 +00:00
func (s *server) listenHandler() {
for {
select {
2017-08-29 07:04:21 +00:00
case <-s._ctx.Done():
return
2017-08-29 08:51:49 +00:00
case ca := <-s._socketsCh:
switch ca.Type {
case channelUtil.ActionTypeCreate:
s._sockets[ca.s.ID()] = ca.s
break
case channelUtil.ActionTypeDelete:
delete(s._sockets, ca.s.ID())
break
}
2017-08-25 09:52:05 +00:00
}
}
}
2017-08-25 10:09:34 +00:00
func (s *server) Sockets() map[string]Socket {
return s._sockets
2017-08-25 09:52:05 +00:00
}
2017-08-31 10:08:14 +00:00
func (s *server) HandleSocket(pattern string, soch SocketHandler) {
2017-08-31 11:19:59 +00:00
soch.setOnDisconnected(s.onDisconnected)
2017-08-25 02:33:00 +00:00
2017-08-31 10:08:14 +00:00
soch.Validate()
s._handlers[pattern] = soch
2017-08-24 11:29:22 +00:00
}
2017-08-24 09:24:38 +00:00
func (s *server) ListenAndServe(addr string) error {
2017-08-25 09:52:05 +00:00
go s.listenHandler()
2017-08-24 10:42:40 +00:00
return fasthttp.ListenAndServe(addr, s.onConnection)
2017-08-18 11:30:29 +00:00
}