This commit is contained in:
crusader 2018-04-19 23:45:39 +09:00
parent de8a590fd5
commit 0720771a87
3 changed files with 23 additions and 8 deletions

View File

@ -4,6 +4,8 @@ import (
"sync" "sync"
"time" "time"
"git.loafle.net/commons/logging-go"
"git.loafle.net/commons/util-go/net/cidr" "git.loafle.net/commons/util-go/net/cidr"
ocdm "git.loafle.net/overflow/commons-go/discovery/model" ocdm "git.loafle.net/overflow/commons-go/discovery/model"
) )
@ -150,6 +152,7 @@ func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan
}, },
func(result interface{}) { func(result interface{}) {
z := result.(*ocdm.Zone) z := result.(*ocdm.Zone)
logging.Logger().Debugf("zone discovered: %v", z)
dataChan <- retainDiscoveryData(DiscoveryDataTypeZone, time.Now(), z, nil) dataChan <- retainDiscoveryData(DiscoveryDataTypeZone, time.Now(), z, nil)
if nil != dz.DiscoveryHost { if nil != dz.DiscoveryHost {
cr, _ := cidr.NewCIDRRanger(z.Network) cr, _ := cidr.NewCIDRRanger(z.Network)
@ -159,7 +162,7 @@ func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan
DiscoveryPort: dz.DiscoveryHost.DiscoveryPort, DiscoveryPort: dz.DiscoveryHost.DiscoveryPort,
} }
wg.Add(1) wg.Add(1)
d.innerDiscoverHost(wg, dataChan, z, dh) go d.innerDiscoverHost(wg, dataChan, z, dh)
} }
}, },
func(err error) { func(err error) {
@ -179,10 +182,11 @@ func (d *defaultDiscoverer) innerDiscoverHost(wg *sync.WaitGroup, dataChan chan
}, },
func(result interface{}) { func(result interface{}) {
h := result.(*ocdm.Host) h := result.(*ocdm.Host)
logging.Logger().Debugf("host discovered: %v", h)
dataChan <- retainDiscoveryData(DiscoveryDataTypeHost, time.Now(), h, nil) dataChan <- retainDiscoveryData(DiscoveryDataTypeHost, time.Now(), h, nil)
if nil != dh.DiscoveryPort { if nil != dh.DiscoveryPort {
wg.Add(1) wg.Add(1)
d.innerDiscoverPort(wg, dataChan, h, dh.DiscoveryPort) go d.innerDiscoverPort(wg, dataChan, h, dh.DiscoveryPort)
} }
}, },
func(err error) { func(err error) {
@ -202,10 +206,11 @@ func (d *defaultDiscoverer) innerDiscoverPort(wg *sync.WaitGroup, dataChan chan
}, },
func(result interface{}) { func(result interface{}) {
p := result.(*ocdm.Port) p := result.(*ocdm.Port)
logging.Logger().Debugf("port discovered: %v", p)
dataChan <- retainDiscoveryData(DiscoveryDataTypePort, time.Now(), p, nil) dataChan <- retainDiscoveryData(DiscoveryDataTypePort, time.Now(), p, nil)
if nil != dp.DiscoveryService { if nil != dp.DiscoveryService {
wg.Add(1) wg.Add(1)
d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoveryService) go d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoveryService)
} }
}, },
func(err error) { func(err error) {
@ -225,6 +230,7 @@ func (d *defaultDiscoverer) innerDiscoverSerice(wg *sync.WaitGroup, dataChan cha
}, },
func(result interface{}) { func(result interface{}) {
s := result.(*ocdm.Service) s := result.(*ocdm.Service)
logging.Logger().Debugf("service discovered: %v", s)
dataChan <- retainDiscoveryData(DiscoveryDataTypeService, time.Now(), s, nil) dataChan <- retainDiscoveryData(DiscoveryDataTypeService, time.Now(), s, nil)
}, },
func(err error) { func(err error) {

View File

@ -3,6 +3,7 @@ package server
import ( import (
cdr "git.loafle.net/commons/di-go/registry" cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
crpj "git.loafle.net/commons/rpc-go/protocol/json"
crr "git.loafle.net/commons/rpc-go/registry" crr "git.loafle.net/commons/rpc-go/registry"
cssn "git.loafle.net/commons/server-go/socket/net" cssn "git.loafle.net/commons/server-go/socket/net"
occa "git.loafle.net/overflow/commons-go/core/annotation" occa "git.loafle.net/overflow/commons-go/core/annotation"
@ -13,7 +14,10 @@ import (
) )
func New(pidFilePath string) *cssn.Server { func New(pidFilePath string) *cssn.Server {
rpcWriteChan := make(chan *container.RPCNotification, 256)
cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers()) cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers())
cdr.RegisterResource(container.CONTAINER_RPC_WRITE_CHAN, rpcWriteChan)
services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType) services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType)
if nil != err { if nil != err {
@ -25,6 +29,8 @@ func New(pidFilePath string) *cssn.Server {
ds := &servlet.DiscoveryServlets{} ds := &servlet.DiscoveryServlets{}
ds.RPCInvoker = rpcRegistry ds.RPCInvoker = rpcRegistry
ds.RPCWriteChan = rpcWriteChan
ds.RPCServerCodec = crpj.NewServerCodec()
sh := &ServerHandlers{} sh := &ServerHandlers{}
sh.Name = "Container Discovery" sh.Name = "Container Discovery"

View File

@ -7,6 +7,7 @@ import (
cda "git.loafle.net/commons/di-go/annotation" cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry" cdr "git.loafle.net/commons/di-go/registry"
"git.loafle.net/commons/logging-go"
ocdm "git.loafle.net/overflow/commons-go/discovery/model" ocdm "git.loafle.net/overflow/commons-go/discovery/model"
ocs "git.loafle.net/overflow/container-go/service" ocs "git.loafle.net/overflow/container-go/service"
"git.loafle.net/overflow/container_discovery/internal/discoverer" "git.loafle.net/overflow/container_discovery/internal/discoverer"
@ -49,7 +50,7 @@ func (s *DiscoveryService) DestroyService() {
} }
func (s *DiscoveryService) DiscoverZone(requesterID string, dz *ocdm.DiscoveryZone) error { func (s *DiscoveryService) DiscoverZone(requesterID string, dz *ocdm.DiscoveryZone) error {
s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) {
s.discoverer.DiscoverZone(dataChan, dz) s.discoverer.DiscoverZone(dataChan, dz)
}) })
@ -57,7 +58,7 @@ func (s *DiscoveryService) DiscoverZone(requesterID string, dz *ocdm.DiscoveryZo
} }
func (s *DiscoveryService) DiscoverHost(requesterID string, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) error { func (s *DiscoveryService) DiscoverHost(requesterID string, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) error {
s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) {
s.discoverer.DiscoverHost(dataChan, zone, dh) s.discoverer.DiscoverHost(dataChan, zone, dh)
}) })
@ -65,7 +66,7 @@ func (s *DiscoveryService) DiscoverHost(requesterID string, zone *ocdm.Zone, dh
} }
func (s *DiscoveryService) DiscoverPort(requesterID string, host *ocdm.Host, dp *ocdm.DiscoveryPort) error { func (s *DiscoveryService) DiscoverPort(requesterID string, host *ocdm.Host, dp *ocdm.DiscoveryPort) error {
s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) {
s.discoverer.DiscoverPort(dataChan, host, dp) s.discoverer.DiscoverPort(dataChan, host, dp)
}) })
@ -73,7 +74,7 @@ func (s *DiscoveryService) DiscoverPort(requesterID string, host *ocdm.Host, dp
} }
func (s *DiscoveryService) DiscoverService(requesterID string, port *ocdm.Port, ds *ocdm.DiscoveryService) error { func (s *DiscoveryService) DiscoverService(requesterID string, port *ocdm.Port, ds *ocdm.DiscoveryService) error {
s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) {
s.discoverer.DiscoverSerice(dataChan, port, ds) s.discoverer.DiscoverSerice(dataChan, port, ds)
}) })
@ -116,7 +117,7 @@ func (s *DiscoveryService) handleDiscovery(requesterID string, discoveryFunc fun
s.discoverer.Release(dataChan) s.discoverer.Release(dataChan)
}() }()
discoveryFunc(dataChan) go discoveryFunc(dataChan)
for { for {
select { select {
@ -126,8 +127,10 @@ func (s *DiscoveryService) handleDiscovery(requesterID string, discoveryFunc fun
} }
switch data.Type { switch data.Type {
case discoverer.DiscoveryDataTypeStart: case discoverer.DiscoveryDataTypeStart:
logging.Logger().Debugf("DiscoveryService.DiscoveryStart: %s", data.Time.String())
s.ProbeService.Send("DiscoveryService.DiscoveryStart", requesterID, data.Time) s.ProbeService.Send("DiscoveryService.DiscoveryStart", requesterID, data.Time)
case discoverer.DiscoveryDataTypeStop: case discoverer.DiscoveryDataTypeStop:
logging.Logger().Debugf("DiscoveryService.DiscoveryStop: %s", data.Time.String())
s.ProbeService.Send("DiscoveryService.DiscoveryStop", requesterID, data.Time) s.ProbeService.Send("DiscoveryService.DiscoveryStop", requesterID, data.Time)
data.Release() data.Release()
return nil return nil