long_poller_go/poller.go
insanity@loafle.com 1b09bf2c49 poller
2017-05-23 11:13:46 +09:00

158 lines
2.9 KiB
Go

package long_poller_go
import (
cm "loafle.com/overflow/agent_api/config_manager"
//pb "loafle.com/overflow/crawler_go/grpc" //temp
"loafle.com/overflow/agent_api/messages"
s "loafle.com/overflow/scheduler_go"
"log"
"strconv"
"sync"
"time"
)
const (
POLLING_ID = "OVERFLOW_LONG_POLLING"
)
var (
instance *Poller
once sync.Once
)
type Poller struct {
runStat chan bool
gconf *cm.GlobalConfig
scheduler *s.Scheduler
taskCh chan interface{}
}
func Start(ch chan bool, conf *cm.GlobalConfig) error {
p := GetInstance()
p.gconf = conf
p.startPolling()
ch <- true
return nil
}
func GetTaskCh() chan interface{} {
p := GetInstance()
p.taskCh = make(chan interface{})
return p.taskCh
}
func Stop(polStopped chan bool) error {
GetInstance().stop()
polStopped <- true
return nil
}
func GetInstance() *Poller {
once.Do(func() {
instance = &Poller{}
})
return instance
}
func UpdateInterval(interval string) error {
v, err := strconv.Atoi(interval)
if err != nil {
return err
}
GetInstance().updateInterval(v)
return nil
}
func (p *Poller) startPolling() {
go func() {
p.scheduler = &s.Scheduler{}
p.scheduler.Start()
p.scheduler.NewSchedule(POLLING_ID, uint64(p.gconf.IntervalSecond), p.polling)
}()
}
//func (p *Poller) handleInterval(ch chan interface{}) {
// go func() {
// data := <-ch
// interval := data.(string)
// i, err := strconv.Atoi(interval)
// if err != nil {
// log.Println(err)
// return
// }
// p.scheduler.UpdateInterval(POLLING_ID, uint64(i))
// }()
//}
func (p *Poller) stop() {
p.scheduler.Stop()
}
var once1 sync.Once
func (p *Poller) polling(agentId string) {
go func() {
log.Println("POLLING....")
time.Sleep(time.Second * 20)
once1.Do(func() {
newTask := messages.AgentTask{}
newTask.Command = messages.TASK_POL_INTERVAL_UPDATE
param := make(map[string]string)
param["interval"] = "3"
newTask.Params = param
p.taskCh <- newTask
})
/*
addr := p.gconf.Central.Address + ":" + string(p.gconf.Central.Port)
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
log.Println(err)
return
}
defer conn.Close()
//todo temporary
client := pb.NewStatusClient(conn)
//printStatusStream(client) //stream
out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil {
log.Println(err)
}
p.taskCh <- out
*/
}()
}
func (p *Poller) updateInterval(interval int) {
p.scheduler.UpdateInterval(POLLING_ID, uint64(interval))
log.Printf("Polling interval has changed to %d seconds.\n", interval)
}
/*
func printStatusStream(client pb.StatusClient) {
stream, err := client.Status(context.Background(), &pb.Empty{})
if err != nil {
grpclog.Fatalf("%v.List(_) = _, %v", client, err)
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
grpclog.Fatalf("%v.List(_) = _, %v", client, err)
}
grpclog.Println(feature)
}
grpclog.Println("--------------------------")
}
*/