This commit is contained in:
crusader 2017-11-14 12:03:16 +09:00
parent 4542ab52e6
commit 2f70541870
3 changed files with 45 additions and 14 deletions

View File

@ -12,11 +12,11 @@ import (
oogw "git.loafle.net/overflow/overflow_gateway_websocket" oogw "git.loafle.net/overflow/overflow_gateway_websocket"
) )
type RPCGatewaySocketHandlers struct { type RPCGatewayServletHandlers struct {
cwf.SocketHandlers oogw.ServerHandlers
} }
func (sh *RPCGatewaySocketHandlers) Init() error { func (sh *RPCGatewayServletHandlers) Init() error {
if err := sh.SocketHandlers.Init(); nil != err { if err := sh.SocketHandlers.Init(); nil != err {
return err return err
} }
@ -24,19 +24,19 @@ func (sh *RPCGatewaySocketHandlers) Init() error {
return nil return nil
} }
func (sh *RPCGatewaySocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil return "", nil
} }
// OnConnect invoked when client is connected // OnConnect invoked when client is connected
// If you override ths method, must call // If you override ths method, must call
func (sh *RPCGatewaySocketHandlers) OnConnect(soc cwf.Socket) cwf.Socket { func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket {
soc = sh.SocketHandlers.OnConnect(soc) soc = sh.SocketHandlers.OnConnect(soc)
return newSocket(soc, "json") return newSocket(soc, "json")
} }
func (sh *RPCGatewaySocketHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType) codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType)
if nil != err { if nil != err {
log.Printf("RPC Handle: %v", err) log.Printf("RPC Handle: %v", err)
@ -84,19 +84,19 @@ func (sh *RPCGatewaySocketHandlers) Handle(soc cwf.Socket, stopChan <-chan struc
// OnDisconnect invoked when client is disconnected // OnDisconnect invoked when client is disconnected
// If you override ths method, must call // If you override ths method, must call
func (sh *RPCGatewaySocketHandlers) OnDisconnect(soc cwf.Socket) { func (sh *RPCGatewayServletHandlers) OnDisconnect(soc cwf.Socket) {
sh.SocketHandlers.OnDisconnect(soc) sh.SocketHandlers.OnDisconnect(soc)
} }
// Destroy invoked when server is stopped // Destroy invoked when server is stopped
// If you override ths method, must call // If you override ths method, must call
func (sh *RPCGatewaySocketHandlers) Destroy() { func (sh *RPCGatewayServletHandlers) Destroy() {
sh.SocketHandlers.Destroy() sh.SocketHandlers.Destroy()
} }
func (sh *RPCGatewaySocketHandlers) Validate() { func (sh *RPCGatewayServletHandlers) Validate() {
sh.SocketHandlers.Validate() sh.SocketHandlers.Validate()
} }

View File

@ -1,13 +1,28 @@
package server package server
import ( import (
"flag" "fmt"
cc "git.loafle.net/commons_go/config" "git.loafle.net/commons_go/logging"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/overflow/overflow_gateway_websocket/config" "git.loafle.net/overflow/overflow_gateway_websocket/config"
) )
func New(sh ServerHandler) cwf.Server {
func New() { if nil == config.Config.Server {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Config of server is not initialized"))
}
sh.Name = config.Config.Server.Name
sh.Concurrency = config.Config.Server.Concurrency
sh.MaxStopWaitTime = config.Config.Server.MaxStopWaitTime
sh.HandshakeTimeout = config.Config.Websocket.HandshakeTimeout
sh.ReadBufferSize = config.Config.Websocket.ReadBufferSize
sh.WriteBufferSize = config.Config.Websocket.WriteBufferSize
sh.EnableCompression = config.Config.Websocket.EnableCompression
s := cwf.New(sh)
return s
} }

View File

@ -10,6 +10,7 @@ import (
"git.loafle.net/overflow/overflow_gateway_websocket/config" "git.loafle.net/overflow/overflow_gateway_websocket/config"
"git.loafle.net/overflow/overflow_gateway_websocket/external" "git.loafle.net/overflow/overflow_gateway_websocket/external"
"git.loafle.net/overflow/overflow_gateway_websocket/external/redis" "git.loafle.net/overflow/overflow_gateway_websocket/external/redis"
"git.loafle.net/overflow/overflow_gateway_websocket/internal/server/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet" "git.loafle.net/overflow/overflow_gateway_websocket/servlet"
oos "git.loafle.net/overflow/overflow_subscriber" oos "git.loafle.net/overflow/overflow_subscriber"
oosr "git.loafle.net/overflow/overflow_subscriber/redis" oosr "git.loafle.net/overflow/overflow_subscriber/redis"
@ -44,7 +45,7 @@ func (sh *ServerHandlers) OnStart() {
sh.redisSubscriber = oosr.New(redis.RedisPool.Get()) sh.redisSubscriber = oosr.New(redis.RedisPool.Get())
if err := sh.redisSubscriber.Start(); nil != err { if err := sh.redisSubscriber.Start(); nil != err {
logging.Logger().Error(fmt.Sprintf("App: Redist Subscriber did not start %v", err)) logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: Redist Subscriber did not start %v", err))
return return
} }
@ -68,7 +69,7 @@ func (sh *ServerHandlers) OnStop() {
func (sh *ServerHandlers) RegisterServlet(entryPath string, servletHandler servlet.ServletHandler) { func (sh *ServerHandlers) RegisterServlet(entryPath string, servletHandler servlet.ServletHandler) {
cfg := config.Config.Servlets[entryPath] cfg := config.Config.Servlets[entryPath]
if nil == cfg { if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Server: config of entry path[%s] is not exist", entryPath)) logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of entry path[%s] is not exist", entryPath))
return return
} }
@ -87,7 +88,22 @@ func (sh *ServerHandlers) RegisterServlet(entryPath string, servletHandler servl
} }
func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, servletHandler servlet.RPCGatewayServletHandler) { func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, servletHandler servlet.RPCGatewayServletHandler) {
cfg := config.Config.Servlets[entryPath]
if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of entry path[%s] is not exist", entryPath))
return
}
rpcSH := &rpc.RPCGatewayServletHandlers{}
rpcSH.MaxMessageSize = cfg.Socket.MaxMessageSize
rpcSH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
rpcSH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
rpcSH.PongTimeout = cfg.Socket.PongTimeout * time.Second
rpcSH.PingTimeout = cfg.Socket.PingTimeout * time.Second
rpcSH.PingPeriod = cfg.Socket.PingPeriod * time.Second
sh.RegisterSocketHandler(entryPath, rpcSH)
} }
func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) { func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) {