package server import ( "fmt" "net" "time" "git.loafle.net/commons_go/logging" cwf "git.loafle.net/commons_go/websocket_fasthttp" "git.loafle.net/overflow/overflow_gateway_websocket/config" "git.loafle.net/overflow/overflow_gateway_websocket/external" "git.loafle.net/overflow/overflow_gateway_websocket/external/redis" oogwisr "git.loafle.net/overflow/overflow_gateway_websocket/internal/server/rpc" "git.loafle.net/overflow/overflow_gateway_websocket/rpc" "git.loafle.net/overflow/overflow_gateway_websocket/servlet" oos "git.loafle.net/overflow/overflow_subscriber" oosr "git.loafle.net/overflow/overflow_subscriber/redis" "github.com/valyala/fasthttp" ) func NewServerHandler() ServerHandler { if nil == config.Config.Server { logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Config of server is not initialized")) } sh := &ServerHandlers{} sh.Name = config.Config.Server.Name sh.Concurrency = config.Config.Server.Concurrency sh.MaxStopWaitTime = config.Config.Server.MaxStopWaitTime sh.HandshakeTimeout = config.Config.Websocket.HandshakeTimeout sh.ReadBufferSize = config.Config.Websocket.ReadBufferSize sh.WriteBufferSize = config.Config.Websocket.WriteBufferSize sh.EnableCompression = config.Config.Websocket.EnableCompression return sh } type ServerHandlers struct { cwf.ServerHandlers redisSubscriber oos.Subscriber subscribers []oos.SubscriberHandler } func (sh *ServerHandlers) ServerContext() ServerContext { serverCTX := sh.ServerHandlers.ServerContext() return newServerContext(serverCTX) } // Init invoked before the server is started // If you override ths method, must call func (sh *ServerHandlers) Init(serverCTX cwf.ServerContext) error { if err := sh.ServerHandlers.Init(serverCTX); nil != err { return err } external.ExternalInit() return nil } func (sh *ServerHandlers) Listen(serverCTX cwf.ServerContext) (net.Listener, error) { return net.Listen(config.Config.Server.Network, config.Config.Server.Addr) } func (sh *ServerHandlers) OnStart(serverCTX cwf.ServerContext) { sh.ServerHandlers.OnStart(serverCTX) sh.redisSubscriber = oosr.New(redis.RedisPool.Get()) if err := sh.redisSubscriber.Start(); nil != err { logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Redist Subscriber did not start %v", err)) return } for _, subscriber := range sh.subscribers { sh.redisSubscriber.Subscribe(subscriber) } } func (sh *ServerHandlers) OnError(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx, status int, reason error) { sh.ServerHandlers.OnError(serverCTX, ctx, status, reason) } func (sh *ServerHandlers) OnStop(serverCTX cwf.ServerContext) { sh.redisSubscriber.Stop() external.ExternalDestroy() sh.ServerHandlers.OnStop(serverCTX) } func (sh *ServerHandlers) RegisterServlet(servletName string, servletHandler servlet.ServletHandler) { cfg := config.Config.Servlets[servletName] if nil == cfg { logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of servlet[%s] is not exist", servletName)) return } servletH := servletHandler.(*servlet.ServletHandlers) servletH.MaxMessageSize = cfg.Socket.MaxMessageSize servletH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second servletH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second servletH.PongTimeout = cfg.Socket.PongTimeout * time.Second servletH.PingTimeout = cfg.Socket.PingTimeout * time.Second servletH.PingPeriod = cfg.Socket.PingPeriod * time.Second // servletH.BinaryMessage = cfg.Socket.BinaryMessage sh.RegisterSocketHandler(cfg.Entry, servletH) } func (sh *ServerHandlers) RegisterRPCGatewayServlet(servletName string, gatewayRPCHandler rpc.GatewayRPCHandler) servlet.ServletHandler { cfg := config.Config.Servlets[servletName] if nil == cfg { logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of servlet[%s] is not exist", servletName)) return nil } rpcSH := oogwisr.NewRPCGatewayServletHandler(gatewayRPCHandler) rpcSHs := rpcSH.(*oogwisr.RPCGatewayServletHandlers) rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize rpcSHs.WriteTimeout = cfg.Socket.WriteTimeout * time.Second rpcSHs.ReadTimeout = cfg.Socket.ReadTimeout * time.Second rpcSHs.PongTimeout = cfg.Socket.PongTimeout * time.Second rpcSHs.PingTimeout = cfg.Socket.PingTimeout * time.Second rpcSHs.PingPeriod = cfg.Socket.PingPeriod * time.Second sh.RegisterSocketHandler(cfg.Entry, rpcSHs) return rpcSH } func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) { if nil == sh.subscribers { sh.subscribers = make([]oos.SubscriberHandler, 0) } sh.subscribers = append(sh.subscribers, subscriberHandler) } func (sh *ServerHandlers) Validate() { sh.ServerHandlers.Validate() }