This commit is contained in:
crusader 2018-09-14 02:14:57 +09:00
parent 57cb5cf1a9
commit 7db69d4551
4 changed files with 35 additions and 10 deletions

View File

@ -21,6 +21,7 @@ type Discoverer interface {
DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost) DiscoverHost(requesterID string, zone *omd.Zone, dh *omd.DiscoverHost)
DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort) DiscoverPort(requesterID string, host *omd.Host, dp *omd.DiscoverPort)
DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService) DiscoverService(requesterID string, port *omd.Port, ds *omd.DiscoverService)
DiscoverStop(requesterID string, requestID string)
Message() <-chan types.DiscoveryMessage Message() <-chan types.DiscoveryMessage
Shutdown() Shutdown()
@ -43,7 +44,8 @@ type ofDiscoverer struct {
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
requestIds map[string]bool requestIDs map[string]bool
processingSessions map[string]session.DiscoverySession
requestQueue chan types.DiscoveryRequest requestQueue chan types.DiscoveryRequest
messageChan chan types.DiscoveryMessage messageChan chan types.DiscoveryMessage
} }
@ -60,6 +62,20 @@ func (d *ofDiscoverer) DiscoverService(requesterID string, port *omd.Port, ds *o
d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypeService, port, ds)) d.enqueue(retainDiscoveryRequest(requesterID, types.DiscoveryRequestTypeService, port, ds))
} }
func (d *ofDiscoverer) DiscoverStop(requesterID string, requestID string) {
_, ok := d.requestIDs[requestID]
if ok {
d.requestIDs[requestID] = false
return
}
s, ok := d.processingSessions[requestID]
if !ok {
return
}
s.Shutdown()
}
func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage { func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
return d.messageChan return d.messageChan
} }
@ -85,7 +101,8 @@ func (d *ofDiscoverer) Shutdown() {
func (d *ofDiscoverer) start() { func (d *ofDiscoverer) start() {
d.stopChan = make(chan struct{}) d.stopChan = make(chan struct{})
d.requestIds = make(map[string]bool, 10) d.requestIDs = make(map[string]bool, 10)
d.processingSessions = make(map[string]session.DiscoverySession, 2)
d.requestQueue = make(chan types.DiscoveryRequest, 10) d.requestQueue = make(chan types.DiscoveryRequest, 10)
d.messageChan = make(chan types.DiscoveryMessage, 256) d.messageChan = make(chan types.DiscoveryMessage, 256)
@ -96,7 +113,7 @@ func (d *ofDiscoverer) start() {
func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) { func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) {
select { select {
case d.requestQueue <- req: case d.requestQueue <- req:
d.requestIds[req.RequestID()] = true d.requestIDs[req.RequestID()] = true
d.SendMessage(req, types.DiscoveryMessageTypeQueueing, req.RequestID(), omu.Now()) d.SendMessage(req, types.DiscoveryMessageTypeQueueing, req.RequestID(), omu.Now())
go func() { go func() {
select { select {
@ -129,8 +146,8 @@ LOOP:
return return
} }
canceled, ok := d.requestIds[req.RequestID()] canceled, ok := d.requestIDs[req.RequestID()]
delete(d.requestIds, req.RequestID()) delete(d.requestIDs, req.RequestID())
if !ok || !canceled { if !ok || !canceled {
req.(*ofDiscoveryRequest).release() req.(*ofDiscoveryRequest).release()
continue LOOP continue LOOP
@ -144,6 +161,7 @@ LOOP:
d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now()) d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now())
s := session.RetainDiscoverySession() s := session.RetainDiscoverySession()
d.processingSessions[req.RequestID()] = s
d.discover(req, s) d.discover(req, s)
select { select {
case <-time.After(time.Second * 1): case <-time.After(time.Second * 1):
@ -156,6 +174,7 @@ LOOP:
session.ReleaseDiscoverySession(s) session.ReleaseDiscoverySession(s)
req.(*ofDiscoveryRequest).release() req.(*ofDiscoveryRequest).release()
delete(d.processingSessions, req.RequestID())
log.Print("Discovery Session complete") log.Print("Discovery Session complete")
case <-d.stopChan: case <-d.stopChan:

View File

@ -55,6 +55,7 @@ type DiscoverySession interface {
DiscoveredServices(port *omd.Port) map[string]*omd.Service DiscoveredServices(port *omd.Port) map[string]*omd.Service
DiscoveredAllServices() map[*omd.Port]map[string]*omd.Service DiscoveredAllServices() map[*omd.Port]map[string]*omd.Service
Shutdown()
StopChan() <-chan struct{} StopChan() <-chan struct{}
} }
@ -542,6 +543,10 @@ func (ds *ofDiscoverySession) delegate(data interface{}) {
} }
} }
func (ds *ofDiscoverySession) Shutdown() {
close(ds.stopChan)
}
func (ds *ofDiscoverySession) findHost(host *omd.Host) (h *omd.Host, modified bool) { func (ds *ofDiscoverySession) findHost(host *omd.Host) (h *omd.Host, modified bool) {
modified = false modified = false
var ok bool var ok bool

View File

@ -135,8 +135,7 @@ func (s *ScannerServlets) Handle(servletCtx server.ServletCtx,
} }
if !s.RPCInvoker.HasMethod(src.Method()) { if !s.RPCInvoker.HasMethod(src.Method()) {
olog.Logger().Error(err.Error()) s.writeError(src, writeChan, orp.E_NO_METHOD, "", fmt.Errorf("%s is not exist", src.Method()))
s.writeError(src, writeChan, orp.E_NO_METHOD, "", err)
break break
} }

View File

@ -1,6 +1,7 @@
package service package service
import ( import (
"log"
"reflect" "reflect"
"git.loafle.net/overflow_scanner/probe/internal/pubsub" "git.loafle.net/overflow_scanner/probe/internal/pubsub"
@ -160,6 +161,7 @@ func (s *DiscoveryService) DiscoverService(requesterID string, port *omd.Port, d
return nil return nil
} }
func (s *DiscoveryService) StopDiscover() error { func (s *DiscoveryService) DiscoverStop(requesterID string, requestID string) error {
log.Print("requestID ", requestID)
return nil return nil
} }