This commit is contained in:
crusader 2018-09-17 16:20:46 +09:00
parent 9567faa011
commit 10be32e2b4
3 changed files with 26 additions and 16 deletions

View File

@ -44,8 +44,8 @@ type ofDiscoverer struct {
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
requestIDs map[string]bool requestIDs sync.Map //map[string]bool
processingSessions map[string]session.DiscoverySession processingSessions sync.Map //map[string]session.DiscoverySession
requestQueue chan types.DiscoveryRequest requestQueue chan types.DiscoveryRequest
messageChan chan types.DiscoveryMessage messageChan chan types.DiscoveryMessage
} }
@ -63,18 +63,18 @@ func (d *ofDiscoverer) DiscoverService(requesterID string, port *omd.Port, ds *o
} }
func (d *ofDiscoverer) DiscoverStop(requesterID string, requestID string) { func (d *ofDiscoverer) DiscoverStop(requesterID string, requestID string) {
_, ok := d.requestIDs[requestID] _, ok := d.requestIDs.Load(requestID)
if ok { if ok {
d.requestIDs[requestID] = false d.requestIDs.Store(requestID, true)
return return
} }
s, ok := d.processingSessions[requestID] s, ok := d.processingSessions.Load(requestID)
if !ok { if !ok {
return return
} }
delete(d.processingSessions, requestID) s.(session.DiscoverySession).DiscoveryRequest().(*ofDiscoveryRequest).canceled.Store(true)
s.Shutdown() s.(session.DiscoverySession).Shutdown()
} }
func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage { func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
@ -82,6 +82,10 @@ func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
} }
func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, datas ...interface{}) { func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, datas ...interface{}) {
if discoveryRequest.(*ofDiscoveryRequest).canceled.Load().(bool) {
return
}
d.messageChan <- types.MakeDiscoveryMessage(discoveryRequest, messageType, datas...) d.messageChan <- types.MakeDiscoveryMessage(discoveryRequest, messageType, datas...)
} }
@ -102,8 +106,6 @@ func (d *ofDiscoverer) Shutdown() {
func (d *ofDiscoverer) start() { func (d *ofDiscoverer) start() {
d.stopChan = make(chan struct{}) d.stopChan = make(chan struct{})
d.requestIDs = make(map[string]bool, 10)
d.processingSessions = make(map[string]session.DiscoverySession, 2)
d.requestQueue = make(chan types.DiscoveryRequest, 10) d.requestQueue = make(chan types.DiscoveryRequest, 10)
d.messageChan = make(chan types.DiscoveryMessage, 256) d.messageChan = make(chan types.DiscoveryMessage, 256)
@ -114,7 +116,7 @@ func (d *ofDiscoverer) start() {
func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) { func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) {
select { select {
case d.requestQueue <- req: case d.requestQueue <- req:
d.requestIDs[req.RequestID()] = true d.requestIDs.Store(req.RequestID(), false)
d.SendMessage(req, types.DiscoveryMessageTypeQueueing, req.RequestID(), omu.Now()) d.SendMessage(req, types.DiscoveryMessageTypeQueueing, req.RequestID(), omu.Now())
go func() { go func() {
select { select {
@ -147,9 +149,9 @@ LOOP:
return return
} }
canceled, ok := d.requestIDs[req.RequestID()] canceled, ok := d.requestIDs.Load(req.RequestID())
delete(d.requestIDs, req.RequestID()) d.requestIDs.Delete(req.RequestID())
if !ok || !canceled { if !ok || canceled.(bool) {
req.(*ofDiscoveryRequest).release() req.(*ofDiscoveryRequest).release()
continue LOOP continue LOOP
} }
@ -162,15 +164,15 @@ LOOP:
d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now()) d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now())
s := session.RetainDiscoverySession() s := session.RetainDiscoverySession()
d.processingSessions[req.RequestID()] = s d.processingSessions.Store(req.RequestID(), s)
d.discover(req, s) d.discover(req, s)
select { select {
case <-time.After(time.Second * 1): case <-time.After(time.Second * 1):
} }
d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now()) d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now())
if _, ok := d.processingSessions[req.RequestID()]; ok { if _, ok := d.processingSessions.Load(req.RequestID()); ok {
delete(d.processingSessions, req.RequestID()) d.processingSessions.Delete(req.RequestID())
s.Shutdown() s.Shutdown()
} }

View File

@ -2,6 +2,7 @@ package discovery
import ( import (
"sync" "sync"
"sync/atomic"
"git.loafle.net/overflow_scanner/probe/discovery/types" "git.loafle.net/overflow_scanner/probe/discovery/types"
uuid "github.com/satori/go.uuid" uuid "github.com/satori/go.uuid"
@ -13,6 +14,8 @@ type ofDiscoveryRequest struct {
requestType types.DiscoveryRequestType requestType types.DiscoveryRequestType
params []interface{} params []interface{}
canceled atomic.Value
dequeue chan bool dequeue chan bool
timeout bool timeout bool
} }
@ -52,6 +55,7 @@ func retainDiscoveryRequest(requesterID string, requestType types.DiscoveryReque
dr.requesterID = requesterID dr.requesterID = requesterID
dr.requestType = requestType dr.requestType = requestType
dr.params = params dr.params = params
dr.canceled.Store(false)
dr.dequeue = make(chan bool, 1) dr.dequeue = make(chan bool, 1)
dr.timeout = false dr.timeout = false

View File

@ -559,6 +559,10 @@ func (ds *ofDiscoverySession) delegate(data interface{}) {
} }
func (ds *ofDiscoverySession) Shutdown() { func (ds *ofDiscoverySession) Shutdown() {
if ds.stopped.Load().(bool) {
return
}
ds.stopped.Store(true) ds.stopped.Store(true)
close(ds.stopChan) close(ds.stopChan)
} }