diff --git a/probe.go b/probe.go new file mode 100644 index 0000000..0058814 --- /dev/null +++ b/probe.go @@ -0,0 +1,292 @@ +package main + +import ( + log "github.com/cihub/seelog" + "loafle.com/overflow/agent/bootstrap" + "loafle.com/overflow/agent_api/config_manager" + msg "loafle.com/overflow/agent_api/messages" + col "loafle.com/overflow/collector_go" + cfg "loafle.com/overflow/config_manager_go" + crm "loafle.com/overflow/crawler_manager_go" + dat "loafle.com/overflow/data_sender_go" + evt "loafle.com/overflow/event_sender_go" + ini "loafle.com/overflow/initializer_go" + pol "loafle.com/overflow/long_poller_go" + "sync" +) + +const ( + // /home/geek/develop/gopath/src/loafle.com/overflow/config_manager_go/test_agent + PATH = "/home/insanity/Develop/gopath/src/loafle.com/overflow/config_manager_go/test_agent" +) + +var wg sync.WaitGroup + +func main() { + wg.Add(1) + go StartAgent() + wg.Wait() +} + +type Agent struct { + cm config_manager.ConfigManager + taskCh chan interface{} + secKey string +} + +func StartAgent() { + stopch := make(chan bool, 1) + bootstrap.HandleShell(stopch) + bootstrap.HandleSignal(stopch) + + agt := Agent{} + agt.startAgent() + + agt.handleAgentStop(stopch) +} + +func (agt *Agent) startAgent() { + //1. cfg start() + cfgStarted := make(chan *config_manager.GlobalConfig, 1) + if err := cfg.Start(cfgStarted, PATH); err != nil { + log.Error(err) + return + } + globalConf := <-cfgStarted + + // ini start() + iniStarted := make(chan string, 1) + if err := ini.Start(iniStarted, globalConf); err != nil { + log.Error(err) + return + } + agt.secKey = <-iniStarted + + //2. evt start() + evtStarted := make(chan bool, 1) + if err := evt.Start(evtStarted); err != nil { + log.Error(err) + return + } + <-evtStarted + + //3. dat start() + datStarted := make(chan bool, 1) + dat.Start(datStarted, globalConf) + <-datStarted + + //4. pol start() + polStarted := make(chan bool, 1) + if err := pol.Start(polStarted, globalConf); err != nil { + log.Error(err) + return + } + <-polStarted + agt.taskCh = pol.GetTaskCh() + + /** + Todo : Getting an auth key from API server. + */ + + //5. scm start() + scmStarted := make(chan config_manager.ConfigManager, 1) + if err := cfg.StartSensorConfig(scmStarted, agt.secKey); err != nil { + log.Error(err) + return + } + agt.cm = <-scmStarted + + //6. crm start() + crmStarted := make(chan bool, 1) + crm.Start(crmStarted, agt.cm) + <-crmStarted + + //7. col start() + dataCh := make(chan interface{}) + colStarted := make(chan bool, 1) + col.Start(colStarted, dataCh, agt.cm) + <-colStarted + + //8.evt result + res := msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_START, nil} + evt.AddEventData(res) + + go agt.processCollectingData(dataCh) + go agt.waitAgentTask() +} + +func (agt *Agent) handleAgentStop(ch chan bool) { + go func() { + if <-ch { + agt.stopAgent() + log.Info("Agent has stopped.") + log.Flush() + close(ch) + wg.Done() + } + }() +} + +func (agt *Agent) waitAgentTask() { + for { + d := <-agt.taskCh + task := d.(msg.AgentTask) + switch task.Command { + case msg.TASK_POL_INTERVAL_UPDATE: + agt.processUpdatePolInterval(task) + break + case msg.TASK_SENSOR_START: + agt.processStartSensor(task) + break + case msg.TASK_SENSOR_STOP: + agt.processStopSensor(task) + break + case msg.TASK_SENSOR_ADD: + agt.processAddSensor(task) + break + case msg.TASK_SENSOR_REMOVE: + agt.processRemoveSensor(task) + break + case msg.TASK_SENSOR_UPDATE: + agt.processUpdateSensor(task) + break + case msg.TASK_AGENT_UPDATE: + agt.processUpdateAgent(task) + break + case msg.TASK_CRAWLER_UPDATE: + agt.processUpdateCRM(task) + break + case msg.TASK_LOG_SEND: + agt.processSendLog(task) + break + default: + + } + } +} + +func (agt *Agent) processCollectingData(ch chan interface{}) { + for { + data := <-ch + dat.AddData(data) + } +} + +func (agt *Agent) stopAgent() { + //col stop() + colStopped := make(chan bool, 1) + col.Stop(colStopped) + <-colStopped + + //crm stop() + crmStopped := make(chan bool, 1) + crm.Stop(crmStopped) + <-crmStopped + + //cfg stop() + cfgStopped := make(chan bool, 1) + cfg.Stop(cfgStopped) + <-cfgStopped + + //dat stop() + datStopped := make(chan bool, 1) + dat.Stop(datStopped) + <-datStopped + + //pol stop() + polStopped := make(chan bool, 1) + if err := pol.Stop(polStopped); err != nil { + return + } + <-polStopped + + evt.AddEventData(msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_STOP, nil}) + evt.Stop() + +} + +func (agt *Agent) processStartSensor(task msg.AgentTask) { + id := task.Params["sensorId"] + if err := col.StartSensor(id); err != nil { + log.Error(err) + return + } + + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} + +func (agt *Agent) processStopSensor(task msg.AgentTask) { + id := task.Params["sensorId"] + if err := col.StopSensor(id); err != nil { + log.Error(err) + return + } + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} + +func (agt *Agent) processAddSensor(task msg.AgentTask) { + path := task.Params["path"] + id := task.Params["id"] + cfg.AddSensor(path) + crm.AddSensor(id) + err := col.AddSensor(id) + if err != nil { + return + } + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} + +func (agt *Agent) processRemoveSensor(task msg.AgentTask) { + id := task.Params["sensorId"] + err := col.RemSensor(id) + if err != nil { + log.Error(err) + return + } + crm.RemoveSensor(id) + cfg.RemoveSensor(id) + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} + +func (agt *Agent) processUpdateSensor(task msg.AgentTask) { + path := task.Params["path"] + id := task.Params["id"] + if err := col.StopSensor(id); err != nil { + log.Error(err) + return + } + cfg.UpdateSensor(path) + crm.UpdateSensor(id) + if err := col.UpdateSensor(id); err != nil { + log.Error(err) + return + } + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} + +func (agt *Agent) processUpdateCRM(task msg.AgentTask) { + //(loop)col stopSensor() + //crm updateCRM() + //(loop)col startSensor() + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} + +func (agt *Agent) processUpdateAgent(task msg.AgentTask) { + //Todo. Updating agent + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} + +func (agt *Agent) processSendLog(task msg.AgentTask) { + //Todo. Sending logs + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} + +func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) { + interval := task.Params["interval"] + err := pol.UpdateInterval(interval) + if err != nil { + log.Error(err) + return + } + evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) +} diff --git a/probe_test.go b/probe_test.go new file mode 100644 index 0000000..63217a8 --- /dev/null +++ b/probe_test.go @@ -0,0 +1,7 @@ +package main + +import "testing" + +func TestFFF(t *testing.T) { + t.Log("aaaa") +} \ No newline at end of file