From 41d98d4b91631a7e3754ff4023c8ae5de514e741 Mon Sep 17 00:00:00 2001 From: "insanity@loafle.com" Date: Tue, 16 May 2017 16:11:50 +0900 Subject: [PATCH] poller --- poller.go | 99 +++++++++++++++++++++++++++---------------------------- 1 file changed, 49 insertions(+), 50 deletions(-) diff --git a/poller.go b/poller.go index 4241ea8..3c81e9a 100644 --- a/poller.go +++ b/poller.go @@ -1,10 +1,8 @@ package long_poller_go import ( - "context" - "google.golang.org/grpc" cm "loafle.com/overflow/agent_api/config_manager" - pb "loafle.com/overflow/crawler_go/grpc" //temp + //pb "loafle.com/overflow/crawler_go/grpc" //temp s "loafle.com/overflow/scheduler_go" "log" "strconv" @@ -27,22 +25,20 @@ type Poller struct { taskCh chan interface{} } -func Start(ch chan string, conf *cm.GlobalConfig) (chan interface{}, error) { +func Start(ch chan bool, conf *cm.GlobalConfig) error { + p := GetInstance() + p.gconf = conf + p.startPolling() + ch <- true + return nil +} + +func GetTaskCh() chan interface{} { p := GetInstance() p.taskCh = make(chan interface{}) - err := p.startPolling(conf) - if err != nil { - return nil, err - } - - key := getSecretKey() //todo. getting api key - ch <- key - return p.taskCh, nil + return p.taskCh } -func getSecretKey() string { - return "tempkey" -} func Stop(polStopped chan bool) error { GetInstance().stop() @@ -63,56 +59,59 @@ func UpdateInterval(interval string) error { return err } GetInstance().updateInterval(v) -} - -func (p *Poller) startPolling(conf *cm.GlobalConfig) error { - p.gconf = conf - p.scheduler = &s.Scheduler{} - p.scheduler.Start() - - err := p.scheduler.NewSchedule(POLLING_ID, uint64(p.gconf.IntervalSecond), p.polling) - if err != nil { - return err - } return nil } -func (p *Poller) handleInterval(ch chan interface{}) { +func (p *Poller) startPolling() { + go func() { - data := <-ch - interval := data.(string) - i, err := strconv.Atoi(interval) - if err != nil { - log.Println(err) - return - } - p.scheduler.UpdateInterval(POLLING_ID, uint64(i)) + p.scheduler = &s.Scheduler{} + p.scheduler.Start() + p.scheduler.NewSchedule(POLLING_ID, uint64(p.gconf.IntervalSecond), p.polling) }() } +//func (p *Poller) handleInterval(ch chan interface{}) { +// go func() { +// data := <-ch +// interval := data.(string) +// i, err := strconv.Atoi(interval) +// if err != nil { +// log.Println(err) +// return +// } +// p.scheduler.UpdateInterval(POLLING_ID, uint64(i)) +// }() +//} + func (p *Poller) stop() { p.scheduler.Stop() } func (p *Poller) polling(agentId string) { - addr := p.gconf.Central.Address + ":" + string(p.gconf.Central.Port) - conn, err := grpc.Dial(addr, grpc.WithInsecure()) - if err != nil { - log.Println(err) - return - } - defer conn.Close() + go func() { + log.Println("POLLING....") + /* + addr := p.gconf.Central.Address + ":" + string(p.gconf.Central.Port) + conn, err := grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + log.Println(err) + return + } + defer conn.Close() - //todo temporary - client := pb.NewStatusClient(conn) - //printStatusStream(client) //stream - out, err := client.Status(context.Background(), &pb.Empty{}) - if err != nil { - log.Println(err) - } + //todo temporary + client := pb.NewStatusClient(conn) + //printStatusStream(client) //stream + out, err := client.Status(context.Background(), &pb.Empty{}) + if err != nil { + log.Println(err) + } - p.taskCh <- out + p.taskCh <- out + */ + }() } func (p *Poller) updateInterval(interval int) {