2018-05-02 10:59:45 +00:00
package service
import (
"fmt"
"reflect"
2018-05-03 11:00:07 +00:00
"sync"
2018-05-02 10:59:45 +00:00
"time"
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go"
2018-07-04 02:35:52 +00:00
cuej "git.loafle.net/commons/util-go/encoding/json"
2018-05-02 10:59:45 +00:00
cuts "git.loafle.net/commons/util-go/time/scheduler"
cutss "git.loafle.net/commons/util-go/time/scheduler/storage"
2018-05-11 05:24:13 +00:00
occu "git.loafle.net/overflow/commons-go/core/util"
ocmd "git.loafle.net/overflow/commons-go/model/data"
2018-05-02 10:59:45 +00:00
ocmsc "git.loafle.net/overflow/commons-go/model/sensorconfig"
ocsp "git.loafle.net/overflow/commons-go/service/probe"
// For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation"
)
var CollectorServiceType = reflect . TypeOf ( ( * CollectorService ) ( nil ) )
func init ( ) {
cdr . RegisterType ( CollectorServiceType )
}
type CollectorService struct {
ocsp . CollectorService
cda . TypeAnnotation ` annotation:"@overflow:RPCService()" `
2018-05-03 11:00:07 +00:00
ProbeService * ProbeService ` annotation:"@Inject()" `
CrawlerService * CrawlerService ` annotation:"@Inject()" `
2018-05-02 10:59:45 +00:00
2018-05-03 11:00:07 +00:00
scheduler * cuts . Scheduler
scheduleMap sync . Map
2018-05-02 10:59:45 +00:00
}
func ( s * CollectorService ) InitService ( ) error {
_storage := cutss . NewMemoryStorage ( )
s . scheduler = cuts . New ( _storage )
return nil
}
func ( s * CollectorService ) StartService ( ) error {
if err := s . scheduler . Start ( ) ; nil != err {
return err
}
return nil
}
func ( s * CollectorService ) StopService ( ) {
s . scheduler . Stop ( )
}
func ( s * CollectorService ) DestroyService ( ) {
}
2018-05-03 11:00:07 +00:00
func ( s * CollectorService ) AddSensorConfigs ( sensorConfigs [ ] * ocmsc . SensorConfig ) error {
2018-05-02 10:59:45 +00:00
if nil == sensorConfigs || 0 == len ( sensorConfigs ) {
return nil
}
for _ , sensorConfig := range sensorConfigs {
2018-07-04 02:35:52 +00:00
interval , err := cuej . NumberToInt ( sensorConfig . Schedule . Interval )
2018-05-02 10:59:45 +00:00
if nil != err {
return fmt . Errorf ( "Cannot convert interval[%s] %v" , sensorConfig . Schedule . Interval , err )
}
s . addSchedule ( interval , sensorConfig )
2018-07-04 02:35:52 +00:00
logging . Logger ( ) . Debugf ( "scheduler of config[%s] has been added" , sensorConfig . SensorID )
2018-05-02 10:59:45 +00:00
}
return nil
}
2018-05-03 11:00:07 +00:00
func ( s * CollectorService ) RemoveSensorConfigs ( sensorConfigs [ ] * ocmsc . SensorConfig ) error {
if nil == sensorConfigs || 0 == len ( sensorConfigs ) {
return nil
}
for _ , sensorConfig := range sensorConfigs {
2018-07-04 02:35:52 +00:00
s . removeSchedule ( sensorConfig . SensorID . String ( ) )
2018-05-03 11:00:07 +00:00
}
return nil
}
2018-07-04 02:35:52 +00:00
func ( s * CollectorService ) addSchedule ( interval int , sensorConfig * ocmsc . SensorConfig ) {
s . removeSchedule ( sensorConfig . SensorID . String ( ) )
2018-05-03 11:00:07 +00:00
scheduleID , err := s . scheduler . RunEvery ( time . Duration ( interval ) * time . Second , s . collectTask , sensorConfig )
if nil != err {
2018-07-04 02:35:52 +00:00
logging . Logger ( ) . Errorf ( "Cannot add schedule for config[%s] %v" , sensorConfig . SensorID , err )
2018-05-03 11:00:07 +00:00
return
}
2018-07-04 02:35:52 +00:00
s . scheduleMap . Store ( sensorConfig . SensorID , scheduleID )
2018-05-02 10:59:45 +00:00
}
2018-07-04 02:35:52 +00:00
func ( s * CollectorService ) removeSchedule ( sensorID string ) {
scheduleID , ok := s . scheduleMap . Load ( sensorID )
2018-05-04 11:44:16 +00:00
if ! ok {
return
}
2018-07-04 02:35:52 +00:00
s . scheduleMap . Delete ( sensorID )
2018-05-04 11:44:16 +00:00
s . scheduler . Cancel ( scheduleID . ( string ) )
}
2018-05-02 10:59:45 +00:00
func ( s * CollectorService ) collectTask ( sensorConfig * ocmsc . SensorConfig ) {
2018-07-04 02:35:52 +00:00
logging . Logger ( ) . Debugf ( "CollectorService.collectTask for sensor config id[%s] of crawler[%s]" , sensorConfig . SensorID , sensorConfig . Crawler . MetaCrawlerKey )
2018-05-02 10:59:45 +00:00
2018-07-04 02:35:52 +00:00
result , err := s . CrawlerService . Get ( sensorConfig . SensorID . String ( ) )
2018-05-02 10:59:45 +00:00
if nil != err {
2018-07-04 02:35:52 +00:00
logging . Logger ( ) . Errorf ( "Cannot get data from crawler[%s] %v" , sensorConfig . Crawler . MetaCrawlerKey , err )
2018-05-02 10:59:45 +00:00
return
}
2018-05-11 05:24:13 +00:00
m := & ocmd . Metric {
2018-07-04 02:35:52 +00:00
SensorConfigID : sensorConfig . SensorID . String ( ) ,
2018-05-11 05:24:13 +00:00
Data : result ,
CollectDate : occu . NowPtr ( ) ,
}
if err := s . ProbeService . Send ( "MetricService.Send" , m ) ; nil != err {
2018-07-04 02:35:52 +00:00
logging . Logger ( ) . Errorf ( "Cannot send data from config id[%s] of crawler[%s] %v" , sensorConfig . SensorID , sensorConfig . Crawler . MetaCrawlerKey , err )
2018-05-02 10:59:45 +00:00
}
}