This commit is contained in:
crusader 2018-09-14 19:08:14 +09:00
parent 7db69d4551
commit 0d082439e3
3 changed files with 19 additions and 3 deletions

View File

@ -73,6 +73,7 @@ func (d *ofDiscoverer) DiscoverStop(requesterID string, requestID string) {
if !ok { if !ok {
return return
} }
delete(d.processingSessions, requestID)
s.Shutdown() s.Shutdown()
} }
@ -168,13 +169,17 @@ LOOP:
} }
d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now()) d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now())
if _, ok := d.processingSessions[req.RequestID()]; ok {
delete(d.processingSessions, req.RequestID())
s.Shutdown()
}
select { select {
case <-time.After(time.Millisecond * 500): case <-time.After(time.Millisecond * 500):
} }
session.ReleaseDiscoverySession(s) session.ReleaseDiscoverySession(s)
req.(*ofDiscoveryRequest).release() req.(*ofDiscoveryRequest).release()
delete(d.processingSessions, req.RequestID())
log.Print("Discovery Session complete") log.Print("Discovery Session complete")
case <-d.stopChan: case <-d.stopChan:
@ -301,7 +306,7 @@ func (d *ofDiscoverer) hierarchyDiscover(s session.DiscoverySession) {
// post complexDiscover // post complexDiscover
if nil != s.DiscoverPort() { if nil != s.DiscoverPort() {
discoveredHosts := s.DiscoveredAllHosts(false) discoveredHosts := s.DiscoveredAllHosts(true)
for _, h := range discoveredHosts { for _, h := range discoveredHosts {
wg.Add(1) wg.Add(1)
go func(h *omd.Host) { go func(h *omd.Host) {

View File

@ -8,6 +8,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
omd "git.loafle.net/overflow/model/discovery" omd "git.loafle.net/overflow/model/discovery"
omm "git.loafle.net/overflow/model/meta" omm "git.loafle.net/overflow/model/meta"
@ -85,6 +86,7 @@ type ofDiscoverySession struct {
ports map[*omd.Host]map[json.Number]map[string]*omd.Port ports map[*omd.Host]map[json.Number]map[string]*omd.Port
services map[*omd.Port]map[string]*omd.Service services map[*omd.Port]map[string]*omd.Service
stopped atomic.Value
stopChan chan struct{} stopChan chan struct{}
} }
@ -106,6 +108,7 @@ func (ds *ofDiscoverySession) init(request types.DiscoveryRequest) {
ds.ports = make(map[*omd.Host]map[json.Number]map[string]*omd.Port) ds.ports = make(map[*omd.Host]map[json.Number]map[string]*omd.Port)
ds.services = make(map[*omd.Port]map[string]*omd.Service) ds.services = make(map[*omd.Port]map[string]*omd.Service)
ds.stopped.Store(false)
ds.stopChan = make(chan struct{}) ds.stopChan = make(chan struct{})
} }
@ -521,6 +524,10 @@ func (ds *ofDiscoverySession) StopChan() <-chan struct{} {
} }
func (ds *ofDiscoverySession) delegate(data interface{}) { func (ds *ofDiscoverySession) delegate(data interface{}) {
if true == ds.stopped.Load().(bool) {
return
}
switch v := data.(type) { switch v := data.(type) {
case *omd.Host: case *omd.Host:
if !ds.IsTargetHost(v) { if !ds.IsTargetHost(v) {
@ -544,6 +551,7 @@ func (ds *ofDiscoverySession) delegate(data interface{}) {
} }
func (ds *ofDiscoverySession) Shutdown() { func (ds *ofDiscoverySession) Shutdown() {
ds.stopped.Store(true)
close(ds.stopChan) close(ds.stopChan)
} }
@ -956,7 +964,7 @@ func RetainDiscoverySession() *ofDiscoverySession {
} }
func ReleaseDiscoverySession(ds *ofDiscoverySession) { func ReleaseDiscoverySession(ds *ofDiscoverySession) {
close(ds.stopChan) ds.stopChan = nil
if nil != ds.pCapScanner { if nil != ds.pCapScanner {
ds.pCapScanner.Stop() ds.pCapScanner.Stop()

View File

@ -163,5 +163,8 @@ func (s *DiscoveryService) DiscoverService(requesterID string, port *omd.Port, d
func (s *DiscoveryService) DiscoverStop(requesterID string, requestID string) error { func (s *DiscoveryService) DiscoverStop(requesterID string, requestID string) error {
log.Print("requestID ", requestID) log.Print("requestID ", requestID)
s.Discoverer.DiscoverStop(requesterID, requestID)
return nil return nil
} }