217 lines
4.6 KiB
Go
217 lines
4.6 KiB
Go
package discovery
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"git.loafle.net/commons_go/logging"
|
|
"git.loafle.net/overflow/overflow_discovery/api/module/discovery/model"
|
|
)
|
|
|
|
var discoverer *discovery
|
|
|
|
func DiscoveryInit() {
|
|
discoverer = &discovery{}
|
|
discoverer.start()
|
|
}
|
|
|
|
func DiscoveryDestroy() {
|
|
discoverer.stop()
|
|
discoverer = nil
|
|
}
|
|
|
|
func DiscoverZone(dz *model.DiscoveryZone) {
|
|
discoverer.discoverZone(dz)
|
|
}
|
|
|
|
func DiscoverHost(dh *model.DiscoveryHost) {
|
|
discoverer.discoverHost(dh.Zone, dh)
|
|
}
|
|
|
|
func DiscoverPort(dp *model.DiscoveryPort) {
|
|
discoverer.discoverPort(dp.Host, dp)
|
|
}
|
|
|
|
func DiscoverService(ds *model.DiscoveryService) {
|
|
discoverer.discoverService(ds.Port, ds)
|
|
}
|
|
|
|
func Stop() {
|
|
|
|
}
|
|
|
|
type discovery struct {
|
|
sendChan chan interface{}
|
|
errChan chan error
|
|
|
|
stopChan chan struct{}
|
|
stopWg sync.WaitGroup
|
|
}
|
|
|
|
func (d *discovery) start() {
|
|
if d.stopChan != nil {
|
|
panic("Discovery: discovery is already running. Stop it before starting it again")
|
|
}
|
|
|
|
d.stopChan = make(chan struct{})
|
|
d.sendChan = make(chan interface{})
|
|
d.errChan = make(chan error)
|
|
logging.Logger().Info(fmt.Sprintf("Discovery: discovery is started"))
|
|
}
|
|
|
|
func (d *discovery) stop() {
|
|
if d.stopChan == nil {
|
|
panic("Discovery: discovery must be started before stopping it")
|
|
}
|
|
close(d.stopChan)
|
|
d.stopWg.Wait()
|
|
d.stopChan = nil
|
|
|
|
logging.Logger().Info(fmt.Sprintf("Discovery: discovery is stopped"))
|
|
}
|
|
|
|
func (d *discovery) discoverZone(dz *model.DiscoveryZone) {
|
|
logging.Logger().Debug(fmt.Sprintf("Discovery: DiscoverZone is start"))
|
|
|
|
d.stopWg.Add(1)
|
|
resultChan := make(chan *model.Zone)
|
|
errChan := make(chan error)
|
|
doneChan := make(chan struct{})
|
|
|
|
defer func() {
|
|
close(resultChan)
|
|
close(errChan)
|
|
close(doneChan)
|
|
d.stopWg.Done()
|
|
}()
|
|
|
|
go scanZone(dz, resultChan, errChan, doneChan)
|
|
|
|
for {
|
|
select {
|
|
case z := <-resultChan:
|
|
logging.Logger().Info(fmt.Sprintf("zone: %v", z))
|
|
if nil != dz.DiscoveryHost {
|
|
d.discoverHost(z, dz.DiscoveryHost)
|
|
}
|
|
case err := <-errChan:
|
|
logging.Logger().Info(fmt.Sprintf("zone err: %v", err))
|
|
case <-d.stopChan:
|
|
return
|
|
case <-doneChan:
|
|
logging.Logger().Debug(fmt.Sprintf("Discovery: DiscoverZone is complete"))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *discovery) discoverHost(zone *model.Zone, dh *model.DiscoveryHost) {
|
|
logging.Logger().Debug(fmt.Sprintf("Discovery: DiscoverHost is start"))
|
|
|
|
d.stopWg.Add(1)
|
|
resultChan := make(chan *model.Host)
|
|
errChan := make(chan error)
|
|
doneChan := make(chan struct{})
|
|
|
|
defer func() {
|
|
close(resultChan)
|
|
close(errChan)
|
|
close(doneChan)
|
|
d.stopWg.Done()
|
|
}()
|
|
|
|
go scanHost(zone, dh, resultChan, errChan, doneChan)
|
|
|
|
for {
|
|
select {
|
|
case h := <-resultChan:
|
|
logging.Logger().Info(fmt.Sprintf("host: %v", h))
|
|
if nil != dh.DiscoveryPort {
|
|
d.discoverPort(h, dh.DiscoveryPort)
|
|
}
|
|
case err := <-errChan:
|
|
logging.Logger().Info(fmt.Sprintf("host err: %v", err))
|
|
case <-d.stopChan:
|
|
return
|
|
case <-doneChan:
|
|
logging.Logger().Debug(fmt.Sprintf("Discovery: DiscoverHost is complete"))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *discovery) discoverPort(host *model.Host, dp *model.DiscoveryPort) {
|
|
logging.Logger().Debug(fmt.Sprintf("Discovery: DiscoverPort is start"))
|
|
|
|
d.stopWg.Add(1)
|
|
resultChan := make(chan *model.Port)
|
|
errChan := make(chan error)
|
|
doneChan := make(chan struct{})
|
|
|
|
defer func() {
|
|
close(resultChan)
|
|
close(errChan)
|
|
close(doneChan)
|
|
d.stopWg.Done()
|
|
}()
|
|
|
|
go scanPort(host, dp, resultChan, errChan, doneChan)
|
|
|
|
for {
|
|
select {
|
|
case p := <-resultChan:
|
|
logging.Logger().Info(fmt.Sprintf("port: %v", p))
|
|
if nil != dp.DiscoveryService {
|
|
d.discoverService(p, dp.DiscoveryService)
|
|
}
|
|
case err := <-errChan:
|
|
logging.Logger().Info(fmt.Sprintf("port err: %v", err))
|
|
case <-d.stopChan:
|
|
return
|
|
case <-doneChan:
|
|
logging.Logger().Debug(fmt.Sprintf("Discovery: DiscoverPort is complete"))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *discovery) discoverService(port *model.Port, ds *model.DiscoveryService) {
|
|
logging.Logger().Debug(fmt.Sprintf("Discovery: DiscoverService is start"))
|
|
|
|
d.stopWg.Add(1)
|
|
resultChan := make(chan *model.Service)
|
|
errChan := make(chan error)
|
|
doneChan := make(chan struct{})
|
|
|
|
defer func() {
|
|
close(resultChan)
|
|
close(errChan)
|
|
close(doneChan)
|
|
d.stopWg.Done()
|
|
}()
|
|
|
|
go scanService(port, ds, resultChan, errChan, doneChan)
|
|
|
|
for {
|
|
select {
|
|
case s := <-resultChan:
|
|
logging.Logger().Info(fmt.Sprintf("service: %v", s))
|
|
case err := <-errChan:
|
|
logging.Logger().Info(fmt.Sprintf("service err: %v", err))
|
|
case <-d.stopChan:
|
|
return
|
|
case <-doneChan:
|
|
logging.Logger().Debug(fmt.Sprintf("Discovery: DiscoverService is complete"))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *discovery) sendResult() {
|
|
|
|
}
|
|
|
|
func (d *discovery) sendError() {
|
|
|
|
}
|