This commit is contained in:
insanity@loafle.com 2017-05-15 19:32:25 +09:00
parent 1bceb63bc4
commit 072de33afb

206
agent.go
View File

@ -2,29 +2,24 @@ package main
import ( import (
//crm "loafle.com/overflow/crawler_manager_go" //crm "loafle.com/overflow/crawler_manager_go"
//col "loafle.com/overflow/collector_go" cm "loafle.com/overflow/agent_api/config_manager"
//dat "loafle.com/overflow/data_sender_go" msg "loafle.com/overflow/agent_api/messages"
//evt "loafle.com/overflow/event_sender_go" col "loafle.com/overflow/collector_go"
dat "loafle.com/overflow/data_sender_go"
evt "loafle.com/overflow/event_sender_go"
pol "loafle.com/overflow/long_poller_go" pol "loafle.com/overflow/long_poller_go"
"log" "log"
"loafle.com/overflow/agent_api/observer/messages"
cm "loafle.com/overflow/agent_api/config_manager"
) )
func main() {
func main() {
log.SetPrefix("Agent : ") log.SetPrefix("Agent : ")
StartAgent() StartAgent()
} }
type AgentTask struct {
command string
params interface{}
}
type Agent struct { type Agent struct {
cm cm.ConfigManager
taskCh chan interface{} taskCh chan interface{}
cm cm.ConfigManager
} }
func StartAgent() { func StartAgent() {
@ -32,95 +27,180 @@ func StartAgent() {
agt.startAgent() agt.startAgent()
} }
func (agt *Agent) startAgent() { func (agt *Agent) startAgent() {
//cfg start() (chan confmanager, error) //1. cfg start()
//evt start() (chan bool, error) //2. evt start()
//dat start() (chan bool, error) //3. dat start()
datStarted := make(chan bool)
//pol start() (chan string, error) -> getting an Auth key from API server and store that key. dat.Start(datStarted, agt.cm.GetGlobalConfig())
ch, err := pol.Start(agt.cm.GetGlobalConfig()) <-datStarted
//4. pol start() -> getting an Auth key from API server and store that key.
authKey := make(chan string)
ch, err := pol.Start(authKey, agt.cm.GetGlobalConfig())
if err != nil { if err != nil {
processError(err) evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err })
return
} }
<-authKey
agt.taskCh = ch agt.taskCh = ch
//5. scm start(secretKey)
//6. crm start()
//7. col start() -> evt result
dataCh := make(chan interface{})
colStarted := make(chan bool)
col.Start(colStarted, dataCh, agt.cm)
<-colStarted
//scm start() with the Auth key (chan bool, error) res := msg.TaskResult{msg.AGT_START, true, nil}
//crm start() (chan bool, error) evt.AddEventData(res)
//col start() -> evt result
go agt.processCollectingData(dataCh)
//8. processing tasks
go agt.waitAgentTask() go agt.waitAgentTask()
} }
func (agt *Agent)waitAgentTask() { func (agt *Agent) waitAgentTask() {
for { for {
task := <- agt.taskCh d := <-agt.taskCh
switch task.(AgentTask).command { task := d.(msg.AgentTask)
case messages.TASK_SENSOR_START: switch task.Command {
agt.processStartSensor() case msg.TASK_POLLER_INTERVAL_UPDATE:
break; agt.processUpdatePolInterval(task)
break
case msg.TASK_SENSOR_START:
agt.processStartSensor(task)
break
case msg.TASK_SENSOR_STOP:
agt.processStopSensor(task)
break
case msg.TASK_SENSOR_ADD:
agt.processAddSensor(task)
break
case msg.TASK_SENSOR_REMOVE:
agt.processRemoveSensor(task)
break
case msg.TASK_SENSOR_UPDATE:
agt.processUpdateSensor(task)
break
case msg.TASK_AGENT_UPDATE:
agt.processUpdateAgent(task)
break
case msg.TASK_CRAWLER_UPDATE:
agt.processUpdateCRM(task)
break
case msg.TASK_LOG_SEND:
agt.processSendLog(task)
break
default: default:
} }
} }
} }
func (agt *Agent) processCollectingData(ch chan interface{}) {
for {
data := <-ch
dat.AddData(data)
}
}
func (agt *Agent) stopAgent() { func (agt *Agent) stopAgent() {
//col stop() (chan bool, error) //col stop()
//crm stop() (chan bool, error) colStopped := make(chan bool)
//scm stop() (chan bool, error) col.Stop(colStopped)
//dat stop() (chan bool, error) <-colStopped
//pol stop() (chan bool, error) //crm stop()
//evt stop() -> evt result (before stop) //scm stop()
//dat stop()
datStopped := make(chan bool)
dat.Stop(datStopped)
<-datStopped
//pol stop()
polStopped := make(chan bool)
err := pol.Stop(polStopped)
if err != nil {
evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err })
return
}
<-polStopped
//evt result
evt.AddEventData(msg.TaskResult{msg.AGT_STOP, true, nil })
//evt stop()
} }
func (agt *Agent) processStartSensor() { func (agt *Agent) processStartSensor(task msg.AgentTask) {
//col startSensor() -> evt result //col startSensor() -> evt result
id := task.Params["sensorId"]
col.StartSensor(id)
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_START, true, nil })
} }
func (agt *Agent) processstopSensor() { func (agt *Agent) processStopSensor(task msg.AgentTask) {
//col stopSensor() -> evt result //col stopSensor() -> evt result
id := task.Params["sensorId"]
col.StopSensor(id)
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_STOP, true, nil })
} }
func (agt *Agent) processAddSensor() { func (agt *Agent) processAddSensor(task msg.AgentTask) {
//scm addSensor() (id chan string, error) id := task.Params["sensorId"]
//crm addSensor() (id chan string, error) //scm addSensor()
//col addSensor() -> evt result //crm addSensor()
//col addSensor()
err := col.AddSensor(id)
if err != nil {
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, false, err })
return
}
//evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, true, nil })
} }
func (agt *Agent) processRemoveSensor() { func (agt *Agent) processRemoveSensor(task msg.AgentTask) {
//col removeSensor() (id chan string, error) id := task.Params["sensorId"]
//crm removeSensor() (id chan string, error) //col removeSensor()
//scm removeSensor() -> evt result err := col.RemSensor(id)
if err != nil {
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, false, err })
}
//crm removeSensor()
//scm removeSensor()
//evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, true, nil })
} }
func (agt *Agent) processUpdateSensor() { func (agt *Agent) processUpdateSensor(task msg.AgentTask) {
//col stopSensor() (id chan string, error) //col stopSensor()
//scm updateSensor() (id chan string, error) //scm updateSensor()
//crm updateSensor() (id chan string, error) //crm updateSensor()
//col updateSensor() -> evt result //col updateSensor()
//evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, true, nil })
} }
func (agt *Agent) processUpdateCRM() { func (agt *Agent) processUpdateCRM(task msg.AgentTask) {
//(loop)col stopSensor() (chan []string?, error) //(loop)col stopSensor()
//crm updateCRM() //crm updateCRM()
//(loop)col startSensor() -> evt result //(loop)col startSensor()
//evt.AddEventData(msg.TaskResult{msg.TASK_CRAWLER_UPDATE, true, nil })
} }
func (agt *Agent) processUpdateAgent() { func (agt *Agent) processUpdateAgent(task msg.AgentTask) {
// //mola
//evt.AddEventData(msg.TaskResult{msg.TASK_AGENT_UPDATE, true, nil })
} }
func (agt *Agent) processSendLog() { func (agt *Agent) processSendLog(task msg.AgentTask) {
// //mola
//evt.AddEventData(msg.TaskResult{msg.TASK_LOG_SEND, true, nil })
} }
func (agt *Agent) processUpdatePolInterval() { func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) {
//pol updateInterval() interval := task.Params["interval"]
err := pol.UpdateInterval(interval)
if err != nil {
evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, false, err })
return
}
evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, true, nil })
} }
func processError(err error) {
log.Println(err)
return
}