From fbd63c533e9188217cbf240415f8f62b8f14aca1 Mon Sep 17 00:00:00 2001 From: "insanity@loafle.com" Date: Thu, 11 May 2017 18:36:20 +0900 Subject: [PATCH] long poller --- poller.go | 86 +++++++++++++++++++++++++++++++------------------ poller_event.go | 20 ++++++++++++ poller_test.go | 5 ++- 3 files changed, 77 insertions(+), 34 deletions(-) create mode 100644 poller_event.go diff --git a/poller.go b/poller.go index 9fbf82b..65cdc12 100644 --- a/poller.go +++ b/poller.go @@ -3,14 +3,12 @@ package long_poller_go import ( "context" "google.golang.org/grpc" - "loafle.com/overflow/cron_go" - "log" - "sync" - cm "loafle.com/overflow/agent_api/config_manager" - "loafle.com/overflow/agent_api/observer" - "loafle.com/overflow/agent_api/observer/messages" pb "loafle.com/overflow/crawler_go/grpc" //temp + s "loafle.com/overflow/scheduler_go" + "log" + "strconv" + "sync" ) const ( @@ -24,13 +22,15 @@ var ( ) type Poller struct { - once sync.Once - runStat chan bool - gconf *cm.GlobalConfig + once sync.Once + runStat chan bool + gconf *cm.GlobalConfig + intervalCh chan interface{} + scheduler *s.Scheduler } func init() { - addObservers() + go handleConfigLoaded() } func GetInstance() *Poller { @@ -40,34 +40,44 @@ func GetInstance() *Poller { return instance } -func addObservers() { - ch := make(chan interface{}, 0) - observer.Add(messages.CONFIGMANAGER_LOADED, ch) - handleInit(ch) -} - -func handleInit(ch chan interface{}) { - ds := GetInstance() +func startPoller(ch chan interface{}) { + p := GetInstance() go func() { data := <-ch - log.Println(data) - ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() - ds.start() - close(ch) + p.gconf = data.(cm.ConfigManager).GetGlobalConfig() + p.start() + p.addObservers() + }() +} + +func (p *Poller) addObservers() { + go p.handleAgentStop() +} + +func (p *Poller) handleInterval(ch chan interface{}) { + go func() { + data := <-ch + interval := data.(string) + i, err := strconv.Atoi(interval) + if err != nil { + log.Println(err) + return + } + p.scheduler.UpdateInterval(POLLING_ID, uint64(i)) }() } func (p *Poller) start() { p.once.Do(func() { - cr := &cron.Cron{} - p.runStat = cr.Start() - cr.AddTask(POLLING_ID, DEFAULT_INTERVAL).Invoke(p.polling, agentIdentifier()) + p.scheduler = &s.Scheduler{} + p.scheduler.Start() + p.scheduler.NewSchedule(POLLING_ID, DEFAULT_INTERVAL, p.polling) }) } func (p *Poller) Stop() { + p.scheduler.Stop() p.runStat <- false - observer.Notify(messages.POLLER_STOPPED, true) } func (p *Poller) polling(agentId string) { @@ -83,7 +93,7 @@ func (p *Poller) polling(agentId string) { //todo temporary client := pb.NewStatusClient(conn) - //printStatus(client) //stream + //printStatusStream(client) //stream out, err := client.Status(context.Background(), &pb.Empty{}) if err != nil { @@ -95,15 +105,29 @@ func (p *Poller) polling(agentId string) { func (p *Poller) dispatchNotify(result interface{}) { log.Printf("Polling result - %s ", result) + /* + TASK_SENSOR_START + TASK_SENSOR_STOP + TASK_SENSOR_ADD + TASK_SENSOR_REMOVE + TASK_SENSOR_UPDATE + TASK_CRAWLER_UPDATE + TASK_AGENT_UPDATE + TASK_LOG_SEND + */ } -func agentIdentifier() string { - //todo - return "agent000001" +func (p *Poller) updateInterval(interval string) { + i, err := strconv.Atoi(interval) + if err != nil { + log.Println(err) + return + } + p.scheduler.UpdateInterval(POLLING_ID, uint64(i)) } /* -func printStassssssssstus(client pb.StatusClient) { +func printStatusStream(client pb.StatusClient) { stream, err := client.Status(context.Background(), &pb.Empty{}) if err != nil { grpclog.Fatalf("%v.List(_) = _, %v", client, err) diff --git a/poller_event.go b/poller_event.go new file mode 100644 index 0000000..66b52ba --- /dev/null +++ b/poller_event.go @@ -0,0 +1,20 @@ +package long_poller_go + +import ( + "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" +) + +func handleConfigLoaded() { + ch := make(chan interface{}, 0) + observer.Add(messages.CFG_LOADED, ch) + startPoller(ch) + observer.Remove(messages.CFG_LOADED, ch) +} + +func (p *Poller) handleAgentStop() { + ch := make(chan interface{}, 0) + observer.Add(messages.AGT_STOPPED, ch) + p.Stop() + observer.Remove(messages.AGT_STOPPED, ch) +} diff --git a/poller_test.go b/poller_test.go index 9b7c52c..4a206f2 100644 --- a/poller_test.go +++ b/poller_test.go @@ -3,7 +3,6 @@ package long_poller_go import ( "loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer/messages" - "log" "testing" "time" ) @@ -16,6 +15,6 @@ func TestPolling(t *testing.T) { func TestTotal(t *testing.T) { time.Sleep(time.Second * 5) - observer.Notify(messages.AGENT_STARTED, "") //CONFIG LOCATION - time.Sleep(time.Second * 5) + observer.Notify(messages.CONFIGMANAGER_LOADED, "") //CONFIG LOCATION + time.Sleep(time.Second * 100) }