overflow_gateway_websocket/server/server_handlers.go
crusader ecd5ad53bf ing
2017-11-28 02:12:26 +09:00

148 lines
4.6 KiB
Go

package server
import (
"fmt"
"net"
"time"
"git.loafle.net/commons_go/logging"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/overflow/overflow_gateway_websocket/config"
"git.loafle.net/overflow/overflow_gateway_websocket/external"
"git.loafle.net/overflow/overflow_gateway_websocket/external/redis"
oogwisr "git.loafle.net/overflow/overflow_gateway_websocket/internal/server/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
oos "git.loafle.net/overflow/overflow_subscriber"
oosr "git.loafle.net/overflow/overflow_subscriber/redis"
"github.com/valyala/fasthttp"
)
func NewServerHandler() ServerHandler {
if nil == config.Config.Server {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Config of server is not initialized"))
}
sh := &ServerHandlers{}
sh.Name = config.Config.Server.Name
sh.Concurrency = config.Config.Server.Concurrency
sh.MaxStopWaitTime = config.Config.Server.MaxStopWaitTime
sh.HandshakeTimeout = config.Config.Websocket.HandshakeTimeout
sh.ReadBufferSize = config.Config.Websocket.ReadBufferSize
sh.WriteBufferSize = config.Config.Websocket.WriteBufferSize
sh.EnableCompression = config.Config.Websocket.EnableCompression
return sh
}
type ServerHandlers struct {
cwf.ServerHandlers
redisSubscriber oos.Subscriber
subscribers []oos.SubscriberHandler
}
func (sh *ServerHandlers) ServerContext() ServerContext {
serverCTX := sh.ServerHandlers.ServerContext()
return newServerContext(serverCTX)
}
// Init invoked before the server is started
// If you override ths method, must call
func (sh *ServerHandlers) Init(serverCTX cwf.ServerContext) error {
if err := sh.ServerHandlers.Init(serverCTX); nil != err {
return err
}
external.ExternalInit()
return nil
}
func (sh *ServerHandlers) Listen(serverCTX cwf.ServerContext) (net.Listener, error) {
return net.Listen(config.Config.Server.Network, config.Config.Server.Addr)
}
func (sh *ServerHandlers) OnStart(serverCTX cwf.ServerContext) {
sh.ServerHandlers.OnStart(serverCTX)
sh.redisSubscriber = oosr.New(redis.RedisPool.Get())
if err := sh.redisSubscriber.Start(); nil != err {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Redist Subscriber did not start %v", err))
return
}
for _, subscriber := range sh.subscribers {
sh.redisSubscriber.Subscribe(subscriber)
}
}
func (sh *ServerHandlers) OnError(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx, status int, reason error) {
sh.ServerHandlers.OnError(serverCTX, ctx, status, reason)
}
func (sh *ServerHandlers) OnStop(serverCTX cwf.ServerContext) {
sh.redisSubscriber.Stop()
external.ExternalDestroy()
sh.ServerHandlers.OnStop(serverCTX)
}
func (sh *ServerHandlers) RegisterServlet(servletName string, servletHandler servlet.ServletHandler) {
cfg := config.Config.Servlets[servletName]
if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of servlet[%s] is not exist", servletName))
return
}
servletH := servletHandler.(*servlet.ServletHandlers)
servletH.MaxMessageSize = cfg.Socket.MaxMessageSize
servletH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
servletH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
servletH.PongTimeout = cfg.Socket.PongTimeout * time.Second
servletH.PingTimeout = cfg.Socket.PingTimeout * time.Second
servletH.PingPeriod = cfg.Socket.PingPeriod * time.Second
// servletH.BinaryMessage = cfg.Socket.BinaryMessage
sh.RegisterSocketHandler(cfg.Entry, servletH)
}
func (sh *ServerHandlers) RegisterRPCGatewayServlet(servletName string, gatewayRPCHandler rpc.GatewayRPCHandler) servlet.ServletHandler {
cfg := config.Config.Servlets[servletName]
if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of servlet[%s] is not exist", servletName))
return nil
}
rpcSH := oogwisr.NewRPCGatewayServletHandler(gatewayRPCHandler)
rpcSHs := rpcSH.(*oogwisr.RPCGatewayServletHandlers)
rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize
rpcSHs.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
rpcSHs.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
rpcSHs.PongTimeout = cfg.Socket.PongTimeout * time.Second
rpcSHs.PingTimeout = cfg.Socket.PingTimeout * time.Second
rpcSHs.PingPeriod = cfg.Socket.PingPeriod * time.Second
sh.RegisterSocketHandler(cfg.Entry, rpcSHs)
return rpcSH
}
func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) {
if nil == sh.subscribers {
sh.subscribers = make([]oos.SubscriberHandler, 0)
}
sh.subscribers = append(sh.subscribers, subscriberHandler)
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}