collector_go/collector_event.go

109 lines
2.9 KiB
Go
Raw Normal View History

2017-04-28 04:16:08 +00:00
package collector_go
import (
"loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
)
func handleConfigLoaded() {
ch := make(chan interface{}, 0)
2017-05-11 09:34:47 +00:00
observer.Add(messages.CRM_READY, ch)
2017-04-28 04:16:08 +00:00
data := <-ch
confMng := data.(config_manager.ConfigManager)
coll := GetInstance()
coll.start(confMng)
2017-05-11 11:06:04 +00:00
//observer.Remove(messages.CRM_READY, ch)
2017-05-11 09:34:47 +00:00
}
2017-04-28 04:16:08 +00:00
2017-05-11 09:34:47 +00:00
func (c *Collector) notifyCollectorReady() {
observer.Notify(messages.CLT_READY, nil)
2017-04-28 04:16:08 +00:00
}
2017-05-11 09:34:47 +00:00
func (c *Collector) notifyCollectorStopped() {
observer.Notify(messages.CLT_STOPPED, nil)
}
2017-04-28 04:16:08 +00:00
2017-05-11 09:34:47 +00:00
func (c *Collector) handleAgentWillStop() {
ch := make(chan interface{}, 0)
2017-05-11 10:05:37 +00:00
observer.Add(messages.AGT_WILL_STOP, ch)
2017-05-11 09:34:47 +00:00
data := <-ch
2017-05-11 10:05:37 +00:00
c.Stop(data)
2017-05-11 11:06:04 +00:00
//observer.Remove(messages.AGT_WILL_STOP, ch)
2017-04-28 04:16:08 +00:00
}
2017-05-11 09:34:47 +00:00
func (c *Collector) handleSensorStart() {
observer.Add(messages.TASK_SENSOR_START, c.startSensorCh)
configId := <-c.startSensorCh
err := c.addSensor(configId.(string))
if err == nil {
//todo task done
}
}
func (c *Collector) handleSensorStop() {
observer.Add(messages.TASK_SENSOR_STOP, c.stopSensorCh)
configId := <-c.stopSensorCh
c.removeSensor(configId.(string))
//todo task done
}
2017-04-28 04:16:08 +00:00
2017-05-11 09:34:47 +00:00
func (c *Collector) handleSensorAdd() {
observer.Add(messages.CRM_SENSOR_ADD_DONE, c.addSensorCh)
configId := <-c.addSensorCh
err := c.addSensor(configId.(string))
if err == nil {
//todo task done
2017-04-28 04:16:08 +00:00
}
}
2017-05-11 09:34:47 +00:00
func (c *Collector) handleSensorRemove() {
observer.Add(messages.TASK_SENSOR_REMOVE, c.remSensorCh)
configId := <-c.remSensorCh
c.removeSensor(configId.(string))
observer.Notify(messages.CLT_SENSOR_REMOVE_DONE, configId)
}
func (c *Collector) handleSensorUpdate() {
observer.Add(messages.TASK_SENSOR_UPDATE, c.updateSensorCh)
configId := <-c.updateSensorCh
//todo update logic
observer.Notify(messages.CLT_SENSOR_UPDATE_DONE, configId)
}
func (c *Collector) handleCrmSensorUpdateDone() {
observer.Add(messages.CRM_SENSOR_UPDATE_DONE, c.crmSensorUpdateCh)
//configId := <-c.crmSensorUpdateCh
//todo update logic
//todo task done
}
func (c *Collector) handleCrawlerUpdate() {
observer.Add(messages.TASK_CRAWLER_UPDATE, c.crmUpdateCh)
//todo. do what collector has to do
}
func (c *Collector) handleCrawlerUpdateDone() {
observer.Add(messages.CRM_UPDATE_DONE, c.crmUpdateDoneCh)
//todo. task done
}
func (c *Collector) cleanObserver(ach chan interface{}, rch chan interface{}) {
observer.Remove(messages.TASK_SENSOR_START, c.startSensorCh)
observer.Remove(messages.TASK_SENSOR_STOP, c.stopSensorCh)
observer.Remove(messages.CRM_SENSOR_ADD_DONE, c.addSensorCh)
observer.Remove(messages.TASK_SENSOR_REMOVE, c.remSensorCh)
observer.Remove(messages.TASK_SENSOR_UPDATE, c.updateSensorCh)
observer.Remove(messages.TASK_CRAWLER_UPDATE, c.crmUpdateCh)
observer.Remove(messages.CRM_SENSOR_UPDATE_DONE, c.crmSensorUpdateCh)
observer.Remove(messages.CRM_UPDATE_DONE, c.crmUpdateDoneCh)
}
func (c *Collector) notifyData(data interface{}) {
2017-05-11 09:58:10 +00:00
observer.Notify(messages.QUEUE_DATA, data)
2017-04-28 04:16:08 +00:00
}