agent/agent.go

262 lines
5.7 KiB
Go
Raw Normal View History

2017-05-11 09:39:01 +00:00
package main
import (
2017-05-16 06:08:08 +00:00
"loafle.com/overflow/agent_api/config_manager"
2017-05-15 10:32:25 +00:00
msg "loafle.com/overflow/agent_api/messages"
col "loafle.com/overflow/collector_go"
2017-05-16 07:12:00 +00:00
cfg "loafle.com/overflow/config_manager_go"
crm "loafle.com/overflow/crawler_manager_go"
2017-05-15 10:32:25 +00:00
dat "loafle.com/overflow/data_sender_go"
evt "loafle.com/overflow/event_sender_go"
2017-05-15 04:57:38 +00:00
pol "loafle.com/overflow/long_poller_go"
2017-05-11 10:29:32 +00:00
"log"
2017-05-16 07:12:00 +00:00
"sync"
2017-05-11 09:39:01 +00:00
)
2017-05-16 06:08:08 +00:00
const (
PATH = "/home/insanity/Develop/gopath/src/loafle.com/overflow/config_manager_go/test_agent"
)
2017-05-15 10:32:25 +00:00
func main() {
2017-05-11 10:29:32 +00:00
log.SetPrefix("Agent : ")
2017-05-16 09:03:18 +00:00
var wg sync.WaitGroup
2017-05-16 07:12:00 +00:00
wg.Add(1)
go StartAgent()
wg.Wait()
2017-05-15 05:46:12 +00:00
}
2017-05-15 04:57:38 +00:00
type Agent struct {
2017-05-16 06:08:08 +00:00
cm config_manager.ConfigManager
2017-05-15 05:46:12 +00:00
taskCh chan interface{}
2017-05-15 04:57:38 +00:00
}
2017-05-15 05:46:12 +00:00
func StartAgent() {
agt := Agent{}
2017-05-15 04:57:38 +00:00
agt.startAgent()
}
func (agt *Agent) startAgent() {
2017-05-15 10:32:25 +00:00
//1. cfg start()
2017-05-16 07:12:00 +00:00
cfgStarted := make(chan *config_manager.GlobalConfig, 1)
2017-05-16 06:08:08 +00:00
if err := cfg.Start(cfgStarted, PATH); err != nil {
log.Println(err)
return
}
globalConf := <-cfgStarted
2017-05-15 10:32:25 +00:00
//2. evt start()
2017-05-16 07:12:00 +00:00
evtStarted := make(chan bool, 1)
2017-05-16 06:10:29 +00:00
if err := evt.Start(evtStarted); err != nil {
log.Println(err)
return
}
<-evtStarted
2017-05-16 07:12:00 +00:00
2017-05-15 10:32:25 +00:00
//3. dat start()
2017-05-16 07:12:00 +00:00
datStarted := make(chan bool, 1)
2017-05-16 06:08:08 +00:00
dat.Start(datStarted, globalConf)
2017-05-15 10:32:25 +00:00
<-datStarted
2017-05-16 07:12:00 +00:00
2017-05-16 06:08:08 +00:00
//4. pol start()
2017-05-16 07:12:00 +00:00
polStarted := make(chan bool, 1)
2017-05-16 06:10:29 +00:00
if err := pol.Start(polStarted, globalConf); err != nil {
2017-05-16 06:08:08 +00:00
log.Println(err)
2017-05-15 10:32:25 +00:00
return
2017-05-15 05:46:12 +00:00
}
2017-05-16 06:08:08 +00:00
<-polStarted
agt.taskCh = pol.GetTaskCh()
//5. scm start(authKey)
2017-05-16 07:12:00 +00:00
scmStarted := make(chan config_manager.ConfigManager, 1)
2017-05-16 06:08:08 +00:00
if err := cfg.StartSensorConfig(scmStarted, "temp"); err != nil {
log.Println(err)
return
}
agt.cm = <-scmStarted
2017-05-15 10:32:25 +00:00
//6. crm start()
2017-05-16 07:12:00 +00:00
crmStarted := make(chan bool, 1)
crm.Start(crmStarted, agt.cm)
<-crmStarted
2017-05-16 06:08:08 +00:00
//7. col start()
2017-05-15 10:32:25 +00:00
dataCh := make(chan interface{})
2017-05-16 07:12:00 +00:00
colStarted := make(chan bool, 1)
2017-05-15 10:32:25 +00:00
col.Start(colStarted, dataCh, agt.cm)
<-colStarted
2017-05-16 06:08:08 +00:00
//8.evt result
2017-05-16 09:03:18 +00:00
res := msg.Event{msg.EVT_TYPE_NONE, msg.AGT_START, nil}
2017-05-15 10:32:25 +00:00
evt.AddEventData(res)
2017-05-15 05:46:12 +00:00
2017-05-15 10:32:25 +00:00
go agt.processCollectingData(dataCh)
2017-05-15 04:57:38 +00:00
2017-05-15 10:32:25 +00:00
//8. processing tasks
2017-05-15 05:46:12 +00:00
go agt.waitAgentTask()
2017-05-11 09:39:01 +00:00
}
2017-05-15 10:32:25 +00:00
func (agt *Agent) waitAgentTask() {
2017-05-15 04:57:38 +00:00
for {
2017-05-15 10:32:25 +00:00
d := <-agt.taskCh
task := d.(msg.AgentTask)
switch task.Command {
case msg.TASK_POLLER_INTERVAL_UPDATE:
agt.processUpdatePolInterval(task)
break
case msg.TASK_SENSOR_START:
agt.processStartSensor(task)
break
case msg.TASK_SENSOR_STOP:
agt.processStopSensor(task)
break
case msg.TASK_SENSOR_ADD:
agt.processAddSensor(task)
break
case msg.TASK_SENSOR_REMOVE:
agt.processRemoveSensor(task)
break
case msg.TASK_SENSOR_UPDATE:
agt.processUpdateSensor(task)
break
case msg.TASK_AGENT_UPDATE:
agt.processUpdateAgent(task)
break
case msg.TASK_CRAWLER_UPDATE:
agt.processUpdateCRM(task)
break
case msg.TASK_LOG_SEND:
agt.processSendLog(task)
break
2017-05-15 04:57:38 +00:00
default:
}
}
}
2017-05-11 09:39:01 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processCollectingData(ch chan interface{}) {
for {
data := <-ch
dat.AddData(data)
}
}
2017-05-11 10:29:32 +00:00
2017-05-15 04:57:38 +00:00
func (agt *Agent) stopAgent() {
2017-05-15 10:32:25 +00:00
//col stop()
2017-05-16 07:12:00 +00:00
colStopped := make(chan bool, 1)
2017-05-15 10:32:25 +00:00
col.Stop(colStopped)
<-colStopped
2017-05-16 07:12:00 +00:00
2017-05-15 10:32:25 +00:00
//crm stop()
2017-05-16 07:12:00 +00:00
crmStopped := make(chan bool, 1)
crm.Stop(crmStopped)
<-crmStopped
2017-05-16 06:08:08 +00:00
//cfg stop()
2017-05-16 07:12:00 +00:00
cfgStopped := make(chan bool, 1)
2017-05-16 06:08:08 +00:00
cfg.Stop(cfgStopped)
<-cfgStopped
2017-05-16 07:12:00 +00:00
2017-05-15 10:32:25 +00:00
//dat stop()
2017-05-16 07:12:00 +00:00
datStopped := make(chan bool, 1)
2017-05-15 10:32:25 +00:00
dat.Stop(datStopped)
<-datStopped
2017-05-16 07:12:00 +00:00
2017-05-15 10:32:25 +00:00
//pol stop()
2017-05-16 07:12:00 +00:00
polStopped := make(chan bool, 1)
if err := pol.Stop(polStopped); err != nil {
2017-05-15 10:32:25 +00:00
return
}
<-polStopped
//evt result
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_NONE, msg.AGT_STOP, nil})
2017-05-15 10:32:25 +00:00
//evt stop()
2017-05-16 04:07:28 +00:00
evt.Stop()
2017-05-15 04:57:38 +00:00
}
2017-05-11 10:29:32 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processStartSensor(task msg.AgentTask) {
2017-05-15 04:57:38 +00:00
//col startSensor() -> evt result
2017-05-15 10:32:25 +00:00
id := task.Params["sensorId"]
col.StartSensor(id)
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 04:57:38 +00:00
}
2017-05-11 10:29:32 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processStopSensor(task msg.AgentTask) {
2017-05-15 04:57:38 +00:00
//col stopSensor() -> evt result
2017-05-15 10:32:25 +00:00
id := task.Params["sensorId"]
col.StopSensor(id)
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 04:57:38 +00:00
}
2017-05-11 10:29:32 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processAddSensor(task msg.AgentTask) {
id := task.Params["sensorId"]
//scm addSensor()
//crm addSensor()
//col addSensor()
err := col.AddSensor(id)
if err != nil {
return
}
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 04:57:38 +00:00
}
2017-05-11 10:29:32 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processRemoveSensor(task msg.AgentTask) {
id := task.Params["sensorId"]
//col removeSensor()
err := col.RemSensor(id)
if err != nil {
2017-05-16 09:03:18 +00:00
log.Println(err)
return
2017-05-15 10:32:25 +00:00
}
//crm removeSensor()
//scm removeSensor()
//evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, true, nil })
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 04:57:38 +00:00
}
2017-05-11 10:29:32 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processUpdateSensor(task msg.AgentTask) {
2017-05-16 06:08:08 +00:00
id := task.Params["sensorId"]
2017-05-15 10:32:25 +00:00
//col stopSensor()
2017-05-16 06:08:08 +00:00
if err := col.StopSensor(id); err != nil {
2017-05-16 09:03:18 +00:00
log.Println(err)
return
2017-05-16 06:08:08 +00:00
}
2017-05-15 10:32:25 +00:00
//scm updateSensor()
//crm updateSensor()
//col updateSensor()
2017-05-16 06:08:08 +00:00
if err := col.UpdateSensor(id); err != nil {
2017-05-16 09:03:18 +00:00
log.Println(err)
return
2017-05-16 06:08:08 +00:00
}
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 04:57:38 +00:00
}
2017-05-11 10:29:32 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processUpdateCRM(task msg.AgentTask) {
//(loop)col stopSensor()
2017-05-15 04:57:38 +00:00
//crm updateCRM()
2017-05-15 10:32:25 +00:00
//(loop)col startSensor()
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 04:57:38 +00:00
}
2017-05-11 10:29:32 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processUpdateAgent(task msg.AgentTask) {
//mola
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 04:57:38 +00:00
}
2017-05-11 09:39:01 +00:00
2017-05-15 10:32:25 +00:00
func (agt *Agent) processSendLog(task msg.AgentTask) {
//mola
//evt.AddEventData(msg.TaskResult{msg.TASK_LOG_SEND, true, nil })
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 05:46:12 +00:00
}
2017-05-15 10:32:25 +00:00
func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) {
interval := task.Params["interval"]
err := pol.UpdateInterval(interval)
if err != nil {
2017-05-16 09:03:18 +00:00
log.Println(err)
2017-05-15 10:32:25 +00:00
return
}
2017-05-16 09:03:18 +00:00
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
2017-05-15 05:46:12 +00:00
}