diff --git a/.vscode/settings.json b/.vscode/settings.json index 20af2f6..60835be 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ // Place your settings in this file to overwrite default and user settings. { + "java.configuration.updateBuildConfiguration": "automatic" } \ No newline at end of file diff --git a/internal/server/rpc/Untitled-1 b/internal/server/rpc/Untitled-1 new file mode 100644 index 0000000..b620f24 --- /dev/null +++ b/internal/server/rpc/Untitled-1 @@ -0,0 +1,42 @@ + codec, err := sh.gwRPCSH.GetCodec(soc.Context().GetAttribute("contentType").(string)) + if nil != err { + log.Printf("RPC Handle: %v", err) + doneChan <- struct{}{} + return + } + + var socConn *cwf.SocketConn + ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, rpcSoc) + // conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + + for { + if socConn, err = rpcSoc.WaitRequest(); nil != err { + doneChan <- struct{}{} + return + } + + // // "git.loafle.net/commons_go/websocket_fasthttp/websocket" + // switch socConn.MessageType { + // case websocket.TextMessage: + // case websocket.BinaryMessage: + // } + + if err = gateway.Handle(ctx, sh.gwRPCSH, codec, socConn, socConn); nil != err { + if server.IsClientDisconnect(err) { + doneChan <- struct{}{} + return + } + log.Printf("RPC: %v", err) + } + + if err = socConn.Close(); nil != err { + doneChan <- struct{}{} + return + } + + select { + case <-stopChan: + return + default: + } + } diff --git a/internal/server/rpc/gateway_rpc_servlet_handlers.go b/internal/server/rpc/gateway_rpc_servlet_handlers.go index 0a94188..fc419e6 100644 --- a/internal/server/rpc/gateway_rpc_servlet_handlers.go +++ b/internal/server/rpc/gateway_rpc_servlet_handlers.go @@ -1,29 +1,46 @@ package rpc import ( + "context" "fmt" "git.loafle.net/commons_go/logging" + "git.loafle.net/commons_go/rpc" crcwf "git.loafle.net/commons_go/rpc/connection/websocket/fasthttp" "git.loafle.net/commons_go/rpc/gateway" "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/rpc/protocol/json" + oogw "git.loafle.net/overflow/overflow_gateway_websocket" + "git.loafle.net/overflow/overflow_gateway_websocket/external/grpc" + "google.golang.org/grpc/metadata" ) +func newGatewayRPCServletHandler() GatewayRPCServletHandler { + sh := &GatewayRPCServletHandlers{} + sh.RegisterCodec(json.NewServerCodec(), json.Name) + return sh +} + type GatewayRPCServletHandlers struct { gateway.ServletHandlers crcwf.ServletHandlers } -func (sh *GatewayRPCServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) { +func (sh *GatewayRPCServletHandlers) Invoke(servletCTX rpc.ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) { if !sh.RPCRegistry.HasMethod(requestCodec.Method()) { return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method()) } - result, err = sh.RPCRegistry.Invoke(requestCodec) - if nil != err { + md := metadata.Pairs(oogw.GRPCUserIDKey, soc.ID()) + grpcCTX := metadata.NewOutgoingContext(context.Background(), md) + r, err := grpc.Exec(grpcCTX, requestCodec.Method(), params) + if err != nil { + return nil, err } + return r, nil + return } diff --git a/internal/server/rpc/rpc_gateway_server_handlers.go b/internal/server/rpc/rpc_gateway_server_handlers.go deleted file mode 100644 index 06a8d6e..0000000 --- a/internal/server/rpc/rpc_gateway_server_handlers.go +++ /dev/null @@ -1,70 +0,0 @@ -package rpc - -import ( - "context" - "io" - - "google.golang.org/grpc/metadata" - - "git.loafle.net/commons_go/rpc/gateway" - "git.loafle.net/commons_go/rpc/protocol/json" - oogw "git.loafle.net/overflow/overflow_gateway_websocket" - "git.loafle.net/overflow/overflow_gateway_websocket/external/grpc" -) - -func newRPCGatewayServerHandler() RPCGatewayServerHandler { - sh := &RPCGatewayServerHandlers{} - sh.RegisterCodec(json.NewServerCodec(), json.Name) - return sh -} - -type RPCGatewayServerHandler interface { - gateway.ServerHandler -} - -type RPCGatewayServerHandlers struct { - gateway.ServerHandlers -} - -func (sh *RPCGatewayServerHandlers) OnPreRead(r io.Reader) { - // no op -} - -func (sh *RPCGatewayServerHandlers) OnPostRead(r io.Reader) { - // no op -} - -func (sh *RPCGatewayServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (sh *RPCGatewayServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (sh *RPCGatewayServerHandlers) OnPreWriteError(w io.Writer, err error) { - // no op -} - -func (sh *RPCGatewayServerHandlers) OnPostWriteError(w io.Writer, err error) { - // no op -} - -func (sh *RPCGatewayServerHandlers) Invoke(ctx context.Context, method string, params []string) (result interface{}, err error) { - soc := ctx.Value(oogw.ServletSocketKey).(Socket) - - md := metadata.Pairs(oogw.GRPCUserIDKey, soc.ID()) - grpcCTX := metadata.NewOutgoingContext(context.Background(), md) - - r, err := grpc.Exec(grpcCTX, method, params) - if err != nil { - return nil, err - } - - return r, nil - -} - -func (sh *RPCGatewayServerHandlers) Validate() { - sh.ServerHandlers.Validate() -} diff --git a/internal/server/rpc/rpc_gateway_servlet_context.go b/internal/server/rpc/rpc_gateway_servlet_context.go new file mode 100644 index 0000000..3351880 --- /dev/null +++ b/internal/server/rpc/rpc_gateway_servlet_context.go @@ -0,0 +1,21 @@ +package rpc + +import ( + cwf "git.loafle.net/commons_go/websocket_fasthttp" + "git.loafle.net/overflow/overflow_gateway_websocket/servlet" +) + +type ServletContext interface { + servlet.ServletContext +} + +func newServletContext(socketCTX cwf.SocketContext) SocketContext { + sCTX := &servletContext{} + sCTX.ServletContext = servlet.NewServletContext(socketCTX) + + return sCTX +} + +type servletContext struct { + servlet.ServletContext +} diff --git a/internal/server/rpc/rpc_gateway_servlet_handler.go b/internal/server/rpc/rpc_gateway_servlet_handler.go new file mode 100644 index 0000000..b03d707 --- /dev/null +++ b/internal/server/rpc/rpc_gateway_servlet_handler.go @@ -0,0 +1,9 @@ +package rpc + +import ( + "git.loafle.net/overflow/overflow_gateway_websocket/servlet" +) + +type RPCGatewayServletHandler interface { + servlet.ServletHandler +} diff --git a/internal/server/rpc/rpc_gateway_servlet_handlers.go b/internal/server/rpc/rpc_gateway_servlet_handlers.go index b3b0cf8..3ea3799 100644 --- a/internal/server/rpc/rpc_gateway_servlet_handlers.go +++ b/internal/server/rpc/rpc_gateway_servlet_handlers.go @@ -1,110 +1,78 @@ package rpc import ( - "context" - "log" + "sync" "github.com/valyala/fasthttp" - "git.loafle.net/commons_go/rpc/gateway" + cRPC "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol/json" - "git.loafle.net/commons_go/server" cwf "git.loafle.net/commons_go/websocket_fasthttp" - oogw "git.loafle.net/overflow/overflow_gateway_websocket" "git.loafle.net/overflow/overflow_gateway_websocket/rpc" "git.loafle.net/overflow/overflow_gateway_websocket/servlet" ) -func NewRPCGatewayServletHandler(rpcGH rpc.RPCGatewayHandler) RPCGatewayServletHandler { - rpcSH := newRPCGatewayServerHandler() +func NewRPCGatewayServletHandler(gwRPCHandler rpc.GatewayRPCHandler) RPCGatewayServletHandler { + gwRPCSH := newGatewayRPCServletHandler() + sh := &RPCGatewayServletHandlers{ - rpcSH: rpcSH, - rpcGH: rpcGH, + gwRPCSH: gwRPCSH, + gwRPCHandler: gwRPCHandler, } return sh } -type RPCGatewayServletHandler interface { - servlet.ServletHandler -} - type RPCGatewayServletHandlers struct { servlet.ServletHandlers - rpcSH RPCGatewayServerHandler - rpcGH rpc.RPCGatewayHandler + gwRPCSH GatewayRPCServletHandler + gwRPCHandler rpc.GatewayRPCHandler } -func (sh *RPCGatewayServletHandlers) Init() error { - if err := sh.ServletHandlers.Init(); nil != err { +func (sh *RPCGatewayServletHandlers) Init(serverCTX cwf.ServerContext) error { + if err := sh.ServletHandlers.Init(serverCTX); nil != err { return err } - if err := sh.rpcGH.Init(); nil != err { + if err := sh.gwRPCHandler.Init(serverCTX); nil != err { return err } return nil } -func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { - return sh.rpcGH.Handshake(ctx) +func (sh *RPCGatewayServletHandlers) Handshake(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { + return sh.gwRPCHandler.Handshake(serverCTX, ctx) +} + +func (sh *SocketHandlers) SocketContext(serverCTX cwf.ServerContext) cwf.SocketContext { + socketCTX := sh.SocketContext(serverCTX) + + return newServletContext(socketCTX) } // OnConnect invoked when client is connected // If you override ths method, must call -func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket { - soc = sh.ServletHandlers.OnConnect(soc) +func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) { + sh.ServletHandlers.OnConnect(soc) - return newSocket(soc, json.Name) + soc.Context().SetAttribute(cRPC.ContentTypeKey, json.Name) } func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { - rpcSoc := soc.(Socket) - codec, err := sh.rpcSH.GetCodec(rpcSoc.GetContentType()) - if nil != err { - log.Printf("RPC Handle: %v", err) + rpcServlet := retainRPCServlet(sh.gwRPCSH) + + if err := rpcServlet.Start(soc.Context(), soc, soc); nil != err { doneChan <- struct{}{} return } - var socConn *cwf.SocketConn - ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, rpcSoc) - // conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) - - for { - if socConn, err = rpcSoc.WaitRequest(); nil != err { - doneChan <- struct{}{} - return - } - - // // "git.loafle.net/commons_go/websocket_fasthttp/websocket" - // switch socConn.MessageType { - // case websocket.TextMessage: - // case websocket.BinaryMessage: - // } - - if err = gateway.Handle(ctx, sh.rpcSH, codec, socConn, socConn); nil != err { - if server.IsClientDisconnect(err) { - doneChan <- struct{}{} - return - } - log.Printf("RPC: %v", err) - } - - if err = socConn.Close(); nil != err { - doneChan <- struct{}{} - return - } - - select { - case <-stopChan: - return - default: - } + select { + case <-stopChan: + rpcServlet.Stop() + return } - } // OnDisconnect invoked when client is disconnected @@ -124,5 +92,20 @@ func (sh *RPCGatewayServletHandlers) Destroy() { func (sh *RPCGatewayServletHandlers) Validate() { sh.ServletHandlers.Validate() - sh.rpcGH.Validate() + sh.gwRPCHandler.Validate() +} + +var rpcServletPool sync.Pool + +func retainRPCServlet(sh GatewayRPCServletHandler) cRPC.Servlet { + v := rpcServletPool.Get() + if v == nil { + return cRPC.NewServlet(sh) + } + return v.(cRPC.Servlet) +} + +func releaseRPCServlet(s cRPC.Servlet) { + + rpcServletPool.Put(s) } diff --git a/internal/server/rpc/socket.go b/internal/server/rpc/socket.go deleted file mode 100644 index 3fa4ab3..0000000 --- a/internal/server/rpc/socket.go +++ /dev/null @@ -1,28 +0,0 @@ -package rpc - -import ( - cwf "git.loafle.net/commons_go/websocket_fasthttp" -) - -func newSocket(soc cwf.Socket, contentType string) Socket { - newSoc := &socket{ - contentType: contentType, - } - newSoc.Socket = soc - - return newSoc -} - -type Socket interface { - cwf.Socket - GetContentType() string -} - -type socket struct { - cwf.Socket - contentType string -} - -func (s *socket) GetContentType() string { - return s.contentType -} diff --git a/rpc/gateway_rpc_handler.go b/rpc/gateway_rpc_handler.go new file mode 100644 index 0000000..fd18bf4 --- /dev/null +++ b/rpc/gateway_rpc_handler.go @@ -0,0 +1,14 @@ +package rpc + +import ( + "github.com/valyala/fasthttp" + + cwf "git.loafle.net/commons_go/websocket_fasthttp" +) + +type GatewayRPCHandler interface { + Init(serverCTX cwf.ServerContext) error + Handshake(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) + + Validate() +} diff --git a/rpc/gateway_rpc_handlers.go b/rpc/gateway_rpc_handlers.go new file mode 100644 index 0000000..0982051 --- /dev/null +++ b/rpc/gateway_rpc_handlers.go @@ -0,0 +1,21 @@ +package rpc + +import ( + cwf "git.loafle.net/commons_go/websocket_fasthttp" + "github.com/valyala/fasthttp" +) + +type GatewayRPCHandlers struct { +} + +func (sh *GatewayRPCHandlers) Init(serverCTX cwf.ServerContext) error { + return nil +} + +func (sh *GatewayRPCHandlers) Handshake(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { + return "", nil +} + +func (sh *GatewayRPCHandlers) Validate() { + +} diff --git a/rpc/rpc_gateway_handler.go b/rpc/rpc_gateway_handler.go deleted file mode 100644 index bd3af04..0000000 --- a/rpc/rpc_gateway_handler.go +++ /dev/null @@ -1,10 +0,0 @@ -package rpc - -import "github.com/valyala/fasthttp" - -type RPCGatewayHandler interface { - Init() error - Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) - - Validate() -} diff --git a/rpc/rpc_gateway_handlers.go b/rpc/rpc_gateway_handlers.go deleted file mode 100644 index 1c65399..0000000 --- a/rpc/rpc_gateway_handlers.go +++ /dev/null @@ -1,18 +0,0 @@ -package rpc - -import "github.com/valyala/fasthttp" - -type RPCGatewayHandlers struct { -} - -func (sh *RPCGatewayHandlers) Init() error { - return nil -} - -func (sh *RPCGatewayHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { - return "", nil -} - -func (sh *RPCGatewayHandlers) Validate() { - -} diff --git a/server/server.go b/server/server.go index 0b16f7d..c2c7660 100644 --- a/server/server.go +++ b/server/server.go @@ -4,8 +4,8 @@ import ( cwf "git.loafle.net/commons_go/websocket_fasthttp" ) -func New(serverCTX ServerContext, sh ServerHandler) Server { - s := cwf.New(serverCTX, sh) +func New(sh ServerHandler) Server { + s := cwf.New(sh) return s } diff --git a/server/server_context.go b/server/server_context.go index c31c054..b5d930b 100644 --- a/server/server_context.go +++ b/server/server_context.go @@ -1,7 +1,6 @@ package server import ( - cuc "git.loafle.net/commons_go/util/context" cwf "git.loafle.net/commons_go/websocket_fasthttp" ) @@ -13,9 +12,9 @@ type serverContext struct { cwf.ServerContext } -func NewServerContext(parent cuc.Context) ServerContext { +func NewServerContext(serverCTX cwf.ServerContext) ServerContext { sCTX := &serverContext{} - sCTX.ServerContext = cwf.NewServerContext(parent) + sCTX.ServerContext = serverCTX return sCTX } diff --git a/server/server_handler.go b/server/server_handler.go index fc65b7e..27935db 100644 --- a/server/server_handler.go +++ b/server/server_handler.go @@ -11,6 +11,6 @@ type ServerHandler interface { cwf.ServerHandler RegisterServlet(servletName string, servletHandler servlet.ServletHandler) - RegisterRPCGatewayServlet(servletName string, rpcGatewayHandler rpc.RPCGatewayHandler) servlet.ServletHandler + RegisterRPCGatewayServlet(servletName string, gatewayRPCHandler rpc.GatewayRPCHandler) servlet.ServletHandler RegisterSubscriber(subscriberHandler oos.SubscriberHandler) } diff --git a/server/server_handlers.go b/server/server_handlers.go index 605db84..710dea2 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -44,10 +44,16 @@ type ServerHandlers struct { subscribers []oos.SubscriberHandler } +func (sh *ServerHandlers) ServerContext() ServerContext { + serverCTX := sh.ServerHandlers.ServerContext() + + return newServerContext(serverCTX) +} + // Init invoked before the server is started // If you override ths method, must call -func (sh *ServerHandlers) Init(serverCTX ServerContext) error { - if err := sh.ServerHandlers.Init(); nil != err { +func (sh *ServerHandlers) Init(serverCTX cwf.ServerContext) error { + if err := sh.ServerHandlers.Init(serverCTX); nil != err { return err } @@ -56,12 +62,12 @@ func (sh *ServerHandlers) Init(serverCTX ServerContext) error { return nil } -func (sh *ServerHandlers) Listen(serverCTX ServerContext) (net.Listener, error) { +func (sh *ServerHandlers) Listen(serverCTX cwf.ServerContext) (net.Listener, error) { return net.Listen(config.Config.Server.Network, config.Config.Server.Addr) } -func (sh *ServerHandlers) OnStart(serverCTX ServerContext) { - sh.ServerHandlers.OnStart() +func (sh *ServerHandlers) OnStart(serverCTX cwf.ServerContext) { + sh.ServerHandlers.OnStart(serverCTX) sh.redisSubscriber = oosr.New(redis.RedisPool.Get()) if err := sh.redisSubscriber.Start(); nil != err { @@ -74,16 +80,16 @@ func (sh *ServerHandlers) OnStart(serverCTX ServerContext) { } } -func (sh *ServerHandlers) OnError(serverCTX ServerContext, ctx *fasthttp.RequestCtx, status int, reason error) { - sh.OnError(ctx, status, reason) +func (sh *ServerHandlers) OnError(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx, status int, reason error) { + sh.ServerHandlers.OnError(serverCTX, ctx, status, reason) } -func (sh *ServerHandlers) OnStop(serverCTX ServerContext) { +func (sh *ServerHandlers) OnStop(serverCTX cwf.ServerContext) { sh.redisSubscriber.Stop() external.ExternalDestroy() - sh.ServerHandlers.OnStop() + sh.ServerHandlers.OnStop(serverCTX) } func (sh *ServerHandlers) RegisterServlet(servletName string, servletHandler servlet.ServletHandler) { @@ -107,14 +113,14 @@ func (sh *ServerHandlers) RegisterServlet(servletName string, servletHandler ser } -func (sh *ServerHandlers) RegisterRPCGatewayServlet(servletName string, rpcGatewayHandler rpc.RPCGatewayHandler) servlet.ServletHandler { +func (sh *ServerHandlers) RegisterRPCGatewayServlet(servletName string, gatewayRPCHandler rpc.GatewayRPCHandler) servlet.ServletHandler { cfg := config.Config.Servlets[servletName] if nil == cfg { logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of servlet[%s] is not exist", servletName)) return nil } - rpcSH := oogwisr.NewRPCGatewayServletHandler(rpcGatewayHandler) + rpcSH := oogwisr.NewRPCGatewayServletHandler(gatewayRPCHandler) rpcSHs := rpcSH.(*oogwisr.RPCGatewayServletHandlers) rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize diff --git a/servlet/servlet_context.go b/servlet/servlet_context.go new file mode 100644 index 0000000..b4aa2e3 --- /dev/null +++ b/servlet/servlet_context.go @@ -0,0 +1,20 @@ +package servlet + +import ( + cwf "git.loafle.net/commons_go/websocket_fasthttp" +) + +type ServletContext interface { + cwf.SocketContext +} + +func NewServletContext(socketCTX cwf.SocketContext) SocketContext { + sCTX := &servletContext{} + sCTX.SocketContext = socketCTX + + return sCTX +} + +type servletContext struct { + cwf.SocketContext +} diff --git a/servlet/servlet_handlers.go b/servlet/servlet_handlers.go index 4040899..2f110a7 100644 --- a/servlet/servlet_handlers.go +++ b/servlet/servlet_handlers.go @@ -7,41 +7,3 @@ import ( type ServletHandlers struct { cwf.SocketHandlers } - -// func (sh *ServletHandlers) Init() error { -// if err := sh.SocketHandlers.Init(); nil != err { -// return err -// } - -// return nil -// } - -// // OnConnect invoked when client is connected -// // If you override ths method, must call -// func (sh *ServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket { -// soc = sh.SocketHandlers.OnConnect(soc) -// newSoc := ... -// return newSoc -// } - -// // OnDisconnect invoked when client is disconnected -// // If you override ths method, must call -// func (sh *ServletHandlers) OnDisconnect(soc cwf.Socket) { -// ... -// sh.SocketHandlers.OnDisconnect(soc) -// } - -// // Destroy invoked when server is stopped -// // If you override ths method, must call -// func (sh *ServletHandlers) Destroy() { - -// sh.SocketHandlers.Destroy() -// } - -// func (sh *ServletHandlers) Validate() { -// sh.SocketHandlers.Validate() - -// if "" == sh.EntryPath { -// logging.Logger().Panic("Geteway Server: The path of entry must be specified") -// } -// }