package service import ( "reflect" "sync" cda "git.loafle.net/commons/di-go/annotation" cdr "git.loafle.net/commons/di-go/registry" logging "git.loafle.net/commons/logging-go" crp "git.loafle.net/commons/rpc-go/protocol" occp "git.loafle.net/overflow/commons-go/config/probe" ocsp "git.loafle.net/overflow/commons-go/service/probe" "git.loafle.net/overflow/probe/container" // For annotation _ "git.loafle.net/overflow/commons-go/core/annotation" ) var ContainerServiceType = reflect.TypeOf((*ContainerService)(nil)) func init() { cdr.RegisterType(ContainerServiceType) } type ContainerService struct { ocsp.ContainerService cda.TypeAnnotation `annotation:"@overflow:RPCService()"` DiscoveryService *DiscoveryService `annotation:"@Inject()"` SensorConfigService *SensorConfigService `annotation:"@Inject()"` OnConnectChan <-chan *container.OnConnectInfo `annotation:"@Resource(name='PROBE_ONCONNECT_CHAN')"` OnDisconnectChan <-chan occp.ContainerType `annotation:"@Resource(name='PROBE_ONDISCONNECT_CHAN')"` RPCServerCodec crp.ServerCodec `annotation:"@Resource(name='PROBE_RPC_SERVER_CODEC')"` containerSessions sync.Map stopChan chan struct{} stopWg sync.WaitGroup } func (s *ContainerService) InitService() error { return nil } func (s *ContainerService) StartService() error { s.stopChan = make(chan struct{}) s.stopWg.Add(1) go s.handleCotainerConnection() return nil } func (s *ContainerService) StopService() { close(s.stopChan) s.stopWg.Wait() s.containerSessions.Range(func(_containerType, _containerSession interface{}) bool { containerSession := _containerSession.(*container.ContainerSession) containerSession.Stop() return true }) } func (s *ContainerService) DestroyService() { } func (s *ContainerService) handleCotainerConnection() { for { select { case onConnectInfo := <-s.OnConnectChan: _containerSession, ok := s.containerSessions.Load(onConnectInfo.ContainerType) if !ok { // var err error // _containerSession, err = s.newContainerSession(onConnectInfo.ContainerType) // if nil != err { // logging.Logger().Errorf("ContainerSession is not exist for %s", onConnectInfo.ContainerType.String()) // } logging.Logger().Errorf("ContainerSession is not exist for %s", onConnectInfo.ContainerType.String()) break } containerSession := _containerSession.(*container.ContainerSession) containerSession.Connected(onConnectInfo.WriteChan) s.SensorConfigService.SendInitConfig(onConnectInfo.ContainerType) case containerType := <-s.OnDisconnectChan: _containerSession, ok := s.containerSessions.Load(containerType) if ok { containerSession := _containerSession.(*container.ContainerSession) containerSession.Disconnected() } case <-s.stopChan: return } } } func (s *ContainerService) Send(containerType occp.ContainerType, method string, params ...interface{}) error { var err error _containerSession, ok := s.containerSessions.Load(containerType) if !ok { _containerSession, err = s.newContainerSession(containerType) if nil != err { return err } } containerSession := _containerSession.(*container.ContainerSession) buff, err := s.RPCServerCodec.NewNotification(method, params) if nil != err { return err } return containerSession.Send(buff) } func (s *ContainerService) newContainerSession(containerType occp.ContainerType) (*container.ContainerSession, error) { _containerSession, ok := s.containerSessions.Load(containerType) if !ok { _containerSession = container.NewContainerSession(containerType) s.containerSessions.Store(containerType, _containerSession) if err := _containerSession.(*container.ContainerSession).Start(); nil != err { return nil, err } } return _containerSession.(*container.ContainerSession), nil }