This commit is contained in:
crusader 2018-03-27 21:35:59 +09:00
parent a3ad29ff73
commit 058c15bb8a
6 changed files with 62 additions and 23 deletions

View File

@ -6,7 +6,7 @@
"writeBufferSize": 8192 "writeBufferSize": 8192
}, },
"probe": { "probe": {
"key": "52abd6fd57e511e7ac52080027658d13" "key": "4af5b56531ad11e8b7900242ac120004"
}, },
"paths": { "paths": {
"root": "/project/overFlow/probe" "root": "/project/overFlow/probe"

View File

@ -1,3 +1,3 @@
{ {
"tempKey": "30e31f01-dc04-47ee-baa0-689ea32158eb" "tempKey": "230d56c431ad11e8b7900242ac120004"
} }

View File

@ -2,7 +2,6 @@ package probe
import ( import (
"context" "context"
"reflect"
"sync" "sync"
cdr "git.loafle.net/commons_go/di/registry" cdr "git.loafle.net/commons_go/di/registry"
@ -18,15 +17,6 @@ import (
"git.loafle.net/overflow/overflow_probes/service" "git.loafle.net/overflow/overflow_probes/service"
) )
var (
servicesToStartAndStop = []reflect.Type{
reflect.TypeOf((*service.CentralService)(nil)),
reflect.TypeOf((*service.SensorConfigService)(nil)),
reflect.TypeOf((*service.CrawlerService)(nil)),
reflect.TypeOf((*service.CollectorService)(nil)),
}
)
func New() ProbeManager { func New() ProbeManager {
a := &probeManagers{} a := &probeManagers{}
@ -103,13 +93,13 @@ func (pm *probeManagers) handleProbe(services []interface{}) {
err error err error
) )
err = oocu.ExecuteStarters(services, servicesToStartAndStop, false) err = oocu.ExecuteStarters(services, service.ServicesToStartAndStop, false)
if nil != err { if nil != err {
logging.Logger().Panic(err) logging.Logger().Panic(err)
} }
defer func() { defer func() {
err = oocu.ExecuteStoppers(services, servicesToStartAndStop, true) err = oocu.ExecuteStoppers(services, service.ServicesToStartAndStop, true)
if nil != err { if nil != err {
logging.Logger().Error(err) logging.Logger().Error(err)
} }

View File

@ -2,11 +2,15 @@ package service
import ( import (
"context" "context"
"fmt"
"reflect" "reflect"
"strconv"
cda "git.loafle.net/commons_go/di/annotation" cda "git.loafle.net/commons_go/di/annotation"
cdr "git.loafle.net/commons_go/di/registry" cdr "git.loafle.net/commons_go/di/registry"
"git.loafle.net/commons_go/logging"
oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces" oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
sensorConfigM "git.loafle.net/overflow/overflow_commons_go/modules/sensor_config/model"
"git.loafle.net/overflow/overflow_probes/commons/scheduler" "git.loafle.net/overflow/overflow_probes/commons/scheduler"
) )
@ -26,9 +30,17 @@ type CollectorService struct {
} }
func (cs *CollectorService) Start() error { func (cs *CollectorService) Start() error {
// cs.crawlingScheduler = scheduler.NewScheduler() cs.crawlingScheduler = scheduler.NewScheduler()
// cs.crawlingScheduler.Every(1).Second().Do(cs.test) if err := cs.addScheduleAll(); nil != err {
return err
}
schedulerStopCh := cs.crawlingScheduler.Start()
go func(stopCh chan bool) {
<-stopCh
logging.Logger().Infof("Scheduler of collector has been stopped")
}(schedulerStopCh)
return nil return nil
} }
@ -38,6 +50,37 @@ func (cs *CollectorService) Stop(ctx context.Context) error {
return nil return nil
} }
func (cs *CollectorService) test() { func (cs *CollectorService) addScheduleAll() error {
sensorConfigs := cs.SensorConfigService.sensorConfigs
if nil == sensorConfigs || 0 == len(sensorConfigs) {
return nil
}
for _, sensorConfig := range sensorConfigs {
interval, err := strconv.ParseUint(sensorConfig.Schedule.Interval, 10, 64)
if nil != err {
return fmt.Errorf("Cannot convert interval[%s] %v", sensorConfig.Schedule.Interval, err)
}
cs.addSchedule(interval, sensorConfig)
}
return nil
}
func (cs *CollectorService) addSchedule(interval uint64, sensorConfig *sensorConfigM.SensorConfig) {
cs.crawlingScheduler.Every(interval).Seconds().Do(cs.collectTask, sensorConfig)
}
func (cs *CollectorService) collectTask(sensorConfig *sensorConfigM.SensorConfig) {
go func(_sensorConfig *sensorConfigM.SensorConfig) {
logging.Logger().Debugf("CollectorService.collectTask for sensor config id[%s] container[%s] crawler[%s]", _sensorConfig.ID.String(), _sensorConfig.Crawler.Container, _sensorConfig.Crawler.Name)
var result map[string]string
err := cs.ContainerService.Call(_sensorConfig.Crawler.Container, &result, "CrawlerService.GetData", _sensorConfig.ID.String())
if nil != err {
logging.Logger().Errorf("Cannot get data from crawler[%s] of container[%s] %v", _sensorConfig.Crawler.Name, _sensorConfig.Crawler.Container, err)
return
}
logging.Logger().Debugf("Data[%v] received from crawler[%s] of container[%s]", result, _sensorConfig.Crawler.Name, _sensorConfig.Crawler.Container)
}(sensorConfig)
} }

View File

@ -70,12 +70,7 @@ func sortSensorConfigPerContainer(sensorConfigMap map[string]*sensorConfigM.Sens
for _, sensorConfig := range sensorConfigMap { for _, sensorConfig := range sensorConfigMap {
containerName := sensorConfig.Crawler.Container containerName := sensorConfig.Crawler.Container
scs, ok := m[containerName] m[containerName] = append(m[containerName], sensorConfig)
if !ok {
scs = make([]*sensorConfigM.SensorConfig, 0)
m[containerName] = scs
}
scs = append(scs, sensorConfig)
} }
return m return m

View File

@ -1,5 +1,16 @@
package service package service
import "reflect"
var (
ServicesToStartAndStop = []reflect.Type{
reflect.TypeOf((*CentralService)(nil)),
reflect.TypeOf((*SensorConfigService)(nil)),
reflect.TypeOf((*CrawlerService)(nil)),
reflect.TypeOf((*CollectorService)(nil)),
}
)
func InitService() { func InitService() {
} }