probe/service/ContainerService.go

146 lines
4.0 KiB
Go
Raw Normal View History

2018-04-13 11:59:46 +00:00
package service
import (
"reflect"
2018-05-03 11:24:07 +00:00
"sync"
2018-04-13 11:59:46 +00:00
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
2018-05-03 11:24:07 +00:00
logging "git.loafle.net/commons/logging-go"
crp "git.loafle.net/commons/rpc-go/protocol"
2018-04-26 08:39:32 +00:00
occp "git.loafle.net/overflow/commons-go/config/probe"
2018-04-26 08:43:40 +00:00
ocsp "git.loafle.net/overflow/commons-go/service/probe"
2018-05-03 11:24:07 +00:00
"git.loafle.net/overflow/probe/container"
2018-04-18 14:56:13 +00:00
// For annotation
2018-04-13 11:59:46 +00:00
_ "git.loafle.net/overflow/commons-go/core/annotation"
)
var ContainerServiceType = reflect.TypeOf((*ContainerService)(nil))
func init() {
cdr.RegisterType(ContainerServiceType)
}
type ContainerService struct {
2018-04-26 08:43:40 +00:00
ocsp.ContainerService
2018-04-13 11:59:46 +00:00
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
2018-04-18 14:56:13 +00:00
2018-05-03 11:24:07 +00:00
DiscoveryService *DiscoveryService `annotation:"@Inject()"`
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
2018-04-18 14:56:13 +00:00
2018-05-03 11:24:07 +00:00
OnConnectChan <-chan *container.OnConnectInfo `annotation:"@Resource(name='PROBE_ONCONNECT_CHAN')"`
OnDisconnectChan <-chan occp.ContainerType `annotation:"@Resource(name='PROBE_ONDISCONNECT_CHAN')"`
RPCServerCodec crp.ServerCodec `annotation:"@Resource(name='PROBE_RPC_SERVER_CODEC')"`
containerSessions sync.Map
stopChan chan struct{}
stopWg sync.WaitGroup
2018-04-13 11:59:46 +00:00
}
2018-04-17 14:11:13 +00:00
func (s *ContainerService) InitService() error {
2018-04-18 14:56:13 +00:00
2018-04-17 14:11:13 +00:00
return nil
}
func (s *ContainerService) StartService() error {
2018-05-03 11:24:07 +00:00
s.stopChan = make(chan struct{})
s.stopWg.Add(1)
go s.handleCotainerConnection()
2018-04-17 14:11:13 +00:00
return nil
}
func (s *ContainerService) StopService() {
2018-05-03 11:24:07 +00:00
close(s.stopChan)
s.stopWg.Wait()
2018-04-17 14:11:13 +00:00
2018-05-03 11:24:07 +00:00
s.containerSessions.Range(func(_containerType, _containerSession interface{}) bool {
containerSession := _containerSession.(*container.ContainerSession)
containerSession.Stop()
2018-04-18 14:56:13 +00:00
2018-05-03 11:24:07 +00:00
return true
})
2018-04-18 14:56:13 +00:00
}
2018-05-03 11:24:07 +00:00
func (s *ContainerService) DestroyService() {
2018-04-18 14:56:13 +00:00
}
2018-04-13 11:59:46 +00:00
2018-05-03 11:24:07 +00:00
func (s *ContainerService) handleCotainerConnection() {
for {
select {
case onConnectInfo := <-s.OnConnectChan:
_containerSession, ok := s.containerSessions.Load(onConnectInfo.ContainerType)
if !ok {
// var err error
// _containerSession, err = s.newContainerSession(onConnectInfo.ContainerType)
// if nil != err {
// logging.Logger().Errorf("ContainerSession is not exist for %s", onConnectInfo.ContainerType.String())
// }
logging.Logger().Errorf("ContainerSession is not exist for %s", onConnectInfo.ContainerType.String())
break
}
containerSession := _containerSession.(*container.ContainerSession)
containerSession.Connected(onConnectInfo.WriteChan)
s.SensorConfigService.SendInitConfig(onConnectInfo.ContainerType)
case containerType := <-s.OnDisconnectChan:
_containerSession, ok := s.containerSessions.Load(containerType)
if ok {
containerSession := _containerSession.(*container.ContainerSession)
containerSession.Disconnected()
}
case <-s.stopChan:
return
2018-04-20 11:19:14 +00:00
}
}
2018-04-19 15:03:58 +00:00
}
2018-05-03 11:24:07 +00:00
func (s *ContainerService) Send(containerType occp.ContainerType, method string, params ...interface{}) error {
var err error
_containerSession, ok := s.containerSessions.Load(containerType)
if !ok {
_containerSession, err = s.newContainerSession(containerType)
if nil != err {
2018-04-18 14:56:13 +00:00
return err
}
}
2018-05-03 11:24:07 +00:00
containerSession := _containerSession.(*container.ContainerSession)
2018-04-18 14:56:13 +00:00
2018-07-01 04:09:50 +00:00
messageType, message, err := s.RPCServerCodec.NewNotification(method, params)
2018-05-03 11:24:07 +00:00
if nil != err {
return err
2018-04-27 07:42:59 +00:00
}
2018-04-18 14:56:13 +00:00
2018-07-01 04:09:50 +00:00
return containerSession.Send(messageType, message)
2018-04-18 14:56:13 +00:00
}
2018-05-04 12:03:23 +00:00
func (s *ContainerService) RunContainer(containerType occp.ContainerType) error {
var err error
_, ok := s.containerSessions.Load(containerType)
if !ok {
_, err = s.newContainerSession(containerType)
if nil != err {
return err
}
}
return nil
}
2018-05-03 11:24:07 +00:00
func (s *ContainerService) newContainerSession(containerType occp.ContainerType) (*container.ContainerSession, error) {
_containerSession, ok := s.containerSessions.Load(containerType)
if !ok {
_containerSession = container.NewContainerSession(containerType)
s.containerSessions.Store(containerType, _containerSession)
if err := _containerSession.(*container.ContainerSession).Start(); nil != err {
return nil, err
2018-04-18 14:56:13 +00:00
}
}
2018-05-03 11:24:07 +00:00
return _containerSession.(*container.ContainerSession), nil
2018-04-18 14:56:13 +00:00
}