package collector import ( "context" cm "git.loafle.net/overflow/overflow_probe/agent_api/config_manager" "git.loafle.net/overflow/overflow_probe/crawler/grpc" crm "git.loafle.net/overflow/overflow_probe/crawler_manager" s "git.loafle.net/overflow/overflow_probe/scheduler" log "github.com/cihub/seelog" "strconv" "sync" "time" ) var ( instance *Collector once sync.Once ) type Collector struct { scheduler *s.Scheduler cm cm.ConfigManager dataCh chan interface{} } func Start(started chan bool, dataCh chan interface{}, conf cm.ConfigManager) { c := GetInstance() c.dataCh = dataCh c.start(started, conf) } func Stop(stopped chan bool) { c := GetInstance() c.stop() stopped <- true } func GetInstance() *Collector { once.Do(func() { instance = &Collector{} }) return instance } func (c *Collector) start(started chan bool, conf cm.ConfigManager) { go func() { c.cm = conf c.scheduler = &s.Scheduler{} c.scheduler.Start() for _, conf := range c.cm.GetSensors() { if err := c.addSensor(conf.Id); err != nil { log.Debug(err) } } started <- true }() } func (c *Collector) stop() { c.scheduler.RemoveAllSchedule() c.scheduler.Stop() } func (c *Collector) collect(id string) { go func() { conf := c.cm.GetSensorById(id) conn, err := crm.GetInstance().GetClient(conf.Crawler.Container) if err != nil { log.Error(err) } defer conn.Close() dc := grpc.NewDataClient(conn) in := &grpc.Input{} in.Id = id in.Name = grpc.Crawlers(grpc.Crawlers_value[conf.Crawler.Name]) startAt := time.Now().Unix() out, err := dc.Get(context.Background(), in) finishedAt := time.Now().Unix() if err != nil { log.Error("Cannot collect [ID: %s] [Crawler : %s] - [Err : %s]\n", conf.Id, conf.Crawler.Name, err.Error()) return } out.StartDate = startAt out.EndDate = finishedAt log.Error("COLLECTED. [ID: %s] [Crawler : %s] [Result : %s]\n", conf.Id, conf.Crawler.Name, out.GetData()) c.dataCh <- out }() } func (c *Collector) addSensor(sensorId string) error { sensor := c.cm.GetSensorById(sensorId) interval, err := strconv.Atoi(sensor.Schedule.Interval) if err != nil { return err } return c.scheduler.NewSchedule(sensorId, uint64(interval), c.collect) } func (c *Collector) removeSensor(id string) error { if err := c.scheduler.RemoveSchedule(id); err != nil { return err } return nil } func (c *Collector) updateSensor(id string) error { err := c.removeSensor(id) if err != nil { return err } return c.addSensor(id) } func AddSensor(id string) error { return GetInstance().addSensor(id) } func RemSensor(id string) error { return GetInstance().removeSensor(id) } func UpdateSensor(id string) error { return GetInstance().updateSensor(id) } func StartSensor(id string) error { return GetInstance().scheduler.StartSchedule(id) } func StopSensor(id string) error { return GetInstance().scheduler.StopSchedule(id) }