diff --git a/service/CollectorService.go b/service/CollectorService.go index 8a9e6af..f6bfa93 100644 --- a/service/CollectorService.go +++ b/service/CollectorService.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" "strconv" + "sync" "time" cda "git.loafle.net/commons/di-go/annotation" @@ -28,11 +29,11 @@ type CollectorService struct { ocsp.CollectorService cda.TypeAnnotation `annotation:"@overflow:RPCService()"` - ProbeService *ProbeService `annotation:"@Inject()"` - CrawlerService *CrawlerService `annotation:"@Inject()"` - SensorConfigService *SensorConfigService `annotation:"@Inject()"` + ProbeService *ProbeService `annotation:"@Inject()"` + CrawlerService *CrawlerService `annotation:"@Inject()"` - scheduler *cuts.Scheduler + scheduler *cuts.Scheduler + scheduleMap sync.Map } func (s *CollectorService) InitService() error { @@ -47,9 +48,6 @@ func (s *CollectorService) StartService() error { return err } - if err := s.addScheduleAll(); nil != err { - return err - } return nil } @@ -61,8 +59,7 @@ func (s *CollectorService) DestroyService() { } -func (s *CollectorService) addScheduleAll() error { - sensorConfigs := s.SensorConfigService.sensorConfigs +func (s *CollectorService) AddSensorConfigs(sensorConfigs []*ocmsc.SensorConfig) error { if nil == sensorConfigs || 0 == len(sensorConfigs) { return nil } @@ -79,8 +76,25 @@ func (s *CollectorService) addScheduleAll() error { return nil } +func (s *CollectorService) RemoveSensorConfigs(sensorConfigs []*ocmsc.SensorConfig) error { + if nil == sensorConfigs || 0 == len(sensorConfigs) { + return nil + } + + for _, sensorConfig := range sensorConfigs { + s.scheduleMap.Delete(sensorConfig.ConfigID) + } + + return nil +} + func (s *CollectorService) addSchedule(interval int64, sensorConfig *ocmsc.SensorConfig) { - s.scheduler.RunEvery(time.Duration(interval)*time.Second, s.collectTask, sensorConfig) + scheduleID, err := s.scheduler.RunEvery(time.Duration(interval)*time.Second, s.collectTask, sensorConfig) + if nil != err { + logging.Logger().Errorf("Cannot add schedule for config[%s] %v", sensorConfig.ConfigID, err) + return + } + s.scheduleMap.Store(sensorConfig.ConfigID, scheduleID) } func (s *CollectorService) collectTask(sensorConfig *ocmsc.SensorConfig) { diff --git a/service/SensorConfigService.go b/service/SensorConfigService.go index 4a7ff69..70448ed 100644 --- a/service/SensorConfigService.go +++ b/service/SensorConfigService.go @@ -22,6 +22,8 @@ func init() { type SensorConfigService struct { cda.TypeAnnotation `annotation:"@overflow:RPCService()"` + CollectorService *CollectorService `annotation:"@Inject()"` + sensorConfigs map[string]*ocmsc.SensorConfig } @@ -53,6 +55,10 @@ func (s *SensorConfigService) InitConfig(sensorConfigs []*ocmsc.SensorConfig) er s.sensorConfigs[sensorConfig.ConfigID] = sensorConfig } + if err := s.CollectorService.AddSensorConfigs(sensorConfigs); nil != err { + return err + } + logging.Logger().Debugf("Sensor configs[%d] were added", len(sensorConfigs)) return nil @@ -69,6 +75,10 @@ func (s *SensorConfigService) AddConfig(sensorConfig *ocmsc.SensorConfig) error } s.sensorConfigs[configID] = sensorConfig + if err := s.CollectorService.AddSensorConfigs([]*ocmsc.SensorConfig{sensorConfig}); nil != err { + return err + } + logging.Logger().Debugf("Sensor config[%s] was added", configID) return nil @@ -86,17 +96,29 @@ func (s *SensorConfigService) UpdateConfig(sensorConfig *ocmsc.SensorConfig) err delete(s.sensorConfigs, configID) s.sensorConfigs[configID] = sensorConfig + if err := s.CollectorService.RemoveSensorConfigs([]*ocmsc.SensorConfig{sensorConfig}); nil != err { + return err + } + if err := s.CollectorService.AddSensorConfigs([]*ocmsc.SensorConfig{sensorConfig}); nil != err { + return err + } + logging.Logger().Debugf("Sensor config[%s] was updated", configID) return nil } func (s *SensorConfigService) RemoveConfig(sensorConfigID string) error { - if _, ok := s.sensorConfigs[sensorConfigID]; !ok { + sensorConfig, ok := s.sensorConfigs[sensorConfigID] + if !ok { return fmt.Errorf("Sensor config[%s] is not exist", sensorConfigID) } delete(s.sensorConfigs, sensorConfigID) + if err := s.CollectorService.RemoveSensorConfigs([]*ocmsc.SensorConfig{sensorConfig}); nil != err { + return err + } + logging.Logger().Debugf("Sensor config[%d] was removed", sensorConfigID) return nil }