overflow_probes/probe/probe.go

114 lines
2.9 KiB
Go
Raw Normal View History

2017-09-18 09:21:58 +00:00
package probe
2017-12-02 03:13:08 +00:00
import (
"fmt"
"sync"
2017-12-08 12:01:38 +00:00
cdr "git.loafle.net/commons_go/di/registry"
2017-12-02 03:13:08 +00:00
"git.loafle.net/commons_go/logging"
crr "git.loafle.net/commons_go/rpc/registry"
2017-12-08 08:31:45 +00:00
oocmp "git.loafle.net/overflow/overflow_commons_go/modules/probe"
2017-12-08 12:01:38 +00:00
oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client"
2017-12-08 08:31:45 +00:00
oopccd "git.loafle.net/overflow/overflow_probes/client/central/data"
oopccp "git.loafle.net/overflow/overflow_probes/client/central/probe"
"git.loafle.net/overflow/overflow_probes/service"
2017-12-02 03:13:08 +00:00
)
func New() ProbeManager {
a := &probeManagers{}
return a
}
type ProbeManager interface {
Start() error
Stop()
}
type probeManagers struct {
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (pm *probeManagers) Start() error {
if nil != pm.stopChan {
logging.Logger().Panic("Probe: already running. Stop it before starting it again")
}
2017-12-08 08:31:45 +00:00
probeRPCRegistry := crr.NewRPCRegistry()
2017-12-15 08:18:32 +00:00
2017-12-08 08:31:45 +00:00
centralProbeClient := oopccp.New(probeRPCRegistry)
2017-12-08 12:01:38 +00:00
centralDataClient := oopccd.New()
centralClients := map[string]oogwc.Client{
oocmp.HTTPEntry_Probe: centralProbeClient,
oocmp.HTTPEntry_Data: centralDataClient,
}
cdr.RegisterResource("CentralClients", centralClients)
centralService := service.GetService("CentralService").(*service.CentralService)
configService := service.GetService("ConfigService").(*service.ConfigService)
crawlerService := service.GetService("CrawlerService").(*service.CrawlerService)
discoveryService := service.GetService("DiscoveryService").(*service.DiscoveryService)
logService := service.GetService("LogService").(*service.LogService)
probeService := service.GetService("ProbeService").(*service.ProbeService)
sensorService := service.GetService("SensorService").(*service.SensorService)
probeRPCRegistry.RegisterService(centralService, "")
probeRPCRegistry.RegisterService(configService, "")
probeRPCRegistry.RegisterService(crawlerService, "")
probeRPCRegistry.RegisterService(discoveryService, "")
probeRPCRegistry.RegisterService(logService, "")
probeRPCRegistry.RegisterService(probeService, "")
probeRPCRegistry.RegisterService(sensorService, "")
2017-12-08 15:59:00 +00:00
logging.Logger().Debug(fmt.Sprintf("%v", centralService.CentralClients))
2017-12-08 08:31:45 +00:00
if err := centralProbeClient.Connect(); nil != err {
return err
}
2017-12-02 03:13:08 +00:00
2017-12-08 08:31:45 +00:00
if err := centralDataClient.Connect(); nil != err {
return err
}
2017-12-02 03:13:08 +00:00
pm.stopChan = make(chan struct{})
pm.stopWg.Add(1)
go pm.handleProbe()
return nil
}
func (pm *probeManagers) Stop() {
if pm.stopChan == nil {
2017-12-08 08:31:45 +00:00
logging.Logger().Warn("Probe: probe must be started before stopping it")
2017-12-02 03:13:08 +00:00
}
close(pm.stopChan)
pm.stopWg.Wait()
pm.stopChan = nil
2017-12-08 08:31:45 +00:00
// pm.cClient.Close()
2017-12-02 03:13:08 +00:00
2017-12-08 08:31:45 +00:00
logging.Logger().Info(fmt.Sprintf("Probe: stopped"))
2017-12-02 03:13:08 +00:00
}
func (pm *probeManagers) handleProbe() {
2017-12-08 08:31:45 +00:00
// var err error
2017-12-02 03:13:08 +00:00
defer func() {
pm.stopWg.Done()
pm.Stop()
}()
2017-12-08 08:31:45 +00:00
// if err = pm.cClient.Connect(); nil != err {
// return
// }
2017-12-02 03:13:08 +00:00
for {
select {
case <-pm.stopChan:
return
}
}
}