package main import ( //crm "loafle.com/overflow/crawler_manager_go" cm "loafle.com/overflow/agent_api/config_manager" msg "loafle.com/overflow/agent_api/messages" col "loafle.com/overflow/collector_go" dat "loafle.com/overflow/data_sender_go" evt "loafle.com/overflow/event_sender_go" pol "loafle.com/overflow/long_poller_go" "log" ) func main() { log.SetPrefix("Agent : ") StartAgent() } type Agent struct { cm cm.ConfigManager taskCh chan interface{} } func StartAgent() { agt := Agent{} agt.startAgent() } func (agt *Agent) startAgent() { //1. cfg start() //2. evt start() evtStartChn := make(chan bool, 0) err := evt.Start(evtStartChn) //if err != nil { // processError(err) //} <- evtStartChn //3. dat start() datStarted := make(chan bool) dat.Start(datStarted, agt.cm.GetGlobalConfig()) <-datStarted //4. pol start() -> getting an Auth key from API server and store that key. authKey := make(chan string) ch, err := pol.Start(authKey, agt.cm.GetGlobalConfig()) if err != nil { evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err }) return } <-authKey agt.taskCh = ch //5. scm start(secretKey) //6. crm start() //7. col start() -> evt result dataCh := make(chan interface{}) colStarted := make(chan bool) col.Start(colStarted, dataCh, agt.cm) <-colStarted res := msg.TaskResult{msg.AGT_START, true, nil} evt.AddEventData(res) go agt.processCollectingData(dataCh) //8. processing tasks go agt.waitAgentTask() } func (agt *Agent) waitAgentTask() { for { d := <-agt.taskCh task := d.(msg.AgentTask) switch task.Command { case msg.TASK_POLLER_INTERVAL_UPDATE: agt.processUpdatePolInterval(task) break case msg.TASK_SENSOR_START: agt.processStartSensor(task) break case msg.TASK_SENSOR_STOP: agt.processStopSensor(task) break case msg.TASK_SENSOR_ADD: agt.processAddSensor(task) break case msg.TASK_SENSOR_REMOVE: agt.processRemoveSensor(task) break case msg.TASK_SENSOR_UPDATE: agt.processUpdateSensor(task) break case msg.TASK_AGENT_UPDATE: agt.processUpdateAgent(task) break case msg.TASK_CRAWLER_UPDATE: agt.processUpdateCRM(task) break case msg.TASK_LOG_SEND: agt.processSendLog(task) break default: } } } func (agt *Agent) processCollectingData(ch chan interface{}) { for { data := <-ch dat.AddData(data) } } func (agt *Agent) stopAgent() { //col stop() colStopped := make(chan bool) col.Stop(colStopped) <-colStopped //crm stop() //scm stop() //dat stop() datStopped := make(chan bool) dat.Stop(datStopped) <-datStopped //pol stop() polStopped := make(chan bool) err := pol.Stop(polStopped) if err != nil { evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err }) return } <-polStopped //evt result evt.AddEventData(msg.TaskResult{msg.AGT_STOP, true, nil }) //evt stop() evt.Stop() } func (agt *Agent) processStartSensor(task msg.AgentTask) { //col startSensor() -> evt result id := task.Params["sensorId"] col.StartSensor(id) evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_START, true, nil }) } func (agt *Agent) processStopSensor(task msg.AgentTask) { //col stopSensor() -> evt result id := task.Params["sensorId"] col.StopSensor(id) evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_STOP, true, nil }) } func (agt *Agent) processAddSensor(task msg.AgentTask) { id := task.Params["sensorId"] //scm addSensor() //crm addSensor() //col addSensor() err := col.AddSensor(id) if err != nil { evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, false, err }) return } //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, true, nil }) } func (agt *Agent) processRemoveSensor(task msg.AgentTask) { id := task.Params["sensorId"] //col removeSensor() err := col.RemSensor(id) if err != nil { evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, false, err }) } //crm removeSensor() //scm removeSensor() //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, true, nil }) } func (agt *Agent) processUpdateSensor(task msg.AgentTask) { //col stopSensor() //scm updateSensor() //crm updateSensor() //col updateSensor() //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, true, nil }) } func (agt *Agent) processUpdateCRM(task msg.AgentTask) { //(loop)col stopSensor() //crm updateCRM() //(loop)col startSensor() //evt.AddEventData(msg.TaskResult{msg.TASK_CRAWLER_UPDATE, true, nil }) } func (agt *Agent) processUpdateAgent(task msg.AgentTask) { //mola //evt.AddEventData(msg.TaskResult{msg.TASK_AGENT_UPDATE, true, nil }) } func (agt *Agent) processSendLog(task msg.AgentTask) { //mola //evt.AddEventData(msg.TaskResult{msg.TASK_LOG_SEND, true, nil }) } func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) { interval := task.Params["interval"] err := pol.UpdateInterval(interval) if err != nil { evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, false, err }) return } evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, true, nil }) }