This commit is contained in:
crusader 2017-11-14 15:27:18 +09:00
parent e5a491c6bb
commit 38f2405ff3
9 changed files with 48 additions and 203 deletions

View File

@ -7,13 +7,14 @@ import (
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"git.loafle.net/commons_go/rpc/gateway" "git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/rpc/protocol/json"
oogw "git.loafle.net/overflow/overflow_gateway_websocket" oogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/external/grpc" "git.loafle.net/overflow/overflow_gateway_websocket/external/grpc"
) )
func newRPCGatewayServerHandler() RPCGatewayServerHandler { func newRPCGatewayServerHandler() RPCGatewayServerHandler {
sh := &RPCGatewayServerHandlers{} sh := &RPCGatewayServerHandlers{}
sh.RegisterCodec(json.NewServerCodec(), json.Name)
return sh return sh
} }

View File

@ -7,16 +7,19 @@ import (
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"git.loafle.net/commons_go/rpc/gateway" "git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/rpc/protocol/json"
"git.loafle.net/commons_go/server" "git.loafle.net/commons_go/server"
cwf "git.loafle.net/commons_go/websocket_fasthttp" cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket" oogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet" "git.loafle.net/overflow/overflow_gateway_websocket/servlet"
) )
func NewRPCGatewayServletHandler() RPCGatewayServletHandler { func NewRPCGatewayServletHandler(rpcGH rpc.RPCGatewayHandler) RPCGatewayServletHandler {
rpcSH := newRPCGatewayServerHandler() rpcSH := newRPCGatewayServerHandler()
sh := &RPCGatewayServletHandlers{ sh := &RPCGatewayServletHandlers{
rpcSH: rpcSH, rpcSH: rpcSH,
rpcGH: rpcGH,
} }
return sh return sh
@ -30,6 +33,7 @@ type RPCGatewayServletHandlers struct {
servlet.ServletHandlers servlet.ServletHandlers
rpcSH RPCGatewayServerHandler rpcSH RPCGatewayServerHandler
rpcGH rpc.RPCGatewayHandler
} }
func (sh *RPCGatewayServletHandlers) Init() error { func (sh *RPCGatewayServletHandlers) Init() error {
@ -37,11 +41,15 @@ func (sh *RPCGatewayServletHandlers) Init() error {
return err return err
} }
if err := sh.rpcGH.Init(); nil != err {
return err
}
return nil return nil
} }
func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil return sh.rpcGH.Handshake(ctx)
} }
// OnConnect invoked when client is connected // OnConnect invoked when client is connected
@ -49,7 +57,7 @@ func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id str
func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket { func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket {
soc = sh.ServletHandlers.OnConnect(soc) soc = sh.ServletHandlers.OnConnect(soc)
return newSocket(soc, "json") return newSocket(soc, json.Name)
} }
func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
@ -116,4 +124,5 @@ func (sh *RPCGatewayServletHandlers) Destroy() {
func (sh *RPCGatewayServletHandlers) Validate() { func (sh *RPCGatewayServletHandlers) Validate() {
sh.ServletHandlers.Validate() sh.ServletHandlers.Validate()
sh.rpcGH.Validate()
} }

View File

@ -1,101 +0,0 @@
package server
import (
"context"
"log"
"github.com/valyala/fasthttp"
"git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/server"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
type SocketHandlers struct {
cwf.SocketHandlers
}
func (sh *SocketHandlers) Init() error {
if err := sh.SocketHandlers.Init(); nil != err {
return err
}
return nil
}
func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
// OnConnect invoked when client is connected
// If you override ths method, must call
func (sh *SocketHandlers) OnConnect(soc cwf.Socket) {
sh.SocketHandlers.OnConnect(soc)
}
func (sh *SocketHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
var socConn *cwf.SocketConn
ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc)
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if socConn, err = soc.WaitRequest(); nil != err {
doneChan <- struct{}{}
return
}
// // "git.loafle.net/commons_go/websocket_fasthttp/websocket"
// switch socConn.MessageType {
// case websocket.TextMessage:
// case websocket.BinaryMessage:
// }
if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = socConn.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
}
}
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
func (sh *SocketHandlers) OnDisconnect(soc *cwf.Socket) {
sh.SocketHandlers.OnDisconnect(soc)
}
// Destroy invoked when server is stopped
// If you override ths method, must call
func (sh *SocketHandlers) Destroy() {
sh.SocketHandlers.Destroy()
}
func (sh *SocketHandlers) Validate() {
sh.SocketHandlers.Validate()
}

View File

@ -0,0 +1,10 @@
package rpc
import "github.com/valyala/fasthttp"
type RPCGatewayHandler interface {
Init() error
Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader)
Validate()
}

View File

@ -1 +1,18 @@
package rpc package rpc
import "github.com/valyala/fasthttp"
type RPCGatewayHandlers struct {
}
func (sh *RPCGatewayHandlers) Init() error {
return nil
}
func (sh *RPCGatewayHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
func (sh *RPCGatewayHandlers) Validate() {
}

View File

@ -2,6 +2,7 @@ package server
import ( import (
cwf "git.loafle.net/commons_go/websocket_fasthttp" cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/overflow/overflow_gateway_websocket/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet" "git.loafle.net/overflow/overflow_gateway_websocket/servlet"
oos "git.loafle.net/overflow/overflow_subscriber" oos "git.loafle.net/overflow/overflow_subscriber"
) )
@ -10,6 +11,6 @@ type ServerHandler interface {
cwf.ServerHandler cwf.ServerHandler
RegisterServlet(entryPath string, servletHandler servlet.ServletHandler) RegisterServlet(entryPath string, servletHandler servlet.ServletHandler)
RegisterRPCGatewayServlet(entryPath string, servletHandler servlet.RPCGatewayServletHandler) RegisterRPCGatewayServlet(entryPath string, rpcGatewayHandler rpc.RPCGatewayHandler)
RegisterSubscriber(subscriberHandler oos.SubscriberHandler) RegisterSubscriber(subscriberHandler oos.SubscriberHandler)
} }

View File

@ -10,7 +10,8 @@ import (
"git.loafle.net/overflow/overflow_gateway_websocket/config" "git.loafle.net/overflow/overflow_gateway_websocket/config"
"git.loafle.net/overflow/overflow_gateway_websocket/external" "git.loafle.net/overflow/overflow_gateway_websocket/external"
"git.loafle.net/overflow/overflow_gateway_websocket/external/redis" "git.loafle.net/overflow/overflow_gateway_websocket/external/redis"
"git.loafle.net/overflow/overflow_gateway_websocket/internal/server/rpc" oogwisr "git.loafle.net/overflow/overflow_gateway_websocket/internal/server/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet" "git.loafle.net/overflow/overflow_gateway_websocket/servlet"
oos "git.loafle.net/overflow/overflow_subscriber" oos "git.loafle.net/overflow/overflow_subscriber"
oosr "git.loafle.net/overflow/overflow_subscriber/redis" oosr "git.loafle.net/overflow/overflow_subscriber/redis"
@ -87,15 +88,15 @@ func (sh *ServerHandlers) RegisterServlet(entryPath string, servletHandler servl
} }
func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, servletHandler servlet.RPCGatewayServletHandler) { func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, rpcGatewayHandler rpc.RPCGatewayHandler) {
cfg := config.Config.Servlets[entryPath] cfg := config.Config.Servlets[entryPath]
if nil == cfg { if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of entry path[%s] is not exist", entryPath)) logging.Logger().Panic(fmt.Sprintf("Gateway Websocket: config of entry path[%s] is not exist", entryPath))
return return
} }
rpcSH := rpc.NewRPCGatewayServletHandler() rpcSH := oogwisr.NewRPCGatewayServletHandler()
rpcSHs := rpcSH.(*rpc.RPCGatewayServletHandlers) rpcSHs := rpcSH.(*oogwisr.RPCGatewayServletHandlers)
rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize
rpcSHs.WriteTimeout = cfg.Socket.WriteTimeout * time.Second rpcSHs.WriteTimeout = cfg.Socket.WriteTimeout * time.Second

View File

@ -1,58 +0,0 @@
package servlet
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"github.com/valyala/fasthttp"
)
type RPCGatewayServletHandler interface {
// Init invoked when server is stated
// If you override ths method, must call
//
// func (sh *SocketHandler) Init() error {
// if err := sh.SocketHandlers.Init(); nil != err {
// return err
// }
// ...
// return nil
// }
Init() error
// Handshake do handshake client and server
// id is identity of client socket. if id is "", disallow connection
Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader)
// OnConnect invoked when client is connected
// If you override ths method, must call
//
// func (sh *SocketHandler) OnConnect(soc *cwf.Socket) {
// ...
// newSoc := ....
// return sh.SocketHandlers.OnConnect(newSoc)
// }
OnConnect(soc cwf.Socket) cwf.Socket
Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{})
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
//
// func (sh *SocketHandler) OnDisconnect(soc *cwf.Socket) {
// ...
// sh.SocketHandlers.OnDisconnect(soc)
// }
OnDisconnect(soc cwf.Socket)
// Destroy invoked when server is stopped
// If you override ths method, must call
//
// func (sh *SocketHandler) Destroy() {
// ...
// sh.SocketHandlers.Destroy()
// }
Destroy()
// Validate is check handler value
// If you override ths method, must call
//
// func (sh *SocketHandlers) Validate() {
// sh.SocketHandlers.Validate()
// ...
// }
Validate()
}

View File

@ -1,35 +0,0 @@
package servlet
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"github.com/valyala/fasthttp"
)
type RPCGatewayServletHandlers struct {
}
func (sh *RPCGatewayServletHandlers) Init() error {
return nil
}
func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
func (sh *RPCGatewayServletHandlers) OnConnect(soc *cwf.Socket) {
}
func (sh *RPCGatewayServletHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
// no op
}
func (sh *RPCGatewayServletHandlers) OnDisconnect(soc *cwf.Socket) {
}
func (sh *RPCGatewayServletHandlers) Destroy() {
// no op
}
func (sh *RPCGatewayServletHandlers) Validate() {
}