From f7017e0021ec71d48d22ba1f5a58d8b5aa855949 Mon Sep 17 00:00:00 2001 From: "insanity@loafle.com" Date: Tue, 16 May 2017 15:08:08 +0900 Subject: [PATCH] agent --- agent.go | 55 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/agent.go b/agent.go index b1adae9..0ade834 100644 --- a/agent.go +++ b/agent.go @@ -2,7 +2,8 @@ package main import ( //crm "loafle.com/overflow/crawler_manager_go" - cm "loafle.com/overflow/agent_api/config_manager" + "loafle.com/overflow/agent_api/config_manager" + cfg "loafle.com/overflow/config_manager_go" msg "loafle.com/overflow/agent_api/messages" col "loafle.com/overflow/collector_go" dat "loafle.com/overflow/data_sender_go" @@ -11,6 +12,10 @@ import ( "log" ) +const ( + PATH = "/home/insanity/Develop/gopath/src/loafle.com/overflow/config_manager_go/test_agent" +) + func main() { log.SetPrefix("Agent : ") StartAgent() @@ -18,7 +23,7 @@ func main() { type Agent struct { - cm cm.ConfigManager + cm config_manager.ConfigManager taskCh chan interface{} } @@ -29,28 +34,44 @@ func StartAgent() { func (agt *Agent) startAgent() { //1. cfg start() + cfgStarted := make(chan *config_manager.GlobalConfig) + if err := cfg.Start(cfgStarted, PATH); err != nil { + log.Println(err) + return + } + globalConf := <-cfgStarted + //2. evt start() //3. dat start() datStarted := make(chan bool) - dat.Start(datStarted, agt.cm.GetGlobalConfig()) + dat.Start(datStarted, globalConf) <-datStarted - //4. pol start() -> getting an Auth key from API server and store that key. - authKey := make(chan string) - ch, err := pol.Start(authKey, agt.cm.GetGlobalConfig()) + //4. pol start() + polStarted := make(chan bool) + err := pol.Start(polStarted, globalConf) if err != nil { - evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err }) + log.Println(err) return } - <-authKey - agt.taskCh = ch - //5. scm start(secretKey) + <-polStarted + agt.taskCh = pol.GetTaskCh() + + //5. scm start(authKey) + scmStarted := make(chan config_manager.ConfigManager) + if err := cfg.StartSensorConfig(scmStarted, "temp"); err != nil { + log.Println(err) + return + } + agt.cm = <-scmStarted + //6. crm start() - //7. col start() -> evt result + //7. col start() dataCh := make(chan interface{}) colStarted := make(chan bool) col.Start(colStarted, dataCh, agt.cm) <-colStarted + //8.evt result res := msg.TaskResult{msg.AGT_START, true, nil} evt.AddEventData(res) @@ -111,7 +132,10 @@ func (agt *Agent) stopAgent() { col.Stop(colStopped) <-colStopped //crm stop() - //scm stop() + //cfg stop() + cfgStopped := make(chan bool) + cfg.Stop(cfgStopped) + <-cfgStopped //dat stop() datStopped := make(chan bool) dat.Stop(datStopped) @@ -170,10 +194,17 @@ func (agt *Agent) processRemoveSensor(task msg.AgentTask) { } func (agt *Agent) processUpdateSensor(task msg.AgentTask) { + id := task.Params["sensorId"] //col stopSensor() + if err := col.StopSensor(id); err != nil { + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err }) + } //scm updateSensor() //crm updateSensor() //col updateSensor() + if err := col.UpdateSensor(id); err != nil { + evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err }) + } //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, true, nil }) }