diff --git a/auth.json b/auth.json new file mode 100644 index 0000000..5a126c0 --- /dev/null +++ b/auth.json @@ -0,0 +1,3 @@ +{ + "tempKey": "6a9937503f1111e8b0460242ac120002" +} \ No newline at end of file diff --git a/auth/authenticator.go b/auth/authenticator.go index 5a5b409..9dffd95 100644 --- a/auth/authenticator.go +++ b/auth/authenticator.go @@ -53,8 +53,8 @@ func (a *Authenticator) EndableStart() (<-chan error, error) { } } - if nil != a.authConfig.DenyDate { - return nil, fmt.Errorf("cannot start because this probe have been denied from overFlow at %s", a.authConfig.DenyDate.String()) + if nil != a.authConfig.DeniedDate { + return nil, fmt.Errorf("cannot start because this probe have been denied from overFlow at %s", a.authConfig.DeniedDate.String()) } a.accpetedChan = make(chan string, 1) @@ -95,7 +95,7 @@ func (a *Authenticator) logHeader() string { } func (a *Authenticator) initService() (crr.RPCInvoker, error) { - cdr.RegisterResource("AccpetedChan", a.accpetedChan) + cdr.RegisterResource("AcceptedChan", a.accpetedChan) cdr.RegisterResource("DeniedChan", a.deniedChan) services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType) @@ -128,9 +128,12 @@ func (a *Authenticator) initClient() error { header[ocncc.HTTPRequestHeaderKey_NoAuthProbe_Info] = []string{rh} } - centralURL := fmt.Sprintf("ws://%s:%d", a.Config.Central.Host, a.Config.Central.Port) + centralURL := fmt.Sprintf("ws://%s:%d%s", a.Config.Central.Host, a.Config.Central.Port, ocncc.HTTPEntry_Auth) - connector.URL = path.Join(centralURL, ocncc.HTTPEntry_Auth) + logging.Logger().Debugf("%s central %s", a.logHeader(), centralURL) + + connector.Name = "Authenticator" + connector.URL = centralURL connector.RequestHeader = header connector.ResponseHandler = func(res *http.Response) { switch a.authConfig.State() { @@ -175,6 +178,14 @@ func (a *Authenticator) handleAuthenticator(endChan chan<- error) { return } logging.Logger().Infof("%s accepted by central", a.logHeader()) + + n := time.Now() + a.authConfig.AcceptedDate = &n + err = configuration.Save(a.authConfig, path.Join(a.ConfigDir, ocnc.ConfigFileName), true) + if nil != err { + logging.Logger().Errorf("%s %v", a.logHeader(), err) + } + a.Config.Probe.Key = &probeKey err = configuration.Save(a.Config, path.Join(a.ConfigDir, ocpc.ConfigFileName), true) if nil != err { @@ -187,7 +198,7 @@ func (a *Authenticator) handleAuthenticator(endChan chan<- error) { } logging.Logger().Infof("%s denied by central", a.logHeader()) n := time.Now() - a.authConfig.DenyDate = &n + a.authConfig.DeniedDate = &n err = configuration.Save(a.authConfig, path.Join(a.ConfigDir, ocnc.ConfigFileName), true) if nil != err { logging.Logger().Errorf("%s %v", a.logHeader(), err) diff --git a/auth/service/NoAuthProbeService.go b/auth/service/NoAuthProbeService.go new file mode 100644 index 0000000..4335299 --- /dev/null +++ b/auth/service/NoAuthProbeService.go @@ -0,0 +1,32 @@ +package service + +import ( + "reflect" + + cda "git.loafle.net/commons/di-go/annotation" + cdr "git.loafle.net/commons/di-go/registry" + _ "git.loafle.net/overflow/commons-go/core/annotation" +) + +var NoAuthProbeServiceType = reflect.TypeOf((*NoAuthProbeService)(nil)) + +func init() { + cdr.RegisterType(NoAuthProbeServiceType) +} + +type NoAuthProbeService struct { + cda.TypeAnnotation `annotation:"@overflow:RPCService()"` + + AcceptedChan chan<- string `annotation:"@Resource(name='AcceptedChan')"` + DeniedChan chan<- struct{} `annotation:"@Resource(name='DeniedChan')"` +} + +func (s *NoAuthProbeService) Accept(probeKey string) error { + s.AcceptedChan <- probeKey + return nil +} + +func (s *NoAuthProbeService) Deny() error { + s.DeniedChan <- struct{}{} + return nil +} diff --git a/auth/service/auth-service.go b/auth/service/auth-service.go deleted file mode 100644 index cafde8c..0000000 --- a/auth/service/auth-service.go +++ /dev/null @@ -1,32 +0,0 @@ -package service - -import ( - "reflect" - - cda "git.loafle.net/commons/di-go/annotation" - cdr "git.loafle.net/commons/di-go/registry" - _ "git.loafle.net/overflow/commons-go/core/annotation" -) - -var AuthServiceType = reflect.TypeOf((*AuthService)(nil)) - -func init() { - cdr.RegisterType(AuthServiceType) -} - -type AuthService struct { - cda.TypeAnnotation `annotation:"@overflow:RPCService()"` - - AcceptedChan chan<- string - DeniedChan chan<- struct{} -} - -func (as *AuthService) Accept(probeKey string) error { - as.AcceptedChan <- probeKey - return nil -} - -func (as *AuthService) Deny() error { - as.DeniedChan <- struct{}{} - return nil -} diff --git a/config.json b/config.json index f175db1..03189af 100644 --- a/config.json +++ b/config.json @@ -1,37 +1,44 @@ { - "account": { - "name": "domain1", - "apiKey": "52abd6fd57e511e7ac52080027658d13" - }, + "account": { + "name": "domain1", + "apiKey": "52abd6fd57e511e7ac52080027658d13" + }, "central": { - "host": "127.0.0.1", - "port": 19190, - "connector": { - "handshakeTimeout": 0, - "reconnectInterval": 5, - "reconnectTryTime": 10, - "maxMessageSize": 4096, - "readBufferSize": 4096, - "writeBufferSize": 4096, - "readTimeout": 0, - "writeTimeout": 0, - "pongTimeout": 60, - "pingTimeout": 10, - "pingPeriod": 9, - "enableCompression": false - }, - "proxy": { - "host": "", - "port": 9090, - "useAuth": true, - "user": "", - "password": "" - } + "host": "127.0.0.1", + "port": 19091, + "connector": { + "name": "", + "network": "", + "address": "", + "concurrency": 0, + "keepAlive": 0, + "handshakeTimeout": 0, + "reconnectInterval": 5, + "reconnectTryTime": 10, + "maxMessageSize": 4096, + "readBufferSize": 4096, + "writeBufferSize": 4096, + "readTimeout": 0, + "writeTimeout": 0, + "pongTimeout": 60, + "pingTimeout": 10, + "pingPeriod": 9, + "enableCompression": false, + "url": "", + "subprotocols": null + }, + "proxy": { + "host": "", + "port": 9090, + "useAuth": true, + "user": "", + "password": "" + } }, "probe": { - "key": "866f9be0333311e8b7230242ac120004" + "key": "7691f1a13f1111e8b0460242ac120002" }, "paths": { "root": "/project/overFlow/probe" } -} +} \ No newline at end of file diff --git a/main.go b/main.go index 6f51430..958773d 100644 --- a/main.go +++ b/main.go @@ -53,6 +53,7 @@ func main() { if nil != err { logging.Logger().Panic(err) } + } // err := s.ListenAndServe() diff --git a/probe/probe.go b/probe/probe.go new file mode 100644 index 0000000..0dda66e --- /dev/null +++ b/probe/probe.go @@ -0,0 +1,187 @@ +package probe + +import ( + "context" + "fmt" + "net/http" + "path" + "sync" + "time" + + "git.loafle.net/commons/configuration-go" + cdr "git.loafle.net/commons/di-go/registry" + logging "git.loafle.net/commons/logging-go" + crc "git.loafle.net/commons/rpc-go/client" + crpj "git.loafle.net/commons/rpc-go/protocol/json" + crr "git.loafle.net/commons/rpc-go/registry" + csswc "git.loafle.net/commons/server-go/socket/web/client" + occa "git.loafle.net/overflow/commons-go/core/annotation" + ocnc "git.loafle.net/overflow/commons-go/noauthprobe/config" + ocncc "git.loafle.net/overflow/commons-go/noauthprobe/constants" + ocpc "git.loafle.net/overflow/commons-go/probe/config" + "git.loafle.net/overflow/probe/auth/info" + _ "git.loafle.net/overflow/probe/auth/service" + "git.loafle.net/overflow/probe/config" +) + +type Probe struct { + Config *config.Config + ConfigDir string + + client *crc.Client + + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (p *Probe) Start() error { + if p.stopChan != nil { + return fmt.Errorf("already running. Stop it before starting it again") + } + + if err := p.initClient(); nil != err { + return err + } + + if err := p.client.Start(); nil != err { + return err + } + + p.stopChan = make(chan struct{}) + + p.stopWg.Add(1) + go p.handleProbe() + + return nil +} + +func (p *Probe) Stop(ctx context.Context) error { + if p.stopChan == nil { + return fmt.Errorf("must be started before stopping it") + } + close(p.stopChan) + p.stopWg.Wait() + + p.stopChan = nil + + return nil +} + +func (p *Probe) logHeader() string { + return "Probe:" +} + +func (p *Probe) initService() (crr.RPCInvoker, error) { + services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType) + if nil != err { + return nil, err + } + + rpcRegistry := crr.NewRPCRegistry() + rpcRegistry.RegisterServices(services...) + + return rpcRegistry, nil +} + +func (p *Probe) initClient() error { + _connector := p.Config.Central.Connector.Clone() + connector := _connector.(*csswc.Connectors) + + header := make(map[string][]string) + + switch p.authConfig.State() { + case ocnc.AuthStateTypeRegisterd: + header[ocncc.HTTPRequestHeaderKey_NoAuthProbe_Method] = []string{ocncc.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect} + header[ocncc.HTTPRequestHeaderKey_NoAuthProbe_TempProbeKey] = []string{*p.authConfig.TempKey} + default: + rh, err := info.GetRegistHeader(p.Config.Account.APIKey) + if nil != err { + return err + } + header[ocncc.HTTPRequestHeaderKey_NoAuthProbe_Method] = []string{ocncc.HTTPRequestHeaderValue_NoAuthProbe_Method_Regist} + header[ocncc.HTTPRequestHeaderKey_NoAuthProbe_Info] = []string{rh} + } + + centralURL := fmt.Sprintf("ws://%s:%d%s", p.Config.Central.Host, p.Config.Central.Port, ocncc.HTTPEntry_Auth) + + logging.Logger().Debugf("%s central %s", p.logHeader(), centralURL) + + connector.URL = centralURL + connector.RequestHeader = header + connector.ResponseHandler = func(res *http.Response) { + switch p.authConfig.State() { + case ocnc.AuthStateTypeNotRegisterd: + tempProbeKey := res.Header.Get(ocncc.HTTPResponseHeaderKey_NoAuthProbe_SetTempProbeKey) + p.tempKeyChan <- tempProbeKey + default: + } + } + + rpcInvoker, err := p.initService() + if nil != err { + return err + } + codec := crpj.NewClientCodec() + + p.client = &crc.Client{ + Connector: connector, + Codec: codec, + RPCInvoker: rpcInvoker, + Name: "Authenticator", + } + + return nil +} + +func (p *Probe) handleProbe(endChan chan<- error) { + var err error + defer func() { + if nil != p.client { + err = p.client.Stop(context.Background()) + } + + p.stopWg.Done() + endChan <- err + }() + + for { + select { + case probeKey, ok := <-p.accpetedChan: + if !ok { + return + } + logging.Logger().Infof("%s accepted by central", p.logHeader()) + p.Config.Probe.Key = &probeKey + err = configuration.Save(p.Config, path.Join(p.ConfigDir, ocpc.ConfigFileName), true) + if nil != err { + logging.Logger().Errorf("%s %v", p.logHeader(), err) + } + return + case _, ok := <-p.deniedChan: + if !ok { + return + } + logging.Logger().Infof("%s denied by central", p.logHeader()) + n := time.Now() + p.authConfig.DenyDate = &n + err = configuration.Save(p.authConfig, path.Join(p.ConfigDir, ocnc.ConfigFileName), true) + if nil != err { + logging.Logger().Errorf("%s %v", p.logHeader(), err) + } + return + case tempKey, ok := <-p.tempKeyChan: + if !ok { + return + } + logging.Logger().Infof("%s registered by central", p.logHeader()) + p.authConfig.TempKey = &tempKey + err = configuration.Save(p.authConfig, path.Join(p.ConfigDir, ocnc.ConfigFileName), true) + if nil != err { + logging.Logger().Errorf("%s %v", p.logHeader(), err) + return + } + case <-p.stopChan: + return + } + } +} diff --git a/service/ContainerService.go b/service/ContainerService.go new file mode 100644 index 0000000..9ca0f81 --- /dev/null +++ b/service/ContainerService.go @@ -0,0 +1,24 @@ +package service + +import ( + "reflect" + + cda "git.loafle.net/commons/di-go/annotation" + cdr "git.loafle.net/commons/di-go/registry" + _ "git.loafle.net/overflow/commons-go/core/annotation" +) + +var ContainerServiceType = reflect.TypeOf((*ContainerService)(nil)) + +func init() { + cdr.RegisterType(ContainerServiceType) +} + +type ContainerService struct { + cda.TypeAnnotation `annotation:"@overflow:RPCService()"` +} + +func (cs *ContainerService) Accept() error { + + return nil +} diff --git a/service/CrawlerService.go b/service/CrawlerService.go new file mode 100644 index 0000000..ffd04bd --- /dev/null +++ b/service/CrawlerService.go @@ -0,0 +1,24 @@ +package service + +import ( + "reflect" + + cda "git.loafle.net/commons/di-go/annotation" + cdr "git.loafle.net/commons/di-go/registry" + _ "git.loafle.net/overflow/commons-go/core/annotation" +) + +var CrawlerServiceType = reflect.TypeOf((*CrawlerService)(nil)) + +func init() { + cdr.RegisterType(CrawlerServiceType) +} + +type CrawlerService struct { + cda.TypeAnnotation `annotation:"@overflow:RPCService()"` +} + +func (cs *CrawlerService) Accept() error { + + return nil +} diff --git a/service/DiscoveryService.go b/service/DiscoveryService.go new file mode 100644 index 0000000..5e6ed2d --- /dev/null +++ b/service/DiscoveryService.go @@ -0,0 +1,24 @@ +package service + +import ( + "reflect" + + cda "git.loafle.net/commons/di-go/annotation" + cdr "git.loafle.net/commons/di-go/registry" + _ "git.loafle.net/overflow/commons-go/core/annotation" +) + +var DiscoveryServiceType = reflect.TypeOf((*DiscoveryService)(nil)) + +func init() { + cdr.RegisterType(DiscoveryServiceType) +} + +type DiscoveryService struct { + cda.TypeAnnotation `annotation:"@overflow:RPCService()"` +} + +func (cs *DiscoveryService) Accept() error { + + return nil +} diff --git a/service/ProbeService.go b/service/ProbeService.go new file mode 100644 index 0000000..883fcdd --- /dev/null +++ b/service/ProbeService.go @@ -0,0 +1,24 @@ +package service + +import ( + "reflect" + + cda "git.loafle.net/commons/di-go/annotation" + cdr "git.loafle.net/commons/di-go/registry" + _ "git.loafle.net/overflow/commons-go/core/annotation" +) + +var ProbeServiceType = reflect.TypeOf((*ProbeService)(nil)) + +func init() { + cdr.RegisterType(ProbeServiceType) +} + +type ProbeService struct { + cda.TypeAnnotation `annotation:"@overflow:RPCService()"` +} + +func (cs *ProbeService) Accept() error { + + return nil +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..3713b70 --- /dev/null +++ b/service/service.go @@ -0,0 +1,13 @@ +package service + +func InitPackage() { +} + +func StartPackage() { +} + +func StopPackage() { +} + +func DestroyPackage() { +}