long poller

This commit is contained in:
insanity@loafle.com 2017-05-15 19:05:54 +09:00
parent 2627e114ff
commit 416eb4460b
3 changed files with 29 additions and 66 deletions

View File

@ -12,7 +12,7 @@ import (
) )
const ( const (
POLLING_ID = "OVERFLOW_LONG_POLLING" POLLING_ID = "OVERFLOW_LONG_POLLING"
) )
var ( var (
@ -21,23 +21,35 @@ var (
) )
type Poller struct { type Poller struct {
once sync.Once
runStat chan bool runStat chan bool
gconf *cm.GlobalConfig gconf *cm.GlobalConfig
scheduler *s.Scheduler scheduler *s.Scheduler
taskCh chan interface{} taskCh chan interface{}
} }
func Start(conf *cm.GlobalConfig) (chan interface{}, error) { func Start(ch chan string, conf *cm.GlobalConfig) (chan interface{}, error) {
p := GetInstance() p := GetInstance()
p.taskCh = make(chan interface{}) p.taskCh = make(chan interface{})
err := p.startPolling(conf) err := p.startPolling(conf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
key := getSecretKey() //todo. getting api key
ch <- key
return p.taskCh, nil return p.taskCh, nil
} }
func getSecretKey() string {
return "tempkey"
}
func Stop(polStopped chan bool) error {
GetInstance().stop()
polStopped <- true
return nil
}
func GetInstance() *Poller { func GetInstance() *Poller {
once.Do(func() { once.Do(func() {
instance = &Poller{} instance = &Poller{}
@ -45,6 +57,14 @@ func GetInstance() *Poller {
return instance return instance
} }
func UpdateInterval(interval string) error {
v, err := strconv.Atoi(interval)
if err != nil {
return err
}
GetInstance().updateInterval(v)
}
func (p *Poller) startPolling(conf *cm.GlobalConfig) error { func (p *Poller) startPolling(conf *cm.GlobalConfig) error {
p.gconf = conf p.gconf = conf
p.scheduler = &s.Scheduler{} p.scheduler = &s.Scheduler{}
@ -70,9 +90,8 @@ func (p *Poller) handleInterval(ch chan interface{}) {
}() }()
} }
func (p *Poller) Stop(ch chan interface{}) { func (p *Poller) stop() {
p.scheduler.Stop() p.scheduler.Stop()
p.runStat <- false
} }
func (p *Poller) polling(agentId string) { func (p *Poller) polling(agentId string) {
@ -87,28 +106,18 @@ func (p *Poller) polling(agentId string) {
//todo temporary //todo temporary
client := pb.NewStatusClient(conn) client := pb.NewStatusClient(conn)
//printStatusStream(client) //stream //printStatusStream(client) //stream
out, err := client.Status(context.Background(), &pb.Empty{}) out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
p.dispatchNotify(out)
p.taskCh <- out
} }
func (p *Poller) dispatchNotify(result interface{}) { func (p *Poller) updateInterval(interval int) {
log.Printf("Polling result - %s ", result) p.scheduler.UpdateInterval(POLLING_ID, uint64(interval))
p.taskCh <- result
}
func (p *Poller) updateInterval(interval string) {
i, err := strconv.Atoi(interval)
if err != nil {
log.Println(err)
return
}
p.scheduler.UpdateInterval(POLLING_ID, uint64(i))
} }
/* /*

View File

@ -1,27 +0,0 @@
package long_poller_go
//import (
// "loafle.com/overflow/agent_api/observer"
// "loafle.com/overflow/agent_api/observer/messages"
//)
//
//func handleConfigLoaded() {
// ch := make(chan interface{}, 0)
// observer.Add(messages.CFG_LOADED, ch)
// startPoller(ch)
//}
//
//func (p *Poller) handleAgentStop() {
// ch := make(chan interface{}, 0)
// observer.Add(messages.AGT_STOPPED, ch)
// _ = <-ch
// p.Stop(ch)
//}
//
//func (p *Poller) removeConfigLoadedHandler(ch chan interface{}) {
// observer.Remove(messages.CFG_LOADED, ch)
//}
//
//func (p *Poller) removeAgentStopHandler(ch chan interface{}) {
// observer.Remove(messages.AGT_STOPPED, ch)
//}

View File

@ -1,20 +1 @@
package long_poller_go package long_poller_go
//import (
// "loafle.com/overflow/agent_api/observer"
// "loafle.com/overflow/agent_api/observer/messages"
// "testing"
// "time"
//)
//
//func TestPolling(t *testing.T) {
// poller := &Poller{}
// poller.start()
//}
//
//func TestTotal(t *testing.T) {
// time.Sleep(time.Second * 5)
//
// observer.Notify(messages.CFG_LOADED, "") //CONFIG LOCATION
// time.Sleep(time.Second * 100)
//}