146 lines
4.0 KiB
Go
146 lines
4.0 KiB
Go
package service
|
|
|
|
import (
|
|
"reflect"
|
|
"sync"
|
|
|
|
cda "git.loafle.net/commons/di-go/annotation"
|
|
cdr "git.loafle.net/commons/di-go/registry"
|
|
logging "git.loafle.net/commons/logging-go"
|
|
crp "git.loafle.net/commons/rpc-go/protocol"
|
|
occp "git.loafle.net/overflow/commons-go/config/probe"
|
|
ocsp "git.loafle.net/overflow/commons-go/service/probe"
|
|
"git.loafle.net/overflow/probe/container"
|
|
|
|
// For annotation
|
|
_ "git.loafle.net/overflow/commons-go/core/annotation"
|
|
)
|
|
|
|
var ContainerServiceType = reflect.TypeOf((*ContainerService)(nil))
|
|
|
|
func init() {
|
|
cdr.RegisterType(ContainerServiceType)
|
|
}
|
|
|
|
type ContainerService struct {
|
|
ocsp.ContainerService
|
|
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
|
|
|
|
DiscoveryService *DiscoveryService `annotation:"@Inject()"`
|
|
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
|
|
|
|
OnConnectChan <-chan *container.OnConnectInfo `annotation:"@Resource(name='PROBE_ONCONNECT_CHAN')"`
|
|
OnDisconnectChan <-chan occp.ContainerType `annotation:"@Resource(name='PROBE_ONDISCONNECT_CHAN')"`
|
|
RPCServerCodec crp.ServerCodec `annotation:"@Resource(name='PROBE_RPC_SERVER_CODEC')"`
|
|
|
|
containerSessions sync.Map
|
|
|
|
stopChan chan struct{}
|
|
stopWg sync.WaitGroup
|
|
}
|
|
|
|
func (s *ContainerService) InitService() error {
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ContainerService) StartService() error {
|
|
s.stopChan = make(chan struct{})
|
|
|
|
s.stopWg.Add(1)
|
|
go s.handleCotainerConnection()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ContainerService) StopService() {
|
|
close(s.stopChan)
|
|
s.stopWg.Wait()
|
|
|
|
s.containerSessions.Range(func(_containerType, _containerSession interface{}) bool {
|
|
containerSession := _containerSession.(*container.ContainerSession)
|
|
containerSession.Stop()
|
|
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (s *ContainerService) DestroyService() {
|
|
|
|
}
|
|
|
|
func (s *ContainerService) handleCotainerConnection() {
|
|
for {
|
|
select {
|
|
case onConnectInfo := <-s.OnConnectChan:
|
|
_containerSession, ok := s.containerSessions.Load(onConnectInfo.ContainerType)
|
|
if !ok {
|
|
// var err error
|
|
// _containerSession, err = s.newContainerSession(onConnectInfo.ContainerType)
|
|
// if nil != err {
|
|
// logging.Logger().Errorf("ContainerSession is not exist for %s", onConnectInfo.ContainerType.String())
|
|
// }
|
|
logging.Logger().Errorf("ContainerSession is not exist for %s", onConnectInfo.ContainerType.String())
|
|
break
|
|
}
|
|
containerSession := _containerSession.(*container.ContainerSession)
|
|
containerSession.Connected(onConnectInfo.WriteChan)
|
|
s.SensorConfigService.SendInitConfig(onConnectInfo.ContainerType)
|
|
|
|
case containerType := <-s.OnDisconnectChan:
|
|
_containerSession, ok := s.containerSessions.Load(containerType)
|
|
if ok {
|
|
containerSession := _containerSession.(*container.ContainerSession)
|
|
containerSession.Disconnected()
|
|
}
|
|
case <-s.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *ContainerService) Send(containerType occp.ContainerType, method string, params ...interface{}) error {
|
|
var err error
|
|
_containerSession, ok := s.containerSessions.Load(containerType)
|
|
if !ok {
|
|
_containerSession, err = s.newContainerSession(containerType)
|
|
if nil != err {
|
|
return err
|
|
}
|
|
}
|
|
containerSession := _containerSession.(*container.ContainerSession)
|
|
|
|
messageType, message, err := s.RPCServerCodec.NewNotification(method, params)
|
|
if nil != err {
|
|
return err
|
|
}
|
|
|
|
return containerSession.Send(messageType, message)
|
|
}
|
|
|
|
func (s *ContainerService) RunContainer(containerType occp.ContainerType) error {
|
|
var err error
|
|
_, ok := s.containerSessions.Load(containerType)
|
|
if !ok {
|
|
_, err = s.newContainerSession(containerType)
|
|
if nil != err {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ContainerService) newContainerSession(containerType occp.ContainerType) (*container.ContainerSession, error) {
|
|
_containerSession, ok := s.containerSessions.Load(containerType)
|
|
if !ok {
|
|
_containerSession = container.NewContainerSession(containerType)
|
|
s.containerSessions.Store(containerType, _containerSession)
|
|
if err := _containerSession.(*container.ContainerSession).Start(); nil != err {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return _containerSession.(*container.ContainerSession), nil
|
|
}
|