package probe import ( "fmt" "sync" cdr "git.loafle.net/commons_go/di/registry" "git.loafle.net/commons_go/logging" crr "git.loafle.net/commons_go/rpc/registry" oocmp "git.loafle.net/overflow/overflow_commons_go/modules/probe" oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client" oopccd "git.loafle.net/overflow/overflow_probes/client/central/data" oopccp "git.loafle.net/overflow/overflow_probes/client/central/probe" "git.loafle.net/overflow/overflow_probes/service" ) func New() ProbeManager { a := &probeManagers{} return a } type ProbeManager interface { Start() error Stop() } type probeManagers struct { stopChan chan struct{} stopWg sync.WaitGroup } func (pm *probeManagers) Start() error { if nil != pm.stopChan { logging.Logger().Panic("Probe: already running. Stop it before starting it again") } probeRPCRegistry := crr.NewRPCRegistry() centralProbeClient := oopccp.New(probeRPCRegistry) centralDataClient := oopccd.New() centralClients := map[string]oogwc.Client{ oocmp.HTTPEntry_Probe: centralProbeClient, oocmp.HTTPEntry_Data: centralDataClient, } cdr.RegisterResource("CentralClients", centralClients) centralService := service.GetService("CentralService").(*service.CentralService) configService := service.GetService("ConfigService").(*service.ConfigService) crawlerService := service.GetService("CrawlerService").(*service.CrawlerService) discoveryService := service.GetService("DiscoveryService").(*service.DiscoveryService) logService := service.GetService("LogService").(*service.LogService) probeService := service.GetService("ProbeService").(*service.ProbeService) sensorService := service.GetService("SensorService").(*service.SensorService) probeRPCRegistry.RegisterService(centralService, "") probeRPCRegistry.RegisterService(configService, "") probeRPCRegistry.RegisterService(crawlerService, "") probeRPCRegistry.RegisterService(discoveryService, "") probeRPCRegistry.RegisterService(logService, "") probeRPCRegistry.RegisterService(probeService, "") probeRPCRegistry.RegisterService(sensorService, "") logging.Logger().Debug(fmt.Sprintf("%v", centralService.CentralClients)) if err := centralProbeClient.Connect(); nil != err { return err } if err := centralDataClient.Connect(); nil != err { return err } pm.stopChan = make(chan struct{}) pm.stopWg.Add(1) go pm.handleProbe() return nil } func (pm *probeManagers) Stop() { if pm.stopChan == nil { logging.Logger().Warn("Probe: probe must be started before stopping it") } close(pm.stopChan) pm.stopWg.Wait() pm.stopChan = nil // pm.cClient.Close() logging.Logger().Info(fmt.Sprintf("Probe: stopped")) } func (pm *probeManagers) handleProbe() { // var err error defer func() { pm.stopWg.Done() pm.Stop() }() // if err = pm.cClient.Connect(); nil != err { // return // } for { select { case <-pm.stopChan: return } } }