224 lines
5.2 KiB
Go
224 lines
5.2 KiB
Go
package discovery
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
omd "git.loafle.net/overflow/model/discovery"
|
|
omu "git.loafle.net/overflow/model/util"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/session"
|
|
"git.loafle.net/overflow_scanner/probe/discovery/types"
|
|
)
|
|
|
|
type Discoverer interface {
|
|
DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost)
|
|
DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort)
|
|
DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService)
|
|
Message() <-chan types.DiscoveryMessage
|
|
|
|
Shutdown()
|
|
}
|
|
|
|
func Instance() Discoverer {
|
|
_once.Do(func() {
|
|
_instance = newDiscoverer()
|
|
})
|
|
return _instance
|
|
}
|
|
|
|
var _instance Discoverer
|
|
var _once sync.Once
|
|
|
|
func newDiscoverer() Discoverer {
|
|
i := &ofDiscoverer{}
|
|
i.start()
|
|
|
|
return i
|
|
}
|
|
|
|
type ofDiscoverer struct {
|
|
stopChan chan struct{}
|
|
stopWg sync.WaitGroup
|
|
|
|
requestQueue chan types.DiscoveryRequest
|
|
messageChan chan types.DiscoveryMessage
|
|
}
|
|
|
|
func (d *ofDiscoverer) DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost) {
|
|
d.enqueue(retainDiscoveryRequest(d.messageChan, requesterID, types.DiscoveryRequestTypeHost, zone, dh))
|
|
}
|
|
|
|
func (d *ofDiscoverer) DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort) {
|
|
d.enqueue(retainDiscoveryRequest(d.messageChan, requesterID, types.DiscoveryRequestTypePort, host, dp))
|
|
}
|
|
|
|
func (d *ofDiscoverer) DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService) {
|
|
d.enqueue(retainDiscoveryRequest(d.messageChan, requesterID, types.DiscoveryRequestTypeService, port, ds))
|
|
}
|
|
|
|
func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
|
|
return d.messageChan
|
|
}
|
|
|
|
func (d *ofDiscoverer) Shutdown() {
|
|
if d.stopChan == nil {
|
|
return
|
|
}
|
|
|
|
close(d.requestQueue)
|
|
close(d.messageChan)
|
|
|
|
close(d.stopChan)
|
|
d.stopWg.Wait()
|
|
|
|
d.stopChan = nil
|
|
}
|
|
|
|
func (d *ofDiscoverer) start() {
|
|
d.stopChan = make(chan struct{})
|
|
|
|
d.requestQueue = make(chan types.DiscoveryRequest, 10)
|
|
d.messageChan = make(chan types.DiscoveryMessage, 256)
|
|
|
|
d.stopWg.Add(1)
|
|
go d.handleRequest()
|
|
}
|
|
|
|
func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) {
|
|
select {
|
|
case d.requestQueue <- req:
|
|
req.SendMessage(types.DiscoveryMessageTypeQueueing, omu.Now(), nil)
|
|
go func() {
|
|
select {
|
|
case <-req.dequeue:
|
|
case <-time.After(10 * time.Second):
|
|
req.timeout = true
|
|
req.SendMessage(types.DiscoveryMessageTypeTimeout, omu.Now(), nil)
|
|
req.release()
|
|
}
|
|
}()
|
|
default:
|
|
req.SendMessage(types.DiscoveryMessageTypeFailedQueueing, omu.Now(), nil)
|
|
req.release()
|
|
}
|
|
}
|
|
|
|
func (d *ofDiscoverer) handleRequest() {
|
|
defer func() {
|
|
d.stopWg.Done()
|
|
}()
|
|
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case req, ok := <-d.requestQueue:
|
|
if !ok {
|
|
return
|
|
}
|
|
if req.(*ofDiscoveryRequest).timeout {
|
|
continue LOOP
|
|
}
|
|
req.(*ofDiscoveryRequest).dequeue <- true
|
|
|
|
req.SendMessage(types.DiscoveryMessageTypeStart, omu.Now(), nil)
|
|
d.discover(req)
|
|
req.SendMessage(types.DiscoveryMessageTypeStop, omu.Now(), nil)
|
|
|
|
req.(*ofDiscoveryRequest).release()
|
|
case <-d.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *ofDiscoverer) discover(req types.DiscoveryRequest) {
|
|
if types.DiscoveryRequestTypeNone == req.RequestType() {
|
|
return
|
|
}
|
|
|
|
s := session.RetainDiscoverySession()
|
|
defer func() {
|
|
session.ReleaseDiscoverySession(s)
|
|
}()
|
|
|
|
params := req.Params()
|
|
|
|
switch req.RequestType() {
|
|
case types.DiscoveryRequestTypeHost:
|
|
if nil == params || 2 != len(params) {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("Parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
zone, ok := params[0].(*omd.Zone)
|
|
if !ok {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("Zone of parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
dh, ok := params[1].(*omd.DiscoverHost)
|
|
if !ok {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("DiscoverHost of parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
s.InitWithDiscoverHost(req, zone, dh)
|
|
case types.DiscoveryRequestTypePort:
|
|
if nil == params || 2 != len(params) {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("Parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
host, ok := params[0].(*omd.Host)
|
|
if !ok {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("Host of parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
dp, ok := params[1].(*omd.DiscoverPort)
|
|
if !ok {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("DiscoverPort of parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
s.InitWithDiscoverPort(req, host, dp)
|
|
case types.DiscoveryRequestTypeService:
|
|
if nil == params || 2 != len(params) {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("Parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
port, ok := params[0].(*omd.Port)
|
|
if !ok {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("Port of parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
ds, ok := params[1].(*omd.DiscoverService)
|
|
if !ok {
|
|
req.SendMessage(types.DiscoveryMessageTypeError, nil, fmt.Errorf("DiscoverService of parameter is not valid"))
|
|
break
|
|
}
|
|
|
|
s.InitWithDiscoverService(req, port, ds)
|
|
}
|
|
|
|
d.preDiscovery(s)
|
|
|
|
d.layerDiscovery(s)
|
|
}
|
|
|
|
func (d *ofDiscoverer) preDiscovery(s session.DiscoverySession) {
|
|
|
|
// SNMP
|
|
// mDNS
|
|
// UPnP
|
|
}
|
|
|
|
func (d *ofDiscoverer) layerDiscovery(s session.DiscoverySession) {
|
|
// SNMP
|
|
// mDNS
|
|
// UPnP
|
|
}
|