probe/discovery/discoverer.go

383 lines
8.5 KiB
Go
Raw Normal View History

2018-08-29 12:04:23 +00:00
package discovery
import (
2018-08-31 05:15:46 +00:00
"log"
2018-08-29 12:04:23 +00:00
"sync"
"time"
omd "git.loafle.net/overflow/model/discovery"
omu "git.loafle.net/overflow/model/util"
2018-08-30 12:56:32 +00:00
"git.loafle.net/overflow_scanner/probe/discovery/protocol/mdns"
"git.loafle.net/overflow_scanner/probe/discovery/protocol/snmp"
"git.loafle.net/overflow_scanner/probe/discovery/protocol/upnp"
2018-08-29 12:04:23 +00:00
"git.loafle.net/overflow_scanner/probe/discovery/session"
2018-08-31 14:00:08 +00:00
"git.loafle.net/overflow_scanner/probe/discovery/target/host"
"git.loafle.net/overflow_scanner/probe/discovery/target/port"
"git.loafle.net/overflow_scanner/probe/discovery/target/service"
2018-08-29 12:04:23 +00:00
"git.loafle.net/overflow_scanner/probe/discovery/types"
)
type Discoverer interface {
DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost)
DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort)
DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService)
2018-09-13 17:14:57 +00:00
DiscoverStop(requesterID string, requestID string)
2018-08-29 12:04:23 +00:00
Message() <-chan types.DiscoveryMessage
Shutdown()
}
func Instance() Discoverer {
_once.Do(func() {
2018-09-01 13:09:33 +00:00
i := &ofDiscoverer{}
i.start()
_instance = i
2018-08-29 12:04:23 +00:00
})
return _instance
}
var _instance Discoverer
var _once sync.Once
type ofDiscoverer struct {
stopChan chan struct{}
stopWg sync.WaitGroup
2018-09-17 07:20:46 +00:00
requestIDs sync.Map //map[string]bool
processingSessions sync.Map //map[string]session.DiscoverySession
2018-09-13 17:14:57 +00:00
requestQueue chan types.DiscoveryRequest
messageChan chan types.DiscoveryMessage
2018-08-29 12:04:23 +00:00
}
func (d *ofDiscoverer) DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost) {
2018-08-30 12:56:32 +00:00
d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypeHost, zone, dh))
2018-08-29 12:04:23 +00:00
}
func (d *ofDiscoverer) DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort) {
2018-08-30 12:56:32 +00:00
d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypePort, host, dp))
2018-08-29 12:04:23 +00:00
}
func (d *ofDiscoverer) DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService) {
2018-08-30 12:56:32 +00:00
d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypeService, port, ds))
2018-08-29 12:04:23 +00:00
}
2018-09-13 17:14:57 +00:00
func (d *ofDiscoverer) DiscoverStop(requesterID string, requestID string) {
2018-09-17 07:20:46 +00:00
_, ok := d.requestIDs.Load(requestID)
2018-09-13 17:14:57 +00:00
if ok {
2018-09-17 07:20:46 +00:00
d.requestIDs.Store(requestID, true)
2018-09-13 17:14:57 +00:00
return
}
2018-09-17 07:20:46 +00:00
s, ok := d.processingSessions.Load(requestID)
2018-09-13 17:14:57 +00:00
if !ok {
return
}
2018-09-19 02:05:53 +00:00
2018-09-17 07:20:46 +00:00
s.(session.DiscoverySession).Shutdown()
2018-09-13 17:14:57 +00:00
}
2018-08-29 12:04:23 +00:00
func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
return d.messageChan
}
2018-09-13 13:05:05 +00:00
func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, datas ...interface{}) {
d.messageChan <- types.MakeDiscoveryMessage(discoveryRequest, messageType, datas...)
2018-08-30 12:56:32 +00:00
}
2018-08-29 12:04:23 +00:00
func (d *ofDiscoverer) Shutdown() {
if d.stopChan == nil {
return
}
close(d.requestQueue)
close(d.messageChan)
close(d.stopChan)
d.stopWg.Wait()
d.stopChan = nil
}
func (d *ofDiscoverer) start() {
d.stopChan = make(chan struct{})
d.requestQueue = make(chan types.DiscoveryRequest, 10)
d.messageChan = make(chan types.DiscoveryMessage, 256)
d.stopWg.Add(1)
go d.handleRequest()
}
func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) {
select {
case d.requestQueue <- req:
2018-09-17 07:20:46 +00:00
d.requestIDs.Store(req.RequestID(), false)
2018-09-13 13:05:05 +00:00
d.SendMessage(req, types.DiscoveryMessageTypeQueueing, req.RequestID(), omu.Now())
2018-08-29 12:04:23 +00:00
go func() {
select {
case <-req.dequeue:
2018-09-04 19:12:14 +00:00
case <-time.After(20 * time.Second):
2018-08-29 12:04:23 +00:00
req.timeout = true
2018-09-13 13:05:05 +00:00
d.SendMessage(req, types.DiscoveryMessageTypeQueueingTimeout, omu.Now())
2018-08-29 12:04:23 +00:00
}
}()
default:
2018-09-13 13:05:05 +00:00
d.SendMessage(req, types.DiscoveryMessageTypeQueueingFailed, omu.Now())
2018-09-04 19:12:14 +00:00
select {
case <-time.After(time.Microsecond * 500):
}
2018-08-29 12:04:23 +00:00
req.release()
}
}
func (d *ofDiscoverer) handleRequest() {
defer func() {
d.stopWg.Done()
}()
LOOP:
for {
select {
case req, ok := <-d.requestQueue:
if !ok {
return
}
2018-09-13 13:05:05 +00:00
2018-09-17 07:20:46 +00:00
canceled, ok := d.requestIDs.Load(req.RequestID())
d.requestIDs.Delete(req.RequestID())
if !ok || canceled.(bool) {
2018-09-13 13:05:05 +00:00
req.(*ofDiscoveryRequest).release()
continue LOOP
}
2018-08-29 12:04:23 +00:00
if req.(*ofDiscoveryRequest).timeout {
2018-09-04 19:12:14 +00:00
req.(*ofDiscoveryRequest).release()
2018-08-29 12:04:23 +00:00
continue LOOP
}
req.(*ofDiscoveryRequest).dequeue <- true
2018-09-13 13:05:05 +00:00
d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now())
2018-09-04 10:09:15 +00:00
s := session.RetainDiscoverySession()
2018-09-17 07:20:46 +00:00
d.processingSessions.Store(req.RequestID(), s)
2018-09-04 10:09:15 +00:00
d.discover(req, s)
2018-09-28 07:13:45 +00:00
log.Print("Discover complete")
2018-09-04 19:37:56 +00:00
select {
2018-09-19 02:05:53 +00:00
case <-time.After(time.Millisecond * 500):
2018-09-04 19:37:56 +00:00
}
2018-09-13 13:05:05 +00:00
d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now())
2018-09-04 10:09:15 +00:00
2018-09-17 07:20:46 +00:00
if _, ok := d.processingSessions.Load(req.RequestID()); ok {
d.processingSessions.Delete(req.RequestID())
2018-09-14 10:08:14 +00:00
s.Shutdown()
}
2018-09-28 07:13:45 +00:00
log.Print("Discovery Session Shutdowm")
2018-09-14 10:08:14 +00:00
2018-09-04 10:09:15 +00:00
select {
2018-09-10 14:53:55 +00:00
case <-time.After(time.Millisecond * 500):
2018-09-04 10:09:15 +00:00
}
2018-09-10 14:53:55 +00:00
session.ReleaseDiscoverySession(s)
req.(*ofDiscoveryRequest).release()
2018-09-28 07:13:45 +00:00
log.Print("Discovery complete")
2018-09-10 14:53:55 +00:00
2018-08-29 12:04:23 +00:00
case <-d.stopChan:
return
}
}
}
2018-09-04 10:09:15 +00:00
func (d *ofDiscoverer) discover(req types.DiscoveryRequest, s session.DiscoverySession) {
2018-08-29 12:04:23 +00:00
if types.DiscoveryRequestTypeNone == req.RequestType() {
return
}
2018-08-29 18:20:51 +00:00
if err := s.InitWithRequest(req); nil != err {
2018-09-13 13:05:05 +00:00
d.SendMessage(req, types.DiscoveryMessageTypeError, err)
2018-08-29 18:20:51 +00:00
return
2018-08-29 12:04:23 +00:00
}
2018-09-14 07:44:09 +00:00
if s.Privileged() {
2018-09-17 06:39:59 +00:00
d.SendMessage(req, types.DiscoveryMessageTypeMode, omd.DiscoveryModeTypePrivileged.String())
2018-09-14 07:44:09 +00:00
} else {
2018-09-17 06:39:59 +00:00
d.SendMessage(req, types.DiscoveryMessageTypeMode, omd.DiscoveryModeTypeUnprivileged.String())
2018-09-14 07:44:09 +00:00
}
2018-08-30 12:56:32 +00:00
d.complexDiscover(s)
d.hierarchyDiscover(s)
2018-08-29 12:04:23 +00:00
}
2018-08-30 12:56:32 +00:00
func (d *ofDiscoverer) complexDiscover(s session.DiscoverySession) {
2018-08-29 13:23:41 +00:00
var wg sync.WaitGroup
2018-08-29 12:04:23 +00:00
2018-08-30 12:56:32 +00:00
discoveredChan := make(chan interface{})
s.SetDiscoveryDelegator(discoveredChan)
defer func() {
s.SetDiscoveryDelegator(nil)
close(discoveredChan)
}()
go func() {
for {
select {
case target, ok := <-discoveredChan:
if !ok {
return
}
switch target.(type) {
case *omd.Host:
2018-09-13 13:05:05 +00:00
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target)
2018-08-30 12:56:32 +00:00
case *omd.Port:
2018-09-13 13:05:05 +00:00
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target)
2018-08-30 12:56:32 +00:00
case *omd.Service:
2018-09-13 13:05:05 +00:00
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target)
2018-08-30 12:56:32 +00:00
default:
}
}
}
}()
2018-08-29 13:23:41 +00:00
wg.Add(1)
go func() {
defer wg.Done()
2018-08-31 05:15:46 +00:00
err := upnp.Scan(s)
if nil != err {
log.Printf("UPnP %v", err)
}
2018-08-29 13:23:41 +00:00
}()
wg.Add(1)
go func() {
defer wg.Done()
2018-08-31 05:15:46 +00:00
err := mdns.Scan(s)
if nil != err {
log.Printf("mDNS %v", err)
}
2018-08-29 13:23:41 +00:00
}()
2018-08-30 12:56:32 +00:00
wg.Add(1)
go func() {
defer wg.Done()
2018-08-31 05:15:46 +00:00
err := snmp.Scan(s)
if nil != err {
log.Printf("SNMP %v", err)
}
2018-08-30 12:56:32 +00:00
}()
2018-08-29 13:23:41 +00:00
wg.Wait()
2018-08-29 12:04:23 +00:00
}
2018-08-30 12:56:32 +00:00
func (d *ofDiscoverer) hierarchyDiscover(s session.DiscoverySession) {
2018-08-31 14:00:08 +00:00
var wg sync.WaitGroup
discoveredChan := make(chan interface{})
s.SetDiscoveryDelegator(discoveredChan)
defer func() {
s.SetDiscoveryDelegator(nil)
close(discoveredChan)
}()
go func() {
for {
select {
case target, ok := <-discoveredChan:
if !ok {
return
}
switch target.(type) {
case *omd.Host:
2018-09-13 13:05:05 +00:00
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target)
2018-08-31 14:00:08 +00:00
if nil != s.DiscoverPort() {
2018-09-28 07:13:45 +00:00
h := target.(*omd.Host)
2018-08-31 14:00:08 +00:00
wg.Add(1)
go func() {
2018-09-28 07:13:45 +00:00
defer func() {
wg.Done()
}()
port.Scan(s, h)
2018-08-31 14:00:08 +00:00
}()
}
case *omd.Port:
2018-09-13 13:05:05 +00:00
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target)
2018-08-31 14:00:08 +00:00
if nil != s.DiscoverService() {
2018-09-28 07:13:45 +00:00
p := target.(*omd.Port)
2018-08-31 14:00:08 +00:00
wg.Add(1)
go func() {
2018-09-28 07:13:45 +00:00
defer func() {
wg.Done()
}()
service.Scan(s, p)
2018-08-31 14:00:08 +00:00
}()
}
case *omd.Service:
2018-09-13 13:05:05 +00:00
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target)
2018-08-31 14:00:08 +00:00
default:
}
}
}
}()
2018-09-01 06:07:15 +00:00
// post complexDiscover
if nil != s.DiscoverPort() {
2018-09-14 10:08:14 +00:00
discoveredHosts := s.DiscoveredAllHosts(true)
2018-09-01 06:07:15 +00:00
for _, h := range discoveredHosts {
wg.Add(1)
go func(h *omd.Host) {
defer wg.Done()
port.Scan(s, h)
}(h)
}
}
wg.Wait()
if nil != s.DiscoverService() {
discoveredPorts := s.DiscoveredAllPorts()
for _, hostPorts := range discoveredPorts {
for _, ports := range hostPorts {
for _, p := range ports {
wg.Add(1)
go func(p *omd.Port) {
defer wg.Done()
service.Scan(s, p)
}(p)
}
}
}
}
wg.Wait()
// exec hierarchy Discovery
2018-08-31 14:00:08 +00:00
if nil != s.DiscoverHost() {
wg.Add(1)
go func() {
2018-09-28 07:13:45 +00:00
defer func() {
wg.Done()
}()
2018-08-31 14:00:08 +00:00
host.Scan(s)
}()
} else if nil != s.DiscoverPort() {
if nil != s.Host() {
wg.Add(1)
go func() {
2018-09-28 07:13:45 +00:00
defer func() {
wg.Done()
}()
2018-08-31 14:00:08 +00:00
port.Scan(s, s.Host())
}()
}
} else if nil != s.DiscoverService() {
if nil != s.Port() {
wg.Add(1)
go func() {
2018-09-28 07:13:45 +00:00
defer func() {
wg.Done()
}()
2018-08-31 14:00:08 +00:00
service.Scan(s, s.Port())
}()
}
}
2018-08-30 12:56:32 +00:00
2018-08-31 14:00:08 +00:00
wg.Wait()
2018-08-29 12:04:23 +00:00
}