diff --git a/poller.go b/poller.go index 65cdc12..da31b57 100644 --- a/poller.go +++ b/poller.go @@ -25,12 +25,18 @@ type Poller struct { once sync.Once runStat chan bool gconf *cm.GlobalConfig - intervalCh chan interface{} scheduler *s.Scheduler + taskCh chan interface{} } -func init() { - go handleConfigLoaded() +func Start(conf *cm.GlobalConfig) (chan interface{}, error) { + p := GetInstance() + p.taskCh = make(chan interface{}) + err := p.startPolling(conf) + if err != nil { + return nil, err + } + return p.taskCh, nil } func GetInstance() *Poller { @@ -40,18 +46,16 @@ func GetInstance() *Poller { return instance } -func startPoller(ch chan interface{}) { - p := GetInstance() - go func() { - data := <-ch - p.gconf = data.(cm.ConfigManager).GetGlobalConfig() - p.start() - p.addObservers() - }() -} +func (p *Poller) startPolling(conf *cm.GlobalConfig) error { + p.gconf = conf + p.scheduler = &s.Scheduler{} + p.scheduler.Start() -func (p *Poller) addObservers() { - go p.handleAgentStop() + err := p.scheduler.NewSchedule(POLLING_ID, uint64(p.gconf.IntervalSecond), p.polling) + if err != nil { + return err + } + return nil } func (p *Poller) handleInterval(ch chan interface{}) { @@ -67,17 +71,10 @@ func (p *Poller) handleInterval(ch chan interface{}) { }() } -func (p *Poller) start() { - p.once.Do(func() { - p.scheduler = &s.Scheduler{} - p.scheduler.Start() - p.scheduler.NewSchedule(POLLING_ID, DEFAULT_INTERVAL, p.polling) - }) -} - -func (p *Poller) Stop() { +func (p *Poller) Stop(ch chan interface{}) { p.scheduler.Stop() p.runStat <- false + p.removeAgentStopHandler(ch) } func (p *Poller) polling(agentId string) { @@ -99,22 +96,12 @@ func (p *Poller) polling(agentId string) { if err != nil { log.Println(err) } - p.dispatchNotify(out) } func (p *Poller) dispatchNotify(result interface{}) { log.Printf("Polling result - %s ", result) - /* - TASK_SENSOR_START - TASK_SENSOR_STOP - TASK_SENSOR_ADD - TASK_SENSOR_REMOVE - TASK_SENSOR_UPDATE - TASK_CRAWLER_UPDATE - TASK_AGENT_UPDATE - TASK_LOG_SEND - */ + p.taskCh <- result } func (p *Poller) updateInterval(interval string) { @@ -126,6 +113,7 @@ func (p *Poller) updateInterval(interval string) { p.scheduler.UpdateInterval(POLLING_ID, uint64(i)) } + /* func printStatusStream(client pb.StatusClient) { stream, err := client.Status(context.Background(), &pb.Empty{}) diff --git a/poller_event.go b/poller_event.go index 47d2e4b..24e3fa8 100644 --- a/poller_event.go +++ b/poller_event.go @@ -9,12 +9,19 @@ func handleConfigLoaded() { ch := make(chan interface{}, 0) observer.Add(messages.CFG_LOADED, ch) startPoller(ch) - //observer.Remove(messages.CFG_LOADED, ch) } func (p *Poller) handleAgentStop() { ch := make(chan interface{}, 0) observer.Add(messages.AGT_STOPPED, ch) - p.Stop() - //observer.Remove(messages.AGT_STOPPED, ch) + _ = <- ch + p.Stop(ch) +} + +func (p *Poller) removeConfigLoadedHandler(ch chan interface{}) { + observer.Remove(messages.CFG_LOADED, ch) +} + +func (p *Poller) removeAgentStopHandler(ch chan interface{}) { + observer.Remove(messages.AGT_STOPPED, ch) }