293 lines
6.2 KiB
Go
293 lines
6.2 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
log "github.com/cihub/seelog"
|
||
|
"loafle.com/overflow/agent/bootstrap"
|
||
|
"loafle.com/overflow/agent_api/config_manager"
|
||
|
msg "loafle.com/overflow/agent_api/messages"
|
||
|
col "loafle.com/overflow/collector_go"
|
||
|
cfg "loafle.com/overflow/config_manager_go"
|
||
|
crm "loafle.com/overflow/crawler_manager_go"
|
||
|
dat "loafle.com/overflow/data_sender_go"
|
||
|
evt "loafle.com/overflow/event_sender_go"
|
||
|
ini "loafle.com/overflow/initializer_go"
|
||
|
pol "loafle.com/overflow/long_poller_go"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// /home/geek/develop/gopath/src/loafle.com/overflow/config_manager_go/test_agent
|
||
|
PATH = "/home/insanity/Develop/gopath/src/loafle.com/overflow/config_manager_go/test_agent"
|
||
|
)
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
|
||
|
func main() {
|
||
|
wg.Add(1)
|
||
|
go StartAgent()
|
||
|
wg.Wait()
|
||
|
}
|
||
|
|
||
|
type Agent struct {
|
||
|
cm config_manager.ConfigManager
|
||
|
taskCh chan interface{}
|
||
|
secKey string
|
||
|
}
|
||
|
|
||
|
func StartAgent() {
|
||
|
stopch := make(chan bool, 1)
|
||
|
bootstrap.HandleShell(stopch)
|
||
|
bootstrap.HandleSignal(stopch)
|
||
|
|
||
|
agt := Agent{}
|
||
|
agt.startAgent()
|
||
|
|
||
|
agt.handleAgentStop(stopch)
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) startAgent() {
|
||
|
//1. cfg start()
|
||
|
cfgStarted := make(chan *config_manager.GlobalConfig, 1)
|
||
|
if err := cfg.Start(cfgStarted, PATH); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
globalConf := <-cfgStarted
|
||
|
|
||
|
// ini start()
|
||
|
iniStarted := make(chan string, 1)
|
||
|
if err := ini.Start(iniStarted, globalConf); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
agt.secKey = <-iniStarted
|
||
|
|
||
|
//2. evt start()
|
||
|
evtStarted := make(chan bool, 1)
|
||
|
if err := evt.Start(evtStarted); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
<-evtStarted
|
||
|
|
||
|
//3. dat start()
|
||
|
datStarted := make(chan bool, 1)
|
||
|
dat.Start(datStarted, globalConf)
|
||
|
<-datStarted
|
||
|
|
||
|
//4. pol start()
|
||
|
polStarted := make(chan bool, 1)
|
||
|
if err := pol.Start(polStarted, globalConf); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
<-polStarted
|
||
|
agt.taskCh = pol.GetTaskCh()
|
||
|
|
||
|
/**
|
||
|
Todo : Getting an auth key from API server.
|
||
|
*/
|
||
|
|
||
|
//5. scm start()
|
||
|
scmStarted := make(chan config_manager.ConfigManager, 1)
|
||
|
if err := cfg.StartSensorConfig(scmStarted, agt.secKey); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
agt.cm = <-scmStarted
|
||
|
|
||
|
//6. crm start()
|
||
|
crmStarted := make(chan bool, 1)
|
||
|
crm.Start(crmStarted, agt.cm)
|
||
|
<-crmStarted
|
||
|
|
||
|
//7. col start()
|
||
|
dataCh := make(chan interface{})
|
||
|
colStarted := make(chan bool, 1)
|
||
|
col.Start(colStarted, dataCh, agt.cm)
|
||
|
<-colStarted
|
||
|
|
||
|
//8.evt result
|
||
|
res := msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_START, nil}
|
||
|
evt.AddEventData(res)
|
||
|
|
||
|
go agt.processCollectingData(dataCh)
|
||
|
go agt.waitAgentTask()
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) handleAgentStop(ch chan bool) {
|
||
|
go func() {
|
||
|
if <-ch {
|
||
|
agt.stopAgent()
|
||
|
log.Info("Agent has stopped.")
|
||
|
log.Flush()
|
||
|
close(ch)
|
||
|
wg.Done()
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) waitAgentTask() {
|
||
|
for {
|
||
|
d := <-agt.taskCh
|
||
|
task := d.(msg.AgentTask)
|
||
|
switch task.Command {
|
||
|
case msg.TASK_POL_INTERVAL_UPDATE:
|
||
|
agt.processUpdatePolInterval(task)
|
||
|
break
|
||
|
case msg.TASK_SENSOR_START:
|
||
|
agt.processStartSensor(task)
|
||
|
break
|
||
|
case msg.TASK_SENSOR_STOP:
|
||
|
agt.processStopSensor(task)
|
||
|
break
|
||
|
case msg.TASK_SENSOR_ADD:
|
||
|
agt.processAddSensor(task)
|
||
|
break
|
||
|
case msg.TASK_SENSOR_REMOVE:
|
||
|
agt.processRemoveSensor(task)
|
||
|
break
|
||
|
case msg.TASK_SENSOR_UPDATE:
|
||
|
agt.processUpdateSensor(task)
|
||
|
break
|
||
|
case msg.TASK_AGENT_UPDATE:
|
||
|
agt.processUpdateAgent(task)
|
||
|
break
|
||
|
case msg.TASK_CRAWLER_UPDATE:
|
||
|
agt.processUpdateCRM(task)
|
||
|
break
|
||
|
case msg.TASK_LOG_SEND:
|
||
|
agt.processSendLog(task)
|
||
|
break
|
||
|
default:
|
||
|
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processCollectingData(ch chan interface{}) {
|
||
|
for {
|
||
|
data := <-ch
|
||
|
dat.AddData(data)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) stopAgent() {
|
||
|
//col stop()
|
||
|
colStopped := make(chan bool, 1)
|
||
|
col.Stop(colStopped)
|
||
|
<-colStopped
|
||
|
|
||
|
//crm stop()
|
||
|
crmStopped := make(chan bool, 1)
|
||
|
crm.Stop(crmStopped)
|
||
|
<-crmStopped
|
||
|
|
||
|
//cfg stop()
|
||
|
cfgStopped := make(chan bool, 1)
|
||
|
cfg.Stop(cfgStopped)
|
||
|
<-cfgStopped
|
||
|
|
||
|
//dat stop()
|
||
|
datStopped := make(chan bool, 1)
|
||
|
dat.Stop(datStopped)
|
||
|
<-datStopped
|
||
|
|
||
|
//pol stop()
|
||
|
polStopped := make(chan bool, 1)
|
||
|
if err := pol.Stop(polStopped); err != nil {
|
||
|
return
|
||
|
}
|
||
|
<-polStopped
|
||
|
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_STOP, nil})
|
||
|
evt.Stop()
|
||
|
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processStartSensor(task msg.AgentTask) {
|
||
|
id := task.Params["sensorId"]
|
||
|
if err := col.StartSensor(id); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processStopSensor(task msg.AgentTask) {
|
||
|
id := task.Params["sensorId"]
|
||
|
if err := col.StopSensor(id); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processAddSensor(task msg.AgentTask) {
|
||
|
path := task.Params["path"]
|
||
|
id := task.Params["id"]
|
||
|
cfg.AddSensor(path)
|
||
|
crm.AddSensor(id)
|
||
|
err := col.AddSensor(id)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processRemoveSensor(task msg.AgentTask) {
|
||
|
id := task.Params["sensorId"]
|
||
|
err := col.RemSensor(id)
|
||
|
if err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
crm.RemoveSensor(id)
|
||
|
cfg.RemoveSensor(id)
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processUpdateSensor(task msg.AgentTask) {
|
||
|
path := task.Params["path"]
|
||
|
id := task.Params["id"]
|
||
|
if err := col.StopSensor(id); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
cfg.UpdateSensor(path)
|
||
|
crm.UpdateSensor(id)
|
||
|
if err := col.UpdateSensor(id); err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processUpdateCRM(task msg.AgentTask) {
|
||
|
//(loop)col stopSensor()
|
||
|
//crm updateCRM()
|
||
|
//(loop)col startSensor()
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processUpdateAgent(task msg.AgentTask) {
|
||
|
//Todo. Updating agent
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processSendLog(task msg.AgentTask) {
|
||
|
//Todo. Sending logs
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|
||
|
|
||
|
func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) {
|
||
|
interval := task.Params["interval"]
|
||
|
err := pol.UpdateInterval(interval)
|
||
|
if err != nil {
|
||
|
log.Error(err)
|
||
|
return
|
||
|
}
|
||
|
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
|
||
|
}
|