This commit is contained in:
crusader 2018-05-03 15:14:22 +09:00
parent f330a9f4f5
commit e4955d0250
3 changed files with 20 additions and 42 deletions

View File

@ -4,45 +4,14 @@ import (
"fmt"
"net/url"
"path"
"reflect"
"git.loafle.net/commons/logging-go"
crc "git.loafle.net/commons/rpc-go/client"
crpj "git.loafle.net/commons/rpc-go/protocol/json"
csswc "git.loafle.net/commons/server-go/socket/web/client"
occc "git.loafle.net/overflow/commons-go/config/container"
occp "git.loafle.net/overflow/commons-go/config/probe"
)
func New(containerType occp.ContainerType, services []interface{}, orderedServices []reflect.Type, portNumber int) (*crc.Client, error) {
connector, err := newConnector(containerType.String(), portNumber)
if nil != err {
return nil, err
}
codec := crpj.NewClientCodec()
var rpcRegistry crr.RPCRegistry
if nil != services && 0 < len(services) {
rpcRegistry = crr.NewRPCRegistry()
rpcRegistry.RegisterServices(services...)
}
ch := &ClientHandlers{
Services: services,
OrderedServices: orderedServices,
}
ch.Name = containerType.String()
ch.Connector = connector
ch.RPCCodec = codec
ch.RPCInvoker = rpcRegistry
return nil, &crc.Client{
ClientHandler: ch,
}
}
func newConnector(containerType occp.ContainerType, portNumber int) (*csswc.Connectors, error) {
func NewConnector(containerType occp.ContainerType, portNumber int) (*csswc.Connectors, error) {
u := url.URL{
Scheme: "ws",
Host: fmt.Sprintf("127.0.0.1:%d", port),

View File

@ -3,9 +3,5 @@ package container
const (
CONTAINER_CRAWLERS = "CONTAINER_CRAWLERS"
CONTAINER_RPC_WRITE_CHAN = "CONTAINER_RPC_WRITE_CHAN"
CONTAINER_RPC_CLIENT_CODEC = "CONTAINER_RPC_CLIENT_CODEC"
)
type RPCNotification struct {
Method string
Params []interface{}
}

View File

@ -1,11 +1,12 @@
package service
import (
"fmt"
"reflect"
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
crc "git.loafle.net/commons/rpc-go/client"
crp "git.loafle.net/commons/rpc-go/protocol"
// For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation"
@ -20,7 +21,8 @@ func init() {
type ProbeService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
Client *crc.Client `annotation:"@Resource(name='CONTAINER_CLIENT')"`
RPCWriteChan chan []byte `annotation:"@Resource(name='CONTAINER_RPC_WRITE_CHAN')"`
ClientCodec crp.ClientCodec `annotation:"@Resource(name='CONTAINER_RPC_CLIENT_CODEC')"`
}
func (s *ProbeService) InitService() error {
@ -41,5 +43,16 @@ func (s *ProbeService) DestroyService() {
}
func (s *ProbeService) Send(method string, params ...interface{}) error {
return s.Client.Send(method, params...)
buff, err := s.ClientCodec.NewRequest(method, params, nil)
if nil != err {
return err
}
select {
case s.RPCWriteChan <- buff:
default:
return fmt.Errorf("Cannot send notification method[%s] params[%v]", method, params)
}
return nil
}