This commit is contained in:
crusader 2018-04-19 23:07:14 +09:00
parent 21b4085bcc
commit 7ec23e0ba2
3 changed files with 24 additions and 15 deletions

View File

@ -5,3 +5,8 @@ const (
CONTAINER_RPC_CLIENT_CODEC = "CONTAINER_RPC_CLIENT_CODEC"
CONTAINER_RPC_WRITE_CHAN = "CONTAINER_RPC_WRITE_CHAN"
)
type RPCNotification struct {
Method string
Params []interface{}
}

View File

@ -6,7 +6,7 @@ import (
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
crp "git.loafle.net/commons/rpc-go/protocol"
"git.loafle.net/overflow/container-go"
// For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation"
@ -21,8 +21,7 @@ func init() {
type ProbeService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
RPCClientCodec crp.ClientCodec `annotation:"@Resource(name='CONTAINER_RPC_CLIENT_CODEC')"`
RPCWriteChan chan<- []byte `annotation:"@Resource(name='CONTAINER_RPC_WRITE_CHAN')"`
RPCWriteChan chan<- *container.RPCNotification `annotation:"@Resource(name='CONTAINER_RPC_WRITE_CHAN')"`
}
func (s *ProbeService) InitService() error {
@ -43,13 +42,8 @@ func (s *ProbeService) DestroyService() {
}
func (s *ProbeService) Send(method string, params ...interface{}) error {
buf, err := s.RPCClientCodec.NewRequest(method, params, nil)
if nil != err {
return err
}
select {
case s.RPCWriteChan <- buf:
case s.RPCWriteChan <- &container.RPCNotification{Method: method, Params: params}:
default:
return fmt.Errorf("cannot write to rpcWriteChan")
}

View File

@ -5,11 +5,11 @@ import (
"git.loafle.net/commons/logging-go"
crp "git.loafle.net/commons/rpc-go/protocol"
crpj "git.loafle.net/commons/rpc-go/protocol/json"
crr "git.loafle.net/commons/rpc-go/registry"
"git.loafle.net/commons/server-go"
css "git.loafle.net/commons/server-go/socket"
cssn "git.loafle.net/commons/server-go/socket/net"
"git.loafle.net/overflow/container-go"
)
type RPCServlet interface {
@ -19,8 +19,9 @@ type RPCServlet interface {
type RPCServlets struct {
cssn.Servlets
RPCInvoker crr.RPCInvoker
RPCWriteChan <-chan []byte
RPCServerCodec crp.ServerCodec
RPCInvoker crr.RPCInvoker
RPCWriteChan <-chan *container.RPCNotification
}
func (s *RPCServlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error {
@ -49,7 +50,7 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx,
replyBuff []byte
err error
)
sc := crpj.NewServerCodec()
go s.handleRPCWrite(stopChan, writeChan)
for {
@ -59,7 +60,7 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx,
return
}
// grpc exec method call
src, err = sc.NewRequest(msg)
src, err = s.RPCServerCodec.NewRequest(msg)
if nil != err {
logging.Logger().Error(err)
break
@ -81,13 +82,22 @@ func (s *RPCServlets) Handle(servletCtx server.ServletCtx,
}
func (s *RPCServlets) handleRPCWrite(stopChan <-chan struct{}, writeChan chan<- []byte) {
var (
buf []byte
err error
)
for {
select {
case buf, ok := <-s.RPCWriteChan:
case noti, ok := <-s.RPCWriteChan:
if !ok {
break
}
buf, err = s.RPCServerCodec.NewNotification(noti.Method, noti.Params)
if nil != err {
logging.Logger().Error(err)
}
writeChan <- buf
case <-stopChan:
return