From 4542ab52e690a63e78d2af68e3fb3d77b603e117 Mon Sep 17 00:00:00 2001 From: crusader Date: Tue, 14 Nov 2017 01:50:22 +0900 Subject: [PATCH] ing --- config/config.go | 12 +-- config/grpc.go | 2 +- config/handler.go | 6 -- config/redis.go | 2 +- config/servlet.go | 6 ++ .../{ => rpc}/rpc_gateway_socket_handlers.go | 11 +- internal/server/rpc/socket.go | 28 +++++ internal/server/socket_handlers.go | 100 ++++++++++++++++++ server/server_handler.go | 5 +- server/server_handlers.go | 28 ++++- servlet/rpc_gateway_servlet_handler.go | 58 ++++++++++ servlet/rpc_gateway_servlet_handlers.go | 35 ++++++ servlet/rpc_servlet_handler.go | 5 - servlet/rpc_servlet_handlers.go | 98 ----------------- servlet/servlet_handlers.go | 58 +++++----- 15 files changed, 296 insertions(+), 158 deletions(-) delete mode 100644 config/handler.go create mode 100644 config/servlet.go rename internal/server/{ => rpc}/rpc_gateway_socket_handlers.go (85%) create mode 100644 internal/server/rpc/socket.go create mode 100644 servlet/rpc_gateway_servlet_handler.go create mode 100644 servlet/rpc_gateway_servlet_handlers.go delete mode 100644 servlet/rpc_servlet_handler.go delete mode 100644 servlet/rpc_servlet_handlers.go diff --git a/config/config.go b/config/config.go index 9ac5764..2e52862 100644 --- a/config/config.go +++ b/config/config.go @@ -3,10 +3,10 @@ package config var Config GatewayConfig type GatewayConfig struct { - Server Server `json:"server" yaml:"server" toml:"server"` - Auth Auth `json:"auth" yaml:"auth" toml:"auth"` - Websocket Websocket `json:"websocket" yaml:"websocket" toml:"websocket"` - GRPC GRPC `json:"gRPC" yaml:"gRPC" toml:"gRPC"` - Redis Redis `json:"redis" yaml:"redis" toml:"redis"` - Handlers map[string]Handler `json:"handlers" yaml:"handlers" toml:"handlers"` + Server *Server `json:"server" yaml:"server" toml:"server"` + Auth *Auth `json:"auth" yaml:"auth" toml:"auth"` + Websocket *Websocket `json:"websocket" yaml:"websocket" toml:"websocket"` + GRPC *GRPC `json:"gRPC" yaml:"gRPC" toml:"gRPC"` + Redis *Redis `json:"redis" yaml:"redis" toml:"redis"` + Servlets map[string]*Servlet `json:"servlets" yaml:"servlets" toml:"servlets"` } diff --git a/config/grpc.go b/config/grpc.go index 1424bc7..051d629 100644 --- a/config/grpc.go +++ b/config/grpc.go @@ -2,5 +2,5 @@ package config type GRPC struct { Server - Pool Pool `json:"pool" yaml:"pool" toml:"pool"` + Pool *Pool `json:"pool" yaml:"pool" toml:"pool"` } diff --git a/config/handler.go b/config/handler.go deleted file mode 100644 index 45c6998..0000000 --- a/config/handler.go +++ /dev/null @@ -1,6 +0,0 @@ -package config - -type Handler struct { - Entry string `json:"entry" yaml:"entry" toml:"entry"` - Socket Socket `json:"socket" yaml:"socket" toml:"socket"` -} diff --git a/config/redis.go b/config/redis.go index 0a52f07..39d96e0 100644 --- a/config/redis.go +++ b/config/redis.go @@ -2,5 +2,5 @@ package config type Redis struct { Server - Pool Pool `json:"pool" yaml:"pool" toml:"pool"` + Pool *Pool `json:"pool" yaml:"pool" toml:"pool"` } diff --git a/config/servlet.go b/config/servlet.go new file mode 100644 index 0000000..a2c6c92 --- /dev/null +++ b/config/servlet.go @@ -0,0 +1,6 @@ +package config + +type Servlet struct { + Entry string `json:"entry" yaml:"entry" toml:"entry"` + Socket *Socket `json:"socket" yaml:"socket" toml:"socket"` +} diff --git a/internal/server/rpc_gateway_socket_handlers.go b/internal/server/rpc/rpc_gateway_socket_handlers.go similarity index 85% rename from internal/server/rpc_gateway_socket_handlers.go rename to internal/server/rpc/rpc_gateway_socket_handlers.go index fe2c94d..592fc09 100644 --- a/internal/server/rpc_gateway_socket_handlers.go +++ b/internal/server/rpc/rpc_gateway_socket_handlers.go @@ -1,4 +1,4 @@ -package server +package rpc import ( "context" @@ -30,12 +30,13 @@ func (sh *RPCGatewaySocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id stri // OnConnect invoked when client is connected // If you override ths method, must call -func (sh *RPCGatewaySocketHandlers) OnConnect(soc *cwf.Socket) { - sh.SocketHandlers.OnConnect(soc) +func (sh *RPCGatewaySocketHandlers) OnConnect(soc cwf.Socket) cwf.Socket { + soc = sh.SocketHandlers.OnConnect(soc) + return newSocket(soc, "json") } -func (sh *RPCGatewaySocketHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { +func (sh *RPCGatewaySocketHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType) if nil != err { log.Printf("RPC Handle: %v", err) @@ -83,7 +84,7 @@ func (sh *RPCGatewaySocketHandlers) Handle(soc *cwf.Socket, stopChan <-chan stru // OnDisconnect invoked when client is disconnected // If you override ths method, must call -func (sh *RPCGatewaySocketHandlers) OnDisconnect(soc *cwf.Socket) { +func (sh *RPCGatewaySocketHandlers) OnDisconnect(soc cwf.Socket) { sh.SocketHandlers.OnDisconnect(soc) } diff --git a/internal/server/rpc/socket.go b/internal/server/rpc/socket.go new file mode 100644 index 0000000..3fa4ab3 --- /dev/null +++ b/internal/server/rpc/socket.go @@ -0,0 +1,28 @@ +package rpc + +import ( + cwf "git.loafle.net/commons_go/websocket_fasthttp" +) + +func newSocket(soc cwf.Socket, contentType string) Socket { + newSoc := &socket{ + contentType: contentType, + } + newSoc.Socket = soc + + return newSoc +} + +type Socket interface { + cwf.Socket + GetContentType() string +} + +type socket struct { + cwf.Socket + contentType string +} + +func (s *socket) GetContentType() string { + return s.contentType +} diff --git a/internal/server/socket_handlers.go b/internal/server/socket_handlers.go index abb4e43..1b3637f 100644 --- a/internal/server/socket_handlers.go +++ b/internal/server/socket_handlers.go @@ -1 +1,101 @@ package server + +import ( + "context" + "log" + + "github.com/valyala/fasthttp" + + "git.loafle.net/commons_go/rpc/gateway" + "git.loafle.net/commons_go/server" + cwf "git.loafle.net/commons_go/websocket_fasthttp" + oogw "git.loafle.net/overflow/overflow_gateway_websocket" +) + +type SocketHandlers struct { + cwf.SocketHandlers +} + +func (sh *SocketHandlers) Init() error { + if err := sh.SocketHandlers.Init(); nil != err { + return err + } + + return nil +} + +func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { + return "", nil +} + +// OnConnect invoked when client is connected +// If you override ths method, must call +func (sh *SocketHandlers) OnConnect(soc cwf.Socket) { + sh.SocketHandlers.OnConnect(soc) + +} + +func (sh *SocketHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { + codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType) + if nil != err { + log.Printf("RPC Handle: %v", err) + doneChan <- struct{}{} + return + } + + var socConn *cwf.SocketConn + ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc) + // conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + + for { + if socConn, err = soc.WaitRequest(); nil != err { + doneChan <- struct{}{} + return + } + + // // "git.loafle.net/commons_go/websocket_fasthttp/websocket" + // switch socConn.MessageType { + // case websocket.TextMessage: + // case websocket.BinaryMessage: + // } + + if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err { + if server.IsClientDisconnect(err) { + doneChan <- struct{}{} + return + } + log.Printf("RPC: %v", err) + } + + if err = socConn.Close(); nil != err { + doneChan <- struct{}{} + return + } + + select { + case <-stopChan: + return + default: + } + } + +} + +// OnDisconnect invoked when client is disconnected +// If you override ths method, must call +func (sh *SocketHandlers) OnDisconnect(soc *cwf.Socket) { + + sh.SocketHandlers.OnDisconnect(soc) +} + +// Destroy invoked when server is stopped +// If you override ths method, must call +func (sh *SocketHandlers) Destroy() { + + sh.SocketHandlers.Destroy() +} + +func (sh *SocketHandlers) Validate() { + sh.SocketHandlers.Validate() + +} diff --git a/server/server_handler.go b/server/server_handler.go index 3b5982e..3610cda 100644 --- a/server/server_handler.go +++ b/server/server_handler.go @@ -9,6 +9,7 @@ import ( type ServerHandler interface { cwf.ServerHandler - RegisterServletHandler(servletHandler servlet.ServletHandler) - RegisterSubscriberHandler(subscriberHandler oos.SubscriberHandler) + RegisterServlet(entryPath string, servletHandler servlet.ServletHandler) + RegisterRPCGatewayServlet(entryPath string, servletHandler servlet.RPCGatewayServletHandler) + RegisterSubscriber(subscriberHandler oos.SubscriberHandler) } diff --git a/server/server_handlers.go b/server/server_handlers.go index c032e03..16df0ea 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -3,6 +3,7 @@ package server import ( "fmt" "net" + "time" "git.loafle.net/commons_go/logging" cwf "git.loafle.net/commons_go/websocket_fasthttp" @@ -64,11 +65,32 @@ func (sh *ServerHandlers) OnStop() { sh.ServerHandlers.OnStop() } -func (sh *ServerHandlers) RegisterServletHandler(servletHandler servlet.ServletHandler) { - sh.RegisterSocketHandler(servletHandler) +func (sh *ServerHandlers) RegisterServlet(entryPath string, servletHandler servlet.ServletHandler) { + cfg := config.Config.Servlets[entryPath] + if nil == cfg { + logging.Logger().Panic(fmt.Sprintf("Gateway Server: config of entry path[%s] is not exist", entryPath)) + return + } + + servletH := servletHandler.(*servlet.ServletHandlers) + + servletH.MaxMessageSize = cfg.Socket.MaxMessageSize + servletH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second + servletH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second + servletH.PongTimeout = cfg.Socket.PongTimeout * time.Second + servletH.PingTimeout = cfg.Socket.PingTimeout * time.Second + servletH.PingPeriod = cfg.Socket.PingPeriod * time.Second + // servletH.BinaryMessage = cfg.Socket.BinaryMessage + + sh.RegisterSocketHandler(entryPath, servletH) + } -func (sh *ServerHandlers) RegisterSubscriberHandler(subscriberHandler oos.SubscriberHandler) { +func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, servletHandler servlet.RPCGatewayServletHandler) { + +} + +func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) { if nil == sh.subscribers { sh.subscribers = make([]oos.SubscriberHandler, 0) } diff --git a/servlet/rpc_gateway_servlet_handler.go b/servlet/rpc_gateway_servlet_handler.go new file mode 100644 index 0000000..034d401 --- /dev/null +++ b/servlet/rpc_gateway_servlet_handler.go @@ -0,0 +1,58 @@ +package servlet + +import ( + cwf "git.loafle.net/commons_go/websocket_fasthttp" + "github.com/valyala/fasthttp" +) + +type RPCGatewayServletHandler interface { + // Init invoked when server is stated + // If you override ths method, must call + // + // func (sh *SocketHandler) Init() error { + // if err := sh.SocketHandlers.Init(); nil != err { + // return err + // } + // ... + // return nil + // } + Init() error + // Handshake do handshake client and server + // id is identity of client socket. if id is "", disallow connection + Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) + // OnConnect invoked when client is connected + // If you override ths method, must call + // + // func (sh *SocketHandler) OnConnect(soc *cwf.Socket) { + // ... + // newSoc := .... + // return sh.SocketHandlers.OnConnect(newSoc) + // } + OnConnect(soc cwf.Socket) cwf.Socket + Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) + // OnDisconnect invoked when client is disconnected + // If you override ths method, must call + // + // func (sh *SocketHandler) OnDisconnect(soc *cwf.Socket) { + // ... + // sh.SocketHandlers.OnDisconnect(soc) + // } + OnDisconnect(soc cwf.Socket) + // Destroy invoked when server is stopped + // If you override ths method, must call + // + // func (sh *SocketHandler) Destroy() { + // ... + // sh.SocketHandlers.Destroy() + // } + Destroy() + + // Validate is check handler value + // If you override ths method, must call + // + // func (sh *SocketHandlers) Validate() { + // sh.SocketHandlers.Validate() + // ... + // } + Validate() +} diff --git a/servlet/rpc_gateway_servlet_handlers.go b/servlet/rpc_gateway_servlet_handlers.go new file mode 100644 index 0000000..58440c9 --- /dev/null +++ b/servlet/rpc_gateway_servlet_handlers.go @@ -0,0 +1,35 @@ +package servlet + +import ( + cwf "git.loafle.net/commons_go/websocket_fasthttp" + "github.com/valyala/fasthttp" +) + +type RPCGatewayServletHandlers struct { +} + +func (sh *RPCGatewayServletHandlers) Init() error { + return nil +} + +func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { + return "", nil +} + +func (sh *RPCGatewayServletHandlers) OnConnect(soc *cwf.Socket) { +} + +func (sh *RPCGatewayServletHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { + // no op +} + +func (sh *RPCGatewayServletHandlers) OnDisconnect(soc *cwf.Socket) { +} + +func (sh *RPCGatewayServletHandlers) Destroy() { + // no op +} + +func (sh *RPCGatewayServletHandlers) Validate() { + +} diff --git a/servlet/rpc_servlet_handler.go b/servlet/rpc_servlet_handler.go deleted file mode 100644 index d880e65..0000000 --- a/servlet/rpc_servlet_handler.go +++ /dev/null @@ -1,5 +0,0 @@ -package servlet - -type RPCServletHandler interface { - ServletHandler -} diff --git a/servlet/rpc_servlet_handlers.go b/servlet/rpc_servlet_handlers.go deleted file mode 100644 index 63cf8ad..0000000 --- a/servlet/rpc_servlet_handlers.go +++ /dev/null @@ -1,98 +0,0 @@ -package servlet - -import ( - "context" - "log" - - "git.loafle.net/commons_go/rpc/gateway" - "git.loafle.net/commons_go/server" - cwf "git.loafle.net/commons_go/websocket_fasthttp" - - oogw "git.loafle.net/overflow/overflow_gateway_websocket" -) - -type RPCServletHandlers struct { - ServletHandlers - - RPCHandler -} - -func (sh *RPCServletHandlers) Init() error { - if err := sh.ServletHandlers.Init(); nil != err { - return err - } - - return nil -} - -// OnConnect invoked when client is connected -// If you override ths method, must call -func (sh *RPCServletHandlers) OnConnect(soc *cwf.Socket) { - sh.ServletHandlers.OnConnect(soc) - -} - -func (sh *RPCServletHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { - codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType) - if nil != err { - log.Printf("RPC Handle: %v", err) - doneChan <- struct{}{} - return - } - - var socConn *cwf.SocketConn - ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc) - // conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) - - for { - if socConn, err = soc.WaitRequest(); nil != err { - doneChan <- struct{}{} - return - } - - // // "git.loafle.net/commons_go/websocket_fasthttp/websocket" - // switch socConn.MessageType { - // case websocket.TextMessage: - // case websocket.BinaryMessage: - // } - - if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err { - if server.IsClientDisconnect(err) { - doneChan <- struct{}{} - return - } - log.Printf("RPC: %v", err) - } - - if err = socConn.Close(); nil != err { - doneChan <- struct{}{} - return - } - - select { - case <-stopChan: - return - default: - } - } - -} - -// OnDisconnect invoked when client is disconnected -// If you override ths method, must call -func (sh *RPCServletHandlers) OnDisconnect(soc *cwf.Socket) { - - sh.ServletHandlers.OnDisconnect(soc) -} - -// Destroy invoked when server is stopped -// If you override ths method, must call -func (sh *ServletHandlers) Destroy() { - - sh.SocketHandlers.Destroy() -} - -func (sh *RPCServletHandlers) Validate() { - sh.ServletHandlers.Validate() - -} diff --git a/servlet/servlet_handlers.go b/servlet/servlet_handlers.go index a23994a..962e2de 100644 --- a/servlet/servlet_handlers.go +++ b/servlet/servlet_handlers.go @@ -1,50 +1,46 @@ package servlet import ( - "git.loafle.net/commons_go/logging" cwf "git.loafle.net/commons_go/websocket_fasthttp" ) type ServletHandlers struct { cwf.SocketHandlers - - // EntryPath is path of url (ex: /web) - EntryPath string } -func (sh *ServletHandlers) Init() error { - if err := sh.SocketHandlers.Init(); nil != err { - return err - } +// func (sh *ServletHandlers) Init() error { +// if err := sh.SocketHandlers.Init(); nil != err { +// return err +// } - return nil -} +// return nil +// } -// OnConnect invoked when client is connected -// If you override ths method, must call -func (sh *ServletHandlers) OnConnect(soc *cwf.Socket) { - sh.SocketHandlers.OnConnect(soc) +// // OnConnect invoked when client is connected +// // If you override ths method, must call +// func (sh *ServletHandlers) OnConnect(soc *cwf.Socket) { +// sh.SocketHandlers.OnConnect(soc) -} +// } -// OnDisconnect invoked when client is disconnected -// If you override ths method, must call -func (sh *ServletHandlers) OnDisconnect(soc *cwf.Socket) { +// // OnDisconnect invoked when client is disconnected +// // If you override ths method, must call +// func (sh *ServletHandlers) OnDisconnect(soc *cwf.Socket) { - sh.SocketHandlers.OnDisconnect(soc) -} +// sh.SocketHandlers.OnDisconnect(soc) +// } -// Destroy invoked when server is stopped -// If you override ths method, must call -func (sh *ServletHandlers) Destroy() { +// // Destroy invoked when server is stopped +// // If you override ths method, must call +// func (sh *ServletHandlers) Destroy() { - sh.SocketHandlers.Destroy() -} +// sh.SocketHandlers.Destroy() +// } -func (sh *ServletHandlers) Validate() { - sh.SocketHandlers.Validate() +// func (sh *ServletHandlers) Validate() { +// sh.SocketHandlers.Validate() - if "" == sh.EntryPath { - logging.Logger().Panic("Geteway Server: The path of entry must be specified") - } -} +// if "" == sh.EntryPath { +// logging.Logger().Panic("Geteway Server: The path of entry must be specified") +// } +// }