package discoverer import ( "sync" "git.loafle.net/commons/util-go/net/cidr" "git.loafle.net/overflow/commons-go/core/util" ocmd "git.loafle.net/overflow/commons-go/model/discovery" ) type DiscoveryDataType int const ( DiscoveryDataTypeNone DiscoveryDataType = iota DiscoveryDataTypeStart DiscoveryDataTypeStop DiscoveryDataTypeError DiscoveryDataTypeZone DiscoveryDataTypeHost DiscoveryDataTypePort DiscoveryDataTypeService ) type DiscoveryData struct { Type DiscoveryDataType Result interface{} Error error } func (dd *DiscoveryData) Release() { releaseDiscoveryData(dd) } var _discoverer Discoverer func GetDiscoverer() Discoverer { if nil == _discoverer { _discoverer = &defaultDiscoverer{} } return _discoverer } type Discoverer interface { Retain() chan *DiscoveryData Release(dataChan chan *DiscoveryData) Stop() DiscoverZone(dataChan chan *DiscoveryData, dz *ocmd.DiscoverZone) DiscoverHost(dataChan chan *DiscoveryData, zone *ocmd.Zone, dh *ocmd.DiscoverHost) DiscoverPort(dataChan chan *DiscoveryData, host *ocmd.Host, dp *ocmd.DiscoverPort) DiscoverSerice(dataChan chan *DiscoveryData, port *ocmd.Port, ds *ocmd.DiscoverService) } type defaultDiscoverer struct { dataChanPool chan chan *DiscoveryData stopChan chan struct{} } func (d *defaultDiscoverer) Retain() chan *DiscoveryData { if nil == d.dataChanPool { d.dataChanPool = make(chan chan *DiscoveryData, 1) d.dataChanPool <- make(chan *DiscoveryData, 256) } return <-d.dataChanPool } func (d *defaultDiscoverer) Release(dataChan chan *DiscoveryData) { d.dataChanPool <- dataChan } func (d *defaultDiscoverer) Stop() { if nil != d.stopChan { close(d.stopChan) d.stopChan = nil } } func (d *defaultDiscoverer) DiscoverZone(dataChan chan *DiscoveryData, dz *ocmd.DiscoverZone) { var wg sync.WaitGroup d.stopChan = make(chan struct{}) dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil) wg.Add(1) go d.innerDiscoverZone(&wg, dataChan, dz) wg.Wait() if nil != d.stopChan { close(d.stopChan) d.stopChan = nil } dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil) } func (d *defaultDiscoverer) DiscoverHost(dataChan chan *DiscoveryData, zone *ocmd.Zone, dh *ocmd.DiscoverHost) { var wg sync.WaitGroup d.stopChan = make(chan struct{}) dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil) wg.Add(1) go d.innerDiscoverHost(&wg, dataChan, zone, dh) wg.Wait() if nil != d.stopChan { close(d.stopChan) d.stopChan = nil } dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil) } func (d *defaultDiscoverer) DiscoverPort(dataChan chan *DiscoveryData, host *ocmd.Host, dp *ocmd.DiscoverPort) { var wg sync.WaitGroup d.stopChan = make(chan struct{}) dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil) wg.Add(1) go d.innerDiscoverPort(&wg, dataChan, host, dp) wg.Wait() if nil != d.stopChan { close(d.stopChan) d.stopChan = nil } dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil) } func (d *defaultDiscoverer) DiscoverSerice(dataChan chan *DiscoveryData, port *ocmd.Port, ds *ocmd.DiscoverService) { var wg sync.WaitGroup d.stopChan = make(chan struct{}) dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil) wg.Add(1) go d.innerDiscoverSerice(&wg, dataChan, port, ds) wg.Wait() if nil != d.stopChan { close(d.stopChan) d.stopChan = nil } dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil) } func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan *DiscoveryData, dz *ocmd.DiscoverZone) { defer func() { wg.Done() }() taskScan(d, func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { scanZone(dz, resultChan, errChan, doneChan, stopChan) }, func(result interface{}) { z := result.(*ocmd.Zone) dataChan <- retainDiscoveryData(DiscoveryDataTypeZone, util.Now(), z, nil) if nil != dz.DiscoverHost { cr, _ := cidr.NewCIDRRanger(z.Network) dh := &ocmd.DiscoverHost{ FirstScanRangeV4: cr.First().String(), LastScanRangeV4: cr.Last().String(), DiscoverPort: dz.DiscoverHost.DiscoverPort, } wg.Add(1) go d.innerDiscoverHost(wg, dataChan, z, dh) } }, func(err error) { dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err) }, ) } func (d *defaultDiscoverer) innerDiscoverHost(wg *sync.WaitGroup, dataChan chan *DiscoveryData, zone *ocmd.Zone, dh *ocmd.DiscoverHost) { defer func() { wg.Done() }() taskScan(d, func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { scanHost(zone, dh, resultChan, errChan, doneChan, stopChan) }, func(result interface{}) { h := result.(*ocmd.Host) dataChan <- retainDiscoveryData(DiscoveryDataTypeHost, util.Now(), h, nil) if nil != dh.DiscoverPort { wg.Add(1) go d.innerDiscoverPort(wg, dataChan, h, dh.DiscoverPort) } }, func(err error) { dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err) }, ) } func (d *defaultDiscoverer) innerDiscoverPort(wg *sync.WaitGroup, dataChan chan *DiscoveryData, host *ocmd.Host, dp *ocmd.DiscoverPort) { defer func() { wg.Done() }() taskScan(d, func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { scanPort(host, dp, resultChan, errChan, doneChan, stopChan) }, func(result interface{}) { p := result.(*ocmd.Port) dataChan <- retainDiscoveryData(DiscoveryDataTypePort, util.Now(), p, nil) if nil != dp.DiscoverService { wg.Add(1) go d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoverService) } }, func(err error) { dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err) }, ) } func (d *defaultDiscoverer) innerDiscoverSerice(wg *sync.WaitGroup, dataChan chan *DiscoveryData, port *ocmd.Port, ds *ocmd.DiscoverService) { defer func() { wg.Done() }() taskScan(d, func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { scanService(port, ds, resultChan, errChan, doneChan, stopChan) }, func(result interface{}) { s := result.(*ocmd.Service) dataChan <- retainDiscoveryData(DiscoveryDataTypeService, util.Now(), s, nil) }, func(err error) { dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err) }, ) } func taskScan(d *defaultDiscoverer, taskFunc func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}), reesultFunc func(result interface{}), errorFunc func(err error)) { resultChan := make(chan interface{}) errChan := make(chan error) stopChan := make(chan struct{}) doneChan := make(chan struct{}) defer func() { }() go taskFunc(resultChan, errChan, doneChan, stopChan) for { select { case r := <-resultChan: reesultFunc(r) case err := <-errChan: errorFunc(err) case <-doneChan: return case <-d.stopChan: close(stopChan) <-doneChan return } } } var discoveryDataPool sync.Pool func retainDiscoveryData(discoveryDataType DiscoveryDataType, t util.Timestamp, result interface{}, err error) *DiscoveryData { v := discoveryDataPool.Get() var discoveryData *DiscoveryData if v == nil { discoveryData = &DiscoveryData{} } else { discoveryData = v.(*DiscoveryData) } discoveryData.Type = discoveryDataType discoveryData.Result = result discoveryData.Error = err return discoveryData } func releaseDiscoveryData(discoveryData *DiscoveryData) { discoveryData.Type = DiscoveryDataTypeNone discoveryData.Result = nil discoveryData.Error = nil discoveryDataPool.Put(discoveryData) }