This commit is contained in:
crusader 2018-09-13 22:05:05 +09:00
parent 337f493251
commit a84f4eab12
6 changed files with 55 additions and 31 deletions

View File

@ -43,6 +43,7 @@ type ofDiscoverer struct {
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
requestIds map[string]bool
requestQueue chan types.DiscoveryRequest requestQueue chan types.DiscoveryRequest
messageChan chan types.DiscoveryMessage messageChan chan types.DiscoveryMessage
} }
@ -63,8 +64,8 @@ func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
return d.messageChan return d.messageChan
} }
func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, data interface{}, err error) { func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, datas ...interface{}) {
d.messageChan <- types.MakeDiscoveryMessage(discoveryRequest, messageType, data, err) d.messageChan <- types.MakeDiscoveryMessage(discoveryRequest, messageType, datas...)
} }
func (d *ofDiscoverer) Shutdown() { func (d *ofDiscoverer) Shutdown() {
@ -84,6 +85,7 @@ func (d *ofDiscoverer) Shutdown() {
func (d *ofDiscoverer) start() { func (d *ofDiscoverer) start() {
d.stopChan = make(chan struct{}) d.stopChan = make(chan struct{})
d.requestIds = make(map[string]bool, 10)
d.requestQueue = make(chan types.DiscoveryRequest, 10) d.requestQueue = make(chan types.DiscoveryRequest, 10)
d.messageChan = make(chan types.DiscoveryMessage, 256) d.messageChan = make(chan types.DiscoveryMessage, 256)
@ -94,17 +96,18 @@ func (d *ofDiscoverer) start() {
func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) { func (d *ofDiscoverer) enqueue(req *ofDiscoveryRequest) {
select { select {
case d.requestQueue <- req: case d.requestQueue <- req:
d.SendMessage(req, types.DiscoveryMessageTypeQueueing, omu.Now(), nil) d.requestIds[req.RequestID()] = true
d.SendMessage(req, types.DiscoveryMessageTypeQueueing, req.RequestID(), omu.Now())
go func() { go func() {
select { select {
case <-req.dequeue: case <-req.dequeue:
case <-time.After(20 * time.Second): case <-time.After(20 * time.Second):
req.timeout = true req.timeout = true
d.SendMessage(req, types.DiscoveryMessageTypeQueueingTimeout, omu.Now(), nil) d.SendMessage(req, types.DiscoveryMessageTypeQueueingTimeout, omu.Now())
} }
}() }()
default: default:
d.SendMessage(req, types.DiscoveryMessageTypeQueueingFailed, omu.Now(), nil) d.SendMessage(req, types.DiscoveryMessageTypeQueueingFailed, omu.Now())
select { select {
case <-time.After(time.Microsecond * 500): case <-time.After(time.Microsecond * 500):
} }
@ -125,19 +128,27 @@ LOOP:
if !ok { if !ok {
return return
} }
canceled, ok := d.requestIds[req.RequestID()]
delete(d.requestIds, req.RequestID())
if !ok || canceled {
req.(*ofDiscoveryRequest).release()
continue LOOP
}
if req.(*ofDiscoveryRequest).timeout { if req.(*ofDiscoveryRequest).timeout {
req.(*ofDiscoveryRequest).release() req.(*ofDiscoveryRequest).release()
continue LOOP continue LOOP
} }
req.(*ofDiscoveryRequest).dequeue <- true req.(*ofDiscoveryRequest).dequeue <- true
d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now(), nil) d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now())
s := session.RetainDiscoverySession() s := session.RetainDiscoverySession()
d.discover(req, s) d.discover(req, s)
select { select {
case <-time.After(time.Second * 1): case <-time.After(time.Second * 1):
} }
d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now(), nil) d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now())
select { select {
case <-time.After(time.Millisecond * 500): case <-time.After(time.Millisecond * 500):
@ -159,7 +170,7 @@ func (d *ofDiscoverer) discover(req types.DiscoveryRequest, s session.DiscoveryS
} }
if err := s.InitWithRequest(req); nil != err { if err := s.InitWithRequest(req); nil != err {
d.SendMessage(req, types.DiscoveryMessageTypeError, nil, err) d.SendMessage(req, types.DiscoveryMessageTypeError, err)
return return
} }
@ -185,11 +196,11 @@ func (d *ofDiscoverer) complexDiscover(s session.DiscoverySession) {
} }
switch target.(type) { switch target.(type) {
case *omd.Host: case *omd.Host:
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target, nil) d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target)
case *omd.Port: case *omd.Port:
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target, nil) d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target)
case *omd.Service: case *omd.Service:
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target, nil) d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target)
default: default:
} }
} }
@ -244,7 +255,7 @@ func (d *ofDiscoverer) hierarchyDiscover(s session.DiscoverySession) {
} }
switch target.(type) { switch target.(type) {
case *omd.Host: case *omd.Host:
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target, nil) d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeHost, target)
if nil != s.DiscoverPort() { if nil != s.DiscoverPort() {
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -253,7 +264,7 @@ func (d *ofDiscoverer) hierarchyDiscover(s session.DiscoverySession) {
}() }()
} }
case *omd.Port: case *omd.Port:
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target, nil) d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypePort, target)
if nil != s.DiscoverService() { if nil != s.DiscoverService() {
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -262,7 +273,7 @@ func (d *ofDiscoverer) hierarchyDiscover(s session.DiscoverySession) {
}() }()
} }
case *omd.Service: case *omd.Service:
d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target, nil) d.SendMessage(s.DiscoveryRequest(), types.DiscoveryMessageTypeService, target)
default: default:
} }
} }

View File

@ -256,8 +256,7 @@ func Test_ofDiscoverer_SendMessage(t *testing.T) {
type args struct { type args struct {
discoveryRequest types.DiscoveryRequest discoveryRequest types.DiscoveryRequest
messageType types.DiscoveryMessageType messageType types.DiscoveryMessageType
data interface{} datas []interface{}
err error
} }
tests := []struct { tests := []struct {
name string name string
@ -274,7 +273,7 @@ func Test_ofDiscoverer_SendMessage(t *testing.T) {
requestQueue: tt.fields.requestQueue, requestQueue: tt.fields.requestQueue,
messageChan: tt.fields.messageChan, messageChan: tt.fields.messageChan,
} }
d.SendMessage(tt.args.discoveryRequest, tt.args.messageType, tt.args.data, tt.args.err) d.SendMessage(tt.args.discoveryRequest, tt.args.messageType, tt.args.datas...)
}) })
} }
} }

View File

@ -4,9 +4,11 @@ import (
"sync" "sync"
"git.loafle.net/overflow_scanner/probe/discovery/types" "git.loafle.net/overflow_scanner/probe/discovery/types"
uuid "github.com/satori/go.uuid"
) )
type ofDiscoveryRequest struct { type ofDiscoveryRequest struct {
requestID string
requesterID string requesterID string
requestType types.DiscoveryRequestType requestType types.DiscoveryRequestType
params []interface{} params []interface{}
@ -15,6 +17,10 @@ type ofDiscoveryRequest struct {
timeout bool timeout bool
} }
func (dr *ofDiscoveryRequest) RequestID() string {
return dr.requestID
}
func (dr *ofDiscoveryRequest) RequesterID() string { func (dr *ofDiscoveryRequest) RequesterID() string {
return dr.requesterID return dr.requesterID
} }
@ -42,6 +48,7 @@ func retainDiscoveryRequest(requesterID string, requestType types.DiscoveryReque
dr = v.(*ofDiscoveryRequest) dr = v.(*ofDiscoveryRequest)
} }
dr.requestID = uuid.NewV4().String()
dr.requesterID = requesterID dr.requesterID = requesterID
dr.requestType = requestType dr.requestType = requestType
dr.params = params dr.params = params
@ -52,6 +59,7 @@ func retainDiscoveryRequest(requesterID string, requestType types.DiscoveryReque
} }
func releaseDiscoveryRequest(dr *ofDiscoveryRequest) { func releaseDiscoveryRequest(dr *ofDiscoveryRequest) {
dr.requestID = ""
dr.requesterID = "" dr.requesterID = ""
dr.requestType = types.DiscoveryRequestTypeNone dr.requestType = types.DiscoveryRequestTypeNone
dr.params = nil dr.params = nil

View File

@ -9,11 +9,16 @@ func NewMockDiscoveryRequest(requesterID string, requestType DiscoveryRequestTyp
} }
type MockDiscoveryRequest struct { type MockDiscoveryRequest struct {
requestID string
requesterID string requesterID string
requestType DiscoveryRequestType requestType DiscoveryRequestType
params []interface{} params []interface{}
} }
func (dr *MockDiscoveryRequest) RequestID() string {
return dr.requestID
}
func (dr *MockDiscoveryRequest) RequesterID() string { func (dr *MockDiscoveryRequest) RequesterID() string {
return dr.requesterID return dr.requesterID
} }

View File

@ -24,15 +24,16 @@ const (
DiscoveryRequestTypeService DiscoveryRequestTypeService
) )
type DiscoveryMessage func() (request DiscoveryRequest, messageType DiscoveryMessageType, data interface{}, err error) type DiscoveryMessage func() (request DiscoveryRequest, messageType DiscoveryMessageType, datas []interface{})
func MakeDiscoveryMessage(request DiscoveryRequest, messageType DiscoveryMessageType, data interface{}, err error) DiscoveryMessage { func MakeDiscoveryMessage(request DiscoveryRequest, messageType DiscoveryMessageType, datas ...interface{}) DiscoveryMessage {
return func() (DiscoveryRequest, DiscoveryMessageType, interface{}, error) { return func() (DiscoveryRequest, DiscoveryMessageType, []interface{}) {
return request, messageType, data, err return request, messageType, datas
} }
} }
type DiscoveryRequest interface { type DiscoveryRequest interface {
RequestID() string
RequesterID() string RequesterID() string
RequestType() DiscoveryRequestType RequestType() DiscoveryRequestType
Params() []interface{} Params() []interface{}

View File

@ -39,7 +39,7 @@ func (s *DiscoveryService) InitService() {
return return
} }
request, messageType, data, err := msg() request, messageType, datas := msg()
switch messageType { switch messageType {
case types.DiscoveryMessageTypeStart: case types.DiscoveryMessageTypeStart:
@ -47,7 +47,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.DiscoveryStart", "DiscoveryService.DiscoveryStart",
[]interface{}{data}, datas,
), ),
"/scanner", "/scanner",
) )
@ -57,7 +57,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.DiscoveryStop", "DiscoveryService.DiscoveryStop",
[]interface{}{data}, datas,
), ),
"/scanner", "/scanner",
) )
@ -67,7 +67,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.Queueing", "DiscoveryService.Queueing",
[]interface{}{data}, datas,
), ),
"/scanner", "/scanner",
) )
@ -77,7 +77,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.QueueingFailed", "DiscoveryService.QueueingFailed",
[]interface{}{data}, datas,
), ),
"/scanner", "/scanner",
) )
@ -87,7 +87,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.QueueingTimeout", "DiscoveryService.QueueingTimeout",
[]interface{}{data}, datas,
), ),
"/scanner", "/scanner",
) )
@ -97,7 +97,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.DiscoveryError", "DiscoveryService.DiscoveryError",
[]interface{}{&orp.Error{Code: orp.E_INTERNAL, Message: err.Error()}}, []interface{}{&orp.Error{Code: orp.E_INTERNAL, Message: datas[0].(error).Error()}},
), ),
"/scanner", "/scanner",
) )
@ -107,7 +107,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.DiscoveredHost", "DiscoveryService.DiscoveredHost",
[]interface{}{data}, datas,
), ),
"/scanner", "/scanner",
) )
@ -117,7 +117,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.DiscoveredPort", "DiscoveryService.DiscoveredPort",
[]interface{}{data}, datas,
), ),
"/scanner", "/scanner",
) )
@ -127,7 +127,7 @@ func (s *DiscoveryService) InitService() {
rpc.MakeRPCMessage( rpc.MakeRPCMessage(
[]string{request.RequesterID()}, []string{request.RequesterID()},
"DiscoveryService.DiscoveredService", "DiscoveryService.DiscoveredService",
[]interface{}{data}, datas,
), ),
"/scanner", "/scanner",
) )