overflow_discovery/discovery/discovery.go

186 lines
5.1 KiB
Go
Raw Normal View History

2017-11-15 12:09:38 +00:00
package discovery
import (
"fmt"
"sync"
"git.loafle.net/commons_go/logging"
2017-11-29 05:46:47 +00:00
"git.loafle.net/commons_go/rpc"
2017-11-23 09:34:07 +00:00
"git.loafle.net/commons_go/util/net/cidr"
2017-12-04 09:37:39 +00:00
discoveryM "git.loafle.net/overflow/overflow_commons_go/modules/discovery/model"
2017-11-15 12:09:38 +00:00
)
2017-11-29 05:46:47 +00:00
var RPCServlet rpc.Servlet
2017-11-15 12:09:38 +00:00
var discoverer *discovery
func DiscoveryInit() {
discoverer = &discovery{}
discoverer.start()
}
func DiscoveryDestroy() {
discoverer.stop()
discoverer = nil
}
2018-03-16 06:43:19 +00:00
func DiscoverZone(requesterID string, dz *discoveryM.DiscoveryZone) {
discoverer.discoverZone(requesterID, dz)
2017-11-15 12:09:38 +00:00
}
2018-03-16 06:43:19 +00:00
func DiscoverHost(requesterID string, zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost) {
discoverer.discoverHost(requesterID, zone, dh)
2017-11-15 12:09:38 +00:00
}
2018-03-16 06:43:19 +00:00
func DiscoverPort(requesterID string, host *discoveryM.Host, dp *discoveryM.DiscoveryPort) {
discoverer.discoverPort(requesterID, host, dp)
2017-11-15 12:09:38 +00:00
}
2018-03-16 06:43:19 +00:00
func DiscoverService(requesterID string, port *discoveryM.Port, ds *discoveryM.DiscoveryService) {
discoverer.discoverService(requesterID, port, ds)
2017-11-15 12:09:38 +00:00
}
func Stop() {
}
type discovery struct {
sendChan chan interface{}
errChan chan error
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (d *discovery) start() {
if d.stopChan != nil {
panic("Discovery: discovery is already running. Stop it before starting it again")
}
d.stopChan = make(chan struct{})
d.sendChan = make(chan interface{})
d.errChan = make(chan error)
logging.Logger().Info(fmt.Sprintf("Discovery: discovery is started"))
}
func (d *discovery) stop() {
if d.stopChan == nil {
panic("Discovery: discovery must be started before stopping it")
}
close(d.stopChan)
d.stopWg.Wait()
d.stopChan = nil
logging.Logger().Info(fmt.Sprintf("Discovery: discovery is stopped"))
}
2018-03-16 06:43:19 +00:00
func (d *discovery) discoverZone(requesterID string, dz *discoveryM.DiscoveryZone) {
2017-11-18 12:08:08 +00:00
go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanZone(dz, resultChan, errChan, doneChan, stopChan)
2017-11-16 11:01:42 +00:00
},
func(result interface{}) {
2017-12-04 09:37:39 +00:00
z := result.(*discoveryM.Zone)
2017-11-15 12:33:46 +00:00
logging.Logger().Info(fmt.Sprintf("zone: %v", z))
2018-03-16 06:43:19 +00:00
d.sendResult("DiscoveryService.DiscoveredZone", requesterID, z)
2017-11-15 12:33:46 +00:00
if nil != dz.DiscoveryHost {
2017-11-17 15:12:38 +00:00
cr, _ := cidr.NewCIDRRanger(z.Network)
2017-12-04 09:37:39 +00:00
dh := &discoveryM.DiscoveryHost{
2017-11-17 15:12:38 +00:00
FirstScanRange: cr.First().String(),
LastScanRange: cr.Last().String(),
DiscoveryPort: dz.DiscoveryHost.DiscoveryPort,
}
2018-03-16 06:43:19 +00:00
d.discoverHost(requesterID, z, dh)
2017-11-15 12:33:46 +00:00
}
2017-11-16 11:01:42 +00:00
},
)
2017-11-15 12:09:38 +00:00
}
2018-03-16 06:43:19 +00:00
func (d *discovery) discoverHost(requesterID string, zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost) {
2017-11-18 12:08:08 +00:00
go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanHost(zone, dh, resultChan, errChan, doneChan, stopChan)
2017-11-16 11:01:42 +00:00
},
func(result interface{}) {
2017-12-04 09:37:39 +00:00
h := result.(*discoveryM.Host)
2017-11-15 12:33:46 +00:00
logging.Logger().Info(fmt.Sprintf("host: %v", h))
2018-03-16 06:43:19 +00:00
d.sendResult("DiscoveryService.DiscoveredHost", requesterID, h)
2017-11-15 12:33:46 +00:00
if nil != dh.DiscoveryPort {
2018-03-16 06:43:19 +00:00
d.discoverPort(requesterID, h, dh.DiscoveryPort)
2017-11-15 12:33:46 +00:00
}
2017-11-16 11:01:42 +00:00
},
)
2017-11-15 12:09:38 +00:00
}
2018-03-16 06:43:19 +00:00
func (d *discovery) discoverPort(requesterID string, host *discoveryM.Host, dp *discoveryM.DiscoveryPort) {
2017-11-18 12:08:08 +00:00
go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanPort(host, dp, resultChan, errChan, doneChan, stopChan)
2017-11-16 11:01:42 +00:00
},
func(result interface{}) {
2017-12-04 09:37:39 +00:00
p := result.(*discoveryM.Port)
2017-11-15 12:33:46 +00:00
logging.Logger().Info(fmt.Sprintf("port: %v", p))
2018-03-16 06:43:19 +00:00
d.sendResult("DiscoveryService.DiscoveredPort", requesterID, p)
2017-11-15 12:33:46 +00:00
if nil != dp.DiscoveryService {
2018-03-16 06:43:19 +00:00
d.discoverService(requesterID, p, dp.DiscoveryService)
2017-11-15 12:33:46 +00:00
}
2017-11-16 11:01:42 +00:00
},
)
2017-11-15 12:09:38 +00:00
}
2018-03-16 06:43:19 +00:00
func (d *discovery) discoverService(requesterID string, port *discoveryM.Port, ds *discoveryM.DiscoveryService) {
2017-11-18 12:08:08 +00:00
go taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) {
scanService(port, ds, resultChan, errChan, doneChan, stopChan)
2017-11-16 11:01:42 +00:00
},
func(result interface{}) {
2017-12-04 09:37:39 +00:00
s := result.(*discoveryM.Service)
2018-03-16 06:43:19 +00:00
d.sendResult("DiscoveryService.DiscoveredService", requesterID, s)
2017-12-04 09:37:39 +00:00
logging.Logger().Info(fmt.Sprintf("service: %s(%s)[%s:%s]", s.ServiceName, s.CryptoType, port.Host.IP, port.PortNumber))
2017-11-16 11:01:42 +00:00
},
)
}
2017-11-23 09:34:07 +00:00
func (d *discovery) sendResult(method string, args ...interface{}) {
2017-11-29 05:46:47 +00:00
go RPCServlet.Send(method, args...)
2017-11-29 00:36:19 +00:00
// go notify.Notifier.Notify(method, args...)
2017-11-16 11:01:42 +00:00
}
func (d *discovery) sendError() {
}
2017-11-18 12:08:08 +00:00
func taskScan(d *discovery, task func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}), onResult func(result interface{})) {
2017-11-15 12:33:46 +00:00
d.stopWg.Add(1)
2017-11-16 11:01:42 +00:00
resultChan := make(chan interface{})
2017-11-15 12:33:46 +00:00
errChan := make(chan error)
2017-11-18 12:08:08 +00:00
stopChan := make(chan struct{})
2017-11-15 12:33:46 +00:00
doneChan := make(chan struct{})
defer func() {
close(resultChan)
close(errChan)
2017-11-18 12:08:08 +00:00
close(stopChan)
2017-11-15 12:33:46 +00:00
close(doneChan)
d.stopWg.Done()
}()
2017-11-18 12:08:08 +00:00
go task(resultChan, errChan, doneChan, stopChan)
2017-11-15 12:33:46 +00:00
for {
select {
2017-11-16 11:01:42 +00:00
case r := <-resultChan:
onResult(r)
2017-11-15 12:33:46 +00:00
case err := <-errChan:
2017-11-16 11:01:42 +00:00
logging.Logger().Info(fmt.Sprintf("task err: %v", err))
2017-11-15 12:33:46 +00:00
case <-doneChan:
2017-11-16 11:01:42 +00:00
logging.Logger().Debug(fmt.Sprintf("Discovery: task is complete"))
return
case <-d.stopChan:
2017-11-18 12:08:08 +00:00
close(stopChan)
<-doneChan
2017-11-15 12:33:46 +00:00
return
}
}
2017-11-15 12:09:38 +00:00
}