This commit is contained in:
crusader 2018-03-23 01:11:03 +09:00
parent 3ceb82880a
commit 9589b92095
7 changed files with 49 additions and 65 deletions

View File

@ -1,22 +0,0 @@
package annotation
// @Service()
// inherit @Component
import (
"reflect"
cda "git.loafle.net/commons_go/di/annotation"
cdia "git.loafle.net/commons_go/di/injection/annotation"
)
const (
ServiceTag = "@overFlow:Service"
)
func init() {
cda.RegisterAnnotation(ServiceTag, reflect.TypeOf((*Service)(nil)))
}
type Service struct {
cdia.Component
}

View File

@ -7,6 +7,7 @@ import (
"git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/util/net/cidr" "git.loafle.net/commons_go/util/net/cidr"
discoveryM "git.loafle.net/overflow/overflow_commons_go/modules/discovery/model" discoveryM "git.loafle.net/overflow/overflow_commons_go/modules/discovery/model"
oopcs "git.loafle.net/overflow/overflow_probe_container/service"
) )
var RPCServlet rpc.Servlet var RPCServlet rpc.Servlet
@ -23,20 +24,20 @@ func DiscoveryDestroy() {
discoverer = nil discoverer = nil
} }
func DiscoverZone(requesterID string, dz *discoveryM.DiscoveryZone) { func DiscoverZone(probeService *oopcs.ProbeService, requesterID string, dz *discoveryM.DiscoveryZone) {
discoverer.discoverZone(requesterID, dz) discoverer.discoverZone(probeService, requesterID, dz)
} }
func DiscoverHost(requesterID string, zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost) { func DiscoverHost(probeService *oopcs.ProbeService, requesterID string, zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost) {
discoverer.discoverHost(requesterID, zone, dh) discoverer.discoverHost(probeService, requesterID, zone, dh)
} }
func DiscoverPort(requesterID string, host *discoveryM.Host, dp *discoveryM.DiscoveryPort) { func DiscoverPort(probeService *oopcs.ProbeService, requesterID string, host *discoveryM.Host, dp *discoveryM.DiscoveryPort) {
discoverer.discoverPort(requesterID, host, dp) discoverer.discoverPort(probeService, requesterID, host, dp)
} }
func DiscoverService(requesterID string, port *discoveryM.Port, ds *discoveryM.DiscoveryService) { func DiscoverService(probeService *oopcs.ProbeService, requesterID string, port *discoveryM.Port, ds *discoveryM.DiscoveryService) {
discoverer.discoverService(requesterID, port, ds) discoverer.discoverService(probeService, requesterID, port, ds)
} }
func Stop() { func Stop() {
@ -73,7 +74,7 @@ func (d *discovery) stop() {
logging.Logger().Infof("Discovery: discovery is stopped") logging.Logger().Infof("Discovery: discovery is stopped")
} }
func (d *discovery) discoverZone(requesterID string, dz *discoveryM.DiscoveryZone) { func (d *discovery) discoverZone(probeService *oopcs.ProbeService, requesterID string, dz *discoveryM.DiscoveryZone) {
go taskScan(d, go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanZone(dz, resultChan, errChan, doneChan, stopChan) scanZone(dz, resultChan, errChan, doneChan, stopChan)
@ -81,7 +82,7 @@ func (d *discovery) discoverZone(requesterID string, dz *discoveryM.DiscoveryZon
func(result interface{}) { func(result interface{}) {
z := result.(*discoveryM.Zone) z := result.(*discoveryM.Zone)
logging.Logger().Debugf("zone: %v", z) logging.Logger().Debugf("zone: %v", z)
d.sendResult("DiscoveryService.DiscoveredZone", requesterID, z) probeService.Send("DiscoveryService.DiscoveredZone", requesterID, z)
if nil != dz.DiscoveryHost { if nil != dz.DiscoveryHost {
cr, _ := cidr.NewCIDRRanger(z.Network) cr, _ := cidr.NewCIDRRanger(z.Network)
dh := &discoveryM.DiscoveryHost{ dh := &discoveryM.DiscoveryHost{
@ -89,13 +90,13 @@ func (d *discovery) discoverZone(requesterID string, dz *discoveryM.DiscoveryZon
LastScanRange: cr.Last().String(), LastScanRange: cr.Last().String(),
DiscoveryPort: dz.DiscoveryHost.DiscoveryPort, DiscoveryPort: dz.DiscoveryHost.DiscoveryPort,
} }
d.discoverHost(requesterID, z, dh) d.discoverHost(probeService, requesterID, z, dh)
} }
}, },
) )
} }
func (d *discovery) discoverHost(requesterID string, zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost) { func (d *discovery) discoverHost(probeService *oopcs.ProbeService, requesterID string, zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost) {
go taskScan(d, go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanHost(zone, dh, resultChan, errChan, doneChan, stopChan) scanHost(zone, dh, resultChan, errChan, doneChan, stopChan)
@ -103,15 +104,15 @@ func (d *discovery) discoverHost(requesterID string, zone *discoveryM.Zone, dh *
func(result interface{}) { func(result interface{}) {
h := result.(*discoveryM.Host) h := result.(*discoveryM.Host)
logging.Logger().Debugf("host: %v", h) logging.Logger().Debugf("host: %v", h)
d.sendResult("DiscoveryService.DiscoveredHost", requesterID, h) probeService.Send("DiscoveryService.DiscoveredHost", requesterID, h)
if nil != dh.DiscoveryPort { if nil != dh.DiscoveryPort {
d.discoverPort(requesterID, h, dh.DiscoveryPort) d.discoverPort(probeService, requesterID, h, dh.DiscoveryPort)
} }
}, },
) )
} }
func (d *discovery) discoverPort(requesterID string, host *discoveryM.Host, dp *discoveryM.DiscoveryPort) { func (d *discovery) discoverPort(probeService *oopcs.ProbeService, requesterID string, host *discoveryM.Host, dp *discoveryM.DiscoveryPort) {
go taskScan(d, go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanPort(host, dp, resultChan, errChan, doneChan, stopChan) scanPort(host, dp, resultChan, errChan, doneChan, stopChan)
@ -119,36 +120,27 @@ func (d *discovery) discoverPort(requesterID string, host *discoveryM.Host, dp *
func(result interface{}) { func(result interface{}) {
p := result.(*discoveryM.Port) p := result.(*discoveryM.Port)
logging.Logger().Debugf("port: %v", p) logging.Logger().Debugf("port: %v", p)
d.sendResult("DiscoveryService.DiscoveredPort", requesterID, p) probeService.Send("DiscoveryService.DiscoveredPort", requesterID, p)
if nil != dp.DiscoveryService { if nil != dp.DiscoveryService {
d.discoverService(requesterID, p, dp.DiscoveryService) d.discoverService(probeService, requesterID, p, dp.DiscoveryService)
} }
}, },
) )
} }
func (d *discovery) discoverService(requesterID string, port *discoveryM.Port, ds *discoveryM.DiscoveryService) { func (d *discovery) discoverService(probeService *oopcs.ProbeService, requesterID string, port *discoveryM.Port, ds *discoveryM.DiscoveryService) {
go taskScan(d, go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanService(port, ds, resultChan, errChan, doneChan, stopChan) scanService(port, ds, resultChan, errChan, doneChan, stopChan)
}, },
func(result interface{}) { func(result interface{}) {
s := result.(*discoveryM.Service) s := result.(*discoveryM.Service)
d.sendResult("DiscoveryService.DiscoveredService", requesterID, s) probeService.Send("DiscoveryService.DiscoveredService", requesterID, s)
logging.Logger().Debugf("service: %s(%s)[%s:%s]", s.ServiceName, s.CryptoType, port.Host.IP, port.PortNumber) logging.Logger().Debugf("service: %s(%s)[%s:%s]", s.ServiceName, s.CryptoType, port.Host.IP, port.PortNumber)
}, },
) )
} }
func (d *discovery) sendResult(method string, args ...interface{}) {
go RPCServlet.Send(method, args...)
// go notify.Notifier.Notify(method, args...)
}
func (d *discovery) sendError() {
}
func taskScan(d *discovery, task func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}), onResult func(result interface{})) { func taskScan(d *discovery, task func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}), onResult func(result interface{})) {
d.stopWg.Add(1) d.stopWg.Add(1)
resultChan := make(chan interface{}) resultChan := make(chan interface{})

View File

@ -3,8 +3,8 @@ import:
- package: git.loafle.net/commons_go/util - package: git.loafle.net/commons_go/util
- package: github.com/google/gopacket - package: github.com/google/gopacket
version: v1.1.14 version: v1.1.14
- package: git.loafle.net/commons_go/local_socket.git
- package: git.loafle.net/commons_go/server - package: git.loafle.net/commons_go/server
- package: git.loafle.net/commons_go/rpc - package: git.loafle.net/commons_go/rpc
- package: gopkg.in/natefinch/npipe.v2 - package: gopkg.in/natefinch/npipe.v2
- package: git.loafle.net/commons_go/di - package: git.loafle.net/commons_go/di
- package: git.loafle.net/overflow/overflow_commons_go

View File

@ -1,25 +1,29 @@
package server package server
import ( import (
"reflect"
cdr "git.loafle.net/commons_go/di/registry" cdr "git.loafle.net/commons_go/di/registry"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
crr "git.loafle.net/commons_go/rpc/registry" crr "git.loafle.net/commons_go/rpc/registry"
"git.loafle.net/commons_go/server" "git.loafle.net/commons_go/server"
oodca "git.loafle.net/overflow/overflow_discovery/commons/annotation" ooca "git.loafle.net/overflow/overflow_commons_go/annotation"
oods "git.loafle.net/overflow/overflow_discovery/service" oods "git.loafle.net/overflow/overflow_discovery/service"
oopcs "git.loafle.net/overflow/overflow_probe_container/server" oopcs "git.loafle.net/overflow/overflow_probe_container/server"
oopcService "git.loafle.net/overflow/overflow_probe_container/service"
) )
func New(pidPath string) server.Server { func New(pidPath string) server.Server {
var ( var (
services []interface{} services []interface{}
err error probeService *oopcService.ProbeService
err error
) )
oods.InitService() oods.InitService()
rpcRegistry := crr.NewRPCRegistry() rpcRegistry := crr.NewRPCRegistry()
if services, err = cdr.GetInstancesByAnnotationName(oodca.ServiceTag); nil != err { if services, err = cdr.GetInstancesByAnnotationName(ooca.ServiceTag); nil != err {
logging.Logger().Panic(err) logging.Logger().Panic(err)
} }
@ -27,8 +31,14 @@ func New(pidPath string) server.Server {
rpcRegistry.RegisterService(s, "") rpcRegistry.RegisterService(s, "")
} }
if _probeService, err := cdr.GetInstance(reflect.TypeOf((*oopcService.ProbeService)(nil))); nil != err {
logging.Logger().Panic(err)
} else {
probeService = _probeService.(*oopcService.ProbeService)
}
rpcSH := oopcs.NewRPCServletHandler(rpcRegistry) rpcSH := oopcs.NewRPCServletHandler(rpcRegistry)
socketHandler := newSocketHandler(rpcSH) socketHandler := newSocketHandler(rpcSH, probeService)
sh := newServerHandler(pidPath, socketHandler) sh := newServerHandler(pidPath, socketHandler)
s := oopcs.New(sh) s := oopcs.New(sh)

View File

@ -3,16 +3,14 @@ package server
import ( import (
"net" "net"
cRPC "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/server" "git.loafle.net/commons_go/server"
"git.loafle.net/overflow/overflow_discovery/discovery"
oopc "git.loafle.net/overflow/overflow_probe_container"
oopcs "git.loafle.net/overflow/overflow_probe_container/server" oopcs "git.loafle.net/overflow/overflow_probe_container/server"
oopcService "git.loafle.net/overflow/overflow_probe_container/service"
) )
func newSocketHandler(rpcSH oopcs.RPCServletHandler) SocketHandler { func newSocketHandler(rpcSH oopcs.RPCServletHandler, probeService *oopcService.ProbeService) SocketHandler {
sh := &SocketHandlers{} sh := &SocketHandlers{}
sh.SocketHandler = oopcs.NewSocketHandler(rpcSH) sh.SocketHandler = oopcs.NewSocketHandler(rpcSH, probeService)
return sh return sh
} }
@ -36,11 +34,9 @@ func (sh *SocketHandlers) Handshake(socketCTX server.SocketContext, conn net.Con
func (sh *SocketHandlers) OnConnect(soc server.Socket) { func (sh *SocketHandlers) OnConnect(soc server.Socket) {
sh.SocketHandler.OnConnect(soc) sh.SocketHandler.OnConnect(soc)
discovery.RPCServlet = soc.Context().GetAttribute(oopc.RPCServletKey).(cRPC.Servlet)
} }
func (sh *SocketHandlers) OnDisconnect(soc server.Socket) { func (sh *SocketHandlers) OnDisconnect(soc server.Socket) {
discovery.RPCServlet = nil
sh.SocketHandler.OnDisconnect(soc) sh.SocketHandler.OnDisconnect(soc)
} }

View File

@ -7,6 +7,7 @@ import (
cdr "git.loafle.net/commons_go/di/registry" cdr "git.loafle.net/commons_go/di/registry"
discoveryM "git.loafle.net/overflow/overflow_commons_go/modules/discovery/model" discoveryM "git.loafle.net/overflow/overflow_commons_go/modules/discovery/model"
"git.loafle.net/overflow/overflow_discovery/discovery" "git.loafle.net/overflow/overflow_discovery/discovery"
oopcs "git.loafle.net/overflow/overflow_probe_container/service"
) )
func init() { func init() {
@ -15,11 +16,13 @@ func init() {
type DiscoveryService struct { type DiscoveryService struct {
cda.TypeAnnotation `annotation:"@overFlow:Service()"` cda.TypeAnnotation `annotation:"@overFlow:Service()"`
ProbeService *oopcs.ProbeService `annotation:"@Inject()"`
} }
func (ds *DiscoveryService) DiscoverZone(requesterID string, dz *discoveryM.DiscoveryZone) error { func (ds *DiscoveryService) DiscoverZone(requesterID string, dz *discoveryM.DiscoveryZone) error {
discovery.DiscoverZone(requesterID, dz) discovery.DiscoverZone(ds.ProbeService, requesterID, dz)
return nil return nil
} }

View File

@ -1,8 +1,13 @@
package service package service
import (
oopcs "git.loafle.net/overflow/overflow_probe_container/service"
)
func InitService() { func InitService() {
oopcs.InitService()
} }
func DestroyService() { func DestroyService() {
oopcs.DestroyService()
} }