collector_go/collector.go
insanity@loafle.com 39dabb08e7 collector
2017-05-11 19:27:04 +09:00

134 lines
2.9 KiB
Go

package collector_go
import (
"context"
confMng "loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/crawler_go/grpc"
crm "loafle.com/overflow/crawler_manager_go"
s "loafle.com/overflow/scheduler_go"
"log"
"strconv"
"sync"
"time"
)
var (
instance *Collector
once sync.Once
)
func init() {
go handleConfigLoaded()
}
func GetInstance() *Collector {
once.Do(func() {
instance = &Collector{}
})
return instance
}
type Collector struct {
scheduler *s.Scheduler
cm confMng.ConfigManager
startSensorCh chan interface{}
stopSensorCh chan interface{}
addSensorCh chan interface{}
remSensorCh chan interface{}
updateSensorCh chan interface{}
crmSensorUpdateCh chan interface{}
crmUpdateCh chan interface{}
crmUpdateDoneCh chan interface{}
}
func (c *Collector) start(conf confMng.ConfigManager) {
go func() {
c.cm = conf
c.scheduler = &s.Scheduler{}
c.scheduler.Start()
c.addObservers()
c.notifyCollectorReady()
for _, conf := range c.cm.GetSensors() {
if err := c.addSensor(conf.Id); err != nil {
log.Println(err)
}
}
}()
}
func (c *Collector) addObservers() {
c.startSensorCh = make(chan interface{})
c.stopSensorCh = make(chan interface{})
c.addSensorCh = make(chan interface{})
c.remSensorCh = make(chan interface{})
c.updateSensorCh = make(chan interface{})
c.crmSensorUpdateCh = make(chan interface{})
c.crmUpdateCh = make(chan interface{})
c.crmUpdateDoneCh = make(chan interface{})
go c.handleAgentWillStop()
go c.handleSensorStart()
go c.handleSensorStop()
go c.handleSensorAdd()
go c.handleSensorRemove()
go c.handleSensorUpdate()
go c.handleCrmSensorUpdateDone()
go c.handleCrawlerUpdate()
go c.handleCrawlerUpdateDone()
}
func (c *Collector) Stop(d interface{}) {
c.cleanObserver(c.addSensorCh, c.remSensorCh)
c.scheduler.RemoveAllSchedule()
c.scheduler.Stop()
c.notifyCollectorStopped()
}
func (c *Collector) collect(id string) {
conf := c.cm.GetSensorById(id)
log.Printf("COLLECT %s - [ID: %s] [Crawler : %s]", time.Now(), conf.Id, conf.Crawler.Name)
conn, err := crm.GetInstance().GetClient(conf.Crawler.Container)
if err != nil {
log.Println(err)
}
defer conn.Close()
dc := grpc.NewDataClient(conn)
in := &grpc.Input{}
in.Id = id
in.Name = grpc.Crawlers(grpc.Crawlers_value[conf.Crawler.Name])
out, err := dc.Get(context.Background(), in)
if err != nil {
log.Println(err)
}
log.Println(out)
c.notifyData(out)
}
func (c *Collector) addSensor(sensorId string) error {
sensor := c.cm.GetSensorById(sensorId)
interval, err := strconv.Atoi(sensor.Schedule.Interval)
if err != nil {
return err
}
return c.scheduler.NewSchedule(sensorId, uint64(interval), c.collect)
}
func (c *Collector) removeSensor(id string) {
if err := c.scheduler.RemoveSchedule(id); err != nil {
log.Println(err)
}
}
func (c *Collector) updateSensor(id string) {
//update
}