package servlet import ( "context" logging "git.loafle.net/commons/logging-go" crp "git.loafle.net/commons/rpc-go/protocol" crpj "git.loafle.net/commons/rpc-go/protocol/json" "git.loafle.net/commons/server-go" cssw "git.loafle.net/commons/server-go/socket/web" og "git.loafle.net/overflow/gateway" ogeg "git.loafle.net/overflow/gateway/external/grpc" "github.com/valyala/fasthttp" "google.golang.org/grpc/metadata" ) type RPCServlet interface { cssw.Servlet } type RPCServlets struct { cssw.Servlets } func init() { // member RSA file read } func (s *RPCServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) { // member auth token extraction // auth token parse // auth token valid // servletCtx set member return nil, nil } func (s *RPCServlets) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan []byte, writeChan chan<- []byte) { defer func() { doneChan <- struct{}{} }() var ( md metadata.MD src crp.ServerRequestCodec method string params []string grpcCtx context.Context grpcReply string replyBuff []byte err error ) sc := crpj.NewServerCodec() _clientType := servletCtx.GetAttribute(og.SessionClientTypeKey) _sessionID := servletCtx.GetAttribute(og.SessionIDKey) _targetID := servletCtx.GetAttribute(og.SessionTargetIDKey) if nil != _clientType && nil != _sessionID && nil != _targetID { md = metadata.Pairs( og.GRPCClientTypeKey.String(), _clientType.(og.ClientType).String(), og.GRPCSessionIDKey.String(), _sessionID.(string), og.GRPCTargetIDKey.String(), _targetID.(string), ) } servletCtx.SetAttribute(og.SessionWriteChanKey, writeChan) for { select { case msg, ok := <-readChan: if !ok { return } // grpc exec method call src, err = sc.NewRequest(msg) if nil != err { logging.Logger().Error(err) break } method = src.Method() params, err = src.Params() if nil != err { logging.Logger().Error(err) s.writeError(src, writeChan, crp.E_BAD_PARAMS, "", err) break } grpcCtx = metadata.NewOutgoingContext(context.Background(), md) grpcReply, err = ogeg.Exec(grpcCtx, method, params...) replyBuff, err = src.NewResponseWithString(grpcReply, err) if nil != err { logging.Logger().Error(err) s.writeError(src, writeChan, crp.E_INTERNAL, "", err) break } writeChan <- replyBuff case <-stopChan: return } } } func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- []byte, code crp.ErrorCode, message string, data interface{}) { if !src.HasResponse() { return } pErr := &crp.Error{ Code: code, Message: message, Data: data, } buf, err := src.NewResponse(nil, pErr) if nil != err { logging.Logger().Error(err) return } writeChan <- buf }