overflow_probes/probe/probe.go

119 lines
2.6 KiB
Go
Raw Permalink Normal View History

2017-09-18 09:21:58 +00:00
package probe
2017-12-02 03:13:08 +00:00
import (
2018-03-26 03:50:29 +00:00
"context"
2017-12-02 03:13:08 +00:00
"sync"
2017-12-08 12:01:38 +00:00
cdr "git.loafle.net/commons_go/di/registry"
2017-12-02 03:13:08 +00:00
"git.loafle.net/commons_go/logging"
crr "git.loafle.net/commons_go/rpc/registry"
2018-03-26 03:50:29 +00:00
oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
2017-12-08 08:31:45 +00:00
oocmp "git.loafle.net/overflow/overflow_commons_go/modules/probe"
2018-03-26 15:14:19 +00:00
oocu "git.loafle.net/overflow/overflow_commons_go/util"
2017-12-08 12:01:38 +00:00
oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client"
2017-12-08 08:31:45 +00:00
oopccd "git.loafle.net/overflow/overflow_probes/client/central/data"
oopccp "git.loafle.net/overflow/overflow_probes/client/central/probe"
2018-03-15 13:52:23 +00:00
oopca "git.loafle.net/overflow/overflow_probes/commons/annotation"
2018-03-26 15:14:19 +00:00
"git.loafle.net/overflow/overflow_probes/service"
)
2017-12-02 03:13:08 +00:00
func New() ProbeManager {
a := &probeManagers{}
return a
}
type ProbeManager interface {
2018-03-26 03:50:29 +00:00
oocmci.Starter
oocmci.Stopper
2017-12-02 03:13:08 +00:00
}
type probeManagers struct {
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (pm *probeManagers) Start() error {
if nil != pm.stopChan {
2018-03-21 10:22:13 +00:00
logging.Logger().Panicf("Probe: already running. Stop it before starting it again")
2017-12-02 03:13:08 +00:00
}
2018-03-26 15:14:19 +00:00
service.InitService()
2018-03-16 06:35:37 +00:00
2017-12-08 08:31:45 +00:00
probeRPCRegistry := crr.NewRPCRegistry()
2017-12-15 08:18:32 +00:00
2017-12-08 08:31:45 +00:00
centralProbeClient := oopccp.New(probeRPCRegistry)
2017-12-08 12:01:38 +00:00
centralDataClient := oopccd.New()
centralClients := map[string]oogwc.Client{
oocmp.HTTPEntry_Probe: centralProbeClient,
oocmp.HTTPEntry_Data: centralDataClient,
}
cdr.RegisterResource("CentralClients", centralClients)
2018-03-22 16:10:38 +00:00
cdr.RegisterResource("ProbeRPCInvoker", probeRPCRegistry)
2017-12-08 12:01:38 +00:00
2018-03-21 10:22:13 +00:00
var (
services []interface{}
err error
)
2018-03-22 16:10:38 +00:00
services, err = cdr.GetInstancesByAnnotationName(oopca.ServiceTag)
if nil != err {
2018-03-21 10:22:13 +00:00
logging.Logger().Panicf("Probe: Cannot create instances of service %v", err)
}
2018-03-15 13:52:23 +00:00
for _, s := range services {
probeRPCRegistry.RegisterService(s, "")
}
2017-12-02 03:13:08 +00:00
pm.stopChan = make(chan struct{})
pm.stopWg.Add(1)
2018-03-26 06:55:38 +00:00
go pm.handleProbe(services)
2017-12-02 03:13:08 +00:00
return nil
}
2018-03-26 03:50:29 +00:00
func (pm *probeManagers) Stop(ctx context.Context) error {
2017-12-02 03:13:08 +00:00
if pm.stopChan == nil {
2018-03-21 10:22:13 +00:00
logging.Logger().Warnf("Probe: probe must be started before stopping it")
2017-12-02 03:13:08 +00:00
}
close(pm.stopChan)
pm.stopWg.Wait()
pm.stopChan = nil
2017-12-08 08:31:45 +00:00
// pm.cClient.Close()
2017-12-02 03:13:08 +00:00
2018-03-21 10:22:13 +00:00
logging.Logger().Infof("Probe: stopped")
2017-12-02 03:13:08 +00:00
2018-03-26 03:50:29 +00:00
return nil
2017-12-02 03:13:08 +00:00
}
2018-03-26 06:55:38 +00:00
func (pm *probeManagers) handleProbe(services []interface{}) {
2018-03-26 15:14:19 +00:00
var (
err error
)
2018-03-27 12:35:59 +00:00
err = oocu.ExecuteStarters(services, service.ServicesToStartAndStop, false)
2018-03-26 15:14:19 +00:00
if nil != err {
logging.Logger().Panic(err)
}
2017-12-02 03:13:08 +00:00
defer func() {
2018-03-27 12:35:59 +00:00
err = oocu.ExecuteStoppers(services, service.ServicesToStartAndStop, true)
2018-03-26 15:14:19 +00:00
if nil != err {
logging.Logger().Error(err)
}
2018-03-26 06:55:38 +00:00
2018-03-26 15:14:19 +00:00
pm.stopWg.Done()
2018-03-26 06:55:38 +00:00
2018-03-26 03:50:29 +00:00
pm.Stop(nil)
2017-12-02 03:13:08 +00:00
}()
for {
select {
case <-pm.stopChan:
return
}
}
}