diff --git a/config.json b/config.json index dbd3cf0..6a1370e 100644 --- a/config.json +++ b/config.json @@ -6,7 +6,7 @@ "writeBufferSize": 8192 }, "probe": { - "key": "52abd6fd57e511e7ac52080027658d13" + "key": "4af5b56531ad11e8b7900242ac120004" }, "paths": { "root": "/project/overFlow/probe" diff --git a/noauthprobe.json b/noauthprobe.json index ec6f0be..95af194 100644 --- a/noauthprobe.json +++ b/noauthprobe.json @@ -1,3 +1,3 @@ { - "tempKey": "30e31f01-dc04-47ee-baa0-689ea32158eb" + "tempKey": "230d56c431ad11e8b7900242ac120004" } \ No newline at end of file diff --git a/probe/probe.go b/probe/probe.go index 8134315..0b10dc5 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -2,7 +2,6 @@ package probe import ( "context" - "reflect" "sync" cdr "git.loafle.net/commons_go/di/registry" @@ -18,15 +17,6 @@ import ( "git.loafle.net/overflow/overflow_probes/service" ) -var ( - servicesToStartAndStop = []reflect.Type{ - reflect.TypeOf((*service.CentralService)(nil)), - reflect.TypeOf((*service.SensorConfigService)(nil)), - reflect.TypeOf((*service.CrawlerService)(nil)), - reflect.TypeOf((*service.CollectorService)(nil)), - } -) - func New() ProbeManager { a := &probeManagers{} @@ -103,13 +93,13 @@ func (pm *probeManagers) handleProbe(services []interface{}) { err error ) - err = oocu.ExecuteStarters(services, servicesToStartAndStop, false) + err = oocu.ExecuteStarters(services, service.ServicesToStartAndStop, false) if nil != err { logging.Logger().Panic(err) } defer func() { - err = oocu.ExecuteStoppers(services, servicesToStartAndStop, true) + err = oocu.ExecuteStoppers(services, service.ServicesToStartAndStop, true) if nil != err { logging.Logger().Error(err) } diff --git a/service/CollectorService.go b/service/CollectorService.go index a0a4a97..a68d135 100644 --- a/service/CollectorService.go +++ b/service/CollectorService.go @@ -2,11 +2,15 @@ package service import ( "context" + "fmt" "reflect" + "strconv" cda "git.loafle.net/commons_go/di/annotation" cdr "git.loafle.net/commons_go/di/registry" + "git.loafle.net/commons_go/logging" oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces" + sensorConfigM "git.loafle.net/overflow/overflow_commons_go/modules/sensor_config/model" "git.loafle.net/overflow/overflow_probes/commons/scheduler" ) @@ -26,9 +30,17 @@ type CollectorService struct { } func (cs *CollectorService) Start() error { - // cs.crawlingScheduler = scheduler.NewScheduler() + cs.crawlingScheduler = scheduler.NewScheduler() - // cs.crawlingScheduler.Every(1).Second().Do(cs.test) + if err := cs.addScheduleAll(); nil != err { + return err + } + + schedulerStopCh := cs.crawlingScheduler.Start() + go func(stopCh chan bool) { + <-stopCh + logging.Logger().Infof("Scheduler of collector has been stopped") + }(schedulerStopCh) return nil } @@ -38,6 +50,37 @@ func (cs *CollectorService) Stop(ctx context.Context) error { return nil } -func (cs *CollectorService) test() { +func (cs *CollectorService) addScheduleAll() error { + sensorConfigs := cs.SensorConfigService.sensorConfigs + if nil == sensorConfigs || 0 == len(sensorConfigs) { + return nil + } + for _, sensorConfig := range sensorConfigs { + interval, err := strconv.ParseUint(sensorConfig.Schedule.Interval, 10, 64) + if nil != err { + return fmt.Errorf("Cannot convert interval[%s] %v", sensorConfig.Schedule.Interval, err) + } + cs.addSchedule(interval, sensorConfig) + } + + return nil +} + +func (cs *CollectorService) addSchedule(interval uint64, sensorConfig *sensorConfigM.SensorConfig) { + cs.crawlingScheduler.Every(interval).Seconds().Do(cs.collectTask, sensorConfig) +} + +func (cs *CollectorService) collectTask(sensorConfig *sensorConfigM.SensorConfig) { + go func(_sensorConfig *sensorConfigM.SensorConfig) { + logging.Logger().Debugf("CollectorService.collectTask for sensor config id[%s] container[%s] crawler[%s]", _sensorConfig.ID.String(), _sensorConfig.Crawler.Container, _sensorConfig.Crawler.Name) + var result map[string]string + err := cs.ContainerService.Call(_sensorConfig.Crawler.Container, &result, "CrawlerService.GetData", _sensorConfig.ID.String()) + if nil != err { + logging.Logger().Errorf("Cannot get data from crawler[%s] of container[%s] %v", _sensorConfig.Crawler.Name, _sensorConfig.Crawler.Container, err) + return + } + logging.Logger().Debugf("Data[%v] received from crawler[%s] of container[%s]", result, _sensorConfig.Crawler.Name, _sensorConfig.Crawler.Container) + + }(sensorConfig) } diff --git a/service/CrawlerService.go b/service/CrawlerService.go index 857d9d7..e48cdb5 100644 --- a/service/CrawlerService.go +++ b/service/CrawlerService.go @@ -70,12 +70,7 @@ func sortSensorConfigPerContainer(sensorConfigMap map[string]*sensorConfigM.Sens for _, sensorConfig := range sensorConfigMap { containerName := sensorConfig.Crawler.Container - scs, ok := m[containerName] - if !ok { - scs = make([]*sensorConfigM.SensorConfig, 0) - m[containerName] = scs - } - scs = append(scs, sensorConfig) + m[containerName] = append(m[containerName], sensorConfig) } return m diff --git a/service/service.go b/service/service.go index fb7d721..c816c20 100644 --- a/service/service.go +++ b/service/service.go @@ -1,5 +1,16 @@ package service +import "reflect" + +var ( + ServicesToStartAndStop = []reflect.Type{ + reflect.TypeOf((*CentralService)(nil)), + reflect.TypeOf((*SensorConfigService)(nil)), + reflect.TypeOf((*CrawlerService)(nil)), + reflect.TypeOf((*CollectorService)(nil)), + } +) + func InitService() { }