This commit is contained in:
crusader 2017-09-29 21:30:27 +09:00
parent 0ece619ffb
commit 1b8ee93026
19 changed files with 395 additions and 262 deletions

View File

@ -1,8 +1,6 @@
package auth package auth
import ( import (
"context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"path" "path"
@ -10,7 +8,6 @@ import (
lfcc "git.loafle.net/commons_go/config" lfcc "git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module" "git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/central/client" "git.loafle.net/overflow/overflow_probes/central/client"
"git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
@ -21,18 +18,14 @@ const (
noAuthEntryPoint = "/auth" noAuthEntryPoint = "/auth"
) )
type AuthHandler interface { type auth struct {
commons.Handler *commons.Handlers
}
type authHandlers struct {
c client.Client c client.Client
entryURL string entryURL string
noAuthConfigPath string noAuthConfigPath string
noAuthConfig config.NoAuthProbeConfig noAuthConfig config.NoAuthProbeConfig
shutdownChan chan bool
acceptedChan chan bool acceptedChan chan bool
deniedChan chan error deniedChan chan error
} }
@ -40,71 +33,32 @@ type authHandlers struct {
func New() (AuthHandler, error) { func New() (AuthHandler, error) {
var err error var err error
h := &authHandlers{ a := &auth{
shutdownChan: make(chan bool),
acceptedChan: make(chan bool), acceptedChan: make(chan bool),
deniedChan: make(chan error), deniedChan: make(chan error),
} }
a.Handlers = commons.NewHandlers()
if h.entryURL, err = opuu.Join(*config.CFG.Central.URL, noAuthEntryPoint); nil != err { if a.entryURL, err = opuu.Join(config.CFG.Central.URL, noAuthEntryPoint); nil != err {
return nil, err return nil, err
} }
h.noAuthConfigPath = path.Join(*config.ConfigDir, config.NoAuthProbeConfigFileName) a.noAuthConfigPath = path.Join(*config.ConfigDir, config.NoAuthProbeConfigFileName)
conf := lfcc.New() conf := lfcc.New()
if lfcc.Exists(h.noAuthConfigPath) { if lfcc.Exists(a.noAuthConfigPath) {
if err = conf.Load(&h.noAuthConfig, h.noAuthConfigPath); nil != err { if err = conf.Load(&a.noAuthConfig, a.noAuthConfigPath); nil != err {
return nil, fmt.Errorf("Auth: Loading of NoAuth config file[%s] failed error[%v]", h.noAuthConfigPath, err) return nil, fmt.Errorf("Auth: Loading of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err)
} }
} }
return h, nil a.c = client.New()
a.c.OnNotify(a.onNotify)
return a, nil
} }
func (h *authHandlers) Serve() error { func (a *auth) serveRegistration() error {
if nil != config.CFG.Probe.Key || "" != *config.CFG.Probe.Key {
return nil
}
if nil != h.noAuthConfig.DenyDate {
return fmt.Errorf("Cannot start because this probe have been denied from overFlow[%s]", h.noAuthConfig.DenyDate.String())
}
h.c = client.New()
h.c.OnNotify(h.onNotify)
var err error
if nil != h.noAuthConfig.TempKey && "" != *h.noAuthConfig.TempKey {
err = h.serveConnect(*h.noAuthConfig.TempKey)
} else {
err = h.serveRegistration()
}
if nil != err {
return err
}
ListenLoop:
for {
select {
case <-h.shutdownChan:
return errors.New("Shutting down")
case <-h.acceptedChan:
break ListenLoop
case err := <-h.deniedChan:
return err
}
}
return nil
}
func (h *authHandlers) Shutdown(ctx context.Context) error {
h.shutdownChan <- true
return nil
}
func (h *authHandlers) serveRegistration() error {
var err error var err error
header := http.Header{} header := http.Header{}
@ -115,26 +69,26 @@ func (h *authHandlers) serveRegistration() error {
header[module.NoAuthProbeHeader_NoAuthRegist] = []string{enc} header[module.NoAuthProbeHeader_NoAuthRegist] = []string{enc}
var res *http.Response var res *http.Response
if res, err = h.c.Dial(h.entryURL, header, 4096, 4096); nil != err { if res, err = a.c.Dial(a.entryURL, header); nil != err {
return err return err
} }
tempKey := res.Header.Get(module.NoAuthProbeHeader_SetNoAuthID) tempKey := res.Header.Get(module.NoAuthProbeHeader_SetNoAuthID)
h.noAuthConfig.TempKey = &tempKey a.noAuthConfig.TempKey = &tempKey
if err = lfcc.Save(h.noAuthConfig, h.noAuthConfigPath, true); nil != err { if err = lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err {
return err return err
} }
return nil return nil
} }
func (h *authHandlers) serveConnect(noAuthTempKey string) error { func (a *auth) serveConnect(noAuthTempKey string) error {
var err error var err error
header := http.Header{} header := http.Header{}
header[module.NoAuthProbeHeader_NoAuthID] = []string{noAuthTempKey} header[module.NoAuthProbeHeader_NoAuthID] = []string{noAuthTempKey}
var res *http.Response var res *http.Response
if res, err = h.c.Dial(h.entryURL, header, 4096, 4096); nil != err { if res, err = a.c.Dial(a.entryURL, header); nil != err {
return err return err
} }

55
auth/auth_handler.go Normal file
View File

@ -0,0 +1,55 @@
package auth
import (
"context"
"errors"
"fmt"
"git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config"
)
type AuthHandler interface {
commons.Handler
}
func (a *auth) Serve() error {
if nil != config.CFG.Probe.Key || "" != *config.CFG.Probe.Key {
return nil
}
if nil != a.noAuthConfig.DenyDate {
return fmt.Errorf("Cannot start because this probe have been denied from overFlow[%s]", a.noAuthConfig.DenyDate.String())
}
var err error
if nil != a.noAuthConfig.TempKey && "" != *a.noAuthConfig.TempKey {
err = a.serveConnect(*a.noAuthConfig.TempKey)
} else {
err = a.serveRegistration()
}
if nil != err {
return err
}
err = nil
ListenLoop:
for {
select {
case <-a.ShutdownChan:
err = errors.New("Shutting down")
break ListenLoop
case <-a.acceptedChan:
break ListenLoop
case err = <-a.deniedChan:
break ListenLoop
}
}
return err
}
func (a *auth) Shutdown(ctx context.Context) error {
a.ShutdownChan <- true
return nil
}

View File

@ -10,25 +10,25 @@ import (
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
) )
func (h *authHandlers) onNotify(method string, params []string) { func (a *auth) onNotify(method string, params []string) {
switch method { switch method {
case module.NoAuthProbeService_AcceptNoAuthProbe: case module.NoAuthProbeService_AcceptNoAuthProbe:
h.onNoAuthProbeAccept(params) a.onNoAuthProbeAccept(params)
break break
case module.NoAuthProbeService_DenyNoauthProbe: case module.NoAuthProbeService_DenyNoauthProbe:
h.onNoAuthProbeDeny(params) a.onNoAuthProbeDeny(params)
break break
} }
} }
func (h *authHandlers) onNoAuthProbeAccept(params []string) { func (a *auth) onNoAuthProbeAccept(params []string) {
var err error var err error
probeKey := params[0] probeKey := params[0]
// if lfcc.Exists(h.probeConfigPath) { // if lfcc.Exists(a.probeConfigPath) {
// if err = lfcc.Load(&h.probeConfig, h.probeConfigPath); nil != err { // if err = lfcc.Load(&a.probeConfig, a.probeConfigPath); nil != err {
// logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", h.probeConfigPath, err)) // logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", a.probeConfigPath, err))
// } // }
// } // }
@ -36,21 +36,21 @@ func (h *authHandlers) onNoAuthProbeAccept(params []string) {
if err = lfcc.Save(*config.CFG, *config.ConfigFilePath, true); nil != err { if err = lfcc.Save(*config.CFG, *config.ConfigFilePath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err)) logging.Logger.Error(fmt.Sprintf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err))
h.shutdownChan <- true a.ShutdownChan <- true
return return
} }
h.acceptedChan <- true a.acceptedChan <- true
} }
func (h *authHandlers) onNoAuthProbeDeny(params []string) { func (a *auth) onNoAuthProbeDeny(params []string) {
n := time.Now() n := time.Now()
h.noAuthConfig.DenyDate = &n a.noAuthConfig.DenyDate = &n
if err := lfcc.Save(h.noAuthConfig, h.noAuthConfigPath, true); nil != err { if err := lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", h.noAuthConfigPath, err)) logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err))
h.shutdownChan <- true a.ShutdownChan <- true
return return
} }
h.deniedChan <- fmt.Errorf("This probe have been denied from overFlow") a.deniedChan <- fmt.Errorf("This probe have been denied from overFlow")
} }

View File

@ -16,7 +16,7 @@ import (
func getRegistHeader() (string, error) { func getRegistHeader() (string, error) {
var err error var err error
nap := module.NoAuthProbe{ nap := module.NoAuthProbe{
APIKey: *config.CFG.Central.APIKey, APIKey: config.CFG.Central.APIKey,
} }
var nad *module.NoAuthProbeDescription var nad *module.NoAuthProbeDescription
if nad, err = getDescription(); nil != err { if nad, err = getDescription(); nil != err {

View File

@ -0,0 +1,7 @@
package module
const (
CrawlerService_Install = "CrawlerService.install"
CrawlerService_Uninstall = "CrawlerService.uninstall"
CrawlerService_Update = "CrawlerService.update"
)

View File

@ -0,0 +1,6 @@
package module
const (
DiscoveryService_Start = "DiscoveryService.start"
DiscoveryService_Stop = "DiscoveryService.stop"
)

View File

@ -0,0 +1,5 @@
package module
const (
LogService_Send = "LogService.send"
)

View File

@ -3,7 +3,14 @@ package module
import "time" import "time"
const ( const (
ProbeHeader_ProbeKey = "overFlow-Probe-Key" ProbeHeader_ProbeKey = "overFlow-Probe-Key"
ProbeHeader_Probe_EncryptionKey = "overFlow-Probe-EncryptionKey"
)
const (
ProbeService_Started = "ProbeService.started"
ProbeService_Stopped = "ProbeService.stopped"
ProbeService_Update = "ProbeService.update"
) )
type Probe struct { type Probe struct {

View File

@ -0,0 +1,9 @@
package module
const (
SensorService_Start = "SensorService.start"
SensorService_Stop = "SensorService.stop"
SensorService_Add = "SensorService.add"
SensorService_Remove = "SensorService.remove"
SensorService_Update = "SensorService.update"
)

View File

@ -11,6 +11,7 @@ import (
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/client/protocol" "git.loafle.net/overflow/overflow_probes/central/client/protocol"
"git.loafle.net/overflow/overflow_probes/config"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -51,7 +52,7 @@ func (c *Call) done() {
} }
type Client interface { type Client interface {
Dial(url string, header http.Header, readBufSize int, writeBufSize int) (*http.Response, error) Dial(url string, header http.Header) (*http.Response, error)
Call(method string, args []string, result interface{}) error Call(method string, args []string, result interface{}) error
Notify(method string, args []string) error Notify(method string, args []string) error
OnNotify(cb OnNotifyFunc) OnNotify(cb OnNotifyFunc)
@ -83,13 +84,13 @@ func New() Client {
return c return c
} }
func (c *client) Dial(url string, header http.Header, readBufSize int, writeBufSize int) (*http.Response, error) { func (c *client) Dial(url string, header http.Header) (*http.Response, error) {
var err error var err error
var res *http.Response var res *http.Response
dialer := websocket.Dialer{ dialer := websocket.Dialer{
ReadBufferSize: readBufSize, ReadBufferSize: config.CFG.Central.ReadBufferSize,
WriteBufferSize: writeBufSize, WriteBufferSize: config.CFG.Central.WriteBufferSize,
} }
if c.conn, res, err = dialer.Dial(url, header); nil != err { if c.conn, res, err = dialer.Dial(url, header); nil != err {

38
collector/collector.go Normal file
View File

@ -0,0 +1,38 @@
package collector
import (
"git.loafle.net/overflow/overflow_probes/commons"
)
const (
metricEntryPoint = "/metric"
)
type collector struct {
*commons.Handlers
}
func New() (CollectorHandler, error) {
var err error
c := &collector{}
c.Handlers = commons.NewHandlers()
return c, nil
}
func (c *collector) Start() error {
return nil
}
func (c *collector) Stop() error {
return nil
}
func (c *collector) Add() error {
return nil
}
func (c *collector) Remove() error {
return nil
}
func (c *collector) Update() error {
return nil
}

View File

@ -0,0 +1,37 @@
package collector
import (
"context"
"errors"
"git.loafle.net/overflow/overflow_probes/commons"
)
type CollectorHandler interface {
commons.Handler
Start() error
Stop() error
Add() error
Remove() error
Update() error
}
func (c *collector) Serve() error {
var err error
ListenLoop:
for {
select {
case <-c.ShutdownChan:
err = errors.New("Shutting down")
break ListenLoop
}
}
return err
}
func (c *collector) Shutdown(ctx context.Context) error {
c.ShutdownChan <- true
return nil
}

View File

@ -1,8 +1,42 @@
package commons package commons
import "context" import (
"context"
"net/http"
"git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/central/client"
"git.loafle.net/overflow/overflow_probes/config"
)
type Handler interface { type Handler interface {
Serve() error Serve() error
Shutdown(ctx context.Context) error Shutdown(ctx context.Context) error
} }
type Handlers struct {
ShutdownChan chan bool
IsRunning bool
}
func NewHandlers() *Handlers {
h := &Handlers{
ShutdownChan: make(chan bool),
IsRunning: false,
}
return h
}
func (h *Handlers) ConnectToCentralAsProbe(c client.Client, entryURL string) (*http.Response, error) {
header := http.Header{}
header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key}
var res *http.Response
var err error
if res, err = c.Dial(entryURL, header); nil != err {
return nil, err
}
return res, nil
}

View File

@ -1,10 +1,11 @@
{ {
"central": { "central": {
"url": "ws://127.0.0.1:19190", "url": "ws://127.0.0.1:19190",
"apikey": "52abd6fd57e511e7ac52080027658d13" "apikey": "52abd6fd57e511e7ac52080027658d13",
"readBufferSize": 8192,
"writeBufferSize": 8192
}, },
"probe": { "probe": {
"key": "" "key": ""
} }
} }

View File

@ -4,9 +4,12 @@ const (
ConfigFileName = "config.json" ConfigFileName = "config.json"
) )
var ConfigDir *string var (
var ConfigFilePath *string ConfigDir *string
var CFG *Config ConfigFilePath *string
CFG *Config
EncryptionKey *string
)
type Config struct { type Config struct {
Central CentralConfig `json:"central" yaml:"central" toml:"central"` Central CentralConfig `json:"central" yaml:"central" toml:"central"`
@ -14,8 +17,10 @@ type Config struct {
} }
type CentralConfig struct { type CentralConfig struct {
URL *string `json:"url" yaml:"url" toml:"url"` URL string `required:"true" json:"url" yaml:"url" toml:"url"`
APIKey *string `json:"apiKey" yaml:"apiKey" toml:"apiKey"` APIKey string `required:"true" json:"apiKey" yaml:"apiKey" toml:"apiKey"`
ReadBufferSize int `default:"8192" json:"readBufferSize" yaml:"readBufferSize" toml:"readBufferSize"`
WriteBufferSize int `default:"8192" json:"writeBufferSize" yaml:"writeBufferSize" toml:"writeBufferSize"`
} }
type ProbeConfig struct { type ProbeConfig struct {

123
main.go
View File

@ -16,7 +16,6 @@ import (
"git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
"git.loafle.net/overflow/overflow_probes/probe" "git.loafle.net/overflow/overflow_probes/probe"
"github.com/takama/daemon"
) )
/* /*
@ -31,54 +30,11 @@ import (
*/ */
const ( // const (
// name of the service // // name of the service
serviceName = "Probe" // serviceName = "Probe"
serviceDescription = "Probe Service of overFlow" // serviceDescription = "Probe Service of overFlow"
) // )
type daemonHandler struct {
daemon.Daemon
}
// Manage by daemon commands or run the daemon
// cmd install -config=./path
// cmd install
// cmd remove
// cmd start
// cmd stop
// cmd status
// cmd -config=./path
// cmd
func (d *daemonHandler) Manage() (isRunning bool, status string, err error) {
isRunning = true
if nil != daemonCommand {
switch *daemonCommand {
case "install":
var runArgs = []string{}
runArgs = append(runArgs, fmt.Sprintf("-configDir=%s", *configDir))
isRunning = false
status, err = d.Install(runArgs...)
case "remove":
isRunning = false
status, err = d.Remove()
case "start":
isRunning = false
status, err = d.Start()
case "stop":
isRunning = false
status, err = d.Stop()
case "status":
isRunning = false
status, err = d.Status()
}
}
return
}
var ( var (
daemonCommand *string daemonCommand *string
@ -86,19 +42,11 @@ var (
) )
func init() { func init() {
flag.Usage = func() { // flag.Usage = func() {
fmt.Printf("Usage of %s\n", os.Args[0]) // fmt.Printf("Usage of %s\n", os.Args[0])
fmt.Printf(" [install | remove | start | stop | status]\n") // fmt.Printf(" [install | remove | start | stop | status]\n")
flag.PrintDefaults() // flag.PrintDefaults()
} // }
if len(os.Args) > 1 {
command := os.Args[1]
switch command {
case "install", "remove", "start", "stop", "status":
*daemonCommand = command
}
}
configDir = flag.String("config-dir", ".", "The directory of config") configDir = flag.String("config-dir", ".", "The directory of config")
flag.Parse() flag.Parse()
@ -107,10 +55,7 @@ func init() {
func main() { func main() {
var err error var err error
var srv daemon.Daemon
var status string
var handler commons.Handler var handler commons.Handler
isRunning := true
defer logging.Logger.Sync() defer logging.Logger.Sync()
@ -131,32 +76,10 @@ func main() {
logging.Logger.Panic(fmt.Sprintf("Probe: config is not valid error[%v]", err)) logging.Logger.Panic(fmt.Sprintf("Probe: config is not valid error[%v]", err))
} }
if nil == config.CFG.Central.APIKey {
logging.Logger.Panic("Probe: APIKey is required")
}
if nil == config.CFG.Central.URL {
logging.Logger.Panic("Probe: URL of overFlow Central is required")
}
if srv, err = daemon.New(serviceName, serviceDescription); nil != err {
logging.Logger.Panic(fmt.Sprintf("Probe: %v", err))
}
s := &daemonHandler{srv}
if isRunning, status, err = s.Manage(); nil != err {
logging.Logger.Error(fmt.Sprintf("Probe: status[%s] error: %v", status, err))
os.Exit(1)
}
if !isRunning {
logging.Logger.Info(fmt.Sprintf("Probe: status[%s]", status))
os.Exit(0)
}
go func() { go func() {
if handler, err = auth.New(); nil != err { if handler, err = auth.New(); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: error: %v", err)) logging.Logger.Error(fmt.Sprintf("Auth: error: %v", err))
os.Exit(1) return
} }
if err := handler.Serve(); err != nil { if err := handler.Serve(); err != nil {
logging.Logger.Error(fmt.Sprintf("Auth: Stopped[%v]", err)) logging.Logger.Error(fmt.Sprintf("Auth: Stopped[%v]", err))
@ -189,31 +112,9 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
if err := handler.Shutdown(ctx); err != nil { if err := handler.Shutdown(ctx); err != nil {
logging.Logger.Error(fmt.Sprintf("Probe: status[%s] error: %v", status, err)) logging.Logger.Error(fmt.Sprintf("Probe error: %v", err))
} }
// // loop work cycle with accept connections or interrupt
// // by system signal
// ListenLoop:
// for {
// select {
// case s := <-interrupt:
// logging.Logger.Info(fmt.Sprintf("Probe: signal[%v]", s))
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// defer cancel()
// if err := p.Shutdown(ctx); err != nil {
// logging.Logger.Error(fmt.Sprintf("Probe: status[%s] error: %v", status, err))
// }
// if s == os.Interrupt {
// logging.Logger.Info("Probe was interruped by system signal")
// } else {
// logging.Logger.Info("Probe was killed")
// }
// break ListenLoop
// }
// }
} }
const ( const (

View File

@ -1,15 +1,56 @@
package probe package probe
import "git.loafle.net/overflow/overflow_probes/central/api/module" import (
"fmt"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module"
)
func (p *probe) onNotify(method string, params []string) { func (p *probe) onNotify(method string, params []string) {
var err error
switch method { switch method {
case module.NoAuthProbeService_AcceptNoAuthProbe: case module.CrawlerService_Install:
break break
case module.NoAuthProbeService_DenyNoauthProbe: case module.CrawlerService_Uninstall:
break
case module.CrawlerService_Update:
break
case module.SensorService_Start:
err = p.collector.Start()
break
case module.SensorService_Stop:
break
case module.SensorService_Add:
break
case module.SensorService_Remove:
break
case module.SensorService_Update:
break
case module.ProbeService_Update:
break
case module.LogService_Send:
break
case module.DiscoveryService_Start:
break
case module.DiscoveryService_Stop:
break break
} }
if nil != err {
logging.Logger.Error(fmt.Sprintf("Probe notify error: %v", err))
}
} }

View File

@ -1,96 +1,84 @@
package probe package probe
import ( import (
"context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module" "git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/central/client" "git.loafle.net/overflow/overflow_probes/central/client"
"git.loafle.net/overflow/overflow_probes/collector"
"git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
opuu "git.loafle.net/overflow/overflow_probes/util/url" opuu "git.loafle.net/overflow/overflow_probes/util/url"
) )
const ( const (
probeEntryPoint = "/probe" probeEntryPoint = "/probe"
metricsEntryPoint = "/metrics" fileEntryPoint = "/file"
fileEntryPoint = "/file"
) )
type Probe interface {
commons.Handler
}
type probe struct { type probe struct {
probeEntryURL string *commons.Handlers
metricsEntryURL string probeEntryURL string
fileEntryURL string
metricEntryURL string
probeClient client.Client probeClient client.Client
metricsClient client.Client fileClient client.Client
metricClient client.Client
shutdownChan chan bool collector collector.CollectorHandler
} }
func New() (Probe, error) { func New() (ProbeHandler, error) {
p := &probe{ p := &probe{}
shutdownChan: make(chan bool), p.Handlers = commons.NewHandlers()
}
var err error var err error
if p.probeEntryURL, err = opuu.Join(*config.CFG.Central.URL, probeEntryPoint); nil != err { if p.probeEntryURL, err = opuu.Join(config.CFG.Central.URL, probeEntryPoint); nil != err {
return nil, err return nil, err
} }
if p.metricsEntryURL, err = opuu.Join(*config.CFG.Central.URL, metricsEntryPoint); nil != err { if p.fileEntryURL, err = opuu.Join(config.CFG.Central.URL, fileEntryPoint); nil != err {
return nil, err
}
if c.metricEntryURL, err = opuu.Join(config.CFG.Central.URL, metricEntryPoint); nil != err {
return nil, err
}
p.probeClient = client.New()
p.fileClient = client.New()
c.metricClient = client.New()
if p.collector, err = collector.New(); nil != err {
return nil, err return nil, err
} }
return p, nil return p, nil
} }
func (p *probe) Serve() error { func (p *probe) connectToCentralProbe() error {
var err error var err error
if err = p.connectToCentral(); nil != err {
return err
}
// ListenLoop:
for {
select {
case <-p.shutdownChan:
return errors.New("Shutting down")
}
}
return nil
}
func (p *probe) Shutdown(ctx context.Context) error {
p.shutdownChan <- true
return nil
}
func (p *probe) connectToCentral() error {
header := http.Header{}
header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key}
var res *http.Response var res *http.Response
var err error if res, err = p.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err {
p.probeClient = client.New()
if res, err = p.probeClient.Dial(p.probeEntryURL, header, 4096, 4096); nil != err {
return err return err
} }
logging.Logger.Debug(fmt.Sprintf("Probe: Connect Probe HTTP Status[%s]", res.Status))
encryptionKey := res.Header.Get(module.ProbeHeader_Probe_EncryptionKey)
config.EncryptionKey = &encryptionKey
p.probeClient.OnNotify(p.onNotify) p.probeClient.OnNotify(p.onNotify)
p.metricsClient = client.New() if _, err = p.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err {
if res, err = p.metricsClient.Dial(p.metricsEntryURL, header, 4096, 4096); nil != err {
return err return err
} }
logging.Logger.Debug(fmt.Sprintf("Probe: Connect Metrics HTTP Status[%s]", res.Status))
return nil return nil
} }
func (p *probe) sendNotifyToCentral(method string, params ...string) {
if err := p.probeClient.Notify(method, params); nil != err {
logging.Logger.Error(fmt.Sprintf("Probe notify error: [%v]", err))
}
}

44
probe/probe_handler.go Normal file
View File

@ -0,0 +1,44 @@
package probe
import (
"context"
"errors"
"git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/commons"
)
type ProbeHandler interface {
commons.Handler
}
func (p *probe) Serve() error {
if err := p.connectToCentralProbe(); nil != err {
return err
}
if err := p.collector.Serve(); nil != err {
return err
}
p.sendNotifyToCentral(module.ProbeService_Started)
var err error
ListenLoop:
for {
select {
case <-p.ShutdownChan:
err = errors.New("Shutting down")
break ListenLoop
}
}
p.sendNotifyToCentral(module.ProbeService_Stopped)
return err
}
func (p *probe) Shutdown(ctx context.Context) error {
p.ShutdownChan <- true
return nil
}