overflow_gateway_websocket/server/server_handlers.go
crusader f3ea7a8fed ing
2017-11-14 16:37:00 +09:00

123 lines
3.7 KiB
Go

package server
import (
"fmt"
"net"
"time"
"git.loafle.net/commons_go/logging"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/overflow/overflow_gateway_websocket/config"
"git.loafle.net/overflow/overflow_gateway_websocket/external"
"git.loafle.net/overflow/overflow_gateway_websocket/external/redis"
oogwisr "git.loafle.net/overflow/overflow_gateway_websocket/internal/server/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
oos "git.loafle.net/overflow/overflow_subscriber"
oosr "git.loafle.net/overflow/overflow_subscriber/redis"
"github.com/valyala/fasthttp"
)
type ServerHandlers struct {
cwf.ServerHandlers
redisSubscriber oos.Subscriber
subscribers []oos.SubscriberHandler
}
// Init invoked before the server is started
// If you override ths method, must call
func (sh *ServerHandlers) Init() error {
if err := sh.ServerHandlers.Init(); nil != err {
return err
}
external.ExternalInit()
return nil
}
func (sh *ServerHandlers) Listen() (net.Listener, error) {
return net.Listen(config.Config.Server.Network, config.Config.Server.Addr)
}
func (sh *ServerHandlers) OnStart() {
sh.ServerHandlers.OnStart()
sh.redisSubscriber = oosr.New(redis.RedisPool.Get())
if err := sh.redisSubscriber.Start(); nil != err {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Redist Subscriber did not start %v", err))
return
}
for _, subscriber := range sh.subscribers {
sh.redisSubscriber.Subscribe(subscriber)
}
}
func (sh *ServerHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) {
sh.OnError(ctx, status, reason)
}
func (sh *ServerHandlers) OnStop() {
sh.redisSubscriber.Stop()
external.ExternalDestroy()
sh.ServerHandlers.OnStop()
}
func (sh *ServerHandlers) RegisterServlet(entryPath string, servletHandler servlet.ServletHandler) {
cfg := config.Config.Servlets[entryPath]
if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of entry path[%s] is not exist", entryPath))
return
}
servletH := servletHandler.(*servlet.ServletHandlers)
servletH.MaxMessageSize = cfg.Socket.MaxMessageSize
servletH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
servletH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
servletH.PongTimeout = cfg.Socket.PongTimeout * time.Second
servletH.PingTimeout = cfg.Socket.PingTimeout * time.Second
servletH.PingPeriod = cfg.Socket.PingPeriod * time.Second
// servletH.BinaryMessage = cfg.Socket.BinaryMessage
sh.RegisterSocketHandler(entryPath, servletH)
}
func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, rpcGatewayHandler rpc.RPCGatewayHandler) servlet.ServletHandler {
cfg := config.Config.Servlets[entryPath]
if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of entry path[%s] is not exist", entryPath))
return nil
}
rpcSH := oogwisr.NewRPCGatewayServletHandler(rpcGatewayHandler)
rpcSHs := rpcSH.(*oogwisr.RPCGatewayServletHandlers)
rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize
rpcSHs.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
rpcSHs.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
rpcSHs.PongTimeout = cfg.Socket.PongTimeout * time.Second
rpcSHs.PingTimeout = cfg.Socket.PingTimeout * time.Second
rpcSHs.PingPeriod = cfg.Socket.PingPeriod * time.Second
sh.RegisterSocketHandler(entryPath, rpcSHs)
return rpcSH
}
func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) {
if nil == sh.subscribers {
sh.subscribers = make([]oos.SubscriberHandler, 0)
}
sh.subscribers = append(sh.subscribers, subscriberHandler)
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}