This commit is contained in:
insanity@loafle.com 2017-05-16 16:11:50 +09:00
parent 447a3b372b
commit 41d98d4b91

View File

@ -1,10 +1,8 @@
package long_poller_go package long_poller_go
import ( import (
"context"
"google.golang.org/grpc"
cm "loafle.com/overflow/agent_api/config_manager" cm "loafle.com/overflow/agent_api/config_manager"
pb "loafle.com/overflow/crawler_go/grpc" //temp //pb "loafle.com/overflow/crawler_go/grpc" //temp
s "loafle.com/overflow/scheduler_go" s "loafle.com/overflow/scheduler_go"
"log" "log"
"strconv" "strconv"
@ -27,22 +25,20 @@ type Poller struct {
taskCh chan interface{} taskCh chan interface{}
} }
func Start(ch chan string, conf *cm.GlobalConfig) (chan interface{}, error) { func Start(ch chan bool, conf *cm.GlobalConfig) error {
p := GetInstance()
p.gconf = conf
p.startPolling()
ch <- true
return nil
}
func GetTaskCh() chan interface{} {
p := GetInstance() p := GetInstance()
p.taskCh = make(chan interface{}) p.taskCh = make(chan interface{})
err := p.startPolling(conf) return p.taskCh
if err != nil {
return nil, err
}
key := getSecretKey() //todo. getting api key
ch <- key
return p.taskCh, nil
} }
func getSecretKey() string {
return "tempkey"
}
func Stop(polStopped chan bool) error { func Stop(polStopped chan bool) error {
GetInstance().stop() GetInstance().stop()
@ -63,56 +59,59 @@ func UpdateInterval(interval string) error {
return err return err
} }
GetInstance().updateInterval(v) GetInstance().updateInterval(v)
}
func (p *Poller) startPolling(conf *cm.GlobalConfig) error {
p.gconf = conf
p.scheduler = &s.Scheduler{}
p.scheduler.Start()
err := p.scheduler.NewSchedule(POLLING_ID, uint64(p.gconf.IntervalSecond), p.polling)
if err != nil {
return err
}
return nil return nil
} }
func (p *Poller) handleInterval(ch chan interface{}) { func (p *Poller) startPolling() {
go func() { go func() {
data := <-ch p.scheduler = &s.Scheduler{}
interval := data.(string) p.scheduler.Start()
i, err := strconv.Atoi(interval) p.scheduler.NewSchedule(POLLING_ID, uint64(p.gconf.IntervalSecond), p.polling)
if err != nil {
log.Println(err)
return
}
p.scheduler.UpdateInterval(POLLING_ID, uint64(i))
}() }()
} }
//func (p *Poller) handleInterval(ch chan interface{}) {
// go func() {
// data := <-ch
// interval := data.(string)
// i, err := strconv.Atoi(interval)
// if err != nil {
// log.Println(err)
// return
// }
// p.scheduler.UpdateInterval(POLLING_ID, uint64(i))
// }()
//}
func (p *Poller) stop() { func (p *Poller) stop() {
p.scheduler.Stop() p.scheduler.Stop()
} }
func (p *Poller) polling(agentId string) { func (p *Poller) polling(agentId string) {
addr := p.gconf.Central.Address + ":" + string(p.gconf.Central.Port) go func() {
conn, err := grpc.Dial(addr, grpc.WithInsecure()) log.Println("POLLING....")
if err != nil { /*
log.Println(err) addr := p.gconf.Central.Address + ":" + string(p.gconf.Central.Port)
return conn, err := grpc.Dial(addr, grpc.WithInsecure())
} if err != nil {
defer conn.Close() log.Println(err)
return
}
defer conn.Close()
//todo temporary //todo temporary
client := pb.NewStatusClient(conn) client := pb.NewStatusClient(conn)
//printStatusStream(client) //stream //printStatusStream(client) //stream
out, err := client.Status(context.Background(), &pb.Empty{}) out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
p.taskCh <- out p.taskCh <- out
*/
}()
} }
func (p *Poller) updateInterval(interval int) { func (p *Poller) updateInterval(interval int) {