This commit is contained in:
crusader 2018-07-01 01:44:44 +09:00
parent 1db82cb967
commit b7d6a6fcb3
2 changed files with 35 additions and 35 deletions

33
Gopkg.lock generated
View File

@ -17,7 +17,7 @@
branch = "master"
name = "git.loafle.net/commons/rpc-go"
packages = ["protocol"]
revision = "dcc3af07239b3f6fcbae3529bcb52c522b02053d"
revision = "f5810183daf8aad248da6aab13fc5785ab984be4"
[[projects]]
branch = "master"
@ -27,7 +27,7 @@
"socket",
"socket/web"
]
revision = "1cae23cf7819d56fd399346a1dc30af6a82d1a93"
revision = "d1d485c711ef8c59ad51fbf17dba80158f1a8ace"
[[projects]]
branch = "master"
@ -45,7 +45,7 @@
branch = "master"
name = "git.loafle.net/overflow/commons-go"
packages = ["config/external"]
revision = "687985e7f2122d12e43a0111f13d6df9921c3d82"
revision = "7e2ff26648aefbafaee2ca28c1a958332ae4c6f9"
[[projects]]
branch = "master"
@ -99,8 +99,8 @@
"models",
"pkg/escape"
]
revision = "89e084a80fb1e0bf5e7d38038e3367f821fdf3d7"
version = "v1.5.3"
revision = "4e4e00bc5ab85a3ff5e988c91020cf0399a87026"
version = "v1.5.4"
[[projects]]
name = "github.com/klauspost/compress"
@ -109,8 +109,8 @@
"gzip",
"zlib"
]
revision = "6c8db69c4b49dd4df1fff66996cf556176d0b9bf"
version = "v1.2.1"
revision = "5fb1f31b0a61e9858f12f39266e059848a5f1cea"
version = "v1.3.0"
[[projects]]
name = "github.com/klauspost/cpuid"
@ -118,12 +118,6 @@
revision = "ae7887de9fa5d2db4eaa8174a7eff2c1ac00f2da"
version = "v1.1"
[[projects]]
name = "github.com/klauspost/crc32"
packages = ["."]
revision = "cb6bfca970f6908083f26f39a79009d608efd5cd"
version = "v1.1"
[[projects]]
branch = "master"
name = "github.com/segmentio/kafka-go"
@ -176,7 +170,7 @@
"internal/timeseries",
"trace"
]
revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196"
revision = "4cb1c02c05b0e749b0365f61ae859a8e0cfceed9"
[[projects]]
name = "golang.org/x/text"
@ -203,7 +197,7 @@
branch = "master"
name = "google.golang.org/genproto"
packages = ["googleapis/rpc/status"]
revision = "32ee49c4dd805befd833990acba36cb75042378c"
revision = "ff3583edef7de132f219f0efc00e097cabcc0ec0"
[[projects]]
name = "google.golang.org/grpc"
@ -212,15 +206,16 @@
"balancer",
"balancer/base",
"balancer/roundrobin",
"channelz",
"codes",
"connectivity",
"credentials",
"encoding",
"encoding/proto",
"grpclb/grpc_lb_v1/messages",
"grpclog",
"internal",
"internal/backoff",
"internal/channelz",
"internal/grpcrand",
"keepalive",
"metadata",
"naming",
@ -233,8 +228,8 @@
"tap",
"transport"
]
revision = "7a6a684ca69eb4cae85ad0a484f2e531598c047b"
version = "v1.12.2"
revision = "168a6198bcb0ef175f7dacec0b8691fc141dc9b8"
version = "v1.13.0"
[[projects]]
name = "gopkg.in/yaml.v2"

View File

@ -58,20 +58,22 @@ func (s *RPCServlets) OnDisconnect(servletCtx server.ServletCtx) {
func (s *RPCServlets) Handle(servletCtx server.ServletCtx,
stopChan <-chan struct{}, doneChan chan<- struct{},
readChan <-chan []byte, writeChan chan<- []byte) {
readChan <-chan css.SocketMessage, writeChan chan<- css.SocketMessage) {
defer func() {
doneChan <- struct{}{}
}()
var (
md metadata.MD
src crp.ServerRequestCodec
method string
params []string
grpcCtx context.Context
grpcReply string
replyBuff []byte
err error
md metadata.MD
src crp.ServerRequestCodec
messageType int
message []byte
method string
params []string
grpcCtx context.Context
grpcReply string
replyBuff []byte
err error
)
_clientType := servletCtx.GetAttribute(og.SessionClientTypeKey)
@ -90,12 +92,15 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx,
for {
select {
case msg, ok := <-readChan:
case socketMessage, ok := <-readChan:
if !ok {
return
}
messageType, message = socketMessage()
// grpc exec method call
src, err = s.RPCServerCodec.NewRequest(msg)
src, err = s.RPCServerCodec.NewRequest(messageType, message)
if nil != err {
logging.Logger().Error(err)
break
@ -118,14 +123,14 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx,
if !src.HasResponse() {
break
}
replyBuff, err = src.NewResponseWithString(grpcReply, err)
messageType, replyBuff, err = src.NewResponseWithString(grpcReply, err)
if nil != err {
logging.Logger().Error(err)
s.writeError(src, writeChan, crp.E_INTERNAL, "", err)
break
}
writeChan <- replyBuff
writeChan <- css.MakeSocketMessage(messageType, replyBuff)
case <-stopChan:
return
}
@ -134,7 +139,7 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx,
}
func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- []byte, code crp.ErrorCode, message string, data interface{}) {
func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- css.SocketMessage, code crp.ErrorCode, message string, data interface{}) {
if !src.HasResponse() {
return
}
@ -145,12 +150,12 @@ func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- []
Data: data,
}
buf, err := src.NewResponse(nil, pErr)
messageType, buf, err := src.NewResponse(nil, pErr)
if nil != err {
logging.Logger().Error(err)
return
}
writeChan <- buf
writeChan <- css.MakeSocketMessage(messageType, buf)
}
func (s *RPCServlets) GetSessions(sessionIDs []string) []*Session {