container_discovery/internal/discoverer/discoverer.go
crusader c40d8eb004 ing
2018-04-19 20:36:56 +09:00

294 lines
7.7 KiB
Go

package discoverer
import (
"sync"
"time"
"git.loafle.net/commons/util-go/net/cidr"
ocdm "git.loafle.net/overflow/commons-go/discovery/model"
)
type DiscoveryDataType int
const (
DiscoveryDataTypeNone DiscoveryDataType = iota
DiscoveryDataTypeStart
DiscoveryDataTypeStop
DiscoveryDataTypeError
DiscoveryDataTypeZone
DiscoveryDataTypeHost
DiscoveryDataTypePort
DiscoveryDataTypeService
)
type DiscoveryData struct {
Type DiscoveryDataType
Result interface{}
Error error
Time time.Time
}
func (dd *DiscoveryData) Release() {
releaseDiscoveryData(dd)
}
var _discoverer Discoverer
func GetDiscoverer() Discoverer {
if nil == _discoverer {
_discoverer = &defaultDiscoverer{}
}
return _discoverer
}
type Discoverer interface {
Retain() chan *DiscoveryData
Release(dataChan chan *DiscoveryData)
DiscoverZone(dataChan chan *DiscoveryData, dz *ocdm.DiscoveryZone)
DiscoverHost(dataChan chan *DiscoveryData, zone *ocdm.Zone, dh *ocdm.DiscoveryHost)
DiscoverPort(dataChan chan *DiscoveryData, host *ocdm.Host, dp *ocdm.DiscoveryPort)
DiscoverSerice(dataChan chan *DiscoveryData, port *ocdm.Port, ds *ocdm.DiscoveryService)
}
type defaultDiscoverer struct {
dataChanPool chan chan *DiscoveryData
stopChan chan struct{}
}
func (d *defaultDiscoverer) Retain() chan *DiscoveryData {
if nil == d.dataChanPool {
d.dataChanPool = make(chan chan *DiscoveryData, 1)
d.dataChanPool <- make(chan *DiscoveryData, 256)
}
return <-d.dataChanPool
}
func (d *defaultDiscoverer) Release(dataChan chan *DiscoveryData) {
d.dataChanPool <- dataChan
}
func (d *defaultDiscoverer) Stop(dataChan chan *DiscoveryData) {
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
}
func (d *defaultDiscoverer) DiscoverZone(dataChan chan *DiscoveryData, dz *ocdm.DiscoveryZone) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, time.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverZone(&wg, dataChan, dz)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, time.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverHost(dataChan chan *DiscoveryData, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, time.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverHost(&wg, dataChan, zone, dh)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, time.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverPort(dataChan chan *DiscoveryData, host *ocdm.Host, dp *ocdm.DiscoveryPort) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, time.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverPort(&wg, dataChan, host, dp)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, time.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverSerice(dataChan chan *DiscoveryData, port *ocdm.Port, ds *ocdm.DiscoveryService) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, time.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverSerice(&wg, dataChan, port, ds)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, time.Now(), nil, nil)
}
func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan *DiscoveryData, dz *ocdm.DiscoveryZone) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanZone(dz, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
z := result.(*ocdm.Zone)
dataChan <- retainDiscoveryData(DiscoveryDataTypeZone, time.Now(), z, nil)
if nil != dz.DiscoveryHost {
cr, _ := cidr.NewCIDRRanger(z.Network)
dh := &ocdm.DiscoveryHost{
FirstScanRange: cr.First().String(),
LastScanRange: cr.Last().String(),
DiscoveryPort: dz.DiscoveryHost.DiscoveryPort,
}
wg.Add(1)
d.innerDiscoverHost(wg, dataChan, z, dh)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, time.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverHost(wg *sync.WaitGroup, dataChan chan *DiscoveryData, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanHost(zone, dh, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
h := result.(*ocdm.Host)
dataChan <- retainDiscoveryData(DiscoveryDataTypeHost, time.Now(), h, nil)
if nil != dh.DiscoveryPort {
wg.Add(1)
d.innerDiscoverPort(wg, dataChan, h, dh.DiscoveryPort)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, time.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverPort(wg *sync.WaitGroup, dataChan chan *DiscoveryData, host *ocdm.Host, dp *ocdm.DiscoveryPort) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanPort(host, dp, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
p := result.(*ocdm.Port)
dataChan <- retainDiscoveryData(DiscoveryDataTypePort, time.Now(), p, nil)
if nil != dp.DiscoveryService {
wg.Add(1)
d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoveryService)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, time.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverSerice(wg *sync.WaitGroup, dataChan chan *DiscoveryData, port *ocdm.Port, ds *ocdm.DiscoveryService) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanService(port, ds, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
s := result.(*ocdm.Service)
dataChan <- retainDiscoveryData(DiscoveryDataTypeService, time.Now(), s, nil)
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, time.Now(), nil, err)
},
)
}
func taskScan(d *defaultDiscoverer,
taskFunc func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}),
reesultFunc func(result interface{}),
errorFunc func(err error)) {
resultChan := make(chan interface{})
errChan := make(chan error)
stopChan := make(chan struct{})
doneChan := make(chan struct{})
defer func() {
}()
go taskFunc(resultChan, errChan, doneChan, stopChan)
for {
select {
case r := <-resultChan:
reesultFunc(r)
case err := <-errChan:
errorFunc(err)
case <-doneChan:
return
case <-d.stopChan:
close(stopChan)
<-doneChan
return
}
}
}
var discoveryDataPool sync.Pool
func retainDiscoveryData(discoveryDataType DiscoveryDataType, t time.Time, result interface{}, err error) *DiscoveryData {
v := discoveryDataPool.Get()
var discoveryData *DiscoveryData
if v == nil {
discoveryData = &DiscoveryData{}
} else {
discoveryData = v.(*DiscoveryData)
}
discoveryData.Type = discoveryDataType
discoveryData.Time = t
discoveryData.Result = result
discoveryData.Error = err
return discoveryData
}
func releaseDiscoveryData(discoveryData *DiscoveryData) {
discoveryData.Type = DiscoveryDataTypeNone
discoveryData.Result = nil
discoveryData.Error = nil
discoveryData.Time = time.Time{}
discoveryDataPool.Put(discoveryData)
}