This commit is contained in:
crusader 2017-11-28 02:12:26 +09:00
parent b2bfe5bb2d
commit ecd5ad53bf
18 changed files with 216 additions and 247 deletions

View File

@ -1,3 +1,4 @@
// Place your settings in this file to overwrite default and user settings.
{
"java.configuration.updateBuildConfiguration": "automatic"
}

View File

@ -0,0 +1,42 @@
codec, err := sh.gwRPCSH.GetCodec(soc.Context().GetAttribute("contentType").(string))
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
var socConn *cwf.SocketConn
ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, rpcSoc)
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if socConn, err = rpcSoc.WaitRequest(); nil != err {
doneChan <- struct{}{}
return
}
// // "git.loafle.net/commons_go/websocket_fasthttp/websocket"
// switch socConn.MessageType {
// case websocket.TextMessage:
// case websocket.BinaryMessage:
// }
if err = gateway.Handle(ctx, sh.gwRPCSH, codec, socConn, socConn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = socConn.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
}

View File

@ -1,29 +1,46 @@
package rpc
import (
"context"
"fmt"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc"
crcwf "git.loafle.net/commons_go/rpc/connection/websocket/fasthttp"
"git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/rpc/protocol/json"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/external/grpc"
"google.golang.org/grpc/metadata"
)
func newGatewayRPCServletHandler() GatewayRPCServletHandler {
sh := &GatewayRPCServletHandlers{}
sh.RegisterCodec(json.NewServerCodec(), json.Name)
return sh
}
type GatewayRPCServletHandlers struct {
gateway.ServletHandlers
crcwf.ServletHandlers
}
func (sh *GatewayRPCServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) {
func (sh *GatewayRPCServletHandlers) Invoke(servletCTX rpc.ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) {
if !sh.RPCRegistry.HasMethod(requestCodec.Method()) {
return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method())
}
result, err = sh.RPCRegistry.Invoke(requestCodec)
if nil != err {
md := metadata.Pairs(oogw.GRPCUserIDKey, soc.ID())
grpcCTX := metadata.NewOutgoingContext(context.Background(), md)
r, err := grpc.Exec(grpcCTX, requestCodec.Method(), params)
if err != nil {
return nil, err
}
return r, nil
return
}

View File

@ -1,70 +0,0 @@
package rpc
import (
"context"
"io"
"google.golang.org/grpc/metadata"
"git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/rpc/protocol/json"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/external/grpc"
)
func newRPCGatewayServerHandler() RPCGatewayServerHandler {
sh := &RPCGatewayServerHandlers{}
sh.RegisterCodec(json.NewServerCodec(), json.Name)
return sh
}
type RPCGatewayServerHandler interface {
gateway.ServerHandler
}
type RPCGatewayServerHandlers struct {
gateway.ServerHandlers
}
func (sh *RPCGatewayServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}
func (sh *RPCGatewayServerHandlers) Invoke(ctx context.Context, method string, params []string) (result interface{}, err error) {
soc := ctx.Value(oogw.ServletSocketKey).(Socket)
md := metadata.Pairs(oogw.GRPCUserIDKey, soc.ID())
grpcCTX := metadata.NewOutgoingContext(context.Background(), md)
r, err := grpc.Exec(grpcCTX, method, params)
if err != nil {
return nil, err
}
return r, nil
}
func (sh *RPCGatewayServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}

View File

@ -0,0 +1,21 @@
package rpc
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
)
type ServletContext interface {
servlet.ServletContext
}
func newServletContext(socketCTX cwf.SocketContext) SocketContext {
sCTX := &servletContext{}
sCTX.ServletContext = servlet.NewServletContext(socketCTX)
return sCTX
}
type servletContext struct {
servlet.ServletContext
}

View File

@ -0,0 +1,9 @@
package rpc
import (
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
)
type RPCGatewayServletHandler interface {
servlet.ServletHandler
}

View File

@ -1,110 +1,78 @@
package rpc
import (
"context"
"log"
"sync"
"github.com/valyala/fasthttp"
"git.loafle.net/commons_go/rpc/gateway"
cRPC "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol/json"
"git.loafle.net/commons_go/server"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
)
func NewRPCGatewayServletHandler(rpcGH rpc.RPCGatewayHandler) RPCGatewayServletHandler {
rpcSH := newRPCGatewayServerHandler()
func NewRPCGatewayServletHandler(gwRPCHandler rpc.GatewayRPCHandler) RPCGatewayServletHandler {
gwRPCSH := newGatewayRPCServletHandler()
sh := &RPCGatewayServletHandlers{
rpcSH: rpcSH,
rpcGH: rpcGH,
gwRPCSH: gwRPCSH,
gwRPCHandler: gwRPCHandler,
}
return sh
}
type RPCGatewayServletHandler interface {
servlet.ServletHandler
}
type RPCGatewayServletHandlers struct {
servlet.ServletHandlers
rpcSH RPCGatewayServerHandler
rpcGH rpc.RPCGatewayHandler
gwRPCSH GatewayRPCServletHandler
gwRPCHandler rpc.GatewayRPCHandler
}
func (sh *RPCGatewayServletHandlers) Init() error {
if err := sh.ServletHandlers.Init(); nil != err {
func (sh *RPCGatewayServletHandlers) Init(serverCTX cwf.ServerContext) error {
if err := sh.ServletHandlers.Init(serverCTX); nil != err {
return err
}
if err := sh.rpcGH.Init(); nil != err {
if err := sh.gwRPCHandler.Init(serverCTX); nil != err {
return err
}
return nil
}
func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return sh.rpcGH.Handshake(ctx)
func (sh *RPCGatewayServletHandlers) Handshake(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return sh.gwRPCHandler.Handshake(serverCTX, ctx)
}
func (sh *SocketHandlers) SocketContext(serverCTX cwf.ServerContext) cwf.SocketContext {
socketCTX := sh.SocketContext(serverCTX)
return newServletContext(socketCTX)
}
// OnConnect invoked when client is connected
// If you override ths method, must call
func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket {
soc = sh.ServletHandlers.OnConnect(soc)
func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) {
sh.ServletHandlers.OnConnect(soc)
return newSocket(soc, json.Name)
soc.Context().SetAttribute(cRPC.ContentTypeKey, json.Name)
}
func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
rpcSoc := soc.(Socket)
codec, err := sh.rpcSH.GetCodec(rpcSoc.GetContentType())
if nil != err {
log.Printf("RPC Handle: %v", err)
rpcServlet := retainRPCServlet(sh.gwRPCSH)
if err := rpcServlet.Start(soc.Context(), soc, soc); nil != err {
doneChan <- struct{}{}
return
}
var socConn *cwf.SocketConn
ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, rpcSoc)
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if socConn, err = rpcSoc.WaitRequest(); nil != err {
doneChan <- struct{}{}
return
}
// // "git.loafle.net/commons_go/websocket_fasthttp/websocket"
// switch socConn.MessageType {
// case websocket.TextMessage:
// case websocket.BinaryMessage:
// }
if err = gateway.Handle(ctx, sh.rpcSH, codec, socConn, socConn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = socConn.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
select {
case <-stopChan:
rpcServlet.Stop()
return
}
}
// OnDisconnect invoked when client is disconnected
@ -124,5 +92,20 @@ func (sh *RPCGatewayServletHandlers) Destroy() {
func (sh *RPCGatewayServletHandlers) Validate() {
sh.ServletHandlers.Validate()
sh.rpcGH.Validate()
sh.gwRPCHandler.Validate()
}
var rpcServletPool sync.Pool
func retainRPCServlet(sh GatewayRPCServletHandler) cRPC.Servlet {
v := rpcServletPool.Get()
if v == nil {
return cRPC.NewServlet(sh)
}
return v.(cRPC.Servlet)
}
func releaseRPCServlet(s cRPC.Servlet) {
rpcServletPool.Put(s)
}

View File

@ -1,28 +0,0 @@
package rpc
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
func newSocket(soc cwf.Socket, contentType string) Socket {
newSoc := &socket{
contentType: contentType,
}
newSoc.Socket = soc
return newSoc
}
type Socket interface {
cwf.Socket
GetContentType() string
}
type socket struct {
cwf.Socket
contentType string
}
func (s *socket) GetContentType() string {
return s.contentType
}

View File

@ -0,0 +1,14 @@
package rpc
import (
"github.com/valyala/fasthttp"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
type GatewayRPCHandler interface {
Init(serverCTX cwf.ServerContext) error
Handshake(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader)
Validate()
}

View File

@ -0,0 +1,21 @@
package rpc
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"github.com/valyala/fasthttp"
)
type GatewayRPCHandlers struct {
}
func (sh *GatewayRPCHandlers) Init(serverCTX cwf.ServerContext) error {
return nil
}
func (sh *GatewayRPCHandlers) Handshake(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
func (sh *GatewayRPCHandlers) Validate() {
}

View File

@ -1,10 +0,0 @@
package rpc
import "github.com/valyala/fasthttp"
type RPCGatewayHandler interface {
Init() error
Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader)
Validate()
}

View File

@ -1,18 +0,0 @@
package rpc
import "github.com/valyala/fasthttp"
type RPCGatewayHandlers struct {
}
func (sh *RPCGatewayHandlers) Init() error {
return nil
}
func (sh *RPCGatewayHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
func (sh *RPCGatewayHandlers) Validate() {
}

View File

@ -4,8 +4,8 @@ import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
func New(serverCTX ServerContext, sh ServerHandler) Server {
s := cwf.New(serverCTX, sh)
func New(sh ServerHandler) Server {
s := cwf.New(sh)
return s
}

View File

@ -1,7 +1,6 @@
package server
import (
cuc "git.loafle.net/commons_go/util/context"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
@ -13,9 +12,9 @@ type serverContext struct {
cwf.ServerContext
}
func NewServerContext(parent cuc.Context) ServerContext {
func NewServerContext(serverCTX cwf.ServerContext) ServerContext {
sCTX := &serverContext{}
sCTX.ServerContext = cwf.NewServerContext(parent)
sCTX.ServerContext = serverCTX
return sCTX
}

View File

@ -11,6 +11,6 @@ type ServerHandler interface {
cwf.ServerHandler
RegisterServlet(servletName string, servletHandler servlet.ServletHandler)
RegisterRPCGatewayServlet(servletName string, rpcGatewayHandler rpc.RPCGatewayHandler) servlet.ServletHandler
RegisterRPCGatewayServlet(servletName string, gatewayRPCHandler rpc.GatewayRPCHandler) servlet.ServletHandler
RegisterSubscriber(subscriberHandler oos.SubscriberHandler)
}

View File

@ -44,10 +44,16 @@ type ServerHandlers struct {
subscribers []oos.SubscriberHandler
}
func (sh *ServerHandlers) ServerContext() ServerContext {
serverCTX := sh.ServerHandlers.ServerContext()
return newServerContext(serverCTX)
}
// Init invoked before the server is started
// If you override ths method, must call
func (sh *ServerHandlers) Init(serverCTX ServerContext) error {
if err := sh.ServerHandlers.Init(); nil != err {
func (sh *ServerHandlers) Init(serverCTX cwf.ServerContext) error {
if err := sh.ServerHandlers.Init(serverCTX); nil != err {
return err
}
@ -56,12 +62,12 @@ func (sh *ServerHandlers) Init(serverCTX ServerContext) error {
return nil
}
func (sh *ServerHandlers) Listen(serverCTX ServerContext) (net.Listener, error) {
func (sh *ServerHandlers) Listen(serverCTX cwf.ServerContext) (net.Listener, error) {
return net.Listen(config.Config.Server.Network, config.Config.Server.Addr)
}
func (sh *ServerHandlers) OnStart(serverCTX ServerContext) {
sh.ServerHandlers.OnStart()
func (sh *ServerHandlers) OnStart(serverCTX cwf.ServerContext) {
sh.ServerHandlers.OnStart(serverCTX)
sh.redisSubscriber = oosr.New(redis.RedisPool.Get())
if err := sh.redisSubscriber.Start(); nil != err {
@ -74,16 +80,16 @@ func (sh *ServerHandlers) OnStart(serverCTX ServerContext) {
}
}
func (sh *ServerHandlers) OnError(serverCTX ServerContext, ctx *fasthttp.RequestCtx, status int, reason error) {
sh.OnError(ctx, status, reason)
func (sh *ServerHandlers) OnError(serverCTX cwf.ServerContext, ctx *fasthttp.RequestCtx, status int, reason error) {
sh.ServerHandlers.OnError(serverCTX, ctx, status, reason)
}
func (sh *ServerHandlers) OnStop(serverCTX ServerContext) {
func (sh *ServerHandlers) OnStop(serverCTX cwf.ServerContext) {
sh.redisSubscriber.Stop()
external.ExternalDestroy()
sh.ServerHandlers.OnStop()
sh.ServerHandlers.OnStop(serverCTX)
}
func (sh *ServerHandlers) RegisterServlet(servletName string, servletHandler servlet.ServletHandler) {
@ -107,14 +113,14 @@ func (sh *ServerHandlers) RegisterServlet(servletName string, servletHandler ser
}
func (sh *ServerHandlers) RegisterRPCGatewayServlet(servletName string, rpcGatewayHandler rpc.RPCGatewayHandler) servlet.ServletHandler {
func (sh *ServerHandlers) RegisterRPCGatewayServlet(servletName string, gatewayRPCHandler rpc.GatewayRPCHandler) servlet.ServletHandler {
cfg := config.Config.Servlets[servletName]
if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of servlet[%s] is not exist", servletName))
return nil
}
rpcSH := oogwisr.NewRPCGatewayServletHandler(rpcGatewayHandler)
rpcSH := oogwisr.NewRPCGatewayServletHandler(gatewayRPCHandler)
rpcSHs := rpcSH.(*oogwisr.RPCGatewayServletHandlers)
rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize

View File

@ -0,0 +1,20 @@
package servlet
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
type ServletContext interface {
cwf.SocketContext
}
func NewServletContext(socketCTX cwf.SocketContext) SocketContext {
sCTX := &servletContext{}
sCTX.SocketContext = socketCTX
return sCTX
}
type servletContext struct {
cwf.SocketContext
}

View File

@ -7,41 +7,3 @@ import (
type ServletHandlers struct {
cwf.SocketHandlers
}
// func (sh *ServletHandlers) Init() error {
// if err := sh.SocketHandlers.Init(); nil != err {
// return err
// }
// return nil
// }
// // OnConnect invoked when client is connected
// // If you override ths method, must call
// func (sh *ServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket {
// soc = sh.SocketHandlers.OnConnect(soc)
// newSoc := ...
// return newSoc
// }
// // OnDisconnect invoked when client is disconnected
// // If you override ths method, must call
// func (sh *ServletHandlers) OnDisconnect(soc cwf.Socket) {
// ...
// sh.SocketHandlers.OnDisconnect(soc)
// }
// // Destroy invoked when server is stopped
// // If you override ths method, must call
// func (sh *ServletHandlers) Destroy() {
// sh.SocketHandlers.Destroy()
// }
// func (sh *ServletHandlers) Validate() {
// sh.SocketHandlers.Validate()
// if "" == sh.EntryPath {
// logging.Logger().Panic("Geteway Server: The path of entry must be specified")
// }
// }