This commit is contained in:
crusader 2017-10-12 19:02:38 +09:00
parent 1b8ee93026
commit caaed69010
14 changed files with 220 additions and 241 deletions

View File

@ -1,6 +1,7 @@
package auth package auth
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"path" "path"
@ -18,26 +19,33 @@ const (
noAuthEntryPoint = "/auth" noAuthEntryPoint = "/auth"
) )
type Auther interface {
commons.EndableStarter
commons.Shutdowner
}
type auth struct { type auth struct {
*commons.Handlers centralClient client.Client
c client.Client entryURL string
entryURL string
noAuthConfigPath string noAuthConfigPath string
noAuthConfig config.NoAuthProbeConfig noAuthConfig config.NoAuthProbeConfig
acceptedChan chan bool endded chan<- error
deniedChan chan error
shutdown chan bool
accepted chan bool
denied chan error
} }
func New() (AuthHandler, error) { func New() (Auther, error) {
var err error var err error
a := &auth{ a := &auth{
acceptedChan: make(chan bool), shutdown: make(chan bool),
deniedChan: make(chan error), accepted: make(chan bool),
denied: make(chan error),
} }
a.Handlers = commons.NewHandlers()
if a.entryURL, err = opuu.Join(config.CFG.Central.URL, noAuthEntryPoint); nil != err { if a.entryURL, err = opuu.Join(config.CFG.Central.URL, noAuthEntryPoint); nil != err {
return nil, err return nil, err
@ -52,12 +60,58 @@ func New() (AuthHandler, error) {
} }
} }
a.c = client.New() a.centralClient = client.New()
a.c.OnNotify(a.onNotify) a.centralClient.OnNotify(a.onNotify)
return a, nil return a, nil
} }
func (a *auth) EndableStart(endded chan<- error) error {
a.endded = endded
return a.start()
}
func (a *auth) start() error {
if nil != config.CFG.Probe.Key || "" != *config.CFG.Probe.Key {
return nil
}
if nil != a.noAuthConfig.DenyDate {
return fmt.Errorf("Cannot start because this probe have been denied from overFlow at %s", a.noAuthConfig.DenyDate.String())
}
var err error
if nil != a.noAuthConfig.TempKey && "" != *a.noAuthConfig.TempKey {
err = a.serveConnect(*a.noAuthConfig.TempKey)
} else {
err = a.serveRegistration()
}
if nil != err {
return err
}
a.listen()
return nil
}
func (a *auth) listen() {
go func() {
for {
select {
case <-a.shutdown:
break
case <-a.accepted:
a.stop(nil)
break
case err := <-a.denied:
a.stop(err)
break
}
}
}()
}
func (a *auth) serveRegistration() error { func (a *auth) serveRegistration() error {
var err error var err error
header := http.Header{} header := http.Header{}
@ -69,7 +123,7 @@ func (a *auth) serveRegistration() error {
header[module.NoAuthProbeHeader_NoAuthRegist] = []string{enc} header[module.NoAuthProbeHeader_NoAuthRegist] = []string{enc}
var res *http.Response var res *http.Response
if res, err = a.c.Dial(a.entryURL, header); nil != err { if res, err = a.centralClient.Dial(a.entryURL, header); nil != err {
return err return err
} }
@ -88,7 +142,7 @@ func (a *auth) serveConnect(noAuthTempKey string) error {
header[module.NoAuthProbeHeader_NoAuthID] = []string{noAuthTempKey} header[module.NoAuthProbeHeader_NoAuthID] = []string{noAuthTempKey}
var res *http.Response var res *http.Response
if res, err = a.c.Dial(a.entryURL, header); nil != err { if res, err = a.centralClient.Dial(a.entryURL, header); nil != err {
return err return err
} }
@ -96,3 +150,28 @@ func (a *auth) serveConnect(noAuthTempKey string) error {
return nil return nil
} }
func (a *auth) Shutdown(ctx context.Context) error {
for {
a.stop(fmt.Errorf("Shutdown"))
select {
case <-ctx.Done():
return ctx.Err()
}
}
}
func (a *auth) stop(err error) {
defer close(a.shutdown)
a.shutdown <- true
close(a.accepted)
close(a.denied)
ctx := context.Background()
if err := a.centralClient.Shutdown(ctx); nil != err {
logging.Logger.Error(fmt.Sprintf("Client of Central: %v", err))
}
a.endded <- err
}

View File

@ -1,55 +0,0 @@
package auth
import (
"context"
"errors"
"fmt"
"git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config"
)
type AuthHandler interface {
commons.Handler
}
func (a *auth) Serve() error {
if nil != config.CFG.Probe.Key || "" != *config.CFG.Probe.Key {
return nil
}
if nil != a.noAuthConfig.DenyDate {
return fmt.Errorf("Cannot start because this probe have been denied from overFlow[%s]", a.noAuthConfig.DenyDate.String())
}
var err error
if nil != a.noAuthConfig.TempKey && "" != *a.noAuthConfig.TempKey {
err = a.serveConnect(*a.noAuthConfig.TempKey)
} else {
err = a.serveRegistration()
}
if nil != err {
return err
}
err = nil
ListenLoop:
for {
select {
case <-a.ShutdownChan:
err = errors.New("Shutting down")
break ListenLoop
case <-a.acceptedChan:
break ListenLoop
case err = <-a.deniedChan:
break ListenLoop
}
}
return err
}
func (a *auth) Shutdown(ctx context.Context) error {
a.ShutdownChan <- true
return nil
}

View File

@ -36,11 +36,10 @@ func (a *auth) onNoAuthProbeAccept(params []string) {
if err = lfcc.Save(*config.CFG, *config.ConfigFilePath, true); nil != err { if err = lfcc.Save(*config.CFG, *config.ConfigFilePath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err)) logging.Logger.Error(fmt.Sprintf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err))
a.ShutdownChan <- true
return return
} }
a.acceptedChan <- true a.accepted <- true
} }
func (a *auth) onNoAuthProbeDeny(params []string) { func (a *auth) onNoAuthProbeDeny(params []string) {
@ -48,9 +47,9 @@ func (a *auth) onNoAuthProbeDeny(params []string) {
a.noAuthConfig.DenyDate = &n a.noAuthConfig.DenyDate = &n
if err := lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err { if err := lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err)) logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err))
a.ShutdownChan <- true
return return
} }
a.deniedChan <- fmt.Errorf("This probe have been denied from overFlow") a.denied <- fmt.Errorf("This probe have been denied from overFlow")
} }

21
central/client/probe.go Normal file
View File

@ -0,0 +1,21 @@
package client
import (
"net/http"
"git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/config"
)
func ConnectToCentralAsProbe(c Client, entryURL string) (*http.Response, error) {
header := http.Header{}
header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key}
var res *http.Response
var err error
if res, err = c.Dial(entryURL, header); nil != err {
return nil, err
}
return res, nil
}

View File

@ -1,22 +1,14 @@
package collector package collector
import ( type Collector interface {
"git.loafle.net/overflow/overflow_probes/commons"
)
const (
metricEntryPoint = "/metric"
)
type collector struct {
*commons.Handlers
} }
func New() (CollectorHandler, error) { type collector struct {
var err error }
func New() (Collector, error) {
c := &collector{} c := &collector{}
c.Handlers = commons.NewHandlers()
return c, nil return c, nil
} }

View File

@ -1,37 +0,0 @@
package collector
import (
"context"
"errors"
"git.loafle.net/overflow/overflow_probes/commons"
)
type CollectorHandler interface {
commons.Handler
Start() error
Stop() error
Add() error
Remove() error
Update() error
}
func (c *collector) Serve() error {
var err error
ListenLoop:
for {
select {
case <-c.ShutdownChan:
err = errors.New("Shutting down")
break ListenLoop
}
}
return err
}
func (c *collector) Shutdown(ctx context.Context) error {
c.ShutdownChan <- true
return nil
}

View File

@ -0,0 +1,5 @@
package commons
type EndableStarter interface {
EndableStart(endded chan<- error) error
}

View File

@ -1,42 +0,0 @@
package commons
import (
"context"
"net/http"
"git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/central/client"
"git.loafle.net/overflow/overflow_probes/config"
)
type Handler interface {
Serve() error
Shutdown(ctx context.Context) error
}
type Handlers struct {
ShutdownChan chan bool
IsRunning bool
}
func NewHandlers() *Handlers {
h := &Handlers{
ShutdownChan: make(chan bool),
IsRunning: false,
}
return h
}
func (h *Handlers) ConnectToCentralAsProbe(c client.Client, entryURL string) (*http.Response, error) {
header := http.Header{}
header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key}
var res *http.Response
var err error
if res, err = c.Dial(entryURL, header); nil != err {
return nil, err
}
return res, nil
}

7
commons/shutdowner.go Normal file
View File

@ -0,0 +1,7 @@
package commons
import "context"
type Shutdowner interface {
Shutdown(ctx context.Context) error
}

5
commons/starter.go Normal file
View File

@ -0,0 +1,5 @@
package commons
type Starter interface {
Start() error
}

36
main.go
View File

@ -10,10 +10,11 @@ import (
"syscall" "syscall"
"time" "time"
"git.loafle.net/overflow/overflow_probes/commons"
lfcc "git.loafle.net/commons_go/config" lfcc "git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/auth" "git.loafle.net/overflow/overflow_probes/auth"
"git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
"git.loafle.net/overflow/overflow_probes/probe" "git.loafle.net/overflow/overflow_probes/probe"
) )
@ -55,16 +56,16 @@ func init() {
func main() { func main() {
var err error var err error
var handler commons.Handler var instance interface{}
defer logging.Logger.Sync() defer logging.Logger.Sync()
printBanner() printBanner()
if dir, err := lfcc.ABSPathify(*configDir); nil != err { if dir, err := lfcc.ABSPathify(*configDir); nil != err {
logging.Logger.Panic(fmt.Sprintf("Probe: config path[%s] is not valid", *configDir)) logging.Logger.Panic(fmt.Sprintf("Config path[%s] is not valid", *configDir))
} else { } else {
logging.Logger.Debug(fmt.Sprintf("Probe: config path[%s]", dir)) logging.Logger.Debug(fmt.Sprintf("Config path: %s", dir))
config.ConfigDir = &dir config.ConfigDir = &dir
} }
cfp := path.Join(*config.ConfigDir, config.ConfigFileName) cfp := path.Join(*config.ConfigDir, config.ConfigFileName)
@ -73,27 +74,34 @@ func main() {
conf := lfcc.New() conf := lfcc.New()
config.CFG = &config.Config{} config.CFG = &config.Config{}
if err := conf.Load(config.CFG, *config.ConfigFilePath); nil != err { if err := conf.Load(config.CFG, *config.ConfigFilePath); nil != err {
logging.Logger.Panic(fmt.Sprintf("Probe: config is not valid error[%v]", err)) logging.Logger.Panic(fmt.Sprintf("Config is not valid: %v", err))
} }
go func() { go func() {
if handler, err = auth.New(); nil != err { if instance, err = auth.New(); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: error: %v", err)) logging.Logger.Error(fmt.Sprintf("Auth error: %v", err))
return return
} }
if err := handler.Serve(); err != nil { endded := make(chan error)
logging.Logger.Error(fmt.Sprintf("Auth: Stopped[%v]", err)) defer close(endded)
if err := instance.(commons.EndableStarter).EndableStart(endded); err != nil {
logging.Logger.Error(fmt.Sprintf("Auther error: %v", err))
return
}
if err := <-endded; nil != err {
logging.Logger.Error(fmt.Sprintf("Auther error: %v", err))
return return
} }
if handler, err = probe.New(); nil != err { if instance, err = probe.New(); nil != err {
logging.Logger.Error(fmt.Sprintf("Probe: error: %v", err)) logging.Logger.Error(fmt.Sprintf("Probe error: %v", err))
return return
} }
if err := handler.Serve(); err != nil { if err := instance.(commons.Starter).Start(); err != nil {
logging.Logger.Error(fmt.Sprintf("Probe: error: %v", err)) logging.Logger.Error(fmt.Sprintf("Probe error: %v", err))
return return
} }
}() }()
// // Set up channel on which to send signal notifications. // // Set up channel on which to send signal notifications.
@ -111,7 +119,7 @@ func main() {
<-interrupt <-interrupt
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
if err := handler.Shutdown(ctx); err != nil { if err := instance.(commons.Shutdowner).Shutdown(ctx); err != nil {
logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) logging.Logger.Error(fmt.Sprintf("Probe error: %v", err))
} }

View File

@ -20,7 +20,6 @@ func (p *probe) onNotify(method string, params []string) {
break break
case module.SensorService_Start: case module.SensorService_Start:
err = p.collector.Start()
break break
case module.SensorService_Stop: case module.SensorService_Stop:

View File

@ -1,25 +1,30 @@
package probe package probe
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module" "git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/central/client" "git.loafle.net/overflow/overflow_probes/central/client"
"git.loafle.net/overflow/overflow_probes/collector"
"git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
opuu "git.loafle.net/overflow/overflow_probes/util/url" opuu "git.loafle.net/overflow/overflow_probes/util/url"
) )
const ( const (
probeEntryPoint = "/probe" probeEntryPoint = "/probe"
fileEntryPoint = "/file" fileEntryPoint = "/file"
metricEntryPoint = "/metric"
) )
type Prober interface {
commons.Starter
commons.Shutdowner
}
type probe struct { type probe struct {
*commons.Handlers
probeEntryURL string probeEntryURL string
fileEntryURL string fileEntryURL string
metricEntryURL string metricEntryURL string
@ -28,12 +33,13 @@ type probe struct {
fileClient client.Client fileClient client.Client
metricClient client.Client metricClient client.Client
collector collector.CollectorHandler shutdown chan bool
} }
func New() (ProbeHandler, error) { func New() (Prober, error) {
p := &probe{} p := &probe{
p.Handlers = commons.NewHandlers() shutdown: make(chan bool),
}
var err error var err error
@ -43,25 +49,47 @@ func New() (ProbeHandler, error) {
if p.fileEntryURL, err = opuu.Join(config.CFG.Central.URL, fileEntryPoint); nil != err { if p.fileEntryURL, err = opuu.Join(config.CFG.Central.URL, fileEntryPoint); nil != err {
return nil, err return nil, err
} }
if c.metricEntryURL, err = opuu.Join(config.CFG.Central.URL, metricEntryPoint); nil != err { if p.metricEntryURL, err = opuu.Join(config.CFG.Central.URL, metricEntryPoint); nil != err {
return nil, err return nil, err
} }
p.probeClient = client.New() p.probeClient = client.New()
p.fileClient = client.New() p.fileClient = client.New()
c.metricClient = client.New() p.metricClient = client.New()
if p.collector, err = collector.New(); nil != err {
return nil, err
}
return p, nil return p, nil
} }
func (p *probe) connectToCentralProbe() error { func (p *probe) Start() error {
return p.start()
}
func (p *probe) start() error {
if err := p.connectToCentral(); nil != err {
return err
}
p.listen()
return nil
}
func (p *probe) listen() {
go func() {
for {
select {
case <-p.shutdown:
break
}
}
}()
}
func (p *probe) connectToCentral() error {
var err error var err error
var res *http.Response var res *http.Response
if res, err = p.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err { if res, err = client.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err {
return err return err
} }
@ -70,7 +98,7 @@ func (p *probe) connectToCentralProbe() error {
p.probeClient.OnNotify(p.onNotify) p.probeClient.OnNotify(p.onNotify)
if _, err = p.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err { if _, err = client.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err {
return err return err
} }
@ -79,6 +107,20 @@ func (p *probe) connectToCentralProbe() error {
func (p *probe) sendNotifyToCentral(method string, params ...string) { func (p *probe) sendNotifyToCentral(method string, params ...string) {
if err := p.probeClient.Notify(method, params); nil != err { if err := p.probeClient.Notify(method, params); nil != err {
logging.Logger.Error(fmt.Sprintf("Probe notify error: [%v]", err)) logging.Logger.Error(fmt.Sprintf("Probe notify: %v", err))
} }
} }
func (p *probe) Shutdown(ctx context.Context) error {
for {
p.stop(fmt.Errorf("Shutdown"))
select {
case <-ctx.Done():
return ctx.Err()
}
}
}
func (p *probe) stop(err error) {
defer close(p.shutdown)
}

View File

@ -1,44 +0,0 @@
package probe
import (
"context"
"errors"
"git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/commons"
)
type ProbeHandler interface {
commons.Handler
}
func (p *probe) Serve() error {
if err := p.connectToCentralProbe(); nil != err {
return err
}
if err := p.collector.Serve(); nil != err {
return err
}
p.sendNotifyToCentral(module.ProbeService_Started)
var err error
ListenLoop:
for {
select {
case <-p.ShutdownChan:
err = errors.New("Shutting down")
break ListenLoop
}
}
p.sendNotifyToCentral(module.ProbeService_Stopped)
return err
}
func (p *probe) Shutdown(ctx context.Context) error {
p.ShutdownChan <- true
return nil
}