overflow_probe/probe.go
insanity@loafle.com 541ecfee2d fix
2017-08-04 12:11:38 +09:00

293 lines
6.5 KiB
Go

package main
import (
"git.loafle.net/overflow/overflow_probe/agent_api/config_manager"
msg "git.loafle.net/overflow/overflow_probe/agent_api/messages"
"git.loafle.net/overflow/overflow_probe/bootstrap"
col "git.loafle.net/overflow/overflow_probe/collector"
cfg "git.loafle.net/overflow/overflow_probe/config_manager"
crm "git.loafle.net/overflow/overflow_probe/crawler_manager"
dat "git.loafle.net/overflow/overflow_probe/data_sender"
evt "git.loafle.net/overflow/overflow_probe/event_sender"
ini "git.loafle.net/overflow/overflow_probe/initializer"
log "github.com/cihub/seelog"
//pol "git.loafle.net/overflow/overflow_probe/long_poller"
"sync"
)
const (
// /home/geek/develop/gopath/src/git.loafle.net/overflow/overflow_probe/config_manager/test_agent
PATH = "/home/insanity/Develop/gopath/src/git.loafle.net/overflow/overflow_probe/config_manager/test_agent"
)
var wg sync.WaitGroup
func main() {
wg.Add(1)
go StartAgent()
wg.Wait()
}
type Probe struct {
cm config_manager.ConfigManager
taskCh chan interface{}
secKey string
}
func StartAgent() {
stopch := make(chan bool, 1)
bootstrap.HandleShell(stopch)
bootstrap.HandleSignal(stopch)
agt := Probe{}
agt.startProbe()
agt.handleProbeStop(stopch)
}
func (agt *Probe) startProbe() {
//1. cfg start()
cfgStarted := make(chan *config_manager.GlobalConfig, 1)
if err := cfg.Start(cfgStarted, PATH); err != nil {
log.Error(err)
return
}
globalConf := <-cfgStarted
// ini start()
iniStarted := make(chan string, 1)
if err := ini.Start(iniStarted, globalConf); err != nil {
log.Error(err)
return
}
agt.secKey = <-iniStarted
//2. evt start()
evtStarted := make(chan bool, 1)
if err := evt.Start(evtStarted); err != nil {
log.Error(err)
return
}
<-evtStarted
//3. dat start()
datStarted := make(chan bool, 1)
dat.Start(datStarted, globalConf)
<-datStarted
//4. pol start()
//polStarted := make(chan bool, 1)
//if err := pol.Start(polStarted, globalConf); err != nil {
// log.Error(err)
// return
//}
//<-polStarted
//agt.taskCh = pol.GetTaskCh()
/**
Todo : Getting an auth key from API server.
*/
//5. scm start()
scmStarted := make(chan config_manager.ConfigManager, 1)
if err := cfg.StartSensorConfig(scmStarted, agt.secKey); err != nil {
log.Error(err)
return
}
agt.cm = <-scmStarted
//6. crm start()
crmStarted := make(chan bool, 1)
crm.Start(crmStarted, agt.cm)
<-crmStarted
//7. col start()
dataCh := make(chan interface{})
colStarted := make(chan bool, 1)
col.Start(colStarted, dataCh, agt.cm)
<-colStarted
//8.evt result
res := msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_START, nil}
evt.AddEventData(res)
go agt.processCollectingData(dataCh)
go agt.waitAgentTask()
}
func (agt *Probe) handleProbeStop(ch chan bool) {
go func() {
if <-ch {
agt.stopProbe()
log.Info("Agent has stopped.")
log.Flush()
close(ch)
wg.Done()
}
}()
}
func (agt *Probe) waitAgentTask() {
for {
d := <-agt.taskCh
task := d.(msg.AgentTask)
switch task.Command {
//case msg.TASK_POL_INTERVAL_UPDATE:
// agt.processUpdatePolInterval(task)
// break
case msg.TASK_SENSOR_START:
agt.processStartSensor(task)
break
case msg.TASK_SENSOR_STOP:
agt.processStopSensor(task)
break
case msg.TASK_SENSOR_ADD:
agt.processAddSensor(task)
break
case msg.TASK_SENSOR_REMOVE:
agt.processRemoveSensor(task)
break
case msg.TASK_SENSOR_UPDATE:
agt.processUpdateSensor(task)
break
case msg.TASK_AGENT_UPDATE:
agt.processUpdateAgent(task)
break
case msg.TASK_CRAWLER_UPDATE:
agt.processUpdateCRM(task)
break
case msg.TASK_LOG_SEND:
agt.processSendLog(task)
break
default:
}
}
}
func (agt *Probe) processCollectingData(ch chan interface{}) {
for {
data := <-ch
dat.AddData(data)
}
}
func (agt *Probe) stopProbe() {
//col stop()
colStopped := make(chan bool, 1)
col.Stop(colStopped)
<-colStopped
//crm stop()
crmStopped := make(chan bool, 1)
crm.Stop(crmStopped)
<-crmStopped
//cfg stop()
cfgStopped := make(chan bool, 1)
cfg.Stop(cfgStopped)
<-cfgStopped
//dat stop()
datStopped := make(chan bool, 1)
dat.Stop(datStopped)
<-datStopped
////pol stop()
//polStopped := make(chan bool, 1)
//if err := pol.Stop(polStopped); err != nil {
// return
//}
//<-polStopped
evt.AddEventData(msg.Event{msg.EVT_TYPE_NONE, msg.EVENT_AGT_STOP, nil})
evt.Stop()
}
func (agt *Probe) processStartSensor(task msg.AgentTask) {
id := task.Params["sensorId"]
if err := col.StartSensor(id); err != nil {
log.Error(err)
return
}
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
}
func (agt *Probe) processStopSensor(task msg.AgentTask) {
id := task.Params["sensorId"]
if err := col.StopSensor(id); err != nil {
log.Error(err)
return
}
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
}
func (agt *Probe) processAddSensor(task msg.AgentTask) {
path := task.Params["path"]
id := task.Params["id"]
cfg.AddSensor(path)
crm.AddSensor(id)
err := col.AddSensor(id)
if err != nil {
return
}
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
}
func (agt *Probe) processRemoveSensor(task msg.AgentTask) {
id := task.Params["sensorId"]
err := col.RemSensor(id)
if err != nil {
log.Error(err)
return
}
crm.RemoveSensor(id)
cfg.RemoveSensor(id)
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
}
func (agt *Probe) processUpdateSensor(task msg.AgentTask) {
path := task.Params["path"]
id := task.Params["id"]
if err := col.StopSensor(id); err != nil {
log.Error(err)
return
}
cfg.UpdateSensor(path)
crm.UpdateSensor(id)
if err := col.UpdateSensor(id); err != nil {
log.Error(err)
return
}
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
}
func (agt *Probe) processUpdateCRM(task msg.AgentTask) {
//(loop)col stopSensor()
//crm updateCRM()
//(loop)col startSensor()
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
}
func (agt *Probe) processUpdateAgent(task msg.AgentTask) {
//Todo. Updating agent
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
}
func (agt *Probe) processSendLog(task msg.AgentTask) {
//Todo. Sending logs
evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
}
//func (agt *Agent) processUpdatePolInterval(task msg.AgentTask) {
// interval := task.Params["interval"]
// err := pol.UpdateInterval(interval)
// if err != nil {
// log.Error(err)
// return
// }
// evt.AddEventData(msg.Event{msg.EVT_TYPE_TASK, task.Params["id"], nil})
//}