diff --git a/auth/auth.go b/auth/auth.go index 46008bf..2f0d561 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -1,6 +1,7 @@ package auth import ( + "context" "fmt" "net/http" "path" @@ -18,26 +19,33 @@ const ( noAuthEntryPoint = "/auth" ) +type Auther interface { + commons.EndableStarter + commons.Shutdowner +} + type auth struct { - *commons.Handlers - c client.Client - entryURL string + centralClient client.Client + entryURL string noAuthConfigPath string noAuthConfig config.NoAuthProbeConfig - acceptedChan chan bool - deniedChan chan error + endded chan<- error + + shutdown chan bool + accepted chan bool + denied chan error } -func New() (AuthHandler, error) { +func New() (Auther, error) { var err error a := &auth{ - acceptedChan: make(chan bool), - deniedChan: make(chan error), + shutdown: make(chan bool), + accepted: make(chan bool), + denied: make(chan error), } - a.Handlers = commons.NewHandlers() if a.entryURL, err = opuu.Join(config.CFG.Central.URL, noAuthEntryPoint); nil != err { return nil, err @@ -52,12 +60,58 @@ func New() (AuthHandler, error) { } } - a.c = client.New() - a.c.OnNotify(a.onNotify) + a.centralClient = client.New() + a.centralClient.OnNotify(a.onNotify) return a, nil } +func (a *auth) EndableStart(endded chan<- error) error { + a.endded = endded + return a.start() +} + +func (a *auth) start() error { + if nil != config.CFG.Probe.Key || "" != *config.CFG.Probe.Key { + return nil + } + + if nil != a.noAuthConfig.DenyDate { + return fmt.Errorf("Cannot start because this probe have been denied from overFlow at %s", a.noAuthConfig.DenyDate.String()) + } + + var err error + if nil != a.noAuthConfig.TempKey && "" != *a.noAuthConfig.TempKey { + err = a.serveConnect(*a.noAuthConfig.TempKey) + } else { + err = a.serveRegistration() + } + if nil != err { + return err + } + + a.listen() + + return nil +} + +func (a *auth) listen() { + go func() { + for { + select { + case <-a.shutdown: + break + case <-a.accepted: + a.stop(nil) + break + case err := <-a.denied: + a.stop(err) + break + } + } + }() +} + func (a *auth) serveRegistration() error { var err error header := http.Header{} @@ -69,7 +123,7 @@ func (a *auth) serveRegistration() error { header[module.NoAuthProbeHeader_NoAuthRegist] = []string{enc} var res *http.Response - if res, err = a.c.Dial(a.entryURL, header); nil != err { + if res, err = a.centralClient.Dial(a.entryURL, header); nil != err { return err } @@ -88,7 +142,7 @@ func (a *auth) serveConnect(noAuthTempKey string) error { header[module.NoAuthProbeHeader_NoAuthID] = []string{noAuthTempKey} var res *http.Response - if res, err = a.c.Dial(a.entryURL, header); nil != err { + if res, err = a.centralClient.Dial(a.entryURL, header); nil != err { return err } @@ -96,3 +150,28 @@ func (a *auth) serveConnect(noAuthTempKey string) error { return nil } + +func (a *auth) Shutdown(ctx context.Context) error { + + for { + a.stop(fmt.Errorf("Shutdown")) + select { + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (a *auth) stop(err error) { + defer close(a.shutdown) + a.shutdown <- true + close(a.accepted) + close(a.denied) + + ctx := context.Background() + if err := a.centralClient.Shutdown(ctx); nil != err { + logging.Logger.Error(fmt.Sprintf("Client of Central: %v", err)) + } + + a.endded <- err +} diff --git a/auth/auth_handler.go b/auth/auth_handler.go deleted file mode 100644 index 0587155..0000000 --- a/auth/auth_handler.go +++ /dev/null @@ -1,55 +0,0 @@ -package auth - -import ( - "context" - "errors" - "fmt" - - "git.loafle.net/overflow/overflow_probes/commons" - "git.loafle.net/overflow/overflow_probes/config" -) - -type AuthHandler interface { - commons.Handler -} - -func (a *auth) Serve() error { - if nil != config.CFG.Probe.Key || "" != *config.CFG.Probe.Key { - return nil - } - - if nil != a.noAuthConfig.DenyDate { - return fmt.Errorf("Cannot start because this probe have been denied from overFlow[%s]", a.noAuthConfig.DenyDate.String()) - } - - var err error - if nil != a.noAuthConfig.TempKey && "" != *a.noAuthConfig.TempKey { - err = a.serveConnect(*a.noAuthConfig.TempKey) - } else { - err = a.serveRegistration() - } - if nil != err { - return err - } - - err = nil -ListenLoop: - for { - select { - case <-a.ShutdownChan: - err = errors.New("Shutting down") - break ListenLoop - case <-a.acceptedChan: - break ListenLoop - case err = <-a.deniedChan: - break ListenLoop - } - } - - return err -} - -func (a *auth) Shutdown(ctx context.Context) error { - a.ShutdownChan <- true - return nil -} diff --git a/auth/on_notify.go b/auth/on_notify.go index 27457e3..87aaa32 100644 --- a/auth/on_notify.go +++ b/auth/on_notify.go @@ -36,11 +36,10 @@ func (a *auth) onNoAuthProbeAccept(params []string) { if err = lfcc.Save(*config.CFG, *config.ConfigFilePath, true); nil != err { logging.Logger.Error(fmt.Sprintf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err)) - a.ShutdownChan <- true return } - a.acceptedChan <- true + a.accepted <- true } func (a *auth) onNoAuthProbeDeny(params []string) { @@ -48,9 +47,9 @@ func (a *auth) onNoAuthProbeDeny(params []string) { a.noAuthConfig.DenyDate = &n if err := lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err { logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err)) - a.ShutdownChan <- true + return } - a.deniedChan <- fmt.Errorf("This probe have been denied from overFlow") + a.denied <- fmt.Errorf("This probe have been denied from overFlow") } diff --git a/central/client/probe.go b/central/client/probe.go new file mode 100644 index 0000000..e501f23 --- /dev/null +++ b/central/client/probe.go @@ -0,0 +1,21 @@ +package client + +import ( + "net/http" + + "git.loafle.net/overflow/overflow_probes/central/api/module" + "git.loafle.net/overflow/overflow_probes/config" +) + +func ConnectToCentralAsProbe(c Client, entryURL string) (*http.Response, error) { + header := http.Header{} + header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key} + + var res *http.Response + var err error + if res, err = c.Dial(entryURL, header); nil != err { + return nil, err + } + + return res, nil +} diff --git a/collector/collector.go b/collector/collector.go index 49a89b7..75702f8 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -1,22 +1,14 @@ package collector -import ( - "git.loafle.net/overflow/overflow_probes/commons" -) - -const ( - metricEntryPoint = "/metric" -) - -type collector struct { - *commons.Handlers +type Collector interface { } -func New() (CollectorHandler, error) { - var err error +type collector struct { +} + +func New() (Collector, error) { c := &collector{} - c.Handlers = commons.NewHandlers() return c, nil } diff --git a/collector/collector_handler.go b/collector/collector_handler.go deleted file mode 100644 index d316b0c..0000000 --- a/collector/collector_handler.go +++ /dev/null @@ -1,37 +0,0 @@ -package collector - -import ( - "context" - "errors" - - "git.loafle.net/overflow/overflow_probes/commons" -) - -type CollectorHandler interface { - commons.Handler - Start() error - Stop() error - Add() error - Remove() error - Update() error -} - -func (c *collector) Serve() error { - - var err error -ListenLoop: - for { - select { - case <-c.ShutdownChan: - err = errors.New("Shutting down") - break ListenLoop - } - } - - return err -} - -func (c *collector) Shutdown(ctx context.Context) error { - c.ShutdownChan <- true - return nil -} diff --git a/commons/endable_starter.go b/commons/endable_starter.go new file mode 100644 index 0000000..6304e59 --- /dev/null +++ b/commons/endable_starter.go @@ -0,0 +1,5 @@ +package commons + +type EndableStarter interface { + EndableStart(endded chan<- error) error +} diff --git a/commons/handler.go b/commons/handler.go deleted file mode 100644 index dbb3b7e..0000000 --- a/commons/handler.go +++ /dev/null @@ -1,42 +0,0 @@ -package commons - -import ( - "context" - "net/http" - - "git.loafle.net/overflow/overflow_probes/central/api/module" - "git.loafle.net/overflow/overflow_probes/central/client" - "git.loafle.net/overflow/overflow_probes/config" -) - -type Handler interface { - Serve() error - Shutdown(ctx context.Context) error -} - -type Handlers struct { - ShutdownChan chan bool - IsRunning bool -} - -func NewHandlers() *Handlers { - h := &Handlers{ - ShutdownChan: make(chan bool), - IsRunning: false, - } - - return h -} - -func (h *Handlers) ConnectToCentralAsProbe(c client.Client, entryURL string) (*http.Response, error) { - header := http.Header{} - header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key} - - var res *http.Response - var err error - if res, err = c.Dial(entryURL, header); nil != err { - return nil, err - } - - return res, nil -} diff --git a/commons/shutdowner.go b/commons/shutdowner.go new file mode 100644 index 0000000..ac1a704 --- /dev/null +++ b/commons/shutdowner.go @@ -0,0 +1,7 @@ +package commons + +import "context" + +type Shutdowner interface { + Shutdown(ctx context.Context) error +} diff --git a/commons/starter.go b/commons/starter.go new file mode 100644 index 0000000..a6d0f03 --- /dev/null +++ b/commons/starter.go @@ -0,0 +1,5 @@ +package commons + +type Starter interface { + Start() error +} diff --git a/main.go b/main.go index 4cafdf9..284c9c5 100644 --- a/main.go +++ b/main.go @@ -10,10 +10,11 @@ import ( "syscall" "time" + "git.loafle.net/overflow/overflow_probes/commons" + lfcc "git.loafle.net/commons_go/config" "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_probes/auth" - "git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/probe" ) @@ -55,16 +56,16 @@ func init() { func main() { var err error - var handler commons.Handler + var instance interface{} defer logging.Logger.Sync() printBanner() if dir, err := lfcc.ABSPathify(*configDir); nil != err { - logging.Logger.Panic(fmt.Sprintf("Probe: config path[%s] is not valid", *configDir)) + logging.Logger.Panic(fmt.Sprintf("Config path[%s] is not valid", *configDir)) } else { - logging.Logger.Debug(fmt.Sprintf("Probe: config path[%s]", dir)) + logging.Logger.Debug(fmt.Sprintf("Config path: %s", dir)) config.ConfigDir = &dir } cfp := path.Join(*config.ConfigDir, config.ConfigFileName) @@ -73,27 +74,34 @@ func main() { conf := lfcc.New() config.CFG = &config.Config{} if err := conf.Load(config.CFG, *config.ConfigFilePath); nil != err { - logging.Logger.Panic(fmt.Sprintf("Probe: config is not valid error[%v]", err)) + logging.Logger.Panic(fmt.Sprintf("Config is not valid: %v", err)) } go func() { - if handler, err = auth.New(); nil != err { - logging.Logger.Error(fmt.Sprintf("Auth: error: %v", err)) + if instance, err = auth.New(); nil != err { + logging.Logger.Error(fmt.Sprintf("Auth error: %v", err)) return } - if err := handler.Serve(); err != nil { - logging.Logger.Error(fmt.Sprintf("Auth: Stopped[%v]", err)) + endded := make(chan error) + defer close(endded) + if err := instance.(commons.EndableStarter).EndableStart(endded); err != nil { + logging.Logger.Error(fmt.Sprintf("Auther error: %v", err)) + return + } + if err := <-endded; nil != err { + logging.Logger.Error(fmt.Sprintf("Auther error: %v", err)) return } - if handler, err = probe.New(); nil != err { - logging.Logger.Error(fmt.Sprintf("Probe: error: %v", err)) + if instance, err = probe.New(); nil != err { + logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) return } - if err := handler.Serve(); err != nil { - logging.Logger.Error(fmt.Sprintf("Probe: error: %v", err)) + if err := instance.(commons.Starter).Start(); err != nil { + logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) return } + }() // // Set up channel on which to send signal notifications. @@ -111,7 +119,7 @@ func main() { <-interrupt ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := handler.Shutdown(ctx); err != nil { + if err := instance.(commons.Shutdowner).Shutdown(ctx); err != nil { logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) } diff --git a/probe/on_notify.go b/probe/on_notify.go index d6c5c31..5842523 100644 --- a/probe/on_notify.go +++ b/probe/on_notify.go @@ -20,7 +20,6 @@ func (p *probe) onNotify(method string, params []string) { break case module.SensorService_Start: - err = p.collector.Start() break case module.SensorService_Stop: diff --git a/probe/probe.go b/probe/probe.go index 9e40825..675fb75 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -1,25 +1,30 @@ package probe import ( + "context" "fmt" "net/http" "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_probes/central/api/module" "git.loafle.net/overflow/overflow_probes/central/client" - "git.loafle.net/overflow/overflow_probes/collector" "git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/config" opuu "git.loafle.net/overflow/overflow_probes/util/url" ) const ( - probeEntryPoint = "/probe" - fileEntryPoint = "/file" + probeEntryPoint = "/probe" + fileEntryPoint = "/file" + metricEntryPoint = "/metric" ) +type Prober interface { + commons.Starter + commons.Shutdowner +} + type probe struct { - *commons.Handlers probeEntryURL string fileEntryURL string metricEntryURL string @@ -28,12 +33,13 @@ type probe struct { fileClient client.Client metricClient client.Client - collector collector.CollectorHandler + shutdown chan bool } -func New() (ProbeHandler, error) { - p := &probe{} - p.Handlers = commons.NewHandlers() +func New() (Prober, error) { + p := &probe{ + shutdown: make(chan bool), + } var err error @@ -43,25 +49,47 @@ func New() (ProbeHandler, error) { if p.fileEntryURL, err = opuu.Join(config.CFG.Central.URL, fileEntryPoint); nil != err { return nil, err } - if c.metricEntryURL, err = opuu.Join(config.CFG.Central.URL, metricEntryPoint); nil != err { + if p.metricEntryURL, err = opuu.Join(config.CFG.Central.URL, metricEntryPoint); nil != err { return nil, err } p.probeClient = client.New() p.fileClient = client.New() - c.metricClient = client.New() - - if p.collector, err = collector.New(); nil != err { - return nil, err - } + p.metricClient = client.New() return p, nil } -func (p *probe) connectToCentralProbe() error { +func (p *probe) Start() error { + + return p.start() +} + +func (p *probe) start() error { + if err := p.connectToCentral(); nil != err { + return err + } + + p.listen() + + return nil +} + +func (p *probe) listen() { + go func() { + for { + select { + case <-p.shutdown: + break + } + } + }() +} + +func (p *probe) connectToCentral() error { var err error var res *http.Response - if res, err = p.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err { + if res, err = client.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err { return err } @@ -70,7 +98,7 @@ func (p *probe) connectToCentralProbe() error { p.probeClient.OnNotify(p.onNotify) - if _, err = p.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err { + if _, err = client.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err { return err } @@ -79,6 +107,20 @@ func (p *probe) connectToCentralProbe() error { func (p *probe) sendNotifyToCentral(method string, params ...string) { if err := p.probeClient.Notify(method, params); nil != err { - logging.Logger.Error(fmt.Sprintf("Probe notify error: [%v]", err)) + logging.Logger.Error(fmt.Sprintf("Probe notify: %v", err)) } } + +func (p *probe) Shutdown(ctx context.Context) error { + for { + p.stop(fmt.Errorf("Shutdown")) + select { + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (p *probe) stop(err error) { + defer close(p.shutdown) +} diff --git a/probe/probe_handler.go b/probe/probe_handler.go deleted file mode 100644 index 07a2b61..0000000 --- a/probe/probe_handler.go +++ /dev/null @@ -1,44 +0,0 @@ -package probe - -import ( - "context" - "errors" - - "git.loafle.net/overflow/overflow_probes/central/api/module" - "git.loafle.net/overflow/overflow_probes/commons" -) - -type ProbeHandler interface { - commons.Handler -} - -func (p *probe) Serve() error { - if err := p.connectToCentralProbe(); nil != err { - return err - } - - if err := p.collector.Serve(); nil != err { - return err - } - - p.sendNotifyToCentral(module.ProbeService_Started) - - var err error -ListenLoop: - for { - select { - case <-p.ShutdownChan: - err = errors.New("Shutting down") - break ListenLoop - } - } - - p.sendNotifyToCentral(module.ProbeService_Stopped) - - return err -} - -func (p *probe) Shutdown(ctx context.Context) error { - p.ShutdownChan <- true - return nil -}