package collector_go import ( "context" confMng "loafle.com/overflow/agent_api/config_manager" "loafle.com/overflow/crawler_go/grpc" crm "loafle.com/overflow/crawler_manager_go" s "loafle.com/overflow/scheduler_go" "log" "strconv" "sync" "time" ) var ( instance *Collector once sync.Once ) func init() { handleConfigLoaded() } func GetInstance() *Collector { once.Do(func() { instance = &Collector{} }) return instance } type Collector struct { scheduler *s.Scheduler cm confMng.ConfigManager startSensorCh chan interface{} stopSensorCh chan interface{} addSensorCh chan interface{} remSensorCh chan interface{} updateSensorCh chan interface{} crmSensorUpdateCh chan interface{} crmUpdateCh chan interface{} crmUpdateDoneCh chan interface{} } func (c *Collector) start(conf confMng.ConfigManager) { go func() { c.cm = conf c.scheduler = &s.Scheduler{} c.scheduler.Start() c.addObservers() c.notifyCollectorReady() for _, conf := range c.cm.GetSensors() { if err := c.addSensor(conf.Id); err != nil { log.Println(err) } } }() } func (c *Collector) addObservers() { c.startSensorCh = make(chan interface{}) c.stopSensorCh = make(chan interface{}) c.addSensorCh = make(chan interface{}) c.remSensorCh = make(chan interface{}) c.updateSensorCh = make(chan interface{}) c.crmSensorUpdateCh = make(chan interface{}) c.crmUpdateCh = make(chan interface{}) c.crmUpdateDoneCh = make(chan interface{}) go c.handleAgentWillStop() go c.handleSensorStart() go c.handleSensorStop() go c.handleSensorAdd() go c.handleSensorRemove() go c.handleSensorUpdate() go c.handleCrmSensorUpdateDone() go c.handleCrawlerUpdate() go c.handleCrawlerUpdateDone() } func (c *Collector) Stop() { c.cleanObserver(c.addSensorCh, c.remSensorCh) c.scheduler.RemoveAllSchedule() c.scheduler.Stop() c.notifyCollectorStopped() } func (c *Collector) collect(id string) { conf := c.cm.GetSensorById(id) log.Printf("COLLECT %s - [ID: %s] [Crawler : %s]", time.Now(), conf.Id, conf.Crawler.Name) conn, err := crm.GetInstance().GetClient(conf.Crawler.Container) if err != nil { log.Println(err) } defer conn.Close() dc := grpc.NewDataClient(conn) in := &grpc.Input{} in.Id = id in.Name = grpc.Crawlers(grpc.Crawlers_value[conf.Crawler.Name]) out, err := dc.Get(context.Background(), in) if err != nil { log.Println(err) } log.Println(out) c.notifyData(out) } func (c *Collector) addSensor(sensorId string) error { sensor := c.cm.GetSensorById(sensorId) interval, err := strconv.Atoi(sensor.Schedule.Interval) if err != nil { return err } return c.scheduler.NewSchedule(sensorId, uint64(interval), c.collect) } func (c *Collector) removeSensor(id string) { if err := c.scheduler.RemoveSchedule(id); err != nil { log.Println(err) } } func (c *Collector) updateSensor(id string) { //update }