367 lines
8.3 KiB
Go
367 lines
8.3 KiB
Go
package discovery
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
omd "git.loafle.net/overflow/model/discovery"
|
|
omu "git.loafle.net/overflow/model/util"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/protocol/mdns"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/protocol/snmp"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/protocol/upnp"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/session"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/target/host"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/target/port"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/target/service"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/types"
|
|
)
|
|
|
|
type Discoverer interface {
|
|
DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost)
|
|
DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort)
|
|
DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService)
|
|
DiscoverStop(requesterID string, requestID string)
|
|
Message() <-chan types.DiscoveryMessage
|
|
|
|
Shutdown()
|
|
}
|
|
|
|
func Instance() Discoverer {
|
|
_once.Do(func() {
|
|
i := &ofDiscoverer{}
|
|
i.start()
|
|
|
|
_instance = i
|
|
})
|
|
return _instance
|
|
}
|
|
|
|
var _instance Discoverer
|
|
var _once sync.Once
|
|
|
|
type ofDiscoverer struct {
|
|
stopChan chan struct{}
|
|
stopWg sync.WaitGroup
|
|
|
|
requestIDs sync.Map //map[string]bool
|
|
processingSessions sync.Map //map[string]session.DiscoverySession
|
|
requestQueue chan types.DiscoveryRequest
|
|
messageChan chan types.DiscoveryMessage
|
|
}
|
|
|
|
func (d *ofDiscoverer) DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost) {
|
|
d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypeHost, zone, dh))
|
|
}
|
|
|
|
func (d *ofDiscoverer) DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort) {
|
|
d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypePort, host, dp))
|
|
}
|
|
|
|
func (d *ofDiscoverer) DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService) {
|
|
d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypeService, port, ds))
|
|
}
|
|
|
|
func (d *ofDiscoverer) DiscoverStop(requesterID string, requestID string) {
|
|
_, ok := d.requestIDs.Load(requestID)
|
|
if ok {
|
|
d.requestIDs.Store(requestID, true)
|
|
return
|
|
}
|
|
|
|
s, ok := d.processingSessions.Load(requestID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
s.(session.DiscoverySession).Shutdown()
|
|
}
|
|
|
|
func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
|
|
return d.messageChan
|
|
}
|
|
|
|
func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, datas ...interface{}) {
|
|
d.messageChan <- types.MakeDiscoveryMessage(discoveryRequest, messageType, datas...)
|
|
}
|
|
|
|
func (d *ofDiscoverer) Shutdown() {
|
|
if d.stopChan == nil {
|
|
return
|
|
}
|
|
|
|
close(d.requestQueue)
|
|
close(d.messageChan)
|
|
|
|
close(d.stopChan)
|
|
d.stopWg.Wait()
|
|
|
|
d.stopChan = nil
|
|
}
|
|
|
|
func (d *ofDiscoverer) start() {
|
|
d.stopChan = make(chan struct{})
|
|
|
|
d.requestQueue = make(chan types.DiscoveryRequest, 10)
|
|
d.messageChan = make(chan types.DiscoveryMessage, 256)
|
|
|
|
d.stopWg.Add(1)
|
|
go d.handleRequest()
|
|
}
|
|
|
|
func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) {
|
|
select {
|
|
case d.requestQueue <- req:
|
|
d.requestIDs.Store(req.RequestID(), false)
|
|
d.SendMessage(req, types.DiscoveryMessageTypeQueueing, req.RequestID(), omu.Now())
|
|
go func() {
|
|
select {
|
|
case <-req.dequeue:
|
|
case <-time.After(20 * time.Second):
|
|
req.timeout = true
|
|
d.SendMessage(req, types.DiscoveryMessageTypeQueueingTimeout, omu.Now())
|
|
}
|
|
}()
|
|
default:
|
|
d.SendMessage(req, types.DiscoveryMessageTypeQueueingFailed, omu.Now())
|
|
select {
|
|
case <-time.After(time.Microsecond * 500):
|
|
}
|
|
|
|
req.release()
|
|
}
|
|
}
|
|
|
|
func (d *ofDiscoverer) handleRequest() {
|
|
defer func() {
|
|
d.stopWg.Done()
|
|
}()
|
|
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case req, ok := <-d.requestQueue:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
canceled, ok := d.requestIDs.Load(req.RequestID())
|
|
d.requestIDs.Delete(req.RequestID())
|
|
if !ok || canceled.(bool) {
|
|
req.(*ofDiscoveryRequest).release()
|
|
continue LOOP
|
|
}
|
|
|
|
if req.(*ofDiscoveryRequest).timeout {
|
|
req.(*ofDiscoveryRequest).release()
|
|
continue LOOP
|
|
}
|
|
req.(*ofDiscoveryRequest).dequeue <- true
|
|
|
|
d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now())
|
|
s := session.RetainDiscoverySession()
|
|
d.processingSessions.Store(req.RequestID(), s)
|
|
d.discover(req, s)
|
|
select {
|
|
case <-time.After(time.Millisecond * 500):
|
|
}
|
|
d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now())
|
|
|
|
if _, ok := d.processingSessions.Load(req.RequestID()); ok {
|
|
d.processingSessions.Delete(req.RequestID())
|
|
s.Shutdown()
|
|
}
|
|
|
|
select {
|
|
case <-time.After(time.Millisecond * 500):
|
|
}
|
|
|
|
session.ReleaseDiscoverySession(s)
|
|
req.(*ofDiscoveryRequest).release()
|
|
|
|
log.Print("Discovery Session complete")
|
|
case <-d.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *ofDiscoverer) discover(req types.DiscoveryRequest, s session.DiscoverySession) {
|
|
if types.DiscoveryRequestTypeNone == req.RequestType() {
|
|
return
|
|
}
|
|
|
|
if err := s.InitWithRequest(req); nil != err {
|
|
d.SendMessage(req, types.DiscoveryMessageTypeError, err)
|
|
return
|
|
}
|
|
|
|
if s.Privileged() {
|
|
d.SendMessage(req, types.DiscoveryMessageTypeMode, omd.DiscoveryModeTypePrivileged.String())
|
|
} else {
|
|
d.SendMessage(req, types.DiscoveryMessageTypeMode, omd.DiscoveryModeTypeUnprivileged.String())
|
|
}
|
|
|
|
d.complexDiscover(s)
|
|
d.hierarchyDiscover(s)
|
|
}
|
|
|
|
func (d *ofDiscoverer) complexDiscover(s session.DiscoverySession) {
|
|
var wg sync.WaitGroup
|
|
|
|
discoveredChan := make(chan interface{})
|
|
s.SetDiscoveryDelegator(discoveredChan)
|
|
defer func() {
|
|
s.SetDiscoveryDelegator(nil)
|
|
close(discoveredChan)
|
|
}()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case target, ok := <-discoveredChan:
|
|
if !ok {
|
|
return
|
|
}
|
|
switch target.(type) {
|
|
case *omd.Host:
|
|
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target)
|
|
case *omd.Port:
|
|
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target)
|
|
case *omd.Service:
|
|
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target)
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := upnp.Scan(s)
|
|
if nil != err {
|
|
log.Printf("UPnP %v", err)
|
|
}
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := mdns.Scan(s)
|
|
if nil != err {
|
|
log.Printf("mDNS %v", err)
|
|
}
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := snmp.Scan(s)
|
|
if nil != err {
|
|
log.Printf("SNMP %v", err)
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func (d *ofDiscoverer) hierarchyDiscover(s session.DiscoverySession) {
|
|
var wg sync.WaitGroup
|
|
|
|
discoveredChan := make(chan interface{})
|
|
s.SetDiscoveryDelegator(discoveredChan)
|
|
defer func() {
|
|
s.SetDiscoveryDelegator(nil)
|
|
close(discoveredChan)
|
|
}()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case target, ok := <-discoveredChan:
|
|
if !ok {
|
|
return
|
|
}
|
|
switch target.(type) {
|
|
case *omd.Host:
|
|
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target)
|
|
if nil != s.DiscoverPort() {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
port.Scan(s, target.(*omd.Host))
|
|
}()
|
|
}
|
|
case *omd.Port:
|
|
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target)
|
|
if nil != s.DiscoverService() {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
service.Scan(s, target.(*omd.Port))
|
|
}()
|
|
}
|
|
case *omd.Service:
|
|
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target)
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// post complexDiscover
|
|
if nil != s.DiscoverPort() {
|
|
discoveredHosts := s.DiscoveredAllHosts(true)
|
|
for _, h := range discoveredHosts {
|
|
wg.Add(1)
|
|
go func(h *omd.Host) {
|
|
defer wg.Done()
|
|
port.Scan(s, h)
|
|
}(h)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
|
|
if nil != s.DiscoverService() {
|
|
discoveredPorts := s.DiscoveredAllPorts()
|
|
for _, hostPorts := range discoveredPorts {
|
|
for _, ports := range hostPorts {
|
|
for _, p := range ports {
|
|
wg.Add(1)
|
|
go func(p *omd.Port) {
|
|
defer wg.Done()
|
|
service.Scan(s, p)
|
|
}(p)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
wg.Wait()
|
|
|
|
// exec hierarchy Discovery
|
|
if nil != s.DiscoverHost() {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
host.Scan(s)
|
|
}()
|
|
} else if nil != s.DiscoverPort() {
|
|
if nil != s.Host() {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
port.Scan(s, s.Host())
|
|
}()
|
|
}
|
|
} else if nil != s.DiscoverService() {
|
|
if nil != s.Port() {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
service.Scan(s, s.Port())
|
|
}()
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|