overflow_discovery/discovery/discovery.go
crusader 348cb63eac ing
2018-03-27 00:33:01 +09:00

227 lines
6.2 KiB
Go

package discovery
import (
"sync"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/util/net/cidr"
discoveryM "git.loafle.net/overflow/overflow_commons_go/modules/discovery/model"
oopcs "git.loafle.net/overflow/overflow_probe_container/service"
)
var RPCServlet rpc.Servlet
var discoverer *discovery
func DiscoveryInit() {
discoverer = &discovery{
_isRunning: false,
}
discoverer.start()
}
func DiscoveryDestroy() {
discoverer.stop()
discoverer = nil
}
func DiscoverZone(probeService *oopcs.ProbeService, requesterID string, dz *discoveryM.DiscoveryZone) {
if !discoverer.canRunOrNot() {
logging.Logger().Warnf("Discovery: Discovery is running already")
return
}
discoverer.discoverZone(probeService, requesterID, dz)
discoverer.stopWg.Wait()
discoverer.complete()
}
func DiscoverHost(probeService *oopcs.ProbeService, requesterID string, zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost) {
if !discoverer.canRunOrNot() {
logging.Logger().Warnf("Discovery: Discovery is running already")
return
}
discoverer.discoverHost(probeService, requesterID, zone, dh)
discoverer.stopWg.Wait()
discoverer.complete()
}
func DiscoverPort(probeService *oopcs.ProbeService, requesterID string, host *discoveryM.Host, dp *discoveryM.DiscoveryPort) {
if !discoverer.canRunOrNot() {
logging.Logger().Warnf("Discovery: Discovery is running already")
return
}
discoverer.discoverPort(probeService, requesterID, host, dp)
discoverer.stopWg.Wait()
discoverer.complete()
}
func DiscoverService(probeService *oopcs.ProbeService, requesterID string, port *discoveryM.Port, ds *discoveryM.DiscoveryService) {
if !discoverer.canRunOrNot() {
logging.Logger().Warnf("Discovery: Discovery is running already")
return
}
discoverer.discoverService(probeService, requesterID, port, ds)
discoverer.stopWg.Wait()
discoverer.complete()
}
func Stop() {
}
type discovery struct {
sendChan chan interface{}
errChan chan error
stopChan chan struct{}
stopWg sync.WaitGroup
_isRunning bool
_statusMtx sync.Mutex
}
func (d *discovery) canRunOrNot() bool {
d._statusMtx.Lock()
defer d._statusMtx.Unlock()
if d._isRunning {
return false
}
d._isRunning = true
return true
}
func (d *discovery) complete() {
d._statusMtx.Lock()
defer d._statusMtx.Unlock()
d._isRunning = false
}
func (d *discovery) start() {
if d.stopChan != nil {
logging.Logger().Panicf("Discovery: discovery is already running. Stop it before starting it again")
}
d.stopChan = make(chan struct{})
d.sendChan = make(chan interface{})
d.errChan = make(chan error)
logging.Logger().Infof("Discovery: discovery is started")
}
func (d *discovery) stop() {
if d.stopChan == nil {
logging.Logger().Panicf("Discovery: discovery must be started before stopping it")
}
close(d.stopChan)
d.stopWg.Wait()
d.stopChan = nil
logging.Logger().Infof("Discovery: discovery is stopped")
}
func (d *discovery) discoverZone(probeService *oopcs.ProbeService, requesterID string, dz *discoveryM.DiscoveryZone) {
go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanZone(dz, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
z := result.(*discoveryM.Zone)
logging.Logger().Debugf("zone: %v", z)
probeService.Send("DiscoveryService.DiscoveredZone", requesterID, z)
if nil != dz.DiscoveryHost {
cr, _ := cidr.NewCIDRRanger(z.Network)
dh := &discoveryM.DiscoveryHost{
FirstScanRange: cr.First().String(),
LastScanRange: cr.Last().String(),
DiscoveryPort: dz.DiscoveryHost.DiscoveryPort,
}
d.discoverHost(probeService, requesterID, z, dh)
}
},
)
}
func (d *discovery) discoverHost(probeService *oopcs.ProbeService, requesterID string, zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost) {
go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanHost(zone, dh, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
h := result.(*discoveryM.Host)
logging.Logger().Debugf("host: %v", h)
probeService.Send("DiscoveryService.DiscoveredHost", requesterID, h)
if nil != dh.DiscoveryPort {
d.discoverPort(probeService, requesterID, h, dh.DiscoveryPort)
}
},
)
}
func (d *discovery) discoverPort(probeService *oopcs.ProbeService, requesterID string, host *discoveryM.Host, dp *discoveryM.DiscoveryPort) {
go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanPort(host, dp, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
p := result.(*discoveryM.Port)
logging.Logger().Debugf("port: %v", p)
probeService.Send("DiscoveryService.DiscoveredPort", requesterID, p)
if nil != dp.DiscoveryService {
d.discoverService(probeService, requesterID, p, dp.DiscoveryService)
}
},
)
}
func (d *discovery) discoverService(probeService *oopcs.ProbeService, requesterID string, port *discoveryM.Port, ds *discoveryM.DiscoveryService) {
go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanService(port, ds, resultChan, errChan, doneChan, stopChan)
},
func(result interface{}) {
s := result.(*discoveryM.Service)
probeService.Send("DiscoveryService.DiscoveredService", requesterID, s)
logging.Logger().Debugf("service: %s(%s)[%s:%s]", s.ServiceName, s.CryptoType, port.Host.IP, port.PortNumber)
},
)
}
func taskScan(d *discovery, task func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}), onResult func(result interface{})) {
d.stopWg.Add(1)
resultChan := make(chan interface{})
errChan := make(chan error)
stopChan := make(chan struct{})
doneChan := make(chan struct{})
defer func() {
close(resultChan)
close(errChan)
close(stopChan)
close(doneChan)
d.stopWg.Done()
}()
go task(resultChan, errChan, doneChan, stopChan)
for {
select {
case r := <-resultChan:
onResult(r)
case err := <-errChan:
logging.Logger().Infof("task err: %v", err)
case <-doneChan:
logging.Logger().Debugf("Discovery: task is complete")
return
case <-d.stopChan:
close(stopChan)
<-doneChan
return
}
}
}