105 lines
2.1 KiB
Go
105 lines
2.1 KiB
Go
package probe
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
cdr "git.loafle.net/commons_go/di/registry"
|
|
"git.loafle.net/commons_go/logging"
|
|
crr "git.loafle.net/commons_go/rpc/registry"
|
|
oocmp "git.loafle.net/overflow/overflow_commons_go/modules/probe"
|
|
oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client"
|
|
oopccd "git.loafle.net/overflow/overflow_probes/client/central/data"
|
|
oopccp "git.loafle.net/overflow/overflow_probes/client/central/probe"
|
|
oopca "git.loafle.net/overflow/overflow_probes/commons/annotation"
|
|
oops "git.loafle.net/overflow/overflow_probes/service"
|
|
)
|
|
|
|
func New() ProbeManager {
|
|
a := &probeManagers{}
|
|
|
|
return a
|
|
}
|
|
|
|
type ProbeManager interface {
|
|
Start() error
|
|
Stop()
|
|
}
|
|
|
|
type probeManagers struct {
|
|
stopChan chan struct{}
|
|
stopWg sync.WaitGroup
|
|
}
|
|
|
|
func (pm *probeManagers) Start() error {
|
|
if nil != pm.stopChan {
|
|
logging.Logger().Panic("Probe: already running. Stop it before starting it again")
|
|
}
|
|
|
|
oops.InitService()
|
|
|
|
probeRPCRegistry := crr.NewRPCRegistry()
|
|
|
|
centralProbeClient := oopccp.New(probeRPCRegistry)
|
|
centralDataClient := oopccd.New()
|
|
centralClients := map[string]oogwc.Client{
|
|
oocmp.HTTPEntry_Probe: centralProbeClient,
|
|
oocmp.HTTPEntry_Data: centralDataClient,
|
|
}
|
|
cdr.RegisterResource("CentralClients", centralClients)
|
|
|
|
services := cdr.GetInstancesByAnnotationName(oopca.ServiceTag)
|
|
|
|
for _, s := range services {
|
|
probeRPCRegistry.RegisterService(s, "")
|
|
}
|
|
|
|
if err := centralProbeClient.Connect(); nil != err {
|
|
return err
|
|
}
|
|
|
|
if err := centralDataClient.Connect(); nil != err {
|
|
return err
|
|
}
|
|
|
|
pm.stopChan = make(chan struct{})
|
|
|
|
pm.stopWg.Add(1)
|
|
go pm.handleProbe()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pm *probeManagers) Stop() {
|
|
if pm.stopChan == nil {
|
|
logging.Logger().Warn("Probe: probe must be started before stopping it")
|
|
}
|
|
close(pm.stopChan)
|
|
pm.stopWg.Wait()
|
|
pm.stopChan = nil
|
|
|
|
// pm.cClient.Close()
|
|
|
|
logging.Logger().Info(fmt.Sprintf("Probe: stopped"))
|
|
|
|
}
|
|
|
|
func (pm *probeManagers) handleProbe() {
|
|
// var err error
|
|
defer func() {
|
|
pm.stopWg.Done()
|
|
pm.Stop()
|
|
}()
|
|
|
|
// if err = pm.cClient.Connect(); nil != err {
|
|
// return
|
|
// }
|
|
|
|
for {
|
|
select {
|
|
case <-pm.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|