diff --git a/Gopkg.lock b/Gopkg.lock index 2672664..5e22e9a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -17,7 +17,7 @@ branch = "master" name = "git.loafle.net/commons/rpc-go" packages = ["protocol"] - revision = "dcc3af07239b3f6fcbae3529bcb52c522b02053d" + revision = "f5810183daf8aad248da6aab13fc5785ab984be4" [[projects]] branch = "master" @@ -27,7 +27,7 @@ "socket", "socket/web" ] - revision = "1cae23cf7819d56fd399346a1dc30af6a82d1a93" + revision = "d1d485c711ef8c59ad51fbf17dba80158f1a8ace" [[projects]] branch = "master" @@ -45,7 +45,7 @@ branch = "master" name = "git.loafle.net/overflow/commons-go" packages = ["config/external"] - revision = "687985e7f2122d12e43a0111f13d6df9921c3d82" + revision = "7e2ff26648aefbafaee2ca28c1a958332ae4c6f9" [[projects]] branch = "master" @@ -99,8 +99,8 @@ "models", "pkg/escape" ] - revision = "89e084a80fb1e0bf5e7d38038e3367f821fdf3d7" - version = "v1.5.3" + revision = "4e4e00bc5ab85a3ff5e988c91020cf0399a87026" + version = "v1.5.4" [[projects]] name = "github.com/klauspost/compress" @@ -109,8 +109,8 @@ "gzip", "zlib" ] - revision = "6c8db69c4b49dd4df1fff66996cf556176d0b9bf" - version = "v1.2.1" + revision = "5fb1f31b0a61e9858f12f39266e059848a5f1cea" + version = "v1.3.0" [[projects]] name = "github.com/klauspost/cpuid" @@ -118,12 +118,6 @@ revision = "ae7887de9fa5d2db4eaa8174a7eff2c1ac00f2da" version = "v1.1" -[[projects]] - name = "github.com/klauspost/crc32" - packages = ["."] - revision = "cb6bfca970f6908083f26f39a79009d608efd5cd" - version = "v1.1" - [[projects]] branch = "master" name = "github.com/segmentio/kafka-go" @@ -176,7 +170,7 @@ "internal/timeseries", "trace" ] - revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196" + revision = "4cb1c02c05b0e749b0365f61ae859a8e0cfceed9" [[projects]] name = "golang.org/x/text" @@ -203,7 +197,7 @@ branch = "master" name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] - revision = "32ee49c4dd805befd833990acba36cb75042378c" + revision = "ff3583edef7de132f219f0efc00e097cabcc0ec0" [[projects]] name = "google.golang.org/grpc" @@ -212,15 +206,16 @@ "balancer", "balancer/base", "balancer/roundrobin", - "channelz", "codes", "connectivity", "credentials", "encoding", "encoding/proto", - "grpclb/grpc_lb_v1/messages", "grpclog", "internal", + "internal/backoff", + "internal/channelz", + "internal/grpcrand", "keepalive", "metadata", "naming", @@ -233,8 +228,8 @@ "tap", "transport" ] - revision = "7a6a684ca69eb4cae85ad0a484f2e531598c047b" - version = "v1.12.2" + revision = "168a6198bcb0ef175f7dacec0b8691fc141dc9b8" + version = "v1.13.0" [[projects]] name = "gopkg.in/yaml.v2" diff --git a/servlet/rpc-servlet.go b/servlet/rpc-servlet.go index fdc2660..417bb5d 100644 --- a/servlet/rpc-servlet.go +++ b/servlet/rpc-servlet.go @@ -58,20 +58,22 @@ func (s *RPCServlets) OnDisconnect(servletCtx server.ServletCtx) { func (s *RPCServlets) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, - readChan <-chan []byte, writeChan chan<- []byte) { + readChan <-chan css.SocketMessage, writeChan chan<- css.SocketMessage) { defer func() { doneChan <- struct{}{} }() var ( - md metadata.MD - src crp.ServerRequestCodec - method string - params []string - grpcCtx context.Context - grpcReply string - replyBuff []byte - err error + md metadata.MD + src crp.ServerRequestCodec + messageType int + message []byte + method string + params []string + grpcCtx context.Context + grpcReply string + replyBuff []byte + err error ) _clientType := servletCtx.GetAttribute(og.SessionClientTypeKey) @@ -90,12 +92,15 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx, for { select { - case msg, ok := <-readChan: + case socketMessage, ok := <-readChan: if !ok { return } + + messageType, message = socketMessage() + // grpc exec method call - src, err = s.RPCServerCodec.NewRequest(msg) + src, err = s.RPCServerCodec.NewRequest(messageType, message) if nil != err { logging.Logger().Error(err) break @@ -118,14 +123,14 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx, if !src.HasResponse() { break } - replyBuff, err = src.NewResponseWithString(grpcReply, err) + messageType, replyBuff, err = src.NewResponseWithString(grpcReply, err) if nil != err { logging.Logger().Error(err) s.writeError(src, writeChan, crp.E_INTERNAL, "", err) break } - writeChan <- replyBuff + writeChan <- css.MakeSocketMessage(messageType, replyBuff) case <-stopChan: return } @@ -134,7 +139,7 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx, } -func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- []byte, code crp.ErrorCode, message string, data interface{}) { +func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- css.SocketMessage, code crp.ErrorCode, message string, data interface{}) { if !src.HasResponse() { return } @@ -145,12 +150,12 @@ func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- [] Data: data, } - buf, err := src.NewResponse(nil, pErr) + messageType, buf, err := src.NewResponse(nil, pErr) if nil != err { logging.Logger().Error(err) return } - writeChan <- buf + writeChan <- css.MakeSocketMessage(messageType, buf) } func (s *RPCServlets) GetSessions(sessionIDs []string) []*Session {