container_discovery/internal/discoverer/discoverer.go
crusader 26da7a2e03 ing
2018-04-28 01:56:47 +09:00

294 lines
7.8 KiB
Go

package discoverer
import (
"sync"
"git.loafle.net/commons/util-go/net/cidr"
"git.loafle.net/overflow/commons-go/core/util"
ocmd "git.loafle.net/overflow/commons-go/model/discovery"
)
type DiscoveryDataType int
const (
DiscoveryDataTypeNone DiscoveryDataType = iota
DiscoveryDataTypeStart
DiscoveryDataTypeStop
DiscoveryDataTypeError
DiscoveryDataTypeZone
DiscoveryDataTypeHost
DiscoveryDataTypePort
DiscoveryDataTypeService
)
type DiscoveryData struct {
Type DiscoveryDataType
Result interface{}
Error error
Time util.Timestamp
}
func (dd *DiscoveryData) Release() {
releaseDiscoveryData(dd)
}
var _discoverer Discoverer
func GetDiscoverer() Discoverer {
if nil == _discoverer {
_discoverer = &defaultDiscoverer{}
}
return _discoverer
}
type Discoverer interface {
Retain() chan *DiscoveryData
Release(dataChan chan *DiscoveryData)
DiscoverZone(dataChan chan *DiscoveryData, dz *ocmd.DiscoverZone)
DiscoverHost(dataChan chan *DiscoveryData, zone *ocmd.Zone, dh *ocmd.DiscoverHost)
DiscoverPort(dataChan chan *DiscoveryData, host *ocmd.Host, dp *ocmd.DiscoverPort)
DiscoverSerice(dataChan chan *DiscoveryData, port *ocmd.Port, ds *ocmd.DiscoverService)
}
type defaultDiscoverer struct {
dataChanPool chan chan *DiscoveryData
stopChan chan struct{}
}
func (d *defaultDiscoverer) Retain() chan *DiscoveryData {
if nil == d.dataChanPool {
d.dataChanPool = make(chan chan *DiscoveryData, 1)
d.dataChanPool <- make(chan *DiscoveryData, 256)
}
return <-d.dataChanPool
}
func (d *defaultDiscoverer) Release(dataChan chan *DiscoveryData) {
d.dataChanPool <- dataChan
}
func (d *defaultDiscoverer) Stop(dataChan chan *DiscoveryData) {
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
}
func (d *defaultDiscoverer) DiscoverZone(dataChan chan *DiscoveryData, dz *ocmd.DiscoverZone) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverZone(&wg, dataChan, dz)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverHost(dataChan chan *DiscoveryData, zone *ocmd.Zone, dh *ocmd.DiscoverHost) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverHost(&wg, dataChan, zone, dh)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverPort(dataChan chan *DiscoveryData, host *ocmd.Host, dp *ocmd.DiscoverPort) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverPort(&wg, dataChan, host, dp)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil)
}
func (d *defaultDiscoverer) DiscoverSerice(dataChan chan *DiscoveryData, port *ocmd.Port, ds *ocmd.DiscoverService) {
var wg sync.WaitGroup
d.stopChan = make(chan struct{})
dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, util.Now(), nil, nil)
wg.Add(1)
go d.innerDiscoverSerice(&wg, dataChan, port, ds)
wg.Wait()
if nil != d.stopChan {
close(d.stopChan)
d.stopChan = nil
}
dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, util.Now(), nil, nil)
}
func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan *DiscoveryData, dz *ocmd.DiscoverZone) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanZone(dz, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
z := result.(*ocmd.Zone)
dataChan <- retainDiscoveryData(DiscoveryDataTypeZone, util.Now(), z, nil)
if nil != dz.DiscoverHost {
cr, _ := cidr.NewCIDRRanger(z.Network)
dh := &ocmd.DiscoverHost{
FirstScanRangeV4: cr.First().String(),
LastScanRangeV4: cr.Last().String(),
DiscoverPort: dz.DiscoverHost.DiscoverPort,
}
wg.Add(1)
go d.innerDiscoverHost(wg, dataChan, z, dh)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverHost(wg *sync.WaitGroup, dataChan chan *DiscoveryData, zone *ocmd.Zone, dh *ocmd.DiscoverHost) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanHost(zone, dh, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
h := result.(*ocmd.Host)
dataChan <- retainDiscoveryData(DiscoveryDataTypeHost, util.Now(), h, nil)
if nil != dh.DiscoverPort {
wg.Add(1)
go d.innerDiscoverPort(wg, dataChan, h, dh.DiscoverPort)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverPort(wg *sync.WaitGroup, dataChan chan *DiscoveryData, host *ocmd.Host, dp *ocmd.DiscoverPort) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanPort(host, dp, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
p := result.(*ocmd.Port)
dataChan <- retainDiscoveryData(DiscoveryDataTypePort, util.Now(), p, nil)
if nil != dp.DiscoverService {
wg.Add(1)
go d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoverService)
}
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err)
},
)
}
func (d *defaultDiscoverer) innerDiscoverSerice(wg *sync.WaitGroup, dataChan chan *DiscoveryData, port *ocmd.Port, ds *ocmd.DiscoverService) {
defer func() {
wg.Done()
}()
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanService(port, ds, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
s := result.(*ocmd.Service)
dataChan <- retainDiscoveryData(DiscoveryDataTypeService, util.Now(), s, nil)
},
func(err error) {
dataChan <- retainDiscoveryData(DiscoveryDataTypeError, util.Now(), nil, err)
},
)
}
func taskScan(d *defaultDiscoverer,
taskFunc func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}),
reesultFunc func(result interface{}),
errorFunc func(err error)) {
resultChan := make(chan interface{})
errChan := make(chan error)
stopChan := make(chan struct{})
doneChan := make(chan struct{})
defer func() {
}()
go taskFunc(resultChan, errChan, doneChan, stopChan)
for {
select {
case r := <-resultChan:
reesultFunc(r)
case err := <-errChan:
errorFunc(err)
case <-doneChan:
return
case <-d.stopChan:
close(stopChan)
<-doneChan
return
}
}
}
var discoveryDataPool sync.Pool
func retainDiscoveryData(discoveryDataType DiscoveryDataType, t util.Timestamp, result interface{}, err error) *DiscoveryData {
v := discoveryDataPool.Get()
var discoveryData *DiscoveryData
if v == nil {
discoveryData = &DiscoveryData{}
} else {
discoveryData = v.(*DiscoveryData)
}
discoveryData.Type = discoveryDataType
discoveryData.Time = t
discoveryData.Result = result
discoveryData.Error = err
return discoveryData
}
func releaseDiscoveryData(discoveryData *DiscoveryData) {
discoveryData.Type = DiscoveryDataTypeNone
discoveryData.Result = nil
discoveryData.Error = nil
discoveryData.Time = util.Timestamp{}
discoveryDataPool.Put(discoveryData)
}