This commit is contained in:
crusader 2017-11-14 14:57:03 +09:00
parent 23d5f5406a
commit 8c19c1a887
5 changed files with 116 additions and 26 deletions

View File

@ -1,15 +1,17 @@
package overflow_gateway_websocket package overflow_gateway_websocket
import ( import (
"git.loafle.net/commons_go/util" cuc "git.loafle.net/commons_go/util/context"
) )
var ( var (
ServletSocketKey = util.ContextKey("ServletSocket") ServletSocketKey = cuc.ContextKey("ServletSocket")
ServletContentTypeKey = util.ContextKey("ServletContentType") ServletContentTypeKey = cuc.ContextKey("ServletContentType")
) )
const ( const (
ConfigPathFlagName = "config-dir" ConfigPathFlagName = "config-dir"
ConfigFileName = "config.json" ConfigFileName = "config.json"
GRPCUserIDKey = "GRPCUserID"
) )

View File

@ -0,0 +1,69 @@
package rpc
import (
"context"
"io"
"google.golang.org/grpc/metadata"
"git.loafle.net/commons_go/rpc/gateway"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/external/grpc"
)
func newRPCGatewayServerHandler() RPCGatewayServerHandler {
sh := &RPCGatewayServerHandlers{}
return sh
}
type RPCGatewayServerHandler interface {
gateway.ServerHandler
}
type RPCGatewayServerHandlers struct {
gateway.ServerHandlers
}
func (sh *RPCGatewayServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (sh *RPCGatewayServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}
func (sh *RPCGatewayServerHandlers) Invoke(ctx context.Context, method string, params []string) (result interface{}, err error) {
soc := ctx.Value(oogw.ServletSocketKey).(Socket)
md := metadata.Pairs(oogw.GRPCUserIDKey, soc.ID())
grpcCTX := metadata.NewOutgoingContext(context.Background(), md)
r, err := grpc.Exec(grpcCTX, method, params)
if err != nil {
return nil, err
}
return r, nil
}
func (sh *RPCGatewayServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}

View File

@ -10,14 +10,30 @@ import (
"git.loafle.net/commons_go/server" "git.loafle.net/commons_go/server"
cwf "git.loafle.net/commons_go/websocket_fasthttp" cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket" oogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
) )
func NewRPCGatewayServletHandler() RPCGatewayServletHandler {
rpcSH := newRPCGatewayServerHandler()
sh := &RPCGatewayServletHandlers{
rpcSH: rpcSH,
}
return sh
}
type RPCGatewayServletHandler interface {
servlet.ServletHandler
}
type RPCGatewayServletHandlers struct { type RPCGatewayServletHandlers struct {
oogw.ServerHandlers servlet.ServletHandlers
rpcSH RPCGatewayServerHandler
} }
func (sh *RPCGatewayServletHandlers) Init() error { func (sh *RPCGatewayServletHandlers) Init() error {
if err := sh.SocketHandlers.Init(); nil != err { if err := sh.ServletHandlers.Init(); nil != err {
return err return err
} }
@ -31,13 +47,14 @@ func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id str
// OnConnect invoked when client is connected // OnConnect invoked when client is connected
// If you override ths method, must call // If you override ths method, must call
func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket { func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket {
soc = sh.SocketHandlers.OnConnect(soc) soc = sh.ServletHandlers.OnConnect(soc)
return newSocket(soc, "json") return newSocket(soc, "json")
} }
func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType) rpcSoc := soc.(Socket)
codec, err := sh.rpcSH.GetCodec(rpcSoc.GetContentType())
if nil != err { if nil != err {
log.Printf("RPC Handle: %v", err) log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{} doneChan <- struct{}{}
@ -45,11 +62,11 @@ func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan stru
} }
var socConn *cwf.SocketConn var socConn *cwf.SocketConn
ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc) ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, rpcSoc)
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) // conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for { for {
if socConn, err = soc.WaitRequest(); nil != err { if socConn, err = rpcSoc.WaitRequest(); nil != err {
doneChan <- struct{}{} doneChan <- struct{}{}
return return
} }
@ -60,7 +77,7 @@ func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan stru
// case websocket.BinaryMessage: // case websocket.BinaryMessage:
// } // }
if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err { if err = gateway.Handle(ctx, sh.rpcSH, codec, socConn, socConn); nil != err {
if server.IsClientDisconnect(err) { if server.IsClientDisconnect(err) {
doneChan <- struct{}{} doneChan <- struct{}{}
return return
@ -86,17 +103,17 @@ func (sh *RPCGatewayServletHandlers) Handle(soc cwf.Socket, stopChan <-chan stru
// If you override ths method, must call // If you override ths method, must call
func (sh *RPCGatewayServletHandlers) OnDisconnect(soc cwf.Socket) { func (sh *RPCGatewayServletHandlers) OnDisconnect(soc cwf.Socket) {
sh.SocketHandlers.OnDisconnect(soc) sh.ServletHandlers.OnDisconnect(soc)
} }
// Destroy invoked when server is stopped // Destroy invoked when server is stopped
// If you override ths method, must call // If you override ths method, must call
func (sh *RPCGatewayServletHandlers) Destroy() { func (sh *RPCGatewayServletHandlers) Destroy() {
sh.SocketHandlers.Destroy() sh.ServletHandlers.Destroy()
} }
func (sh *RPCGatewayServletHandlers) Validate() { func (sh *RPCGatewayServletHandlers) Validate() {
sh.SocketHandlers.Validate() sh.ServletHandlers.Validate()
} }

View File

@ -94,16 +94,17 @@ func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, servletHan
return return
} }
rpcSH := &rpc.RPCGatewayServletHandlers{} rpcSH := rpc.NewRPCGatewayServletHandler()
rpcSHs := rpcSH.(*rpc.RPCGatewayServletHandlers)
rpcSH.MaxMessageSize = cfg.Socket.MaxMessageSize rpcSHs.MaxMessageSize = cfg.Socket.MaxMessageSize
rpcSH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second rpcSHs.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
rpcSH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second rpcSHs.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
rpcSH.PongTimeout = cfg.Socket.PongTimeout * time.Second rpcSHs.PongTimeout = cfg.Socket.PongTimeout * time.Second
rpcSH.PingTimeout = cfg.Socket.PingTimeout * time.Second rpcSHs.PingTimeout = cfg.Socket.PingTimeout * time.Second
rpcSH.PingPeriod = cfg.Socket.PingPeriod * time.Second rpcSHs.PingPeriod = cfg.Socket.PingPeriod * time.Second
sh.RegisterSocketHandler(entryPath, rpcSH) sh.RegisterSocketHandler(entryPath, rpcSHs)
} }
func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) { func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) {

View File

@ -18,15 +18,16 @@ type ServletHandlers struct {
// // OnConnect invoked when client is connected // // OnConnect invoked when client is connected
// // If you override ths method, must call // // If you override ths method, must call
// func (sh *ServletHandlers) OnConnect(soc *cwf.Socket) { // func (sh *ServletHandlers) OnConnect(soc cwf.Socket) cwf.Socket {
// sh.SocketHandlers.OnConnect(soc) // soc = sh.SocketHandlers.OnConnect(soc)
// newSoc := ...
// return newSoc
// } // }
// // OnDisconnect invoked when client is disconnected // // OnDisconnect invoked when client is disconnected
// // If you override ths method, must call // // If you override ths method, must call
// func (sh *ServletHandlers) OnDisconnect(soc *cwf.Socket) { // func (sh *ServletHandlers) OnDisconnect(soc cwf.Socket) {
// ...
// sh.SocketHandlers.OnDisconnect(soc) // sh.SocketHandlers.OnDisconnect(soc)
// } // }