package discovery import ( "log" "sync" "time" omd "git.loafle.net/overflow/model/discovery" omu "git.loafle.net/overflow/model/util" "git.loafle.net/overflow_scanner/probe/discovery/protocol/mdns" "git.loafle.net/overflow_scanner/probe/discovery/protocol/snmp" "git.loafle.net/overflow_scanner/probe/discovery/protocol/upnp" "git.loafle.net/overflow_scanner/probe/discovery/session" "git.loafle.net/overflow_scanner/probe/discovery/target/host" "git.loafle.net/overflow_scanner/probe/discovery/target/port" "git.loafle.net/overflow_scanner/probe/discovery/target/service" "git.loafle.net/overflow_scanner/probe/discovery/types" ) type Discoverer interface { DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost) DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort) DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService) DiscoverStop(requesterID string, requestID string) Message() <-chan types.DiscoveryMessage Shutdown() } func Instance() Discoverer { _once.Do(func() { i := &ofDiscoverer{} i.start() _instance = i }) return _instance } var _instance Discoverer var _once sync.Once type ofDiscoverer struct { stopChan chan struct{} stopWg sync.WaitGroup requestIDs sync.Map //map[string]bool processingSessions sync.Map //map[string]session.DiscoverySession requestQueue chan types.DiscoveryRequest messageChan chan types.DiscoveryMessage } func (d *ofDiscoverer) DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost) { d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypeHost, zone, dh)) } func (d *ofDiscoverer) DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort) { d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypePort, host, dp)) } func (d *ofDiscoverer) DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService) { d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypeService, port, ds)) } func (d *ofDiscoverer) DiscoverStop(requesterID string, requestID string) { _, ok := d.requestIDs.Load(requestID) if ok { d.requestIDs.Store(requestID, true) return } s, ok := d.processingSessions.Load(requestID) if !ok { return } s.(session.DiscoverySession).DiscoveryRequest().(*ofDiscoveryRequest).canceled.Store(true) s.(session.DiscoverySession).Shutdown() } func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage { return d.messageChan } func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, datas ...interface{}) { if discoveryRequest.(*ofDiscoveryRequest).canceled.Load().(bool) { return } d.messageChan <- types.MakeDiscoveryMessage(discoveryRequest, messageType, datas...) } func (d *ofDiscoverer) Shutdown() { if d.stopChan == nil { return } close(d.requestQueue) close(d.messageChan) close(d.stopChan) d.stopWg.Wait() d.stopChan = nil } func (d *ofDiscoverer) start() { d.stopChan = make(chan struct{}) d.requestQueue = make(chan types.DiscoveryRequest, 10) d.messageChan = make(chan types.DiscoveryMessage, 256) d.stopWg.Add(1) go d.handleRequest() } func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) { select { case d.requestQueue <- req: d.requestIDs.Store(req.RequestID(), false) d.SendMessage(req, types.DiscoveryMessageTypeQueueing, req.RequestID(), omu.Now()) go func() { select { case <-req.dequeue: case <-time.After(20 * time.Second): req.timeout = true d.SendMessage(req, types.DiscoveryMessageTypeQueueingTimeout, omu.Now()) } }() default: d.SendMessage(req, types.DiscoveryMessageTypeQueueingFailed, omu.Now()) select { case <-time.After(time.Microsecond * 500): } req.release() } } func (d *ofDiscoverer) handleRequest() { defer func() { d.stopWg.Done() }() LOOP: for { select { case req, ok := <-d.requestQueue: if !ok { return } canceled, ok := d.requestIDs.Load(req.RequestID()) d.requestIDs.Delete(req.RequestID()) if !ok || canceled.(bool) { req.(*ofDiscoveryRequest).release() continue LOOP } if req.(*ofDiscoveryRequest).timeout { req.(*ofDiscoveryRequest).release() continue LOOP } req.(*ofDiscoveryRequest).dequeue <- true d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now()) s := session.RetainDiscoverySession() d.processingSessions.Store(req.RequestID(), s) d.discover(req, s) select { case <-time.After(time.Second * 1): } d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now()) if _, ok := d.processingSessions.Load(req.RequestID()); ok { d.processingSessions.Delete(req.RequestID()) s.Shutdown() } select { case <-time.After(time.Millisecond * 500): } session.ReleaseDiscoverySession(s) req.(*ofDiscoveryRequest).release() log.Print("Discovery Session complete") case <-d.stopChan: return } } } func (d *ofDiscoverer) discover(req types.DiscoveryRequest, s session.DiscoverySession) { if types.DiscoveryRequestTypeNone == req.RequestType() { return } if err := s.InitWithRequest(req); nil != err { d.SendMessage(req, types.DiscoveryMessageTypeError, err) return } if s.Privileged() { d.SendMessage(req, types.DiscoveryMessageTypeMode, omd.DiscoveryModeTypePrivileged.String()) } else { d.SendMessage(req, types.DiscoveryMessageTypeMode, omd.DiscoveryModeTypeUnprivileged.String()) } d.complexDiscover(s) d.hierarchyDiscover(s) } func (d *ofDiscoverer) complexDiscover(s session.DiscoverySession) { var wg sync.WaitGroup discoveredChan := make(chan interface{}) s.SetDiscoveryDelegator(discoveredChan) defer func() { s.SetDiscoveryDelegator(nil) close(discoveredChan) }() go func() { for { select { case target, ok := <-discoveredChan: if !ok { return } switch target.(type) { case *omd.Host: d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target) case *omd.Port: d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target) case *omd.Service: d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target) default: } } } }() wg.Add(1) go func() { defer wg.Done() err := upnp.Scan(s) if nil != err { log.Printf("UPnP %v", err) } }() wg.Add(1) go func() { defer wg.Done() err := mdns.Scan(s) if nil != err { log.Printf("mDNS %v", err) } }() wg.Add(1) go func() { defer wg.Done() err := snmp.Scan(s) if nil != err { log.Printf("SNMP %v", err) } }() wg.Wait() } func (d *ofDiscoverer) hierarchyDiscover(s session.DiscoverySession) { var wg sync.WaitGroup discoveredChan := make(chan interface{}) s.SetDiscoveryDelegator(discoveredChan) defer func() { s.SetDiscoveryDelegator(nil) close(discoveredChan) }() go func() { for { select { case target, ok := <-discoveredChan: if !ok { return } switch target.(type) { case *omd.Host: d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target) if nil != s.DiscoverPort() { wg.Add(1) go func() { defer wg.Done() port.Scan(s, target.(*omd.Host)) }() } case *omd.Port: d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target) if nil != s.DiscoverService() { wg.Add(1) go func() { defer wg.Done() service.Scan(s, target.(*omd.Port)) }() } case *omd.Service: d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target) default: } } } }() // post complexDiscover if nil != s.DiscoverPort() { discoveredHosts := s.DiscoveredAllHosts(true) for _, h := range discoveredHosts { wg.Add(1) go func(h *omd.Host) { defer wg.Done() port.Scan(s, h) }(h) } } wg.Wait() if nil != s.DiscoverService() { discoveredPorts := s.DiscoveredAllPorts() for _, hostPorts := range discoveredPorts { for _, ports := range hostPorts { for _, p := range ports { wg.Add(1) go func(p *omd.Port) { defer wg.Done() service.Scan(s, p) }(p) } } } } wg.Wait() // exec hierarchy Discovery if nil != s.DiscoverHost() { wg.Add(1) go func() { defer wg.Done() host.Scan(s) }() } else if nil != s.DiscoverPort() { if nil != s.Host() { wg.Add(1) go func() { defer wg.Done() port.Scan(s, s.Host()) }() } } else if nil != s.DiscoverService() { if nil != s.Port() { wg.Add(1) go func() { defer wg.Done() service.Scan(s, s.Port()) }() } } wg.Wait() }