This commit is contained in:
crusader 2017-11-28 19:40:45 +09:00
parent c977d2c651
commit 5ddd34f39d
3 changed files with 24 additions and 14 deletions

View File

@ -6,7 +6,7 @@ import (
var (
ServletSocketKey = cuc.ContextKey("ServletSocket")
ServletContentTypeKey = cuc.ContextKey("ServletContentType")
SocketIDKey = cuc.ContextKey("SocketID")
)
const (

View File

@ -2,9 +2,7 @@ package rpc
import (
"context"
"fmt"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc"
crcwf "git.loafle.net/commons_go/rpc/connection/websocket/fasthttp"
"git.loafle.net/commons_go/rpc/gateway"
@ -17,23 +15,36 @@ import (
func newGatewayRPCServletHandler() GatewayRPCServletHandler {
sh := &GatewayRPCServletHandlers{}
sh.RegisterCodec(json.NewServerCodec(), json.Name)
sh.RegisterCodec(json.Name, json.NewServerCodec())
return sh
}
type GatewayRPCServletHandlers struct {
gateway.ServletHandlers
crcwf.ServletHandlers
rpcIO crcwf.ServletHandlers
}
func (sh *GatewayRPCServletHandlers) GetRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) {
return sh.rpcIO.GetRequest(servletCTX, codec, conn)
}
func (sh *GatewayRPCServletHandlers) SendResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error {
return sh.rpcIO.SendResponse(servletCTX, conn, requestCodec, result, err)
}
func (sh *GatewayRPCServletHandlers) SendNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args ...interface{}) error {
return sh.rpcIO.SendNotification(servletCTX, conn, codec, method, args...)
}
func (sh *GatewayRPCServletHandlers) Invoke(servletCTX rpc.ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) {
if !sh.RPCRegistry.HasMethod(requestCodec.Method()) {
return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method())
}
md := metadata.Pairs(oogw.GRPCUserIDKey, soc.ID())
md := metadata.Pairs(oogw.GRPCUserIDKey, servletCTX.GetAttribute(oogw.SocketIDKey).(string))
grpcCTX := metadata.NewOutgoingContext(context.Background(), md)
params, err := requestCodec.Params()
if nil != err {
return nil, err
}
r, err := grpc.Exec(grpcCTX, requestCodec.Method(), params)
if err != nil {
return nil, err
@ -45,7 +56,4 @@ func (sh *GatewayRPCServletHandlers) Invoke(servletCTX rpc.ServletContext, reque
func (sh *GatewayRPCServletHandlers) Validate() {
sh.ServletHandlers.Validate()
if nil == sh.RPCRegistry {
logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Registry must be specified"))
}
}

View File

@ -8,6 +8,7 @@ import (
cRPC "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol/json"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/rpc"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
)
@ -57,6 +58,7 @@ func (sh *SocketHandlers) SocketContext(serverCTX cwf.ServerContext) cwf.SocketC
func (sh *RPCGatewayServletHandlers) OnConnect(soc cwf.Socket) {
sh.ServletHandlers.OnConnect(soc)
soc.Context().SetAttribute(oogw.SocketIDKey, soc.ID())
soc.Context().SetAttribute(cRPC.ContentTypeKey, json.Name)
}