From d61f2c58df9e19b0480edf2c0d4794be4bde19b1 Mon Sep 17 00:00:00 2001 From: "insanity@loafle.com" Date: Tue, 16 May 2017 16:12:00 +0900 Subject: [PATCH] agent --- agent.go | 69 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/agent.go b/agent.go index 3f2890c..0d7ffea 100644 --- a/agent.go +++ b/agent.go @@ -1,15 +1,16 @@ package main import ( - //crm "loafle.com/overflow/crawler_manager_go" "loafle.com/overflow/agent_api/config_manager" - cfg "loafle.com/overflow/config_manager_go" msg "loafle.com/overflow/agent_api/messages" col "loafle.com/overflow/collector_go" + cfg "loafle.com/overflow/config_manager_go" + crm "loafle.com/overflow/crawler_manager_go" dat "loafle.com/overflow/data_sender_go" evt "loafle.com/overflow/event_sender_go" pol "loafle.com/overflow/long_poller_go" "log" + "sync" ) const ( @@ -18,13 +19,16 @@ const ( func main() { log.SetPrefix("Agent : ") - StartAgent() + var wg sync.WaitGroup + wg.Add(1) + go StartAgent() + wg.Wait() } - type Agent struct { cm config_manager.ConfigManager taskCh chan interface{} + } func StartAgent() { @@ -34,7 +38,7 @@ func StartAgent() { func (agt *Agent) startAgent() { //1. cfg start() - cfgStarted := make(chan *config_manager.GlobalConfig) + cfgStarted := make(chan *config_manager.GlobalConfig, 1) if err := cfg.Start(cfgStarted, PATH); err != nil { log.Println(err) return @@ -42,18 +46,20 @@ func (agt *Agent) startAgent() { globalConf := <-cfgStarted //2. evt start() - evtStarted := make(chan bool, 0) + evtStarted := make(chan bool, 1) if err := evt.Start(evtStarted); err != nil { log.Println(err) return } <-evtStarted + //3. dat start() - datStarted := make(chan bool) + datStarted := make(chan bool, 1) dat.Start(datStarted, globalConf) <-datStarted + //4. pol start() - polStarted := make(chan bool) + polStarted := make(chan bool, 1) if err := pol.Start(polStarted, globalConf); err != nil { log.Println(err) return @@ -62,7 +68,7 @@ func (agt *Agent) startAgent() { agt.taskCh = pol.GetTaskCh() //5. scm start(authKey) - scmStarted := make(chan config_manager.ConfigManager) + scmStarted := make(chan config_manager.ConfigManager, 1) if err := cfg.StartSensorConfig(scmStarted, "temp"); err != nil { log.Println(err) return @@ -70,9 +76,13 @@ func (agt *Agent) startAgent() { agt.cm = <-scmStarted //6. crm start() + crmStarted := make(chan bool, 1) + crm.Start(crmStarted, agt.cm) + <-crmStarted + //7. col start() dataCh := make(chan interface{}) - colStarted := make(chan bool) + colStarted := make(chan bool, 1) col.Start(colStarted, dataCh, agt.cm) <-colStarted @@ -133,29 +143,35 @@ func (agt *Agent) processCollectingData(ch chan interface{}) { func (agt *Agent) stopAgent() { //col stop() - colStopped := make(chan bool) + colStopped := make(chan bool, 1) col.Stop(colStopped) <-colStopped + //crm stop() + crmStopped := make(chan bool, 1) + crm.Stop(crmStopped) + <-crmStopped + //cfg stop() - cfgStopped := make(chan bool) + cfgStopped := make(chan bool, 1) cfg.Stop(cfgStopped) <-cfgStopped + //dat stop() - datStopped := make(chan bool) + datStopped := make(chan bool, 1) dat.Stop(datStopped) <-datStopped + //pol stop() - polStopped := make(chan bool) - err := pol.Stop(polStopped) - if err != nil { - evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err }) + polStopped := make(chan bool, 1) + if err := pol.Stop(polStopped); err != nil { + evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err}) return } <-polStopped //evt result - evt.AddEventData(msg.TaskResult{msg.AGT_STOP, true, nil }) + evt.AddEventData(msg.TaskResult{msg.AGT_STOP, true, nil}) //evt stop() evt.Stop() @@ -165,14 +181,14 @@ func (agt *Agent) processStartSensor(task msg.AgentTask) { //col startSensor() -> evt result id := task.Params["sensorId"] col.StartSensor(id) - evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_START, true, nil }) + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_START, true, nil}) } func (agt *Agent) processStopSensor(task msg.AgentTask) { //col stopSensor() -> evt result id := task.Params["sensorId"] col.StopSensor(id) - evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_STOP, true, nil }) + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_STOP, true, nil}) } func (agt *Agent) processAddSensor(task msg.AgentTask) { @@ -182,7 +198,7 @@ func (agt *Agent) processAddSensor(task msg.AgentTask) { //col addSensor() err := col.AddSensor(id) if err != nil { - evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, false, err }) + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, false, err}) return } //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, true, nil }) @@ -193,7 +209,7 @@ func (agt *Agent) processRemoveSensor(task msg.AgentTask) { //col removeSensor() err := col.RemSensor(id) if err != nil { - evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, false, err }) + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, false, err}) } //crm removeSensor() //scm removeSensor() @@ -204,13 +220,13 @@ func (agt *Agent) processUpdateSensor(task msg.AgentTask) { id := task.Params["sensorId"] //col stopSensor() if err := col.StopSensor(id); err != nil { - evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err }) + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err}) } //scm updateSensor() //crm updateSensor() //col updateSensor() if err := col.UpdateSensor(id); err != nil { - evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err }) + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err}) } //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, true, nil }) } @@ -236,9 +252,8 @@ func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) { interval := task.Params["interval"] err := pol.UpdateInterval(interval) if err != nil { - evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, false, err }) + evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, false, err}) return } - evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, true, nil }) + evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, true, nil}) } -