This commit is contained in:
insanity@loafle.com 2017-05-16 16:12:00 +09:00
parent 79b3f6728d
commit d61f2c58df

View File

@ -1,15 +1,16 @@
package main package main
import ( import (
//crm "loafle.com/overflow/crawler_manager_go"
"loafle.com/overflow/agent_api/config_manager" "loafle.com/overflow/agent_api/config_manager"
cfg "loafle.com/overflow/config_manager_go"
msg "loafle.com/overflow/agent_api/messages" msg "loafle.com/overflow/agent_api/messages"
col "loafle.com/overflow/collector_go" col "loafle.com/overflow/collector_go"
cfg "loafle.com/overflow/config_manager_go"
crm "loafle.com/overflow/crawler_manager_go"
dat "loafle.com/overflow/data_sender_go" dat "loafle.com/overflow/data_sender_go"
evt "loafle.com/overflow/event_sender_go" evt "loafle.com/overflow/event_sender_go"
pol "loafle.com/overflow/long_poller_go" pol "loafle.com/overflow/long_poller_go"
"log" "log"
"sync"
) )
const ( const (
@ -18,13 +19,16 @@ const (
func main() { func main() {
log.SetPrefix("Agent : ") log.SetPrefix("Agent : ")
StartAgent() var wg sync.WaitGroup
wg.Add(1)
go StartAgent()
wg.Wait()
} }
type Agent struct { type Agent struct {
cm config_manager.ConfigManager cm config_manager.ConfigManager
taskCh chan interface{} taskCh chan interface{}
} }
func StartAgent() { func StartAgent() {
@ -34,7 +38,7 @@ func StartAgent() {
func (agt *Agent) startAgent() { func (agt *Agent) startAgent() {
//1. cfg start() //1. cfg start()
cfgStarted := make(chan *config_manager.GlobalConfig) cfgStarted := make(chan *config_manager.GlobalConfig, 1)
if err := cfg.Start(cfgStarted, PATH); err != nil { if err := cfg.Start(cfgStarted, PATH); err != nil {
log.Println(err) log.Println(err)
return return
@ -42,18 +46,20 @@ func (agt *Agent) startAgent() {
globalConf := <-cfgStarted globalConf := <-cfgStarted
//2. evt start() //2. evt start()
evtStarted := make(chan bool, 0) evtStarted := make(chan bool, 1)
if err := evt.Start(evtStarted); err != nil { if err := evt.Start(evtStarted); err != nil {
log.Println(err) log.Println(err)
return return
} }
<-evtStarted <-evtStarted
//3. dat start() //3. dat start()
datStarted := make(chan bool) datStarted := make(chan bool, 1)
dat.Start(datStarted, globalConf) dat.Start(datStarted, globalConf)
<-datStarted <-datStarted
//4. pol start() //4. pol start()
polStarted := make(chan bool) polStarted := make(chan bool, 1)
if err := pol.Start(polStarted, globalConf); err != nil { if err := pol.Start(polStarted, globalConf); err != nil {
log.Println(err) log.Println(err)
return return
@ -62,7 +68,7 @@ func (agt *Agent) startAgent() {
agt.taskCh = pol.GetTaskCh() agt.taskCh = pol.GetTaskCh()
//5. scm start(authKey) //5. scm start(authKey)
scmStarted := make(chan config_manager.ConfigManager) scmStarted := make(chan config_manager.ConfigManager, 1)
if err := cfg.StartSensorConfig(scmStarted, "temp"); err != nil { if err := cfg.StartSensorConfig(scmStarted, "temp"); err != nil {
log.Println(err) log.Println(err)
return return
@ -70,9 +76,13 @@ func (agt *Agent) startAgent() {
agt.cm = <-scmStarted agt.cm = <-scmStarted
//6. crm start() //6. crm start()
crmStarted := make(chan bool, 1)
crm.Start(crmStarted, agt.cm)
<-crmStarted
//7. col start() //7. col start()
dataCh := make(chan interface{}) dataCh := make(chan interface{})
colStarted := make(chan bool) colStarted := make(chan bool, 1)
col.Start(colStarted, dataCh, agt.cm) col.Start(colStarted, dataCh, agt.cm)
<-colStarted <-colStarted
@ -133,29 +143,35 @@ func (agt *Agent) processCollectingData(ch chan interface{}) {
func (agt *Agent) stopAgent() { func (agt *Agent) stopAgent() {
//col stop() //col stop()
colStopped := make(chan bool) colStopped := make(chan bool, 1)
col.Stop(colStopped) col.Stop(colStopped)
<-colStopped <-colStopped
//crm stop() //crm stop()
crmStopped := make(chan bool, 1)
crm.Stop(crmStopped)
<-crmStopped
//cfg stop() //cfg stop()
cfgStopped := make(chan bool) cfgStopped := make(chan bool, 1)
cfg.Stop(cfgStopped) cfg.Stop(cfgStopped)
<-cfgStopped <-cfgStopped
//dat stop() //dat stop()
datStopped := make(chan bool) datStopped := make(chan bool, 1)
dat.Stop(datStopped) dat.Stop(datStopped)
<-datStopped <-datStopped
//pol stop() //pol stop()
polStopped := make(chan bool) polStopped := make(chan bool, 1)
err := pol.Stop(polStopped) if err := pol.Stop(polStopped); err != nil {
if err != nil { evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err})
evt.AddEventData(msg.TaskResult{msg.AGT_STOP, false, err })
return return
} }
<-polStopped <-polStopped
//evt result //evt result
evt.AddEventData(msg.TaskResult{msg.AGT_STOP, true, nil }) evt.AddEventData(msg.TaskResult{msg.AGT_STOP, true, nil})
//evt stop() //evt stop()
evt.Stop() evt.Stop()
@ -165,14 +181,14 @@ func (agt *Agent) processStartSensor(task msg.AgentTask) {
//col startSensor() -> evt result //col startSensor() -> evt result
id := task.Params["sensorId"] id := task.Params["sensorId"]
col.StartSensor(id) col.StartSensor(id)
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_START, true, nil }) evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_START, true, nil})
} }
func (agt *Agent) processStopSensor(task msg.AgentTask) { func (agt *Agent) processStopSensor(task msg.AgentTask) {
//col stopSensor() -> evt result //col stopSensor() -> evt result
id := task.Params["sensorId"] id := task.Params["sensorId"]
col.StopSensor(id) col.StopSensor(id)
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_STOP, true, nil }) evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_STOP, true, nil})
} }
func (agt *Agent) processAddSensor(task msg.AgentTask) { func (agt *Agent) processAddSensor(task msg.AgentTask) {
@ -182,7 +198,7 @@ func (agt *Agent) processAddSensor(task msg.AgentTask) {
//col addSensor() //col addSensor()
err := col.AddSensor(id) err := col.AddSensor(id)
if err != nil { if err != nil {
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, false, err }) evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, false, err})
return return
} }
//evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, true, nil }) //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_ADD, true, nil })
@ -193,7 +209,7 @@ func (agt *Agent) processRemoveSensor(task msg.AgentTask) {
//col removeSensor() //col removeSensor()
err := col.RemSensor(id) err := col.RemSensor(id)
if err != nil { if err != nil {
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, false, err }) evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_REMOVE, false, err})
} }
//crm removeSensor() //crm removeSensor()
//scm removeSensor() //scm removeSensor()
@ -204,13 +220,13 @@ func (agt *Agent) processUpdateSensor(task msg.AgentTask) {
id := task.Params["sensorId"] id := task.Params["sensorId"]
//col stopSensor() //col stopSensor()
if err := col.StopSensor(id); err != nil { if err := col.StopSensor(id); err != nil {
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err }) evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err})
} }
//scm updateSensor() //scm updateSensor()
//crm updateSensor() //crm updateSensor()
//col updateSensor() //col updateSensor()
if err := col.UpdateSensor(id); err != nil { if err := col.UpdateSensor(id); err != nil {
evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err }) evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, false, err})
} }
//evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, true, nil }) //evt.AddEventData(msg.TaskResult{msg.TASK_SENSOR_UPDATE, true, nil })
} }
@ -236,9 +252,8 @@ func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) {
interval := task.Params["interval"] interval := task.Params["interval"]
err := pol.UpdateInterval(interval) err := pol.UpdateInterval(interval)
if err != nil { if err != nil {
evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, false, err }) evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, false, err})
return return
} }
evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, true, nil }) evt.AddEventData(msg.TaskResult{msg.TASK_POLLER_INTERVAL_UPDATE, true, nil})
} }