probe/service/ContainerService.go
crusader 9317f5316d ing
2018-05-04 21:03:23 +09:00

146 lines
4.0 KiB
Go

package service
import (
"reflect"
"sync"
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go"
crp "git.loafle.net/commons/rpc-go/protocol"
occp "git.loafle.net/overflow/commons-go/config/probe"
ocsp "git.loafle.net/overflow/commons-go/service/probe"
"git.loafle.net/overflow/probe/container"
// For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation"
)
var ContainerServiceType = reflect.TypeOf((*ContainerService)(nil))
func init() {
cdr.RegisterType(ContainerServiceType)
}
type ContainerService struct {
ocsp.ContainerService
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
DiscoveryService *DiscoveryService `annotation:"@Inject()"`
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
OnConnectChan <-chan *container.OnConnectInfo `annotation:"@Resource(name='PROBE_ONCONNECT_CHAN')"`
OnDisconnectChan <-chan occp.ContainerType `annotation:"@Resource(name='PROBE_ONDISCONNECT_CHAN')"`
RPCServerCodec crp.ServerCodec `annotation:"@Resource(name='PROBE_RPC_SERVER_CODEC')"`
containerSessions sync.Map
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (s *ContainerService) InitService() error {
return nil
}
func (s *ContainerService) StartService() error {
s.stopChan = make(chan struct{})
s.stopWg.Add(1)
go s.handleCotainerConnection()
return nil
}
func (s *ContainerService) StopService() {
close(s.stopChan)
s.stopWg.Wait()
s.containerSessions.Range(func(_containerType, _containerSession interface{}) bool {
containerSession := _containerSession.(*container.ContainerSession)
containerSession.Stop()
return true
})
}
func (s *ContainerService) DestroyService() {
}
func (s *ContainerService) handleCotainerConnection() {
for {
select {
case onConnectInfo := <-s.OnConnectChan:
_containerSession, ok := s.containerSessions.Load(onConnectInfo.ContainerType)
if !ok {
// var err error
// _containerSession, err = s.newContainerSession(onConnectInfo.ContainerType)
// if nil != err {
// logging.Logger().Errorf("ContainerSession is not exist for %s", onConnectInfo.ContainerType.String())
// }
logging.Logger().Errorf("ContainerSession is not exist for %s", onConnectInfo.ContainerType.String())
break
}
containerSession := _containerSession.(*container.ContainerSession)
containerSession.Connected(onConnectInfo.WriteChan)
s.SensorConfigService.SendInitConfig(onConnectInfo.ContainerType)
case containerType := <-s.OnDisconnectChan:
_containerSession, ok := s.containerSessions.Load(containerType)
if ok {
containerSession := _containerSession.(*container.ContainerSession)
containerSession.Disconnected()
}
case <-s.stopChan:
return
}
}
}
func (s *ContainerService) Send(containerType occp.ContainerType, method string, params ...interface{}) error {
var err error
_containerSession, ok := s.containerSessions.Load(containerType)
if !ok {
_containerSession, err = s.newContainerSession(containerType)
if nil != err {
return err
}
}
containerSession := _containerSession.(*container.ContainerSession)
buff, err := s.RPCServerCodec.NewNotification(method, params)
if nil != err {
return err
}
return containerSession.Send(buff)
}
func (s *ContainerService) RunContainer(containerType occp.ContainerType) error {
var err error
_, ok := s.containerSessions.Load(containerType)
if !ok {
_, err = s.newContainerSession(containerType)
if nil != err {
return err
}
}
return nil
}
func (s *ContainerService) newContainerSession(containerType occp.ContainerType) (*container.ContainerSession, error) {
_containerSession, ok := s.containerSessions.Load(containerType)
if !ok {
_containerSession = container.NewContainerSession(containerType)
s.containerSessions.Store(containerType, _containerSession)
if err := _containerSession.(*container.ContainerSession).Start(); nil != err {
return nil, err
}
}
return _containerSession.(*container.ContainerSession), nil
}