From 58c6c696ad08e9e4ae2dd37372fa8b48e7321060 Mon Sep 17 00:00:00 2001 From: "insanity@loafle.com" Date: Mon, 15 May 2017 19:32:48 +0900 Subject: [PATCH] collector --- collector.go | 87 +++++++++++++++++------------------- collector_event.go | 108 --------------------------------------------- 2 files changed, 41 insertions(+), 154 deletions(-) delete mode 100644 collector_event.go diff --git a/collector.go b/collector.go index 9501985..60f205b 100644 --- a/collector.go +++ b/collector.go @@ -2,7 +2,7 @@ package collector_go import ( "context" - confMng "loafle.com/overflow/agent_api/config_manager" + cm "loafle.com/overflow/agent_api/config_manager" "loafle.com/overflow/crawler_go/grpc" crm "loafle.com/overflow/crawler_manager_go" s "loafle.com/overflow/scheduler_go" @@ -17,8 +17,22 @@ var ( once sync.Once ) -func init() { - go handleConfigLoaded() +type Collector struct { + scheduler *s.Scheduler + cm cm.ConfigManager + dataCh chan interface{} +} + +func Start(started chan bool, dataCh chan interface{}, conf cm.ConfigManager) { + c := GetInstance() + c.dataCh = dataCh + c.start(started, conf) +} + +func Stop(stopped chan bool) { + c := GetInstance() + c.stop() + stopped <- true } func GetInstance() *Collector { @@ -28,63 +42,27 @@ func GetInstance() *Collector { return instance } -type Collector struct { - scheduler *s.Scheduler - cm confMng.ConfigManager - startSensorCh chan interface{} - stopSensorCh chan interface{} - addSensorCh chan interface{} - remSensorCh chan interface{} - updateSensorCh chan interface{} - crmSensorUpdateCh chan interface{} - crmUpdateCh chan interface{} - crmUpdateDoneCh chan interface{} -} -func (c *Collector) start(conf confMng.ConfigManager) { +func (c *Collector) start(started chan bool, conf cm.ConfigManager) { go func() { c.cm = conf c.scheduler = &s.Scheduler{} c.scheduler.Start() - c.addObservers() - c.notifyCollectorReady() for _, conf := range c.cm.GetSensors() { if err := c.addSensor(conf.Id); err != nil { log.Println(err) } } + started <- true }() } -func (c *Collector) addObservers() { - c.startSensorCh = make(chan interface{}) - c.stopSensorCh = make(chan interface{}) - c.addSensorCh = make(chan interface{}) - c.remSensorCh = make(chan interface{}) - c.updateSensorCh = make(chan interface{}) - c.crmSensorUpdateCh = make(chan interface{}) - c.crmUpdateCh = make(chan interface{}) - c.crmUpdateDoneCh = make(chan interface{}) - - go c.handleAgentWillStop() - go c.handleSensorStart() - go c.handleSensorStop() - go c.handleSensorAdd() - go c.handleSensorRemove() - go c.handleSensorUpdate() - go c.handleCrmSensorUpdateDone() - go c.handleCrawlerUpdate() - go c.handleCrawlerUpdateDone() -} - -func (c *Collector) Stop(d interface{}) { - c.cleanObserver(c.addSensorCh, c.remSensorCh) +func (c *Collector) stop() { c.scheduler.RemoveAllSchedule() c.scheduler.Stop() - c.notifyCollectorStopped() } func (c *Collector) collect(id string) { @@ -108,9 +86,9 @@ func (c *Collector) collect(id string) { if err != nil { log.Println(err) } - log.Println(out) + log.Println("collector get result : ", out) + c.dataCh <- out - c.notifyData(out) } func (c *Collector) addSensor(sensorId string) error { @@ -122,12 +100,29 @@ func (c *Collector) addSensor(sensorId string) error { return c.scheduler.NewSchedule(sensorId, uint64(interval), c.collect) } -func (c *Collector) removeSensor(id string) { +func (c *Collector) removeSensor(id string) error { if err := c.scheduler.RemoveSchedule(id); err != nil { - log.Println(err) + return err } + return nil } func (c *Collector) updateSensor(id string) { //update } + +func AddSensor(id string) error { + return GetInstance().addSensor(id) +} + +func RemSensor(id string) error { + return GetInstance().removeSensor(id) +} + +func StartSensor(id string) error { + return GetInstance().scheduler.StartSchedule(id) +} + +func StopSensor(id string) error { + return GetInstance().scheduler.StopSchedule(id) +} \ No newline at end of file diff --git a/collector_event.go b/collector_event.go deleted file mode 100644 index b2d4147..0000000 --- a/collector_event.go +++ /dev/null @@ -1,108 +0,0 @@ -package collector_go - -import ( - "loafle.com/overflow/agent_api/config_manager" - "loafle.com/overflow/agent_api/observer" - "loafle.com/overflow/agent_api/observer/messages" -) - -func handleConfigLoaded() { - ch := make(chan interface{}, 0) - observer.Add(messages.CRM_READY, ch) - - data := <-ch - confMng := data.(config_manager.ConfigManager) - - coll := GetInstance() - coll.start(confMng) - //observer.Remove(messages.CRM_READY, ch) -} - -func (c *Collector) notifyCollectorReady() { - observer.Notify(messages.CLT_READY, nil) -} - -func (c *Collector) notifyCollectorStopped() { - observer.Notify(messages.CLT_STOPPED, nil) -} - -func (c *Collector) handleAgentWillStop() { - ch := make(chan interface{}, 0) - observer.Add(messages.AGT_WILL_STOP, ch) - data := <-ch - c.Stop(data) - //observer.Remove(messages.AGT_WILL_STOP, ch) -} - -func (c *Collector) handleSensorStart() { - observer.Add(messages.TASK_SENSOR_START, c.startSensorCh) - configId := <-c.startSensorCh - err := c.addSensor(configId.(string)) - if err == nil { - //todo task done - } -} - -func (c *Collector) handleSensorStop() { - observer.Add(messages.TASK_SENSOR_STOP, c.stopSensorCh) - configId := <-c.stopSensorCh - c.removeSensor(configId.(string)) - //todo task done -} - -func (c *Collector) handleSensorAdd() { - observer.Add(messages.CRM_SENSOR_ADD_DONE, c.addSensorCh) - configId := <-c.addSensorCh - err := c.addSensor(configId.(string)) - if err == nil { - //todo task done - } -} - -func (c *Collector) handleSensorRemove() { - observer.Add(messages.TASK_SENSOR_REMOVE, c.remSensorCh) - configId := <-c.remSensorCh - c.removeSensor(configId.(string)) - observer.Notify(messages.CLT_SENSOR_REMOVE_DONE, configId) -} - -func (c *Collector) handleSensorUpdate() { - observer.Add(messages.TASK_SENSOR_UPDATE, c.updateSensorCh) - - configId := <-c.updateSensorCh - //todo update logic - observer.Notify(messages.CLT_SENSOR_UPDATE_DONE, configId) -} - -func (c *Collector) handleCrmSensorUpdateDone() { - observer.Add(messages.CRM_SENSOR_UPDATE_DONE, c.crmSensorUpdateCh) - - //configId := <-c.crmSensorUpdateCh - //todo update logic - //todo task done -} - -func (c *Collector) handleCrawlerUpdate() { - observer.Add(messages.TASK_CRAWLER_UPDATE, c.crmUpdateCh) - //todo. do what collector has to do -} - -func (c *Collector) handleCrawlerUpdateDone() { - observer.Add(messages.CRM_UPDATE_DONE, c.crmUpdateDoneCh) - //todo. task done -} - -func (c *Collector) cleanObserver(ach chan interface{}, rch chan interface{}) { - observer.Remove(messages.TASK_SENSOR_START, c.startSensorCh) - observer.Remove(messages.TASK_SENSOR_STOP, c.stopSensorCh) - observer.Remove(messages.CRM_SENSOR_ADD_DONE, c.addSensorCh) - observer.Remove(messages.TASK_SENSOR_REMOVE, c.remSensorCh) - observer.Remove(messages.TASK_SENSOR_UPDATE, c.updateSensorCh) - observer.Remove(messages.TASK_CRAWLER_UPDATE, c.crmUpdateCh) - observer.Remove(messages.CRM_SENSOR_UPDATE_DONE, c.crmSensorUpdateCh) - observer.Remove(messages.CRM_UPDATE_DONE, c.crmUpdateDoneCh) -} - -func (c *Collector) notifyData(data interface{}) { - observer.Notify(messages.QUEUE_DATA, data) -}