This commit is contained in:
insanity@loafle.com 2017-08-04 17:03:32 +09:00
parent f0cd311ed0
commit 92ee474889

View File

@ -39,13 +39,13 @@ func StartAgent() {
bootstrap.HandleShell(stopch) bootstrap.HandleShell(stopch)
bootstrap.HandleSignal(stopch) bootstrap.HandleSignal(stopch)
agt := Probe{} prb := Probe{}
agt.startProbe() prb.startProbe()
agt.handleProbeStop(stopch) prb.handleProbeStop(stopch)
} }
func (agt *Probe) startProbe() { func (prb *Probe) startProbe() {
//1. cfg start() //1. cfg start()
cfgStarted := make(chan *config_manager.GlobalConfig, 1) cfgStarted := make(chan *config_manager.GlobalConfig, 1)
if err := cfg.Start(cfgStarted, PATH); err != nil { if err := cfg.Start(cfgStarted, PATH); err != nil {
@ -60,7 +60,7 @@ func (agt *Probe) startProbe() {
log.Error(err) log.Error(err)
return return
} }
agt.secKey = <-iniStarted prb.secKey = <-iniStarted
//2. evt start() //2. evt start()
evtStarted := make(chan bool, 1) evtStarted := make(chan bool, 1)
@ -82,7 +82,7 @@ func (agt *Probe) startProbe() {
// return // return
//} //}
//<-polStarted //<-polStarted
//agt.taskCh = pol.GetTaskCh() //prb.taskCh = pol.GetTaskCh()
/** /**
Todo : Getting an auth key from API server. Todo : Getting an auth key from API server.
@ -90,35 +90,35 @@ func (agt *Probe) startProbe() {
//5. scm start() //5. scm start()
scmStarted := make(chan config_manager.ConfigManager, 1) scmStarted := make(chan config_manager.ConfigManager, 1)
if err := cfg.StartSensorConfig(scmStarted, agt.secKey); err != nil { if err := cfg.StartSensorConfig(scmStarted, prb.secKey); err != nil {
log.Error(err) log.Error(err)
return return
} }
agt.cm = <-scmStarted prb.cm = <-scmStarted
//6. crm start() //6. crm start()
crmStarted := make(chan bool, 1) crmStarted := make(chan bool, 1)
crm.Start(crmStarted, agt.cm) crm.Start(crmStarted, prb.cm)
<-crmStarted <-crmStarted
//7. col start() //7. col start()
dataCh := make(chan interface{}) dataCh := make(chan interface{})
colStarted := make(chan bool, 1) colStarted := make(chan bool, 1)
col.Start(colStarted, dataCh, agt.cm) col.Start(colStarted, dataCh, prb.cm)
<-colStarted <-colStarted
//8.evt result //8.evt result
res := msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_START, nil} res := msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_START, nil}
evt.AddEventData(res) evt.AddEventData(res)
go agt.processCollectingData(dataCh) go prb.processCollectingData(dataCh)
go agt.waitAgentTask() go prb.waitAgentTask()
} }
func (agt *Probe) handleProbeStop(ch chan bool) { func (prb *Probe) handleProbeStop(ch chan bool) {
go func() { go func() {
if <-ch { if <-ch {
agt.stopProbe() prb.stopProbe()
log.Info("Agent has stopped.") log.Info("Agent has stopped.")
log.Flush() log.Flush()
close(ch) close(ch)
@ -127,37 +127,37 @@ func (agt *Probe) handleProbeStop(ch chan bool) {
}() }()
} }
func (agt *Probe) waitAgentTask() { func (prb *Probe) waitAgentTask() {
for { for {
d := <-agt.taskCh d := <-prb.taskCh
task := d.(msg.AgentTask) task := d.(msg.AgentTask)
switch task.Command { switch task.Command {
//case msg.TASK_POL_INTERVAL_UPDATE: //case msg.TASK_POL_INTERVAL_UPDATE:
// agt.processUpdatePolInterval(task) // prb.processUpdatePolInterval(task)
// break // break
case msg.TASK_SENSOR_START: case msg.TASK_SENSOR_START:
agt.processStartSensor(task) prb.processStartSensor(task)
break break
case msg.TASK_SENSOR_STOP: case msg.TASK_SENSOR_STOP:
agt.processStopSensor(task) prb.processStopSensor(task)
break break
case msg.TASK_SENSOR_ADD: case msg.TASK_SENSOR_ADD:
agt.processAddSensor(task) prb.processAddSensor(task)
break break
case msg.TASK_SENSOR_REMOVE: case msg.TASK_SENSOR_REMOVE:
agt.processRemoveSensor(task) prb.processRemoveSensor(task)
break break
case msg.TASK_SENSOR_UPDATE: case msg.TASK_SENSOR_UPDATE:
agt.processUpdateSensor(task) prb.processUpdateSensor(task)
break break
case msg.TASK_AGENT_UPDATE: case msg.TASK_AGENT_UPDATE:
agt.processUpdateAgent(task) prb.processUpdateAgent(task)
break break
case msg.TASK_CRAWLER_UPDATE: case msg.TASK_CRAWLER_UPDATE:
agt.processUpdateCRM(task) prb.processUpdateCRM(task)
break break
case msg.TASK_LOG_SEND: case msg.TASK_LOG_SEND:
agt.processSendLog(task) prb.processSendLog(task)
break break
default: default:
@ -165,14 +165,14 @@ func (agt *Probe) waitAgentTask() {
} }
} }
func (agt *Probe) processCollectingData(ch chan interface{}) { func (prb *Probe) processCollectingData(ch chan interface{}) {
for { for {
data := <-ch data := <-ch
dat.AddData(data) dat.AddData(data)
} }
} }
func (agt *Probe) stopProbe() { func (prb *Probe) stopProbe() {
//col stop() //col stop()
colStopped := make(chan bool, 1) colStopped := make(chan bool, 1)
col.Stop(colStopped) col.Stop(colStopped)
@ -205,7 +205,7 @@ func (agt *Probe) stopProbe() {
} }
func (agt *Probe) processStartSensor(task msg.AgentTask) { func (prb *Probe) processStartSensor(task msg.AgentTask) {
id := task.Params["sensorId"] id := task.Params["sensorId"]
if err := col.StartSensor(id); err != nil { if err := col.StartSensor(id); err != nil {
log.Error(err) log.Error(err)
@ -215,7 +215,7 @@ func (agt *Probe) processStartSensor(task msg.AgentTask) {
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
} }
func (agt *Probe) processStopSensor(task msg.AgentTask) { func (prb *Probe) processStopSensor(task msg.AgentTask) {
id := task.Params["sensorId"] id := task.Params["sensorId"]
if err := col.StopSensor(id); err != nil { if err := col.StopSensor(id); err != nil {
log.Error(err) log.Error(err)
@ -224,7 +224,7 @@ func (agt *Probe) processStopSensor(task msg.AgentTask) {
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
} }
func (agt *Probe) processAddSensor(task msg.AgentTask) { func (prb *Probe) processAddSensor(task msg.AgentTask) {
path := task.Params["path"] path := task.Params["path"]
id := task.Params["id"] id := task.Params["id"]
cfg.AddSensor(path) cfg.AddSensor(path)
@ -236,7 +236,7 @@ func (agt *Probe) processAddSensor(task msg.AgentTask) {
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
} }
func (agt *Probe) processRemoveSensor(task msg.AgentTask) { func (prb *Probe) processRemoveSensor(task msg.AgentTask) {
id := task.Params["sensorId"] id := task.Params["sensorId"]
err := col.RemSensor(id) err := col.RemSensor(id)
if err != nil { if err != nil {
@ -248,7 +248,7 @@ func (agt *Probe) processRemoveSensor(task msg.AgentTask) {
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
} }
func (agt *Probe) processUpdateSensor(task msg.AgentTask) { func (prb *Probe) processUpdateSensor(task msg.AgentTask) {
path := task.Params["path"] path := task.Params["path"]
id := task.Params["id"] id := task.Params["id"]
if err := col.StopSensor(id); err != nil { if err := col.StopSensor(id); err != nil {
@ -264,24 +264,24 @@ func (agt *Probe) processUpdateSensor(task msg.AgentTask) {
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
} }
func (agt *Probe) processUpdateCRM(task msg.AgentTask) { func (prb *Probe) processUpdateCRM(task msg.AgentTask) {
//(loop)col stopSensor() //(loop)col stopSensor()
//crm updateCRM() //crm updateCRM()
//(loop)col startSensor() //(loop)col startSensor()
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
} }
func (agt *Probe) processUpdateAgent(task msg.AgentTask) { func (prb *Probe) processUpdateAgent(task msg.AgentTask) {
//Todo. Updating agent //Todo. Updating agent
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
} }
func (agt *Probe) processSendLog(task msg.AgentTask) { func (prb *Probe) processSendLog(task msg.AgentTask) {
//Todo. Sending logs //Todo. Sending logs
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil}) evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
} }
//func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) { //func (prb *Agent) processUpdatePolInterval(task msg.AgentTask) {
// interval := task.Params["interval"] // interval := task.Params["interval"]
// err := pol.UpdateInterval(interval) // err := pol.UpdateInterval(interval)
// if err != nil { // if err != nil {