From 1b8ee9302658fab13a01494fb4ec368d7025d0a1 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 29 Sep 2017 21:30:27 +0900 Subject: [PATCH] ing --- auth/auth.go | 84 +++++----------------- auth/auth_handler.go | 55 ++++++++++++++ auth/on_notify.go | 30 ++++---- auth/registration.go | 2 +- central/api/module/crawler.go | 7 ++ central/api/module/discovery.go | 6 ++ central/api/module/log.go | 5 ++ central/api/module/probe.go | 9 ++- central/api/module/sensor.go | 9 +++ central/client/client.go | 9 +-- collector/collector.go | 38 ++++++++++ collector/collector_handler.go | 37 ++++++++++ commons/handler.go | 36 +++++++++- config.json | 7 +- config/config.go | 15 ++-- main.go | 123 ++++---------------------------- probe/on_notify.go | 47 +++++++++++- probe/probe.go | 94 +++++++++++------------- probe/probe_handler.go | 44 ++++++++++++ 19 files changed, 395 insertions(+), 262 deletions(-) create mode 100644 auth/auth_handler.go create mode 100644 central/api/module/crawler.go create mode 100644 central/api/module/discovery.go create mode 100644 central/api/module/log.go create mode 100644 central/api/module/sensor.go create mode 100644 collector/collector.go create mode 100644 collector/collector_handler.go create mode 100644 probe/probe_handler.go diff --git a/auth/auth.go b/auth/auth.go index a5e2aa4..46008bf 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -1,8 +1,6 @@ package auth import ( - "context" - "errors" "fmt" "net/http" "path" @@ -10,7 +8,6 @@ import ( lfcc "git.loafle.net/commons_go/config" "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_probes/central/api/module" - "git.loafle.net/overflow/overflow_probes/central/client" "git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/config" @@ -21,18 +18,14 @@ const ( noAuthEntryPoint = "/auth" ) -type AuthHandler interface { - commons.Handler -} - -type authHandlers struct { +type auth struct { + *commons.Handlers c client.Client entryURL string noAuthConfigPath string noAuthConfig config.NoAuthProbeConfig - shutdownChan chan bool acceptedChan chan bool deniedChan chan error } @@ -40,71 +33,32 @@ type authHandlers struct { func New() (AuthHandler, error) { var err error - h := &authHandlers{ - shutdownChan: make(chan bool), + a := &auth{ acceptedChan: make(chan bool), deniedChan: make(chan error), } + a.Handlers = commons.NewHandlers() - if h.entryURL, err = opuu.Join(*config.CFG.Central.URL, noAuthEntryPoint); nil != err { + if a.entryURL, err = opuu.Join(config.CFG.Central.URL, noAuthEntryPoint); nil != err { return nil, err } - h.noAuthConfigPath = path.Join(*config.ConfigDir, config.NoAuthProbeConfigFileName) + a.noAuthConfigPath = path.Join(*config.ConfigDir, config.NoAuthProbeConfigFileName) conf := lfcc.New() - if lfcc.Exists(h.noAuthConfigPath) { - if err = conf.Load(&h.noAuthConfig, h.noAuthConfigPath); nil != err { - return nil, fmt.Errorf("Auth: Loading of NoAuth config file[%s] failed error[%v]", h.noAuthConfigPath, err) + if lfcc.Exists(a.noAuthConfigPath) { + if err = conf.Load(&a.noAuthConfig, a.noAuthConfigPath); nil != err { + return nil, fmt.Errorf("Auth: Loading of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err) } } - return h, nil + a.c = client.New() + a.c.OnNotify(a.onNotify) + + return a, nil } -func (h *authHandlers) Serve() error { - if nil != config.CFG.Probe.Key || "" != *config.CFG.Probe.Key { - return nil - } - - if nil != h.noAuthConfig.DenyDate { - return fmt.Errorf("Cannot start because this probe have been denied from overFlow[%s]", h.noAuthConfig.DenyDate.String()) - } - - h.c = client.New() - h.c.OnNotify(h.onNotify) - - var err error - if nil != h.noAuthConfig.TempKey && "" != *h.noAuthConfig.TempKey { - err = h.serveConnect(*h.noAuthConfig.TempKey) - } else { - err = h.serveRegistration() - } - if nil != err { - return err - } - -ListenLoop: - for { - select { - case <-h.shutdownChan: - return errors.New("Shutting down") - case <-h.acceptedChan: - break ListenLoop - case err := <-h.deniedChan: - return err - } - } - - return nil -} - -func (h *authHandlers) Shutdown(ctx context.Context) error { - h.shutdownChan <- true - return nil -} - -func (h *authHandlers) serveRegistration() error { +func (a *auth) serveRegistration() error { var err error header := http.Header{} @@ -115,26 +69,26 @@ func (h *authHandlers) serveRegistration() error { header[module.NoAuthProbeHeader_NoAuthRegist] = []string{enc} var res *http.Response - if res, err = h.c.Dial(h.entryURL, header, 4096, 4096); nil != err { + if res, err = a.c.Dial(a.entryURL, header); nil != err { return err } tempKey := res.Header.Get(module.NoAuthProbeHeader_SetNoAuthID) - h.noAuthConfig.TempKey = &tempKey - if err = lfcc.Save(h.noAuthConfig, h.noAuthConfigPath, true); nil != err { + a.noAuthConfig.TempKey = &tempKey + if err = lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err { return err } return nil } -func (h *authHandlers) serveConnect(noAuthTempKey string) error { +func (a *auth) serveConnect(noAuthTempKey string) error { var err error header := http.Header{} header[module.NoAuthProbeHeader_NoAuthID] = []string{noAuthTempKey} var res *http.Response - if res, err = h.c.Dial(h.entryURL, header, 4096, 4096); nil != err { + if res, err = a.c.Dial(a.entryURL, header); nil != err { return err } diff --git a/auth/auth_handler.go b/auth/auth_handler.go new file mode 100644 index 0000000..0587155 --- /dev/null +++ b/auth/auth_handler.go @@ -0,0 +1,55 @@ +package auth + +import ( + "context" + "errors" + "fmt" + + "git.loafle.net/overflow/overflow_probes/commons" + "git.loafle.net/overflow/overflow_probes/config" +) + +type AuthHandler interface { + commons.Handler +} + +func (a *auth) Serve() error { + if nil != config.CFG.Probe.Key || "" != *config.CFG.Probe.Key { + return nil + } + + if nil != a.noAuthConfig.DenyDate { + return fmt.Errorf("Cannot start because this probe have been denied from overFlow[%s]", a.noAuthConfig.DenyDate.String()) + } + + var err error + if nil != a.noAuthConfig.TempKey && "" != *a.noAuthConfig.TempKey { + err = a.serveConnect(*a.noAuthConfig.TempKey) + } else { + err = a.serveRegistration() + } + if nil != err { + return err + } + + err = nil +ListenLoop: + for { + select { + case <-a.ShutdownChan: + err = errors.New("Shutting down") + break ListenLoop + case <-a.acceptedChan: + break ListenLoop + case err = <-a.deniedChan: + break ListenLoop + } + } + + return err +} + +func (a *auth) Shutdown(ctx context.Context) error { + a.ShutdownChan <- true + return nil +} diff --git a/auth/on_notify.go b/auth/on_notify.go index 3d89788..27457e3 100644 --- a/auth/on_notify.go +++ b/auth/on_notify.go @@ -10,25 +10,25 @@ import ( "git.loafle.net/overflow/overflow_probes/config" ) -func (h *authHandlers) onNotify(method string, params []string) { +func (a *auth) onNotify(method string, params []string) { switch method { case module.NoAuthProbeService_AcceptNoAuthProbe: - h.onNoAuthProbeAccept(params) + a.onNoAuthProbeAccept(params) break case module.NoAuthProbeService_DenyNoauthProbe: - h.onNoAuthProbeDeny(params) + a.onNoAuthProbeDeny(params) break } } -func (h *authHandlers) onNoAuthProbeAccept(params []string) { +func (a *auth) onNoAuthProbeAccept(params []string) { var err error probeKey := params[0] - // if lfcc.Exists(h.probeConfigPath) { - // if err = lfcc.Load(&h.probeConfig, h.probeConfigPath); nil != err { - // logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", h.probeConfigPath, err)) + // if lfcc.Exists(a.probeConfigPath) { + // if err = lfcc.Load(&a.probeConfig, a.probeConfigPath); nil != err { + // logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", a.probeConfigPath, err)) // } // } @@ -36,21 +36,21 @@ func (h *authHandlers) onNoAuthProbeAccept(params []string) { if err = lfcc.Save(*config.CFG, *config.ConfigFilePath, true); nil != err { logging.Logger.Error(fmt.Sprintf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err)) - h.shutdownChan <- true + a.ShutdownChan <- true return } - h.acceptedChan <- true + a.acceptedChan <- true } -func (h *authHandlers) onNoAuthProbeDeny(params []string) { +func (a *auth) onNoAuthProbeDeny(params []string) { n := time.Now() - h.noAuthConfig.DenyDate = &n - if err := lfcc.Save(h.noAuthConfig, h.noAuthConfigPath, true); nil != err { - logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", h.noAuthConfigPath, err)) - h.shutdownChan <- true + a.noAuthConfig.DenyDate = &n + if err := lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err { + logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err)) + a.ShutdownChan <- true return } - h.deniedChan <- fmt.Errorf("This probe have been denied from overFlow") + a.deniedChan <- fmt.Errorf("This probe have been denied from overFlow") } diff --git a/auth/registration.go b/auth/registration.go index 06eee32..0cae0f2 100644 --- a/auth/registration.go +++ b/auth/registration.go @@ -16,7 +16,7 @@ import ( func getRegistHeader() (string, error) { var err error nap := module.NoAuthProbe{ - APIKey: *config.CFG.Central.APIKey, + APIKey: config.CFG.Central.APIKey, } var nad *module.NoAuthProbeDescription if nad, err = getDescription(); nil != err { diff --git a/central/api/module/crawler.go b/central/api/module/crawler.go new file mode 100644 index 0000000..7f63874 --- /dev/null +++ b/central/api/module/crawler.go @@ -0,0 +1,7 @@ +package module + +const ( + CrawlerService_Install = "CrawlerService.install" + CrawlerService_Uninstall = "CrawlerService.uninstall" + CrawlerService_Update = "CrawlerService.update" +) diff --git a/central/api/module/discovery.go b/central/api/module/discovery.go new file mode 100644 index 0000000..8ceb969 --- /dev/null +++ b/central/api/module/discovery.go @@ -0,0 +1,6 @@ +package module + +const ( + DiscoveryService_Start = "DiscoveryService.start" + DiscoveryService_Stop = "DiscoveryService.stop" +) diff --git a/central/api/module/log.go b/central/api/module/log.go new file mode 100644 index 0000000..7d5780c --- /dev/null +++ b/central/api/module/log.go @@ -0,0 +1,5 @@ +package module + +const ( + LogService_Send = "LogService.send" +) diff --git a/central/api/module/probe.go b/central/api/module/probe.go index d490db1..b1c2bf2 100644 --- a/central/api/module/probe.go +++ b/central/api/module/probe.go @@ -3,7 +3,14 @@ package module import "time" const ( - ProbeHeader_ProbeKey = "overFlow-Probe-Key" + ProbeHeader_ProbeKey = "overFlow-Probe-Key" + ProbeHeader_Probe_EncryptionKey = "overFlow-Probe-EncryptionKey" +) + +const ( + ProbeService_Started = "ProbeService.started" + ProbeService_Stopped = "ProbeService.stopped" + ProbeService_Update = "ProbeService.update" ) type Probe struct { diff --git a/central/api/module/sensor.go b/central/api/module/sensor.go new file mode 100644 index 0000000..d22c8c0 --- /dev/null +++ b/central/api/module/sensor.go @@ -0,0 +1,9 @@ +package module + +const ( + SensorService_Start = "SensorService.start" + SensorService_Stop = "SensorService.stop" + SensorService_Add = "SensorService.add" + SensorService_Remove = "SensorService.remove" + SensorService_Update = "SensorService.update" +) diff --git a/central/client/client.go b/central/client/client.go index 0ac44e6..af527b1 100644 --- a/central/client/client.go +++ b/central/client/client.go @@ -11,6 +11,7 @@ import ( "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_probes/central/client/protocol" + "git.loafle.net/overflow/overflow_probes/config" "github.com/gorilla/websocket" ) @@ -51,7 +52,7 @@ func (c *Call) done() { } type Client interface { - Dial(url string, header http.Header, readBufSize int, writeBufSize int) (*http.Response, error) + Dial(url string, header http.Header) (*http.Response, error) Call(method string, args []string, result interface{}) error Notify(method string, args []string) error OnNotify(cb OnNotifyFunc) @@ -83,13 +84,13 @@ func New() Client { return c } -func (c *client) Dial(url string, header http.Header, readBufSize int, writeBufSize int) (*http.Response, error) { +func (c *client) Dial(url string, header http.Header) (*http.Response, error) { var err error var res *http.Response dialer := websocket.Dialer{ - ReadBufferSize: readBufSize, - WriteBufferSize: writeBufSize, + ReadBufferSize: config.CFG.Central.ReadBufferSize, + WriteBufferSize: config.CFG.Central.WriteBufferSize, } if c.conn, res, err = dialer.Dial(url, header); nil != err { diff --git a/collector/collector.go b/collector/collector.go new file mode 100644 index 0000000..49a89b7 --- /dev/null +++ b/collector/collector.go @@ -0,0 +1,38 @@ +package collector + +import ( + "git.loafle.net/overflow/overflow_probes/commons" +) + +const ( + metricEntryPoint = "/metric" +) + +type collector struct { + *commons.Handlers +} + +func New() (CollectorHandler, error) { + var err error + + c := &collector{} + c.Handlers = commons.NewHandlers() + + return c, nil +} + +func (c *collector) Start() error { + return nil +} +func (c *collector) Stop() error { + return nil +} +func (c *collector) Add() error { + return nil +} +func (c *collector) Remove() error { + return nil +} +func (c *collector) Update() error { + return nil +} diff --git a/collector/collector_handler.go b/collector/collector_handler.go new file mode 100644 index 0000000..d316b0c --- /dev/null +++ b/collector/collector_handler.go @@ -0,0 +1,37 @@ +package collector + +import ( + "context" + "errors" + + "git.loafle.net/overflow/overflow_probes/commons" +) + +type CollectorHandler interface { + commons.Handler + Start() error + Stop() error + Add() error + Remove() error + Update() error +} + +func (c *collector) Serve() error { + + var err error +ListenLoop: + for { + select { + case <-c.ShutdownChan: + err = errors.New("Shutting down") + break ListenLoop + } + } + + return err +} + +func (c *collector) Shutdown(ctx context.Context) error { + c.ShutdownChan <- true + return nil +} diff --git a/commons/handler.go b/commons/handler.go index 91cb5fd..dbb3b7e 100644 --- a/commons/handler.go +++ b/commons/handler.go @@ -1,8 +1,42 @@ package commons -import "context" +import ( + "context" + "net/http" + + "git.loafle.net/overflow/overflow_probes/central/api/module" + "git.loafle.net/overflow/overflow_probes/central/client" + "git.loafle.net/overflow/overflow_probes/config" +) type Handler interface { Serve() error Shutdown(ctx context.Context) error } + +type Handlers struct { + ShutdownChan chan bool + IsRunning bool +} + +func NewHandlers() *Handlers { + h := &Handlers{ + ShutdownChan: make(chan bool), + IsRunning: false, + } + + return h +} + +func (h *Handlers) ConnectToCentralAsProbe(c client.Client, entryURL string) (*http.Response, error) { + header := http.Header{} + header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key} + + var res *http.Response + var err error + if res, err = c.Dial(entryURL, header); nil != err { + return nil, err + } + + return res, nil +} diff --git a/config.json b/config.json index 2c9ba6f..d1123d1 100644 --- a/config.json +++ b/config.json @@ -1,10 +1,11 @@ { "central": { "url": "ws://127.0.0.1:19190", - "apikey": "52abd6fd57e511e7ac52080027658d13" + "apikey": "52abd6fd57e511e7ac52080027658d13", + "readBufferSize": 8192, + "writeBufferSize": 8192 }, - "probe": { "key": "" } -} \ No newline at end of file +} diff --git a/config/config.go b/config/config.go index f0a2528..e536846 100644 --- a/config/config.go +++ b/config/config.go @@ -4,9 +4,12 @@ const ( ConfigFileName = "config.json" ) -var ConfigDir *string -var ConfigFilePath *string -var CFG *Config +var ( + ConfigDir *string + ConfigFilePath *string + CFG *Config + EncryptionKey *string +) type Config struct { Central CentralConfig `json:"central" yaml:"central" toml:"central"` @@ -14,8 +17,10 @@ type Config struct { } type CentralConfig struct { - URL *string `json:"url" yaml:"url" toml:"url"` - APIKey *string `json:"apiKey" yaml:"apiKey" toml:"apiKey"` + URL string `required:"true" json:"url" yaml:"url" toml:"url"` + APIKey string `required:"true" json:"apiKey" yaml:"apiKey" toml:"apiKey"` + ReadBufferSize int `default:"8192" json:"readBufferSize" yaml:"readBufferSize" toml:"readBufferSize"` + WriteBufferSize int `default:"8192" json:"writeBufferSize" yaml:"writeBufferSize" toml:"writeBufferSize"` } type ProbeConfig struct { diff --git a/main.go b/main.go index 29ff9f5..4cafdf9 100644 --- a/main.go +++ b/main.go @@ -16,7 +16,6 @@ import ( "git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/probe" - "github.com/takama/daemon" ) /* @@ -31,54 +30,11 @@ import ( */ -const ( - // name of the service - serviceName = "Probe" - serviceDescription = "Probe Service of overFlow" -) - -type daemonHandler struct { - daemon.Daemon -} - -// Manage by daemon commands or run the daemon -// cmd install -config=./path -// cmd install -// cmd remove -// cmd start -// cmd stop -// cmd status -// cmd -config=./path -// cmd -func (d *daemonHandler) Manage() (isRunning bool, status string, err error) { - - isRunning = true - - if nil != daemonCommand { - switch *daemonCommand { - case "install": - var runArgs = []string{} - runArgs = append(runArgs, fmt.Sprintf("-configDir=%s", *configDir)) - - isRunning = false - status, err = d.Install(runArgs...) - case "remove": - isRunning = false - status, err = d.Remove() - case "start": - isRunning = false - status, err = d.Start() - case "stop": - isRunning = false - status, err = d.Stop() - case "status": - isRunning = false - status, err = d.Status() - } - } - - return -} +// const ( +// // name of the service +// serviceName = "Probe" +// serviceDescription = "Probe Service of overFlow" +// ) var ( daemonCommand *string @@ -86,19 +42,11 @@ var ( ) func init() { - flag.Usage = func() { - fmt.Printf("Usage of %s\n", os.Args[0]) - fmt.Printf(" [install | remove | start | stop | status]\n") - flag.PrintDefaults() - } - - if len(os.Args) > 1 { - command := os.Args[1] - switch command { - case "install", "remove", "start", "stop", "status": - *daemonCommand = command - } - } + // flag.Usage = func() { + // fmt.Printf("Usage of %s\n", os.Args[0]) + // fmt.Printf(" [install | remove | start | stop | status]\n") + // flag.PrintDefaults() + // } configDir = flag.String("config-dir", ".", "The directory of config") flag.Parse() @@ -107,10 +55,7 @@ func init() { func main() { var err error - var srv daemon.Daemon - var status string var handler commons.Handler - isRunning := true defer logging.Logger.Sync() @@ -131,32 +76,10 @@ func main() { logging.Logger.Panic(fmt.Sprintf("Probe: config is not valid error[%v]", err)) } - if nil == config.CFG.Central.APIKey { - logging.Logger.Panic("Probe: APIKey is required") - } - if nil == config.CFG.Central.URL { - logging.Logger.Panic("Probe: URL of overFlow Central is required") - } - - if srv, err = daemon.New(serviceName, serviceDescription); nil != err { - logging.Logger.Panic(fmt.Sprintf("Probe: %v", err)) - } - - s := &daemonHandler{srv} - if isRunning, status, err = s.Manage(); nil != err { - logging.Logger.Error(fmt.Sprintf("Probe: status[%s] error: %v", status, err)) - os.Exit(1) - } - - if !isRunning { - logging.Logger.Info(fmt.Sprintf("Probe: status[%s]", status)) - os.Exit(0) - } - go func() { if handler, err = auth.New(); nil != err { logging.Logger.Error(fmt.Sprintf("Auth: error: %v", err)) - os.Exit(1) + return } if err := handler.Serve(); err != nil { logging.Logger.Error(fmt.Sprintf("Auth: Stopped[%v]", err)) @@ -189,31 +112,9 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := handler.Shutdown(ctx); err != nil { - logging.Logger.Error(fmt.Sprintf("Probe: status[%s] error: %v", status, err)) + logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) } - // // loop work cycle with accept connections or interrupt - // // by system signal - // ListenLoop: - // for { - // select { - // case s := <-interrupt: - // logging.Logger.Info(fmt.Sprintf("Probe: signal[%v]", s)) - // ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - // defer cancel() - // if err := p.Shutdown(ctx); err != nil { - // logging.Logger.Error(fmt.Sprintf("Probe: status[%s] error: %v", status, err)) - // } - - // if s == os.Interrupt { - // logging.Logger.Info("Probe was interruped by system signal") - // } else { - // logging.Logger.Info("Probe was killed") - // } - // break ListenLoop - // } - // } - } const ( diff --git a/probe/on_notify.go b/probe/on_notify.go index b18fb34..d6c5c31 100644 --- a/probe/on_notify.go +++ b/probe/on_notify.go @@ -1,15 +1,56 @@ package probe -import "git.loafle.net/overflow/overflow_probes/central/api/module" +import ( + "fmt" + + "git.loafle.net/commons_go/logging" + "git.loafle.net/overflow/overflow_probes/central/api/module" +) func (p *probe) onNotify(method string, params []string) { + var err error switch method { - case module.NoAuthProbeService_AcceptNoAuthProbe: + case module.CrawlerService_Install: break - case module.NoAuthProbeService_DenyNoauthProbe: + case module.CrawlerService_Uninstall: + + break + case module.CrawlerService_Update: + + break + case module.SensorService_Start: + err = p.collector.Start() + + break + case module.SensorService_Stop: + + break + case module.SensorService_Add: + + break + case module.SensorService_Remove: + + break + case module.SensorService_Update: + + break + case module.ProbeService_Update: + + break + case module.LogService_Send: + + break + case module.DiscoveryService_Start: + + break + case module.DiscoveryService_Stop: break } + if nil != err { + logging.Logger.Error(fmt.Sprintf("Probe notify error: %v", err)) + } + } diff --git a/probe/probe.go b/probe/probe.go index d3f8671..9e40825 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -1,96 +1,84 @@ package probe import ( - "context" - "errors" "fmt" "net/http" "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_probes/central/api/module" "git.loafle.net/overflow/overflow_probes/central/client" + "git.loafle.net/overflow/overflow_probes/collector" "git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/config" opuu "git.loafle.net/overflow/overflow_probes/util/url" ) const ( - probeEntryPoint = "/probe" - metricsEntryPoint = "/metrics" - fileEntryPoint = "/file" + probeEntryPoint = "/probe" + fileEntryPoint = "/file" ) -type Probe interface { - commons.Handler -} - type probe struct { - probeEntryURL string - metricsEntryURL string + *commons.Handlers + probeEntryURL string + fileEntryURL string + metricEntryURL string - probeClient client.Client - metricsClient client.Client + probeClient client.Client + fileClient client.Client + metricClient client.Client - shutdownChan chan bool + collector collector.CollectorHandler } -func New() (Probe, error) { - p := &probe{ - shutdownChan: make(chan bool), - } +func New() (ProbeHandler, error) { + p := &probe{} + p.Handlers = commons.NewHandlers() + var err error - if p.probeEntryURL, err = opuu.Join(*config.CFG.Central.URL, probeEntryPoint); nil != err { + if p.probeEntryURL, err = opuu.Join(config.CFG.Central.URL, probeEntryPoint); nil != err { return nil, err } - if p.metricsEntryURL, err = opuu.Join(*config.CFG.Central.URL, metricsEntryPoint); nil != err { + if p.fileEntryURL, err = opuu.Join(config.CFG.Central.URL, fileEntryPoint); nil != err { + return nil, err + } + if c.metricEntryURL, err = opuu.Join(config.CFG.Central.URL, metricEntryPoint); nil != err { + return nil, err + } + + p.probeClient = client.New() + p.fileClient = client.New() + c.metricClient = client.New() + + if p.collector, err = collector.New(); nil != err { return nil, err } return p, nil } -func (p *probe) Serve() error { +func (p *probe) connectToCentralProbe() error { var err error - - if err = p.connectToCentral(); nil != err { - return err - } - - // ListenLoop: - for { - select { - case <-p.shutdownChan: - return errors.New("Shutting down") - } - } - - return nil -} - -func (p *probe) Shutdown(ctx context.Context) error { - p.shutdownChan <- true - return nil -} - -func (p *probe) connectToCentral() error { - header := http.Header{} - header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key} - var res *http.Response - var err error - p.probeClient = client.New() - if res, err = p.probeClient.Dial(p.probeEntryURL, header, 4096, 4096); nil != err { + if res, err = p.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err { return err } - logging.Logger.Debug(fmt.Sprintf("Probe: Connect Probe HTTP Status[%s]", res.Status)) + + encryptionKey := res.Header.Get(module.ProbeHeader_Probe_EncryptionKey) + config.EncryptionKey = &encryptionKey + p.probeClient.OnNotify(p.onNotify) - p.metricsClient = client.New() - if res, err = p.metricsClient.Dial(p.metricsEntryURL, header, 4096, 4096); nil != err { + if _, err = p.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err { return err } - logging.Logger.Debug(fmt.Sprintf("Probe: Connect Metrics HTTP Status[%s]", res.Status)) return nil } + +func (p *probe) sendNotifyToCentral(method string, params ...string) { + if err := p.probeClient.Notify(method, params); nil != err { + logging.Logger.Error(fmt.Sprintf("Probe notify error: [%v]", err)) + } +} diff --git a/probe/probe_handler.go b/probe/probe_handler.go new file mode 100644 index 0000000..07a2b61 --- /dev/null +++ b/probe/probe_handler.go @@ -0,0 +1,44 @@ +package probe + +import ( + "context" + "errors" + + "git.loafle.net/overflow/overflow_probes/central/api/module" + "git.loafle.net/overflow/overflow_probes/commons" +) + +type ProbeHandler interface { + commons.Handler +} + +func (p *probe) Serve() error { + if err := p.connectToCentralProbe(); nil != err { + return err + } + + if err := p.collector.Serve(); nil != err { + return err + } + + p.sendNotifyToCentral(module.ProbeService_Started) + + var err error +ListenLoop: + for { + select { + case <-p.ShutdownChan: + err = errors.New("Shutting down") + break ListenLoop + } + } + + p.sendNotifyToCentral(module.ProbeService_Stopped) + + return err +} + +func (p *probe) Shutdown(ctx context.Context) error { + p.ShutdownChan <- true + return nil +}