111 lines
3.0 KiB
Go
111 lines
3.0 KiB
Go
package collector_go
|
|
|
|
import (
|
|
"loafle.com/overflow/agent_api/config_manager"
|
|
"loafle.com/overflow/agent_api/observer"
|
|
"loafle.com/overflow/agent_api/observer/messages"
|
|
)
|
|
|
|
func handleConfigLoaded() {
|
|
ch := make(chan interface{}, 0)
|
|
observer.Add(messages.CRM_READY, ch)
|
|
|
|
data := <-ch
|
|
confMng := data.(config_manager.ConfigManager)
|
|
|
|
coll := GetInstance()
|
|
coll.start(confMng)
|
|
observer.Remove(messages.CRM_READY, ch)
|
|
}
|
|
|
|
func (c *Collector) notifyCollectorReady() {
|
|
observer.Notify(messages.CLT_READY, nil)
|
|
}
|
|
|
|
func (c *Collector) notifyCollectorStopped() {
|
|
observer.Notify(messages.CLT_STOPPED, nil)
|
|
}
|
|
|
|
func (c *Collector) handleAgentWillStop() {
|
|
ch := make(chan interface{}, 0)
|
|
observer.Add(messages.AGT_WILL_STOPPED, ch)
|
|
data := <-ch
|
|
if data {
|
|
c.Stop()
|
|
}
|
|
observer.Remove(messages.AGT_WILL_STOPPED, ch)
|
|
}
|
|
|
|
func (c *Collector) handleSensorStart() {
|
|
observer.Add(messages.TASK_SENSOR_START, c.startSensorCh)
|
|
configId := <-c.startSensorCh
|
|
err := c.addSensor(configId.(string))
|
|
if err == nil {
|
|
//todo task done
|
|
}
|
|
}
|
|
|
|
func (c *Collector) handleSensorStop() {
|
|
observer.Add(messages.TASK_SENSOR_STOP, c.stopSensorCh)
|
|
configId := <-c.stopSensorCh
|
|
c.removeSensor(configId.(string))
|
|
//todo task done
|
|
}
|
|
|
|
func (c *Collector) handleSensorAdd() {
|
|
observer.Add(messages.CRM_SENSOR_ADD_DONE, c.addSensorCh)
|
|
configId := <-c.addSensorCh
|
|
err := c.addSensor(configId.(string))
|
|
if err == nil {
|
|
//todo task done
|
|
}
|
|
}
|
|
|
|
func (c *Collector) handleSensorRemove() {
|
|
observer.Add(messages.TASK_SENSOR_REMOVE, c.remSensorCh)
|
|
configId := <-c.remSensorCh
|
|
c.removeSensor(configId.(string))
|
|
observer.Notify(messages.CLT_SENSOR_REMOVE_DONE, configId)
|
|
}
|
|
|
|
func (c *Collector) handleSensorUpdate() {
|
|
observer.Add(messages.TASK_SENSOR_UPDATE, c.updateSensorCh)
|
|
|
|
configId := <-c.updateSensorCh
|
|
//todo update logic
|
|
observer.Notify(messages.CLT_SENSOR_UPDATE_DONE, configId)
|
|
}
|
|
|
|
func (c *Collector) handleCrmSensorUpdateDone() {
|
|
observer.Add(messages.CRM_SENSOR_UPDATE_DONE, c.crmSensorUpdateCh)
|
|
|
|
//configId := <-c.crmSensorUpdateCh
|
|
//todo update logic
|
|
//todo task done
|
|
}
|
|
|
|
func (c *Collector) handleCrawlerUpdate() {
|
|
observer.Add(messages.TASK_CRAWLER_UPDATE, c.crmUpdateCh)
|
|
//todo. do what collector has to do
|
|
}
|
|
|
|
func (c *Collector) handleCrawlerUpdateDone() {
|
|
observer.Add(messages.CRM_UPDATE_DONE, c.crmUpdateDoneCh)
|
|
//todo. task done
|
|
}
|
|
|
|
func (c *Collector) cleanObserver(ach chan interface{}, rch chan interface{}) {
|
|
observer.Remove(messages.TASK_SENSOR_START, c.startSensorCh)
|
|
observer.Remove(messages.TASK_SENSOR_STOP, c.stopSensorCh)
|
|
observer.Remove(messages.CRM_SENSOR_ADD_DONE, c.addSensorCh)
|
|
observer.Remove(messages.TASK_SENSOR_REMOVE, c.remSensorCh)
|
|
observer.Remove(messages.TASK_SENSOR_UPDATE, c.updateSensorCh)
|
|
observer.Remove(messages.TASK_CRAWLER_UPDATE, c.crmUpdateCh)
|
|
observer.Remove(messages.CRM_SENSOR_UPDATE_DONE, c.crmSensorUpdateCh)
|
|
observer.Remove(messages.CRM_UPDATE_DONE, c.crmUpdateDoneCh)
|
|
}
|
|
|
|
func (c *Collector) notifyData(data interface{}) {
|
|
observer.Notify(messages.CLT_DATA_SEND, data)
|
|
}
|