overflow_probes/probe/probe.go
crusader fb7d1ca4d6 ing
2017-12-08 17:31:45 +09:00

99 lines
2.2 KiB
Go

package probe
import (
"fmt"
"sync"
"git.loafle.net/commons_go/logging"
crr "git.loafle.net/commons_go/rpc/registry"
oocmp "git.loafle.net/overflow/overflow_commons_go/modules/probe"
oopccd "git.loafle.net/overflow/overflow_probes/client/central/data"
oopccp "git.loafle.net/overflow/overflow_probes/client/central/probe"
"git.loafle.net/overflow/overflow_probes/service"
)
func New() ProbeManager {
a := &probeManagers{}
return a
}
type ProbeManager interface {
Start() error
Stop()
}
type probeManagers struct {
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (pm *probeManagers) Start() error {
if nil != pm.stopChan {
logging.Logger().Panic("Probe: already running. Stop it before starting it again")
}
probeRPCRegistry := crr.NewRPCRegistry()
probeRPCRegistry.RegisterService(&service.CentralService{}, "")
probeRPCRegistry.RegisterService(&service.ConfigService{}, "")
probeRPCRegistry.RegisterService(&service.CrawlerService{}, "")
probeRPCRegistry.RegisterService(&service.DiscoveryService{}, "")
probeRPCRegistry.RegisterService(&service.LogService{}, "")
probeRPCRegistry.RegisterService(&service.ProbeService{}, "")
probeRPCRegistry.RegisterService(&service.SensorService{}, "")
centralProbeClient := oopccp.New(probeRPCRegistry)
if err := centralProbeClient.Connect(); nil != err {
return err
}
centralDataClient := oopccd.New()
if err := centralDataClient.Connect(); nil != err {
return err
}
centralS := probeRPCRegistry.GetService("CentralService").(*service.CentralService)
centralS.PutClient(oocmp.HTTPEntry_Probe, centralProbeClient)
centralS.PutClient(oocmp.HTTPEntry_Data, centralDataClient)
pm.stopChan = make(chan struct{})
pm.stopWg.Add(1)
go pm.handleProbe()
return nil
}
func (pm *probeManagers) Stop() {
if pm.stopChan == nil {
logging.Logger().Warn("Probe: probe must be started before stopping it")
}
close(pm.stopChan)
pm.stopWg.Wait()
pm.stopChan = nil
// pm.cClient.Close()
logging.Logger().Info(fmt.Sprintf("Probe: stopped"))
}
func (pm *probeManagers) handleProbe() {
// var err error
defer func() {
pm.stopWg.Done()
pm.Stop()
}()
// if err = pm.cClient.Connect(); nil != err {
// return
// }
for {
select {
case <-pm.stopChan:
return
}
}
}