long poller

This commit is contained in:
insanity@loafle.com 2017-05-15 14:48:08 +09:00
parent 37b37b5870
commit 254b3f9392
2 changed files with 32 additions and 37 deletions

View File

@ -25,12 +25,18 @@ type Poller struct {
once sync.Once once sync.Once
runStat chan bool runStat chan bool
gconf *cm.GlobalConfig gconf *cm.GlobalConfig
intervalCh chan interface{}
scheduler *s.Scheduler scheduler *s.Scheduler
taskCh chan interface{}
} }
func init() { func Start(conf *cm.GlobalConfig) (chan interface{}, error) {
go handleConfigLoaded() p := GetInstance()
p.taskCh = make(chan interface{})
err := p.startPolling(conf)
if err != nil {
return nil, err
}
return p.taskCh, nil
} }
func GetInstance() *Poller { func GetInstance() *Poller {
@ -40,18 +46,16 @@ func GetInstance() *Poller {
return instance return instance
} }
func startPoller(ch chan interface{}) { func (p *Poller) startPolling(conf *cm.GlobalConfig) error {
p := GetInstance() p.gconf = conf
go func() { p.scheduler = &s.Scheduler{}
data := <-ch p.scheduler.Start()
p.gconf = data.(cm.ConfigManager).GetGlobalConfig()
p.start()
p.addObservers()
}()
}
func (p *Poller) addObservers() { err := p.scheduler.NewSchedule(POLLING_ID, uint64(p.gconf.IntervalSecond), p.polling)
go p.handleAgentStop() if err != nil {
return err
}
return nil
} }
func (p *Poller) handleInterval(ch chan interface{}) { func (p *Poller) handleInterval(ch chan interface{}) {
@ -67,17 +71,10 @@ func (p *Poller) handleInterval(ch chan interface{}) {
}() }()
} }
func (p *Poller) start() { func (p *Poller) Stop(ch chan interface{}) {
p.once.Do(func() {
p.scheduler = &s.Scheduler{}
p.scheduler.Start()
p.scheduler.NewSchedule(POLLING_ID, DEFAULT_INTERVAL, p.polling)
})
}
func (p *Poller) Stop() {
p.scheduler.Stop() p.scheduler.Stop()
p.runStat <- false p.runStat <- false
p.removeAgentStopHandler(ch)
} }
func (p *Poller) polling(agentId string) { func (p *Poller) polling(agentId string) {
@ -99,22 +96,12 @@ func (p *Poller) polling(agentId string) {
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
p.dispatchNotify(out) p.dispatchNotify(out)
} }
func (p *Poller) dispatchNotify(result interface{}) { func (p *Poller) dispatchNotify(result interface{}) {
log.Printf("Polling result - %s ", result) log.Printf("Polling result - %s ", result)
/* p.taskCh <- result
TASK_SENSOR_START
TASK_SENSOR_STOP
TASK_SENSOR_ADD
TASK_SENSOR_REMOVE
TASK_SENSOR_UPDATE
TASK_CRAWLER_UPDATE
TASK_AGENT_UPDATE
TASK_LOG_SEND
*/
} }
func (p *Poller) updateInterval(interval string) { func (p *Poller) updateInterval(interval string) {
@ -126,6 +113,7 @@ func (p *Poller) updateInterval(interval string) {
p.scheduler.UpdateInterval(POLLING_ID, uint64(i)) p.scheduler.UpdateInterval(POLLING_ID, uint64(i))
} }
/* /*
func printStatusStream(client pb.StatusClient) { func printStatusStream(client pb.StatusClient) {
stream, err := client.Status(context.Background(), &pb.Empty{}) stream, err := client.Status(context.Background(), &pb.Empty{})

View File

@ -9,12 +9,19 @@ func handleConfigLoaded() {
ch := make(chan interface{}, 0) ch := make(chan interface{}, 0)
observer.Add(messages.CFG_LOADED, ch) observer.Add(messages.CFG_LOADED, ch)
startPoller(ch) startPoller(ch)
//observer.Remove(messages.CFG_LOADED, ch)
} }
func (p *Poller) handleAgentStop() { func (p *Poller) handleAgentStop() {
ch := make(chan interface{}, 0) ch := make(chan interface{}, 0)
observer.Add(messages.AGT_STOPPED, ch) observer.Add(messages.AGT_STOPPED, ch)
p.Stop() _ = <- ch
//observer.Remove(messages.AGT_STOPPED, ch) p.Stop(ch)
}
func (p *Poller) removeConfigLoadedHandler(ch chan interface{}) {
observer.Remove(messages.CFG_LOADED, ch)
}
func (p *Poller) removeAgentStopHandler(ch chan interface{}) {
observer.Remove(messages.AGT_STOPPED, ch)
} }