probe/discovery/discoverer.go

178 lines
3.8 KiB
Go
Raw Normal View History

2018-08-29 12:04:23 +00:00
package discovery
import (
"sync"
"time"
omd "git.loafle.net/overflow/model/discovery"
omu "git.loafle.net/overflow/model/util"
2018-08-29 13:23:41 +00:00
"git.loafle.net/overflow_scanner/probe/discovery/protocol/mdns"
"git.loafle.net/overflow_scanner/probe/discovery/protocol/upnp"
2018-08-29 12:04:23 +00:00
"git.loafle.net/overflow_scanner/probe/discovery/session"
"git.loafle.net/overflow_scanner/probe/discovery/types"
)
type Discoverer interface {
DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost)
DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort)
DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService)
Message() <-chan types.DiscoveryMessage
Shutdown()
}
func Instance() Discoverer {
_once.Do(func() {
_instance = newDiscoverer()
})
return _instance
}
var _instance Discoverer
var _once sync.Once
func newDiscoverer() Discoverer {
i := &ofDiscoverer{}
i.start()
return i
}
type ofDiscoverer struct {
stopChan chan struct{}
stopWg sync.WaitGroup
requestQueue chan types.DiscoveryRequest
messageChan chan types.DiscoveryMessage
}
func (d *ofDiscoverer) DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost) {
d.enqueue(retainDiscoveryRequest(d.messageChan, requesterID, types.DiscoveryRequestTypeHost, zone, dh))
}
func (d *ofDiscoverer) DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort) {
d.enqueue(retainDiscoveryRequest(d.messageChan, requesterID, types.DiscoveryRequestTypePort, host, dp))
}
func (d *ofDiscoverer) DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService) {
d.enqueue(retainDiscoveryRequest(d.messageChan, requesterID, types.DiscoveryRequestTypeService, port, ds))
}
func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
return d.messageChan
}
func (d *ofDiscoverer) Shutdown() {
if d.stopChan == nil {
return
}
close(d.requestQueue)
close(d.messageChan)
close(d.stopChan)
d.stopWg.Wait()
d.stopChan = nil
}
func (d *ofDiscoverer) start() {
d.stopChan = make(chan struct{})
d.requestQueue = make(chan types.DiscoveryRequest, 10)
d.messageChan = make(chan types.DiscoveryMessage, 256)
d.stopWg.Add(1)
go d.handleRequest()
}
func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) {
select {
case d.requestQueue <- req:
req.SendMessage(types.DiscoveryMessageTypeQueueing, omu.Now(), nil)
go func() {
select {
case <-req.dequeue:
case <-time.After(10 * time.Second):
req.timeout = true
req.SendMessage(types.DiscoveryMessageTypeTimeout, omu.Now(), nil)
req.release()
}
}()
default:
req.SendMessage(types.DiscoveryMessageTypeFailedQueueing, omu.Now(), nil)
req.release()
}
}
func (d *ofDiscoverer) handleRequest() {
defer func() {
d.stopWg.Done()
}()
LOOP:
for {
select {
case req, ok := <-d.requestQueue:
if !ok {
return
}
if req.(*ofDiscoveryRequest).timeout {
continue LOOP
}
req.(*ofDiscoveryRequest).dequeue <- true
req.SendMessage(types.DiscoveryMessageTypeStart, omu.Now(), nil)
d.discover(req)
req.SendMessage(types.DiscoveryMessageTypeStop, omu.Now(), nil)
req.(*ofDiscoveryRequest).release()
case <-d.stopChan:
return
}
}
}
func (d *ofDiscoverer) discover(req types.DiscoveryRequest) {
if types.DiscoveryRequestTypeNone == req.RequestType() {
return
}
s := session.RetainDiscoverySession()
defer func() {
session.ReleaseDiscoverySession(s)
}()
2018-08-29 18:20:51 +00:00
if err := s.InitWithRequest(req); nil != err {
req.SendMessage(types.DiscoveryMessageTypeError, nil, err)
return
2018-08-29 12:04:23 +00:00
}
d.preDiscovery(s)
d.layerDiscovery(s)
}
func (d *ofDiscoverer) preDiscovery(s session.DiscoverySession) {
2018-08-29 13:23:41 +00:00
var wg sync.WaitGroup
2018-08-29 12:04:23 +00:00
2018-08-29 13:23:41 +00:00
wg.Add(1)
go func() {
defer wg.Done()
upnp.Scan(s)
}()
wg.Add(1)
go func() {
defer wg.Done()
mdns.Scan(s)
}()
wg.Wait()
2018-08-29 12:04:23 +00:00
}
func (d *ofDiscoverer) layerDiscovery(s session.DiscoverySession) {
// SNMP
// mDNS
// UPnP
}