package main import ( "git.loafle.net/overflow/overflow_probe/agent_api/config_manager" msg "git.loafle.net/overflow/overflow_probe/agent_api/messages" "git.loafle.net/overflow/overflow_probe/bootstrap" discovery "git.loafle.net/overflow/overflow_probe/discovery/bootstrap" col "git.loafle.net/overflow/overflow_probe/collector" cfg "git.loafle.net/overflow/overflow_probe/config_manager" crm "git.loafle.net/overflow/overflow_probe/crawler_manager" dat "git.loafle.net/overflow/overflow_probe/data_sender" evt "git.loafle.net/overflow/overflow_probe/event_sender" ini "git.loafle.net/overflow/overflow_probe/initializer" log "github.com/cihub/seelog" //pol "git.loafle.net/overflow/overflow_probe/long_poller" "sync" ) const ( // /home/geek/develop/gopath/src/git.loafle.net/overflow/overflow_probe/config_manager/test_agent PATH = "/home/geek/Develop/gopath/src/git.loafle.net/overflow/overflow_probe/config_manager/test_agent" ) var wg sync.WaitGroup func main() { wg.Add(1) go StartAgent() wg.Wait() } type Probe struct { cm config_manager.ConfigManager taskCh chan interface{} secKey string } func StartAgent() { stopch := make(chan bool, 1) bootstrap.HandleShell(stopch) bootstrap.HandleSignal(stopch) prb := Probe{} prb.startProbe() // Todo discovery start delete discovery.Start() prb.handleProbeStop(stopch) } func (prb *Probe) startProbe() { //1. cfg start() cfgStarted := make(chan *config_manager.GlobalConfig, 1) if err := cfg.Start(cfgStarted, PATH); err != nil { log.Error(err) return } globalConf := <-cfgStarted // ini start() iniStarted := make(chan string, 1) if err := ini.Start(iniStarted, globalConf); err != nil { log.Error(err) return } prb.secKey = <-iniStarted //2. evt start() evtStarted := make(chan bool, 1) if err := evt.Start(evtStarted); err != nil { log.Error(err) return } <-evtStarted //3. dat start() datStarted := make(chan bool, 1) dat.Start(datStarted, globalConf) <-datStarted //4. pol start() //polStarted := make(chan bool, 1) //if err := pol.Start(polStarted, globalConf); err != nil { // log.Error(err) // return //} //<-polStarted //prb.taskCh = pol.GetTaskCh() /** Todo : Getting an auth key from API server. */ //5. scm start() scmStarted := make(chan config_manager.ConfigManager, 1) if err := cfg.StartSensorConfig(scmStarted, prb.secKey); err != nil { log.Error(err) return } prb.cm = <-scmStarted //6. crm start() crmStarted := make(chan bool, 1) crm.Start(crmStarted, prb.cm) <-crmStarted //7. col start() dataCh := make(chan interface{}) colStarted := make(chan bool, 1) col.Start(colStarted, dataCh, prb.cm) <-colStarted //8.evt result res := msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_START, nil} evt.AddEventData(res) go prb.processCollectingData(dataCh) go prb.waitAgentTask() } func (prb *Probe) handleProbeStop(ch chan bool) { go func() { if <-ch { prb.stopProbe() log.Info("Agent has stopped.") log.Flush() close(ch) wg.Done() } }() } func (prb *Probe) waitAgentTask() { for { d := <-prb.taskCh task := d.(msg.AgentTask) switch task.Command { //case msg.TASK_POL_INTERVAL_UPDATE: // prb.processUpdatePolInterval(task) // break case msg.TASK_SENSOR_START: prb.processStartSensor(task) break case msg.TASK_SENSOR_STOP: prb.processStopSensor(task) break case msg.TASK_SENSOR_ADD: prb.processAddSensor(task) break case msg.TASK_SENSOR_REMOVE: prb.processRemoveSensor(task) break case msg.TASK_SENSOR_UPDATE: prb.processUpdateSensor(task) break case msg.TASK_AGENT_UPDATE: prb.processUpdateAgent(task) break case msg.TASK_CRAWLER_UPDATE: prb.processUpdateCRM(task) break case msg.TASK_LOG_SEND: prb.processSendLog(task) break case msg.DISCOVERY_START: discovery.Start() break case msg.DISCOVERY_STOP: break default: } } } func (prb *Probe) processCollectingData(ch chan interface{}) { for { data := <-ch dat.AddData(data) } } func (prb *Probe) stopProbe() { //col stop() colStopped := make(chan bool, 1) col.Stop(colStopped) <-colStopped //crm stop() crmStopped := make(chan bool, 1) crm.Stop(crmStopped) <-crmStopped //cfg stop() cfgStopped := make(chan bool, 1) cfg.Stop(cfgStopped) <-cfgStopped //dat stop() datStopped := make(chan bool, 1) dat.Stop(datStopped) <-datStopped ////pol stop() //polStopped := make(chan bool, 1) //if err := pol.Stop(polStopped); err != nil { // return //} //<-polStopped evt.AddEventData(msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_STOP, nil}) evt.Stop() } func (prb *Probe) processStartSensor(task msg.AgentTask) { id := task.Params["sensorId"] if err := col.StartSensor(id); err != nil { log.Error(err) return } evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) } func (prb *Probe) processStopSensor(task msg.AgentTask) { id := task.Params["sensorId"] if err := col.StopSensor(id); err != nil { log.Error(err) return } evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) } func (prb *Probe) processAddSensor(task msg.AgentTask) { path := task.Params["path"] id := task.Params["id"] cfg.AddSensor(path) crm.AddSensor(id) err := col.AddSensor(id) if err != nil { return } evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) } func (prb *Probe) processRemoveSensor(task msg.AgentTask) { id := task.Params["sensorId"] err := col.RemSensor(id) if err != nil { log.Error(err) return } crm.RemoveSensor(id) cfg.RemoveSensor(id) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) } func (prb *Probe) processUpdateSensor(task msg.AgentTask) { path := task.Params["path"] id := task.Params["id"] if err := col.StopSensor(id); err != nil { log.Error(err) return } cfg.UpdateSensor(path) crm.UpdateSensor(id) if err := col.UpdateSensor(id); err != nil { log.Error(err) return } evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) } func (prb *Probe) processUpdateCRM(task msg.AgentTask) { //(loop)col stopSensor() //crm updateCRM() //(loop)col startSensor() evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) } func (prb *Probe) processUpdateAgent(task msg.AgentTask) { //Todo. Updating agent evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) } func (prb *Probe) processSendLog(task msg.AgentTask) { //Todo. Sending logs evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) } //func (prb *Agent) processUpdatePolInterval(task msg.AgentTask) { // interval := task.Params["interval"] // err := pol.UpdateInterval(interval) // if err != nil { // log.Error(err) // return // } // evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) //}