diff --git a/agent.go b/agent.go index e45096a..b1adae9 100644 --- a/agent.go +++ b/agent.go @@ -2,29 +2,24 @@ package main import ( //crm "loafle.com/overflow/crawler_manager_go" - //col "loafle.com/overflow/collector_go" - //dat "loafle.com/overflow/data_sender_go" - //evt "loafle.com/overflow/event_sender_go" + cm "loafle.com/overflow/agent_api/config_manager" + msg "loafle.com/overflow/agent_api/messages" + col "loafle.com/overflow/collector_go" + dat "loafle.com/overflow/data_sender_go" + evt "loafle.com/overflow/event_sender_go" pol "loafle.com/overflow/long_poller_go" "log" - "loafle.com/overflow/agent_api/observer/messages" - cm "loafle.com/overflow/agent_api/config_manager" ) - -func main() { +func main() { log.SetPrefix("Agent : ") StartAgent() } -type AgentTask struct { - command string - params interface{} -} type Agent struct { + cm cm.ConfigManager taskCh chan interface{} - cm cm.ConfigManager } func StartAgent() { @@ -32,95 +27,180 @@ func StartAgent() { agt.startAgent() } - func (agt *Agent) startAgent() { - //cfg start() (chan confmanager, error) - //evt start() (chan bool, error) - //dat start() (chan bool, error) - - //pol start() (chan string, error) -> getting an Auth key from API server and store that key. - ch, err := pol.Start(agt.cm.GetGlobalConfig()) + //1. cfg start() + //2. evt start() + //3. dat start() + datStarted := make(chan bool) + dat.Start(datStarted, agt.cm.GetGlobalConfig()) + <-datStarted + //4. pol start() -> getting an Auth key from API server and store that key. + authKey := make(chan string) + ch, err := pol.Start(authKey, agt.cm.GetGlobalConfig()) if err != nil { - processError(err) + evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err }) + return } + <-authKey agt.taskCh = ch + //5. scm start(secretKey) + //6. crm start() + //7. col start() -> evt result + dataCh := make(chan interface{}) + colStarted := make(chan bool) + col.Start(colStarted, dataCh, agt.cm) + <-colStarted - //scm start() with the Auth key (chan bool, error) - //crm start() (chan bool, error) - //col start() -> evt result + res := msg.TaskResult{msg.AGT_START, true, nil} + evt.AddEventData(res) + go agt.processCollectingData(dataCh) + + //8. processing tasks go agt.waitAgentTask() } -func (agt *Agent)waitAgentTask() { +func (agt *Agent) waitAgentTask() { for { - task := <- agt.taskCh - switch task.(AgentTask).command { - case messages.TASK_SENSOR_START: - agt.processStartSensor() - break; + d := <-agt.taskCh + task := d.(msg.AgentTask) + switch task.Command { + case msg.TASK_POLLER_INTERVAL_UPDATE: + agt.processUpdatePolInterval(task) + break + case msg.TASK_SENSOR_START: + agt.processStartSensor(task) + break + case msg.TASK_SENSOR_STOP: + agt.processStopSensor(task) + break + case msg.TASK_SENSOR_ADD: + agt.processAddSensor(task) + break + case msg.TASK_SENSOR_REMOVE: + agt.processRemoveSensor(task) + break + case msg.TASK_SENSOR_UPDATE: + agt.processUpdateSensor(task) + break + case msg.TASK_AGENT_UPDATE: + agt.processUpdateAgent(task) + break + case msg.TASK_CRAWLER_UPDATE: + agt.processUpdateCRM(task) + break + case msg.TASK_LOG_SEND: + agt.processSendLog(task) + break default: } } } +func (agt *Agent) processCollectingData(ch chan interface{}) { + for { + data := <-ch + dat.AddData(data) + } +} func (agt *Agent) stopAgent() { - //col stop() (chan bool, error) - //crm stop() (chan bool, error) - //scm stop() (chan bool, error) - //dat stop() (chan bool, error) - //pol stop() (chan bool, error) - //evt stop() -> evt result (before stop) + //col stop() + colStopped := make(chan bool) + col.Stop(colStopped) + <-colStopped + //crm stop() + //scm stop() + //dat stop() + datStopped := make(chan bool) + dat.Stop(datStopped) + <-datStopped + //pol stop() + polStopped := make(chan bool) + err := pol.Stop(polStopped) + if err != nil { + evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err }) + return + } + <-polStopped + + //evt result + evt.AddEventData(msg.TaskResult{msg.AGT_STOP, true, nil }) + //evt stop() } -func (agt *Agent) processStartSensor() { +func (agt *Agent) processStartSensor(task msg.AgentTask) { //col startSensor() -> evt result + id := task.Params["sensorId"] + col.StartSensor(id) + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_START, true, nil }) } -func (agt *Agent) processstopSensor() { +func (agt *Agent) processStopSensor(task msg.AgentTask) { //col stopSensor() -> evt result + id := task.Params["sensorId"] + col.StopSensor(id) + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_STOP, true, nil }) } -func (agt *Agent) processAddSensor() { - //scm addSensor() (id chan string, error) - //crm addSensor() (id chan string, error) - //col addSensor() -> evt result +func (agt *Agent) processAddSensor(task msg.AgentTask) { + id := task.Params["sensorId"] + //scm addSensor() + //crm addSensor() + //col addSensor() + err := col.AddSensor(id) + if err != nil { + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, false, err }) + return + } + //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, true, nil }) } -func (agt *Agent) processRemoveSensor() { - //col removeSensor() (id chan string, error) - //crm removeSensor() (id chan string, error) - //scm removeSensor() -> evt result +func (agt *Agent) processRemoveSensor(task msg.AgentTask) { + id := task.Params["sensorId"] + //col removeSensor() + err := col.RemSensor(id) + if err != nil { + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, false, err }) + } + //crm removeSensor() + //scm removeSensor() + //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, true, nil }) } -func (agt *Agent) processUpdateSensor() { - //col stopSensor() (id chan string, error) - //scm updateSensor() (id chan string, error) - //crm updateSensor() (id chan string, error) - //col updateSensor() -> evt result +func (agt *Agent) processUpdateSensor(task msg.AgentTask) { + //col stopSensor() + //scm updateSensor() + //crm updateSensor() + //col updateSensor() + //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, true, nil }) } -func (agt *Agent) processUpdateCRM() { - //(loop)col stopSensor() (chan []string?, error) +func (agt *Agent) processUpdateCRM(task msg.AgentTask) { + //(loop)col stopSensor() //crm updateCRM() - //(loop)col startSensor() -> evt result + //(loop)col startSensor() + //evt.AddEventData(msg.TaskResult{msg.TASK_CRAWLER_UPDATE, true, nil }) } -func (agt *Agent) processUpdateAgent() { - // +func (agt *Agent) processUpdateAgent(task msg.AgentTask) { + //mola + //evt.AddEventData(msg.TaskResult{msg.TASK_AGENT_UPDATE, true, nil }) } -func (agt *Agent) processSendLog() { - // +func (agt *Agent) processSendLog(task msg.AgentTask) { + //mola + //evt.AddEventData(msg.TaskResult{msg.TASK_LOG_SEND, true, nil }) } -func (agt *Agent) processUpdatePolInterval() { - //pol updateInterval() +func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) { + interval := task.Params["interval"] + err := pol.UpdateInterval(interval) + if err != nil { + evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, false, err }) + return + } + evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, true, nil }) } -func processError(err error) { - log.Println(err) - return -} \ No newline at end of file