From 0720771a8756111a2f5c25dbbfc73cef8d72ee0c Mon Sep 17 00:00:00 2001 From: crusader Date: Thu, 19 Apr 2018 23:45:39 +0900 Subject: [PATCH] ing --- internal/discoverer/discoverer.go | 12 +++++++++--- server/server.go | 6 ++++++ service/DiscoveryService.go | 13 ++++++++----- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/internal/discoverer/discoverer.go b/internal/discoverer/discoverer.go index e75399f..2188d41 100644 --- a/internal/discoverer/discoverer.go +++ b/internal/discoverer/discoverer.go @@ -4,6 +4,8 @@ import ( "sync" "time" + "git.loafle.net/commons/logging-go" + "git.loafle.net/commons/util-go/net/cidr" ocdm "git.loafle.net/overflow/commons-go/discovery/model" ) @@ -150,6 +152,7 @@ func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan }, func(result interface{}) { z := result.(*ocdm.Zone) + logging.Logger().Debugf("zone discovered: %v", z) dataChan <- retainDiscoveryData(DiscoveryDataTypeZone, time.Now(), z, nil) if nil != dz.DiscoveryHost { cr, _ := cidr.NewCIDRRanger(z.Network) @@ -159,7 +162,7 @@ func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan DiscoveryPort: dz.DiscoveryHost.DiscoveryPort, } wg.Add(1) - d.innerDiscoverHost(wg, dataChan, z, dh) + go d.innerDiscoverHost(wg, dataChan, z, dh) } }, func(err error) { @@ -179,10 +182,11 @@ func (d *defaultDiscoverer) innerDiscoverHost(wg *sync.WaitGroup, dataChan chan }, func(result interface{}) { h := result.(*ocdm.Host) + logging.Logger().Debugf("host discovered: %v", h) dataChan <- retainDiscoveryData(DiscoveryDataTypeHost, time.Now(), h, nil) if nil != dh.DiscoveryPort { wg.Add(1) - d.innerDiscoverPort(wg, dataChan, h, dh.DiscoveryPort) + go d.innerDiscoverPort(wg, dataChan, h, dh.DiscoveryPort) } }, func(err error) { @@ -202,10 +206,11 @@ func (d *defaultDiscoverer) innerDiscoverPort(wg *sync.WaitGroup, dataChan chan }, func(result interface{}) { p := result.(*ocdm.Port) + logging.Logger().Debugf("port discovered: %v", p) dataChan <- retainDiscoveryData(DiscoveryDataTypePort, time.Now(), p, nil) if nil != dp.DiscoveryService { wg.Add(1) - d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoveryService) + go d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoveryService) } }, func(err error) { @@ -225,6 +230,7 @@ func (d *defaultDiscoverer) innerDiscoverSerice(wg *sync.WaitGroup, dataChan cha }, func(result interface{}) { s := result.(*ocdm.Service) + logging.Logger().Debugf("service discovered: %v", s) dataChan <- retainDiscoveryData(DiscoveryDataTypeService, time.Now(), s, nil) }, func(err error) { diff --git a/server/server.go b/server/server.go index e5769a9..a095233 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,7 @@ package server import ( cdr "git.loafle.net/commons/di-go/registry" logging "git.loafle.net/commons/logging-go" + crpj "git.loafle.net/commons/rpc-go/protocol/json" crr "git.loafle.net/commons/rpc-go/registry" cssn "git.loafle.net/commons/server-go/socket/net" occa "git.loafle.net/overflow/commons-go/core/annotation" @@ -13,7 +14,10 @@ import ( ) func New(pidFilePath string) *cssn.Server { + rpcWriteChan := make(chan *container.RPCNotification, 256) + cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers()) + cdr.RegisterResource(container.CONTAINER_RPC_WRITE_CHAN, rpcWriteChan) services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType) if nil != err { @@ -25,6 +29,8 @@ func New(pidFilePath string) *cssn.Server { ds := &servlet.DiscoveryServlets{} ds.RPCInvoker = rpcRegistry + ds.RPCWriteChan = rpcWriteChan + ds.RPCServerCodec = crpj.NewServerCodec() sh := &ServerHandlers{} sh.Name = "Container Discovery" diff --git a/service/DiscoveryService.go b/service/DiscoveryService.go index 640ff80..1662bc7 100644 --- a/service/DiscoveryService.go +++ b/service/DiscoveryService.go @@ -7,6 +7,7 @@ import ( cda "git.loafle.net/commons/di-go/annotation" cdr "git.loafle.net/commons/di-go/registry" + "git.loafle.net/commons/logging-go" ocdm "git.loafle.net/overflow/commons-go/discovery/model" ocs "git.loafle.net/overflow/container-go/service" "git.loafle.net/overflow/container_discovery/internal/discoverer" @@ -49,7 +50,7 @@ func (s *DiscoveryService) DestroyService() { } func (s *DiscoveryService) DiscoverZone(requesterID string, dz *ocdm.DiscoveryZone) error { - s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { + go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { s.discoverer.DiscoverZone(dataChan, dz) }) @@ -57,7 +58,7 @@ func (s *DiscoveryService) DiscoverZone(requesterID string, dz *ocdm.DiscoveryZo } func (s *DiscoveryService) DiscoverHost(requesterID string, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) error { - s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { + go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { s.discoverer.DiscoverHost(dataChan, zone, dh) }) @@ -65,7 +66,7 @@ func (s *DiscoveryService) DiscoverHost(requesterID string, zone *ocdm.Zone, dh } func (s *DiscoveryService) DiscoverPort(requesterID string, host *ocdm.Host, dp *ocdm.DiscoveryPort) error { - s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { + go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { s.discoverer.DiscoverPort(dataChan, host, dp) }) @@ -73,7 +74,7 @@ func (s *DiscoveryService) DiscoverPort(requesterID string, host *ocdm.Host, dp } func (s *DiscoveryService) DiscoverService(requesterID string, port *ocdm.Port, ds *ocdm.DiscoveryService) error { - s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { + go s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { s.discoverer.DiscoverSerice(dataChan, port, ds) }) @@ -116,7 +117,7 @@ func (s *DiscoveryService) handleDiscovery(requesterID string, discoveryFunc fun s.discoverer.Release(dataChan) }() - discoveryFunc(dataChan) + go discoveryFunc(dataChan) for { select { @@ -126,8 +127,10 @@ func (s *DiscoveryService) handleDiscovery(requesterID string, discoveryFunc fun } switch data.Type { case discoverer.DiscoveryDataTypeStart: + logging.Logger().Debugf("DiscoveryService.DiscoveryStart: %s", data.Time.String()) s.ProbeService.Send("DiscoveryService.DiscoveryStart", requesterID, data.Time) case discoverer.DiscoveryDataTypeStop: + logging.Logger().Debugf("DiscoveryService.DiscoveryStop: %s", data.Time.String()) s.ProbeService.Send("DiscoveryService.DiscoveryStop", requesterID, data.Time) data.Release() return nil