overflow_gateway_websocket/server/server_handlers.go

148 lines
4.6 KiB
Go
Raw Normal View History

2017-11-10 13:24:10 +00:00
package server
import (
"fmt"
"net"
2017-11-13 16:50:22 +00:00
"time"
2017-11-10 13:24:10 +00:00
"git.loafle.net/commons_go/logging"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/overflow/overflow_gateway_websocket/config"
"git.loafle.net/overflow/overflow_gateway_websocket/external"
"git.loafle.net/overflow/overflow_gateway_websocket/external/redis"
2017-11-14 06:27:18 +00:00
oogwisr "git.loafle.net/overflow/overflow_gateway_websocket/internal/server/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/rpc"
2017-11-10 13:24:10 +00:00
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
oos "git.loafle.net/overflow/overflow_subscriber"
oosr "git.loafle.net/overflow/overflow_subscriber/redis"
"github.com/valyala/fasthttp"
)
2017-11-14 08:23:12 +00:00
func NewServerHandler() ServerHandler {
if nil == config.Config.Server {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Config of server is not initialized"))
}
sh := &ServerHandlers{}
sh.Name = config.Config.Server.Name
sh.Concurrency = config.Config.Server.Concurrency
sh.MaxStopWaitTime = config.Config.Server.MaxStopWaitTime
sh.HandshakeTimeout = config.Config.Websocket.HandshakeTimeout
sh.ReadBufferSize = config.Config.Websocket.ReadBufferSize
sh.WriteBufferSize = config.Config.Websocket.WriteBufferSize
sh.EnableCompression = config.Config.Websocket.EnableCompression
return sh
}
2017-11-10 13:24:10 +00:00
type ServerHandlers struct {
cwf.ServerHandlers
redisSubscriber oos.Subscriber
subscribers []oos.SubscriberHandler
}
2017-11-28 10:48:58 +00:00
func (sh *ServerHandlers) ServerContext() cwf.ServerContext {
2017-11-27 17:12:26 +00:00
serverCTX := sh.ServerHandlers.ServerContext()
return newServerContext(serverCTX)
}
2017-11-10 13:24:10 +00:00
// Init invoked before the server is started
// If you override ths method, must call
2017-11-27 17:12:26 +00:00
func (sh *ServerHandlers) Init(serverCTX cwf.ServerContext) error {
if err := sh.ServerHandlers.Init(serverCTX); nil != err {
2017-11-10 13:24:10 +00:00
return err
}
external.ExternalInit()
return nil
}
2017-11-27 17:12:26 +00:00
func (sh *ServerHandlers) Listen(serverCTX cwf.ServerContext) (net.Listener, error) {
2017-11-10 13:24:10 +00:00
return net.Listen(config.Config.Server.Network, config.Config.Server.Addr)
}
2017-11-27 17:12:26 +00:00
func (sh *ServerHandlers) OnStart(serverCTX cwf.ServerContext) {
sh.ServerHandlers.OnStart(serverCTX)
2017-11-10 13:24:10 +00:00
sh.redisSubscriber = oosr.New(redis.RedisPool.Get())
if err := sh.redisSubscriber.Start(); nil != err {
2017-11-14 03:03:16 +00:00
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Redist Subscriber did not start %v", err))
2017-11-10 13:24:10 +00:00
return
}
for _, subscriber := range sh.subscribers {
sh.redisSubscriber.Subscribe(subscriber)
}
}
2017-11-27 17:12:26 +00:00
func (sh *ServerHandlers) OnError(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx, status int, reason error) {
sh.ServerHandlers.OnError(serverCTX, ctx, status, reason)
2017-11-10 13:24:10 +00:00
}
2017-11-27 17:12:26 +00:00
func (sh *ServerHandlers) OnStop(serverCTX cwf.ServerContext) {
2017-11-10 13:24:10 +00:00
sh.redisSubscriber.Stop()
external.ExternalDestroy()
2017-11-27 17:12:26 +00:00
sh.ServerHandlers.OnStop(serverCTX)
2017-11-10 13:24:10 +00:00
}
2017-11-14 07:47:29 +00:00
func (sh *ServerHandlers) RegisterServlet(servletName string, servletHandler servlet.ServletHandler) {
cfg := config.Config.Servlets[servletName]
2017-11-13 16:50:22 +00:00
if nil == cfg {
2017-11-14 07:47:29 +00:00
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of servlet[%s] is not exist", servletName))
2017-11-13 16:50:22 +00:00
return
}
servletH := servletHandler.(*servlet.ServletHandlers)
servletH.MaxMessageSize = cfg.Socket.MaxMessageSize
servletH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
servletH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
servletH.PongTimeout = cfg.Socket.PongTimeout * time.Second
servletH.PingTimeout = cfg.Socket.PingTimeout * time.Second
servletH.PingPeriod = cfg.Socket.PingPeriod * time.Second
// servletH.BinaryMessage = cfg.Socket.BinaryMessage
2017-11-14 07:47:29 +00:00
sh.RegisterSocketHandler(cfg.Entry, servletH)
2017-11-13 16:50:22 +00:00
}
2017-11-27 17:12:26 +00:00
func (sh *ServerHandlers) RegisterRPCGatewayServlet(servletName string, gatewayRPCHandler rpc.GatewayRPCHandler) servlet.ServletHandler {
2017-11-14 07:47:29 +00:00
cfg := config.Config.Servlets[servletName]
2017-11-14 03:03:16 +00:00
if nil == cfg {
2017-11-14 07:47:29 +00:00
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of servlet[%s] is not exist", servletName))
2017-11-14 07:37:00 +00:00
return nil
2017-11-14 03:03:16 +00:00
}
2017-11-27 17:12:26 +00:00
rpcSH := oogwisr.NewRPCGatewayServletHandler(gatewayRPCHandler)
2017-11-14 06:27:18 +00:00
rpcSHs := rpcSH.(*oogwisr.RPCGatewayServletHandlers)
2017-11-14 03:03:16 +00:00
2017-11-14 05:57:03 +00:00
rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize
rpcSHs.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
rpcSHs.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
rpcSHs.PongTimeout = cfg.Socket.PongTimeout * time.Second
rpcSHs.PingTimeout = cfg.Socket.PingTimeout * time.Second
rpcSHs.PingPeriod = cfg.Socket.PingPeriod * time.Second
2017-11-13 16:50:22 +00:00
2017-11-14 07:47:29 +00:00
sh.RegisterSocketHandler(cfg.Entry, rpcSHs)
2017-11-14 07:37:00 +00:00
return rpcSH
2017-11-10 13:24:10 +00:00
}
2017-11-13 16:50:22 +00:00
func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) {
2017-11-10 13:24:10 +00:00
if nil == sh.subscribers {
sh.subscribers = make([]oos.SubscriberHandler, 0)
}
sh.subscribers = append(sh.subscribers, subscriberHandler)
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}