long_poller_go/poller.go
insanity@loafle.com fef63ceb94 long poller
2017-04-28 13:16:39 +09:00

124 lines
2.2 KiB
Go

package long_poller_go
import (
"context"
"google.golang.org/grpc"
"loafle.com/overflow/cron_go"
"log"
"sync"
cm "loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
pb "loafle.com/overflow/crawler_go/grpc" //temp
)
const (
POLLING_ID = "OVERFLOW_LONG_POLLING"
DEFAULT_INTERVAL = uint64(3)
)
var (
instance *Poller
once sync.Once
)
type Poller struct {
once sync.Once
runStat chan bool
gconf *cm.GlobalConfig
}
func init() {
addObservers()
}
func GetInstance() *Poller {
once.Do(func() {
instance = &Poller{}
})
return instance
}
func addObservers() {
ch := make(chan interface{}, 0)
observer.Add(messages.CONFIGMANAGER_LOADED, ch)
handleInit(ch)
}
func handleInit(ch chan interface{}) {
ds := GetInstance()
go func() {
data := <-ch
log.Println(data)
ds.gconf = data.(cm.ConfigManager).GetGlobalConfig()
ds.start()
close(ch)
}()
}
func (p *Poller) start() {
p.once.Do(func() {
cr := &cron.Cron{}
p.runStat = cr.Start()
cr.AddTask(POLLING_ID, DEFAULT_INTERVAL).Invoke(p.polling, agentIdentifier())
})
}
func (p *Poller) Stop() {
p.runStat <- false
observer.Notify(messages.POLLER_STOPPED, true)
}
func (p *Poller) polling(agentId string) {
addr := p.gconf.Central.Address + ":" + string(p.gconf.Central.Port)
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
log.Println(err)
return
}
defer conn.Close()
//todo temporary
client := pb.NewStatusClient(conn)
//printStatus(client) //stream
out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil {
log.Println(err)
}
p.dispatchNotify(out)
}
func (p *Poller) dispatchNotify(result interface{}) {
log.Printf("Polling result - %s ", result)
}
func agentIdentifier() string {
//todo
return "agent000001"
}
/*
func printStassssssssstus(client pb.StatusClient) {
stream, err := client.Status(context.Background(), &pb.Empty{})
if err != nil {
grpclog.Fatalf("%v.List(_) = _, %v", client, err)
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
grpclog.Fatalf("%v.List(_) = _, %v", client, err)
}
grpclog.Println(feature)
}
grpclog.Println("--------------------------")
}
*/