package service import ( "fmt" "reflect" "sync" "git.loafle.net/commons/logging-go" cda "git.loafle.net/commons/di-go/annotation" cdr "git.loafle.net/commons/di-go/registry" ocmd "git.loafle.net/overflow/commons-go/model/discovery" ocscd "git.loafle.net/overflow/commons-go/service/container/discovery" ocs "git.loafle.net/overflow/container-go/service" "git.loafle.net/overflow/container_discovery/internal/discoverer" // For annotation _ "git.loafle.net/overflow/commons-go/core/annotation" ) var DiscoveryServiceType = reflect.TypeOf((*DiscoveryService)(nil)) func init() { cdr.RegisterType(DiscoveryServiceType) } type DiscoveryService struct { ocscd.DiscoveryService cda.TypeAnnotation `annotation:"@overflow:RPCService()"` ProbeService *ocs.ProbeService `annotation:"@Inject()"` pendingDiscovery sync.Map discoverer discoverer.Discoverer } func (s *DiscoveryService) InitService() error { s.discoverer = discoverer.GetDiscoverer() return nil } func (s *DiscoveryService) StartService() error { return nil } func (s *DiscoveryService) StopService() { } func (s *DiscoveryService) DestroyService() { } func (s *DiscoveryService) DiscoverZone(requesterID string, dz *ocmd.DiscoveryZone) error { go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { s.discoverer.DiscoverZone(dataChan, dz) }) return nil } func (s *DiscoveryService) DiscoverHost(requesterID string, zone *ocmd.Zone, dh *ocmd.DiscoveryHost) error { go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { s.discoverer.DiscoverHost(dataChan, zone, dh) }) return nil } func (s *DiscoveryService) DiscoverPort(requesterID string, host *ocmd.Host, dp *ocmd.DiscoveryPort) error { go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { s.discoverer.DiscoverPort(dataChan, host, dp) }) return nil } func (s *DiscoveryService) DiscoverService(requesterID string, port *ocmd.Port, ds *ocmd.DiscoveryService) error { go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { s.discoverer.DiscoverSerice(dataChan, port, ds) }) return nil } func (s *DiscoveryService) StopRequest(requesterID string) error { _stopChan, ok := s.pendingDiscovery.Load(requesterID) if !ok { return fmt.Errorf("discovery request for [%s] is not exist", requesterID) } stopChan := _stopChan.(chan struct{}) close(stopChan) return nil } func (s *DiscoveryService) handleDiscovery(requesterID string, discoveryFunc func(dataChan chan *discoverer.DiscoveryData)) error { stopChan := make(chan struct{}) s.pendingDiscovery.Store(requesterID, stopChan) defer func() { s.pendingDiscovery.Delete(requesterID) }() var dataChan chan *discoverer.DiscoveryData retainChan := make(chan struct{}) go func() { dataChan = s.discoverer.Retain() close(retainChan) }() select { case <-stopChan: return nil case <-retainChan: } defer func() { s.discoverer.Release(dataChan) }() go discoveryFunc(dataChan) for { select { case data, ok := <-dataChan: if !ok { return nil } switch data.Type { case discoverer.DiscoveryDataTypeStart: logging.Logger().Debugf("DiscoveryService.DiscoveryStart") s.ProbeService.Send("DiscoveryService.DiscoveryStart", requesterID, data.Time) case discoverer.DiscoveryDataTypeStop: logging.Logger().Debugf("DiscoveryService.DiscoveryStop") s.ProbeService.Send("DiscoveryService.DiscoveryStop", requesterID, data.Time) data.Release() return nil case discoverer.DiscoveryDataTypeError: s.ProbeService.Send("DiscoveryService.DiscoveryError", requesterID, data.Error) case discoverer.DiscoveryDataTypeZone: s.ProbeService.Send("DiscoveryService.DiscoveredZone", requesterID, data.Result) case discoverer.DiscoveryDataTypeHost: s.ProbeService.Send("DiscoveryService.DiscoveredHost", requesterID, data.Result) case discoverer.DiscoveryDataTypePort: s.ProbeService.Send("DiscoveryService.DiscoveredPort", requesterID, data.Result) case discoverer.DiscoveryDataTypeService: s.ProbeService.Send("DiscoveryService.DiscoveredService", requesterID, data.Result) } data.Release() case <-stopChan: return nil } } }