From 8c19c1a88704ce9161f2fbb7340162dfcc892697 Mon Sep 17 00:00:00 2001 From: crusader Date: Tue, 14 Nov 2017 14:57:03 +0900 Subject: [PATCH] ing --- constants.go | 8 ++- .../server/rpc/rpc_gateway_server_handlers.go | 69 +++++++++++++++++++ ...ers.go => rpc_gateway_servlet_handlers.go} | 37 +++++++--- server/server_handlers.go | 17 ++--- servlet/servlet_handlers.go | 11 +-- 5 files changed, 116 insertions(+), 26 deletions(-) create mode 100644 internal/server/rpc/rpc_gateway_server_handlers.go rename internal/server/rpc/{rpc_gateway_socket_handlers.go => rpc_gateway_servlet_handlers.go} (71%) diff --git a/constants.go b/constants.go index 51b5b46..5925672 100644 --- a/constants.go +++ b/constants.go @@ -1,15 +1,17 @@ package overflow_gateway_websocket import ( - "git.loafle.net/commons_go/util" + cuc "git.loafle.net/commons_go/util/context" ) var ( - ServletSocketKey = util.ContextKey("ServletSocket") - ServletContentTypeKey = util.ContextKey("ServletContentType") + ServletSocketKey = cuc.ContextKey("ServletSocket") + ServletContentTypeKey = cuc.ContextKey("ServletContentType") ) const ( ConfigPathFlagName = "config-dir" ConfigFileName = "config.json" + + GRPCUserIDKey = "GRPCUserID" ) diff --git a/internal/server/rpc/rpc_gateway_server_handlers.go b/internal/server/rpc/rpc_gateway_server_handlers.go new file mode 100644 index 0000000..71078f2 --- /dev/null +++ b/internal/server/rpc/rpc_gateway_server_handlers.go @@ -0,0 +1,69 @@ +package rpc + +import ( + "context" + "io" + + "google.golang.org/grpc/metadata" + + "git.loafle.net/commons_go/rpc/gateway" + oogw "git.loafle.net/overflow/overflow_gateway_websocket" + "git.loafle.net/overflow/overflow_gateway_websocket/external/grpc" +) + +func newRPCGatewayServerHandler() RPCGatewayServerHandler { + sh := &RPCGatewayServerHandlers{} + + return sh +} + +type RPCGatewayServerHandler interface { + gateway.ServerHandler +} + +type RPCGatewayServerHandlers struct { + gateway.ServerHandlers +} + +func (sh *RPCGatewayServerHandlers) OnPreRead(r io.Reader) { + // no op +} + +func (sh *RPCGatewayServerHandlers) OnPostRead(r io.Reader) { + // no op +} + +func (sh *RPCGatewayServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (sh *RPCGatewayServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (sh *RPCGatewayServerHandlers) OnPreWriteError(w io.Writer, err error) { + // no op +} + +func (sh *RPCGatewayServerHandlers) OnPostWriteError(w io.Writer, err error) { + // no op +} + +func (sh *RPCGatewayServerHandlers) Invoke(ctx context.Context, method string, params []string) (result interface{}, err error) { + soc := ctx.Value(oogw.ServletSocketKey).(Socket) + + md := metadata.Pairs(oogw.GRPCUserIDKey, soc.ID()) + grpcCTX := metadata.NewOutgoingContext(context.Background(), md) + + r, err := grpc.Exec(grpcCTX, method, params) + if err != nil { + return nil, err + } + + return r, nil + +} + +func (sh *RPCGatewayServerHandlers) Validate() { + sh.ServerHandlers.Validate() +} diff --git a/internal/server/rpc/rpc_gateway_socket_handlers.go b/internal/server/rpc/rpc_gateway_servlet_handlers.go similarity index 71% rename from internal/server/rpc/rpc_gateway_socket_handlers.go rename to internal/server/rpc/rpc_gateway_servlet_handlers.go index 3c8c8c6..6aeb8a7 100644 --- a/internal/server/rpc/rpc_gateway_socket_handlers.go +++ b/internal/server/rpc/rpc_gateway_servlet_handlers.go @@ -10,14 +10,30 @@ import ( "git.loafle.net/commons_go/server" cwf "git.loafle.net/commons_go/websocket_fasthttp" oogw "git.loafle.net/overflow/overflow_gateway_websocket" + "git.loafle.net/overflow/overflow_gateway_websocket/servlet" ) +func NewRPCGatewayServletHandler() RPCGatewayServletHandler { + rpcSH := newRPCGatewayServerHandler() + sh := &RPCGatewayServletHandlers{ + rpcSH: rpcSH, + } + + return sh +} + +type RPCGatewayServletHandler interface { + servlet.ServletHandler +} + type RPCGatewayServletHandlers struct { - oogw.ServerHandlers + servlet.ServletHandlers + + rpcSH RPCGatewayServerHandler } func (sh *RPCGatewayServletHandlers) Init() error { - if err := sh.SocketHandlers.Init(); nil != err { + if err := sh.ServletHandlers.Init(); nil != err { return err } @@ -31,13 +47,14 @@ func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id str // OnConnect invoked when client is connected // If you override ths method, must call func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket { - soc = sh.SocketHandlers.OnConnect(soc) + soc = sh.ServletHandlers.OnConnect(soc) return newSocket(soc, "json") } func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { - codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType) + rpcSoc := soc.(Socket) + codec, err := sh.rpcSH.GetCodec(rpcSoc.GetContentType()) if nil != err { log.Printf("RPC Handle: %v", err) doneChan <- struct{}{} @@ -45,11 +62,11 @@ func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan stru } var socConn *cwf.SocketConn - ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc) + ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, rpcSoc) // conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { - if socConn, err = soc.WaitRequest(); nil != err { + if socConn, err = rpcSoc.WaitRequest(); nil != err { doneChan <- struct{}{} return } @@ -60,7 +77,7 @@ func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan stru // case websocket.BinaryMessage: // } - if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err { + if err = gateway.Handle(ctx, sh.rpcSH, codec, socConn, socConn); nil != err { if server.IsClientDisconnect(err) { doneChan <- struct{}{} return @@ -86,17 +103,17 @@ func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan stru // If you override ths method, must call func (sh *RPCGatewayServletHandlers) OnDisconnect(soc cwf.Socket) { - sh.SocketHandlers.OnDisconnect(soc) + sh.ServletHandlers.OnDisconnect(soc) } // Destroy invoked when server is stopped // If you override ths method, must call func (sh *RPCGatewayServletHandlers) Destroy() { - sh.SocketHandlers.Destroy() + sh.ServletHandlers.Destroy() } func (sh *RPCGatewayServletHandlers) Validate() { - sh.SocketHandlers.Validate() + sh.ServletHandlers.Validate() } diff --git a/server/server_handlers.go b/server/server_handlers.go index 7093ed7..c16ee6c 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -94,16 +94,17 @@ func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, servletHan return } - rpcSH := &rpc.RPCGatewayServletHandlers{} + rpcSH := rpc.NewRPCGatewayServletHandler() + rpcSHs := rpcSH.(*rpc.RPCGatewayServletHandlers) - rpcSH.MaxMessageSize = cfg.Socket.MaxMessageSize - rpcSH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second - rpcSH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second - rpcSH.PongTimeout = cfg.Socket.PongTimeout * time.Second - rpcSH.PingTimeout = cfg.Socket.PingTimeout * time.Second - rpcSH.PingPeriod = cfg.Socket.PingPeriod * time.Second + rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize + rpcSHs.WriteTimeout = cfg.Socket.WriteTimeout * time.Second + rpcSHs.ReadTimeout = cfg.Socket.ReadTimeout * time.Second + rpcSHs.PongTimeout = cfg.Socket.PongTimeout * time.Second + rpcSHs.PingTimeout = cfg.Socket.PingTimeout * time.Second + rpcSHs.PingPeriod = cfg.Socket.PingPeriod * time.Second - sh.RegisterSocketHandler(entryPath, rpcSH) + sh.RegisterSocketHandler(entryPath, rpcSHs) } func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) { diff --git a/servlet/servlet_handlers.go b/servlet/servlet_handlers.go index 962e2de..4040899 100644 --- a/servlet/servlet_handlers.go +++ b/servlet/servlet_handlers.go @@ -18,15 +18,16 @@ type ServletHandlers struct { // // OnConnect invoked when client is connected // // If you override ths method, must call -// func (sh *ServletHandlers) OnConnect(soc *cwf.Socket) { -// sh.SocketHandlers.OnConnect(soc) - +// func (sh *ServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket { +// soc = sh.SocketHandlers.OnConnect(soc) +// newSoc := ... +// return newSoc // } // // OnDisconnect invoked when client is disconnected // // If you override ths method, must call -// func (sh *ServletHandlers) OnDisconnect(soc *cwf.Socket) { - +// func (sh *ServletHandlers) OnDisconnect(soc cwf.Socket) { +// ... // sh.SocketHandlers.OnDisconnect(soc) // }