package long_poller_go import ( cm "git.loafle.net/overflow/agent_api/config_manager" //pb "loafle.com/overflow/crawler_go/grpc" //temp "git.loafle.net/overflow/agent_api/messages" s "git.loafle.net/overflow/scheduler_go" "log" "strconv" "sync" "time" ) const ( POLLING_ID = "OVERFLOW_LONG_POLLING" ) var ( instance *Poller once sync.Once ) type Poller struct { runStat chan bool gconf *cm.GlobalConfig scheduler *s.Scheduler taskCh chan interface{} } func Start(ch chan bool, conf *cm.GlobalConfig) error { p := GetInstance() p.gconf = conf p.startPolling() ch <- true return nil } func GetTaskCh() chan interface{} { p := GetInstance() p.taskCh = make(chan interface{}) return p.taskCh } func Stop(polStopped chan bool) error { GetInstance().stop() polStopped <- true return nil } func GetInstance() *Poller { once.Do(func() { instance = &Poller{} }) return instance } func UpdateInterval(interval string) error { v, err := strconv.Atoi(interval) if err != nil { return err } GetInstance().updateInterval(v) return nil } func (p *Poller) startPolling() { go func() { p.scheduler = &s.Scheduler{} p.scheduler.Start() p.scheduler.NewSchedule(POLLING_ID, uint64(p.gconf.IntervalSecond), p.polling) }() } //func (p *Poller) handleInterval(ch chan interface{}) { // go func() { // data := <-ch // interval := data.(string) // i, err := strconv.Atoi(interval) // if err != nil { // log.Println(err) // return // } // p.scheduler.UpdateInterval(POLLING_ID, uint64(i)) // }() //} func (p *Poller) stop() { p.scheduler.Stop() } var once1 sync.Once func (p *Poller) polling(agentId string) { go func() { log.Println("POLLING....") time.Sleep(time.Second * 20) once1.Do(func() { newTask := messages.AgentTask{} newTask.Command = messages.TASK_POL_INTERVAL_UPDATE param := make(map[string]string) param["interval"] = "3" newTask.Params = param p.taskCh <- newTask }) /* addr := p.gconf.Central.Address + ":" + string(p.gconf.Central.Port) conn, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { log.Println(err) return } defer conn.Close() //todo temporary client := pb.NewStatusClient(conn) //printStatusStream(client) //stream out, err := client.Status(context.Background(), &pb.Empty{}) if err != nil { log.Println(err) } p.taskCh <- out */ }() } func (p *Poller) updateInterval(interval int) { p.scheduler.UpdateInterval(POLLING_ID, uint64(interval)) log.Printf("Polling interval has changed to %d seconds.\n", interval) } /* func printStatusStream(client pb.StatusClient) { stream, err := client.Status(context.Background(), &pb.Empty{}) if err != nil { grpclog.Fatalf("%v.List(_) = _, %v", client, err) } for { feature, err := stream.Recv() if err == io.EOF { break } if err != nil { grpclog.Fatalf("%v.List(_) = _, %v", client, err) } grpclog.Println(feature) } grpclog.Println("--------------------------") } */