This commit is contained in:
crusader 2018-05-03 20:00:07 +09:00
parent 053f55dd67
commit 7b191518b0
2 changed files with 47 additions and 11 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"strconv"
"sync"
"time"
cda "git.loafle.net/commons/di-go/annotation"
@ -28,11 +29,11 @@ type CollectorService struct {
ocsp.CollectorService
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
ProbeService *ProbeService `annotation:"@Inject()"`
CrawlerService *CrawlerService `annotation:"@Inject()"`
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
ProbeService *ProbeService `annotation:"@Inject()"`
CrawlerService *CrawlerService `annotation:"@Inject()"`
scheduler *cuts.Scheduler
scheduler *cuts.Scheduler
scheduleMap sync.Map
}
func (s *CollectorService) InitService() error {
@ -47,9 +48,6 @@ func (s *CollectorService) StartService() error {
return err
}
if err := s.addScheduleAll(); nil != err {
return err
}
return nil
}
@ -61,8 +59,7 @@ func (s *CollectorService) DestroyService() {
}
func (s *CollectorService) addScheduleAll() error {
sensorConfigs := s.SensorConfigService.sensorConfigs
func (s *CollectorService) AddSensorConfigs(sensorConfigs []*ocmsc.SensorConfig) error {
if nil == sensorConfigs || 0 == len(sensorConfigs) {
return nil
}
@ -79,8 +76,25 @@ func (s *CollectorService) addScheduleAll() error {
return nil
}
func (s *CollectorService) RemoveSensorConfigs(sensorConfigs []*ocmsc.SensorConfig) error {
if nil == sensorConfigs || 0 == len(sensorConfigs) {
return nil
}
for _, sensorConfig := range sensorConfigs {
s.scheduleMap.Delete(sensorConfig.ConfigID)
}
return nil
}
func (s *CollectorService) addSchedule(interval int64, sensorConfig *ocmsc.SensorConfig) {
s.scheduler.RunEvery(time.Duration(interval)*time.Second, s.collectTask, sensorConfig)
scheduleID, err := s.scheduler.RunEvery(time.Duration(interval)*time.Second, s.collectTask, sensorConfig)
if nil != err {
logging.Logger().Errorf("Cannot add schedule for config[%s] %v", sensorConfig.ConfigID, err)
return
}
s.scheduleMap.Store(sensorConfig.ConfigID, scheduleID)
}
func (s *CollectorService) collectTask(sensorConfig *ocmsc.SensorConfig) {

View File

@ -22,6 +22,8 @@ func init() {
type SensorConfigService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
CollectorService *CollectorService `annotation:"@Inject()"`
sensorConfigs map[string]*ocmsc.SensorConfig
}
@ -53,6 +55,10 @@ func (s *SensorConfigService) InitConfig(sensorConfigs []*ocmsc.SensorConfig) er
s.sensorConfigs[sensorConfig.ConfigID] = sensorConfig
}
if err := s.CollectorService.AddSensorConfigs(sensorConfigs); nil != err {
return err
}
logging.Logger().Debugf("Sensor configs[%d] were added", len(sensorConfigs))
return nil
@ -69,6 +75,10 @@ func (s *SensorConfigService) AddConfig(sensorConfig *ocmsc.SensorConfig) error
}
s.sensorConfigs[configID] = sensorConfig
if err := s.CollectorService.AddSensorConfigs([]*ocmsc.SensorConfig{sensorConfig}); nil != err {
return err
}
logging.Logger().Debugf("Sensor config[%s] was added", configID)
return nil
@ -86,17 +96,29 @@ func (s *SensorConfigService) UpdateConfig(sensorConfig *ocmsc.SensorConfig) err
delete(s.sensorConfigs, configID)
s.sensorConfigs[configID] = sensorConfig
if err := s.CollectorService.RemoveSensorConfigs([]*ocmsc.SensorConfig{sensorConfig}); nil != err {
return err
}
if err := s.CollectorService.AddSensorConfigs([]*ocmsc.SensorConfig{sensorConfig}); nil != err {
return err
}
logging.Logger().Debugf("Sensor config[%s] was updated", configID)
return nil
}
func (s *SensorConfigService) RemoveConfig(sensorConfigID string) error {
if _, ok := s.sensorConfigs[sensorConfigID]; !ok {
sensorConfig, ok := s.sensorConfigs[sensorConfigID]
if !ok {
return fmt.Errorf("Sensor config[%s] is not exist", sensorConfigID)
}
delete(s.sensorConfigs, sensorConfigID)
if err := s.CollectorService.RemoveSensorConfigs([]*ocmsc.SensorConfig{sensorConfig}); nil != err {
return err
}
logging.Logger().Debugf("Sensor config[%d] was removed", sensorConfigID)
return nil
}