diff --git a/poller.go b/poller.go index 56c9ab8..9fbf82b 100644 --- a/poller.go +++ b/poller.go @@ -7,39 +7,73 @@ import ( "log" "sync" + cm "loafle.com/overflow/agent_api/config_manager" + "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" pb "loafle.com/overflow/crawler_go/grpc" //temp ) const ( POLLING_ID = "OVERFLOW_LONG_POLLING" DEFAULT_INTERVAL = uint64(3) - CENTRAL_ADDR = "127.0.0.1:50052" ) -type LongPoller struct { +var ( + instance *Poller + once sync.Once +) + +type Poller struct { once sync.Once runStat chan bool + gconf *cm.GlobalConfig } -func (p *LongPoller) Start() { +func init() { + addObservers() +} + +func GetInstance() *Poller { + once.Do(func() { + instance = &Poller{} + }) + return instance +} + +func addObservers() { + ch := make(chan interface{}, 0) + observer.Add(messages.CONFIGMANAGER_LOADED, ch) + handleInit(ch) +} + +func handleInit(ch chan interface{}) { + ds := GetInstance() + go func() { + data := <-ch + log.Println(data) + ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() + ds.start() + close(ch) + }() +} + +func (p *Poller) start() { p.once.Do(func() { - p.startPolling() + cr := &cron.Cron{} + p.runStat = cr.Start() + cr.AddTask(POLLING_ID, DEFAULT_INTERVAL).Invoke(p.polling, agentIdentifier()) }) } -func (p *LongPoller) startPolling() { - cr := &cron.Cron{} - p.runStat = cr.Start() - cr.AddTask(POLLING_ID, DEFAULT_INTERVAL).Invoke(p.polling, agentIdentifier()) -} - -func (p *LongPoller) Stop() { +func (p *Poller) Stop() { p.runStat <- false + observer.Notify(messages.POLLER_STOPPED, true) } -func (p *LongPoller) polling(agentId string) { +func (p *Poller) polling(agentId string) { - conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) + addr := p.gconf.Central.Address + ":" + string(p.gconf.Central.Port) + conn, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { log.Println(err) return @@ -59,7 +93,7 @@ func (p *LongPoller) polling(agentId string) { p.dispatchNotify(out) } -func (p *LongPoller) dispatchNotify(result interface{}) { +func (p *Poller) dispatchNotify(result interface{}) { log.Printf("Polling result - %s ", result) } diff --git a/poller_test.go b/poller_test.go index 40f9705..9b7c52c 100644 --- a/poller_test.go +++ b/poller_test.go @@ -1,15 +1,21 @@ package long_poller_go import ( + "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" "log" "testing" "time" ) func TestPolling(t *testing.T) { - poller := &LongPoller{} - poller.Start() - - log.Println("hihihihihi") - + poller := &Poller{} + poller.start() +} + +func TestTotal(t *testing.T) { + time.Sleep(time.Second * 5) + + observer.Notify(messages.AGENT_STARTED, "") //CONFIG LOCATION + time.Sleep(time.Second * 5) }