package service import ( "fmt" "reflect" "strconv" "sync" "time" cda "git.loafle.net/commons/di-go/annotation" cdr "git.loafle.net/commons/di-go/registry" logging "git.loafle.net/commons/logging-go" cuts "git.loafle.net/commons/util-go/time/scheduler" cutss "git.loafle.net/commons/util-go/time/scheduler/storage" occu "git.loafle.net/overflow/commons-go/core/util" ocmd "git.loafle.net/overflow/commons-go/model/data" ocmsc "git.loafle.net/overflow/commons-go/model/sensorconfig" ocsp "git.loafle.net/overflow/commons-go/service/probe" // For annotation _ "git.loafle.net/overflow/commons-go/core/annotation" ) var CollectorServiceType = reflect.TypeOf((*CollectorService)(nil)) func init() { cdr.RegisterType(CollectorServiceType) } type CollectorService struct { ocsp.CollectorService cda.TypeAnnotation `annotation:"@overflow:RPCService()"` ProbeService *ProbeService `annotation:"@Inject()"` CrawlerService *CrawlerService `annotation:"@Inject()"` scheduler *cuts.Scheduler scheduleMap sync.Map } func (s *CollectorService) InitService() error { _storage := cutss.NewMemoryStorage() s.scheduler = cuts.New(_storage) return nil } func (s *CollectorService) StartService() error { if err := s.scheduler.Start(); nil != err { return err } return nil } func (s *CollectorService) StopService() { s.scheduler.Stop() } func (s *CollectorService) DestroyService() { } func (s *CollectorService) AddSensorConfigs(sensorConfigs []*ocmsc.SensorConfig) error { if nil == sensorConfigs || 0 == len(sensorConfigs) { return nil } for _, sensorConfig := range sensorConfigs { interval, err := strconv.ParseInt(sensorConfig.Schedule.Interval, 10, 64) if nil != err { return fmt.Errorf("Cannot convert interval[%s] %v", sensorConfig.Schedule.Interval, err) } s.addSchedule(interval, sensorConfig) logging.Logger().Debugf("scheduler of config[%s] has been added", sensorConfig.ConfigID) } return nil } func (s *CollectorService) RemoveSensorConfigs(sensorConfigs []*ocmsc.SensorConfig) error { if nil == sensorConfigs || 0 == len(sensorConfigs) { return nil } for _, sensorConfig := range sensorConfigs { s.removeSchedule(sensorConfig.ConfigID) } return nil } func (s *CollectorService) addSchedule(interval int64, sensorConfig *ocmsc.SensorConfig) { s.removeSchedule(sensorConfig.ConfigID) scheduleID, err := s.scheduler.RunEvery(time.Duration(interval)*time.Second, s.collectTask, sensorConfig) if nil != err { logging.Logger().Errorf("Cannot add schedule for config[%s] %v", sensorConfig.ConfigID, err) return } s.scheduleMap.Store(sensorConfig.ConfigID, scheduleID) } func (s *CollectorService) removeSchedule(configID string) { scheduleID, ok := s.scheduleMap.Load(configID) if !ok { return } s.scheduleMap.Delete(configID) s.scheduler.Cancel(scheduleID.(string)) } func (s *CollectorService) collectTask(sensorConfig *ocmsc.SensorConfig) { logging.Logger().Debugf("CollectorService.collectTask for sensor config id[%s] of crawler[%s]", sensorConfig.ConfigID, sensorConfig.Crawler.Name) result, err := s.CrawlerService.Get(sensorConfig.ConfigID) if nil != err { logging.Logger().Errorf("Cannot get data from crawler[%s] %v", sensorConfig.Crawler.Name, err) return } m := &ocmd.Metric{ SensorConfigID: sensorConfig.ConfigID, Data: result, CollectDate: occu.NowPtr(), } if err := s.ProbeService.Send("MetricService.Send", m); nil != err { logging.Logger().Errorf("Cannot send data from config id[%s] of crawler[%s] %v", sensorConfig.ConfigID, sensorConfig.Crawler.Name, err) } }