package service import ( "fmt" "reflect" "strconv" "time" cda "git.loafle.net/commons/di-go/annotation" cdr "git.loafle.net/commons/di-go/registry" cuts "git.loafle.net/commons/util-go/time/scheduler" cutss "git.loafle.net/commons/util-go/time/scheduler/storage" ocmsc "git.loafle.net/overflow/commons-go/model/sensorconfig" ocsp "git.loafle.net/overflow/commons-go/service/probe" // For annotation _ "git.loafle.net/overflow/commons-go/core/annotation" ) var CollectorServiceType = reflect.TypeOf((*CollectorService)(nil)) func init() { cdr.RegisterType(CollectorServiceType) } type CollectorService struct { ocsp.CollectorService cda.TypeAnnotation `annotation:"@overflow:RPCService()"` DataClientService *DataClientService `annotation:"@Inject()"` ContainerService *ContainerService `annotation:"@Inject()"` SensorConfigService *SensorConfigService `annotation:"@Inject()"` scheduler *cuts.Scheduler } func (s *CollectorService) InitService() error { _storage := cutss.NewMemoryStorage() s.scheduler = cuts.New(_storage) return nil } func (s *CollectorService) StartService() error { if err := s.scheduler.Start(); nil != err { return fmt.Errorf("CollectorService: StartService failed %v", err) } if err := s.addScheduleAll(); nil != err { return err } return nil } func (s *CollectorService) StopService() { s.scheduler.Stop() } func (s *CollectorService) DestroyService() { } func (s *CollectorService) addScheduleAll() error { sensorConfigs := s.SensorConfigService.sensorConfigs if nil == sensorConfigs || 0 == len(sensorConfigs) { return nil } for _, sensorConfig := range sensorConfigs { interval, err := strconv.ParseInt(sensorConfig.Schedule.Interval, 10, 64) if nil != err { return fmt.Errorf("Cannot convert interval[%s] %v", sensorConfig.Schedule.Interval, err) } s.addSchedule(interval, sensorConfig) } return nil } func (s *CollectorService) addSchedule(interval int64, sensorConfig *ocmsc.SensorConfig) { s.scheduler.RunEvery(time.Duration(interval)*time.Second, s.collectTask, sensorConfig) } func (s *CollectorService) collectTask(sensorConfig *ocmsc.SensorConfig) { // go func(_sensorConfig *ocmsc.SensorConfig) { // logging.Logger().Debugf("CollectorService.collectTask for sensor config id[%s] container[%s] crawler[%s]", _sensorConfig.ConfigID, _sensorConfig.Crawler.Container, _sensorConfig.Crawler.Name) // var result map[string]string // err := s.ContainerService.Call(occp.ToContainerType(_sensorConfig.Crawler.Container), &result, "CrawlerService.Get", _sensorConfig.ConfigID) // if nil != err { // logging.Logger().Errorf("Cannot get data from crawler[%s] of container[%s] %v", _sensorConfig.Crawler.Name, _sensorConfig.Crawler.Container, err) // return // } // logging.Logger().Debugf("Data[%v] received from crawler[%s] of container[%s]", result, _sensorConfig.Crawler.Name, _sensorConfig.Crawler.Container) // //cs.CentralService.Send(oocmp.HTTPEntry_Data, "CrawlerService.Data", result) // }(sensorConfig) }