long poller

This commit is contained in:
insanity@loafle.com 2017-05-11 18:36:20 +09:00
parent fef63ceb94
commit fbd63c533e
3 changed files with 77 additions and 34 deletions

View File

@ -3,14 +3,12 @@ package long_poller_go
import ( import (
"context" "context"
"google.golang.org/grpc" "google.golang.org/grpc"
"loafle.com/overflow/cron_go"
"log"
"sync"
cm "loafle.com/overflow/agent_api/config_manager" cm "loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
pb "loafle.com/overflow/crawler_go/grpc" //temp pb "loafle.com/overflow/crawler_go/grpc" //temp
s "loafle.com/overflow/scheduler_go"
"log"
"strconv"
"sync"
) )
const ( const (
@ -24,13 +22,15 @@ var (
) )
type Poller struct { type Poller struct {
once sync.Once once sync.Once
runStat chan bool runStat chan bool
gconf *cm.GlobalConfig gconf *cm.GlobalConfig
intervalCh chan interface{}
scheduler *s.Scheduler
} }
func init() { func init() {
addObservers() go handleConfigLoaded()
} }
func GetInstance() *Poller { func GetInstance() *Poller {
@ -40,34 +40,44 @@ func GetInstance() *Poller {
return instance return instance
} }
func addObservers() { func startPoller(ch chan interface{}) {
ch := make(chan interface{}, 0) p := GetInstance()
observer.Add(messages.CONFIGMANAGER_LOADED, ch)
handleInit(ch)
}
func handleInit(ch chan interface{}) {
ds := GetInstance()
go func() { go func() {
data := <-ch data := <-ch
log.Println(data) p.gconf = data.(cm.ConfigManager).GetGlobalConfig()
ds.gconf = data.(cm.ConfigManager).GetGlobalConfig() p.start()
ds.start() p.addObservers()
close(ch) }()
}
func (p *Poller) addObservers() {
go p.handleAgentStop()
}
func (p *Poller) handleInterval(ch chan interface{}) {
go func() {
data := <-ch
interval := data.(string)
i, err := strconv.Atoi(interval)
if err != nil {
log.Println(err)
return
}
p.scheduler.UpdateInterval(POLLING_ID, uint64(i))
}() }()
} }
func (p *Poller) start() { func (p *Poller) start() {
p.once.Do(func() { p.once.Do(func() {
cr := &cron.Cron{} p.scheduler = &s.Scheduler{}
p.runStat = cr.Start() p.scheduler.Start()
cr.AddTask(POLLING_ID, DEFAULT_INTERVAL).Invoke(p.polling, agentIdentifier()) p.scheduler.NewSchedule(POLLING_ID, DEFAULT_INTERVAL, p.polling)
}) })
} }
func (p *Poller) Stop() { func (p *Poller) Stop() {
p.scheduler.Stop()
p.runStat <- false p.runStat <- false
observer.Notify(messages.POLLER_STOPPED, true)
} }
func (p *Poller) polling(agentId string) { func (p *Poller) polling(agentId string) {
@ -83,7 +93,7 @@ func (p *Poller) polling(agentId string) {
//todo temporary //todo temporary
client := pb.NewStatusClient(conn) client := pb.NewStatusClient(conn)
//printStatus(client) //stream //printStatusStream(client) //stream
out, err := client.Status(context.Background(), &pb.Empty{}) out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil { if err != nil {
@ -95,15 +105,29 @@ func (p *Poller) polling(agentId string) {
func (p *Poller) dispatchNotify(result interface{}) { func (p *Poller) dispatchNotify(result interface{}) {
log.Printf("Polling result - %s ", result) log.Printf("Polling result - %s ", result)
/*
TASK_SENSOR_START
TASK_SENSOR_STOP
TASK_SENSOR_ADD
TASK_SENSOR_REMOVE
TASK_SENSOR_UPDATE
TASK_CRAWLER_UPDATE
TASK_AGENT_UPDATE
TASK_LOG_SEND
*/
} }
func agentIdentifier() string { func (p *Poller) updateInterval(interval string) {
//todo i, err := strconv.Atoi(interval)
return "agent000001" if err != nil {
log.Println(err)
return
}
p.scheduler.UpdateInterval(POLLING_ID, uint64(i))
} }
/* /*
func printStassssssssstus(client pb.StatusClient) { func printStatusStream(client pb.StatusClient) {
stream, err := client.Status(context.Background(), &pb.Empty{}) stream, err := client.Status(context.Background(), &pb.Empty{})
if err != nil { if err != nil {
grpclog.Fatalf("%v.List(_) = _, %v", client, err) grpclog.Fatalf("%v.List(_) = _, %v", client, err)

20
poller_event.go Normal file
View File

@ -0,0 +1,20 @@
package long_poller_go
import (
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
)
func handleConfigLoaded() {
ch := make(chan interface{}, 0)
observer.Add(messages.CFG_LOADED, ch)
startPoller(ch)
observer.Remove(messages.CFG_LOADED, ch)
}
func (p *Poller) handleAgentStop() {
ch := make(chan interface{}, 0)
observer.Add(messages.AGT_STOPPED, ch)
p.Stop()
observer.Remove(messages.AGT_STOPPED, ch)
}

View File

@ -3,7 +3,6 @@ package long_poller_go
import ( import (
"loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages" "loafle.com/overflow/agent_api/observer/messages"
"log"
"testing" "testing"
"time" "time"
) )
@ -16,6 +15,6 @@ func TestPolling(t *testing.T) {
func TestTotal(t *testing.T) { func TestTotal(t *testing.T) {
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
observer.Notify(messages.AGENT_STARTED, "") //CONFIG LOCATION observer.Notify(messages.CONFIGMANAGER_LOADED, "") //CONFIG LOCATION
time.Sleep(time.Second * 5) time.Sleep(time.Second * 100)
} }