probe/_discoverer/discoverer.go

294 lines
7.6 KiB
Go
Raw Normal View History

2018-08-13 07:19:59 +00:00
package discoverer
import (
"sync"
omd "git.loafle.net/overflow/model/discovery"
"git.loafle.net/overflow/model/util"
2018-08-23 09:21:48 +00:00
"git.loafle.net/overflow/util-go/net/cidr"
2018-08-13 07:19:59 +00:00
)
type DiscoveryDataType int
const (
DiscoveryDataTypeNone DiscoveryDataType = iota
DiscoveryDataTypeStart
DiscoveryDataTypeStop
DiscoveryDataTypeError
DiscoveryDataTypeZone
DiscoveryDataTypeHost
DiscoveryDataTypePort
DiscoveryDataTypeService
)
type DiscoveryData struct {
Type DiscoveryDataType
Result interface{}
Error error
}
func (dd *DiscoveryData) Release() {
releaseDiscoveryData(dd)
}
var _discoverer Discoverer
func GetDiscoverer() Discoverer {
if nil == _discoverer {
_discoverer = &defaultDiscoverer{}
}
return _discoverer
}
type Discoverer interface {
Retain() chan *DiscoveryData
Release(dataChan chan *DiscoveryData)
Stop()
DiscoverZone(dataChan chan *DiscoveryData, dz *omd.DiscoverZone)
DiscoverHost(dataChan chan *DiscoveryData, zone *omd.Zone, dh *omd.DiscoverHost)
DiscoverPort(dataChan chan *DiscoveryData, host *omd.Host, dp *omd.DiscoverPort)
DiscoverSerice(dataChan chan *DiscoveryData, port *omd.Port, ds *omd.DiscoverService)
}
type defaultDiscoverer struct {
dataChanPool chan chan *DiscoveryData
stopChan chan struct{}
}
func (d *defaultDiscoverer) Retain() chan *DiscoveryData {
if nil == d.dataChanPool {
d.dataChanPool = make(chan chan *DiscoveryData, 1)
d.dataChanPool <- make(chan *DiscoveryData, 256)
}
return <-d.dataChanPool
}
func (d *defaultDiscoverer) Release(dataChan chan *DiscoveryData) {
d.dataChanPool <- dataChan
}
func (d *defaultDiscoverer) Stop() {
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
}
func (d *defaultDiscoverer) DiscoverZone(dataChan chan *DiscoveryData, dz *omd.DiscoverZone) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverZone(&wg, dataChan, dz)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverHost(dataChan chan *DiscoveryData, zone *omd.Zone, dh *omd.DiscoverHost) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverHost(&wg, dataChan, zone, dh)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverPort(dataChan chan *DiscoveryData, host *omd.Host, dp *omd.DiscoverPort) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverPort(&wg, dataChan, host, dp)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverSerice(dataChan chan *DiscoveryData, port *omd.Port, ds *omd.DiscoverService) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverSerice(&wg, dataChan, port, ds)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil)
}
func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan *DiscoveryData, dz *omd.DiscoverZone) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanZone(dz, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
z := result.(*omd.Zone)
dataChan <- retainDiscoveryData(DiscoveryDataTypeZone, util.Now(), z, nil)
if nil != dz.DiscoverHost {
cr, _ := cidr.NewCIDRRanger(z.Network)
dh := &omd.DiscoverHost{
MetaIPType: z.MetaIPType,
FirstScanRange: cr.First().String(),
LastScanRange: cr.Last().String(),
DiscoverPort: dz.DiscoverHost.DiscoverPort,
}
wg.Add(1)
go d.innerDiscoverHost(wg, dataChan, z, dh)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverHost(wg *sync.WaitGroup, dataChan chan *DiscoveryData, zone *omd.Zone, dh *omd.DiscoverHost) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanHost(zone, dh, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
h := result.(*omd.Host)
dataChan <- retainDiscoveryData(DiscoveryDataTypeHost, util.Now(), h, nil)
if nil != dh.DiscoverPort {
wg.Add(1)
go d.innerDiscoverPort(wg, dataChan, h, dh.DiscoverPort)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverPort(wg *sync.WaitGroup, dataChan chan *DiscoveryData, host *omd.Host, dp *omd.DiscoverPort) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanPort(host, dp, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
p := result.(*omd.Port)
dataChan <- retainDiscoveryData(DiscoveryDataTypePort, util.Now(), p, nil)
if nil != dp.DiscoverService {
wg.Add(1)
go d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoverService)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverSerice(wg *sync.WaitGroup, dataChan chan *DiscoveryData, port *omd.Port, ds *omd.DiscoverService) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanService(port, ds, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
s := result.(*omd.Service)
dataChan <- retainDiscoveryData(DiscoveryDataTypeService, util.Now(), s, nil)
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err)
},
)
}
func taskScan(d *defaultDiscoverer,
taskFunc func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}),
reesultFunc func(result interface{}),
errorFunc func(err error)) {
resultChan := make(chan interface{})
errChan := make(chan error)
stopChan := make(chan struct{})
doneChan := make(chan struct{})
defer func() {
}()
go taskFunc(resultChan, errChan, doneChan, stopChan)
for {
select {
case r := <-resultChan:
reesultFunc(r)
case err := <-errChan:
errorFunc(err)
case <-doneChan:
return
case <-d.stopChan:
close(stopChan)
<-doneChan
return
}
}
}
var discoveryDataPool sync.Pool
func retainDiscoveryData(discoveryDataType DiscoveryDataType, t util.Timestamp, result interface{}, err error) *DiscoveryData {
v := discoveryDataPool.Get()
var discoveryData *DiscoveryData
if v == nil {
discoveryData = &DiscoveryData{}
} else {
discoveryData = v.(*DiscoveryData)
}
discoveryData.Type = discoveryDataType
discoveryData.Result = result
discoveryData.Error = err
return discoveryData
}
func releaseDiscoveryData(discoveryData *DiscoveryData) {
discoveryData.Type = DiscoveryDataTypeNone
discoveryData.Result = nil
discoveryData.Error = nil
discoveryDataPool.Put(discoveryData)
}