package probe import ( "context" "sync" cdr "git.loafle.net/commons_go/di/registry" "git.loafle.net/commons_go/logging" crr "git.loafle.net/commons_go/rpc/registry" oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces" oocmp "git.loafle.net/overflow/overflow_commons_go/modules/probe" oocu "git.loafle.net/overflow/overflow_commons_go/util" oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client" oopccd "git.loafle.net/overflow/overflow_probes/client/central/data" oopccp "git.loafle.net/overflow/overflow_probes/client/central/probe" oopca "git.loafle.net/overflow/overflow_probes/commons/annotation" "git.loafle.net/overflow/overflow_probes/service" ) func New() ProbeManager { a := &probeManagers{} return a } type ProbeManager interface { oocmci.Starter oocmci.Stopper } type probeManagers struct { stopChan chan struct{} stopWg sync.WaitGroup } func (pm *probeManagers) Start() error { if nil != pm.stopChan { logging.Logger().Panicf("Probe: already running. Stop it before starting it again") } service.InitService() probeRPCRegistry := crr.NewRPCRegistry() centralProbeClient := oopccp.New(probeRPCRegistry) centralDataClient := oopccd.New() centralClients := map[string]oogwc.Client{ oocmp.HTTPEntry_Probe: centralProbeClient, oocmp.HTTPEntry_Data: centralDataClient, } cdr.RegisterResource("CentralClients", centralClients) cdr.RegisterResource("ProbeRPCInvoker", probeRPCRegistry) var ( services []interface{} err error ) services, err = cdr.GetInstancesByAnnotationName(oopca.ServiceTag) if nil != err { logging.Logger().Panicf("Probe: Cannot create instances of service %v", err) } for _, s := range services { probeRPCRegistry.RegisterService(s, "") } pm.stopChan = make(chan struct{}) pm.stopWg.Add(1) go pm.handleProbe(services) return nil } func (pm *probeManagers) Stop(ctx context.Context) error { if pm.stopChan == nil { logging.Logger().Warnf("Probe: probe must be started before stopping it") } close(pm.stopChan) pm.stopWg.Wait() pm.stopChan = nil // pm.cClient.Close() logging.Logger().Infof("Probe: stopped") return nil } func (pm *probeManagers) handleProbe(services []interface{}) { var ( err error ) err = oocu.ExecuteStarters(services, service.ServicesToStartAndStop, false) if nil != err { logging.Logger().Panic(err) } defer func() { err = oocu.ExecuteStoppers(services, service.ServicesToStartAndStop, true) if nil != err { logging.Logger().Error(err) } pm.stopWg.Done() pm.Stop(nil) }() for { select { case <-pm.stopChan: return } } }