overflow_discovery/discovery/discovery.go
crusader 3579d26e52 ing
2017-11-16 20:01:42 +09:00

167 lines
3.7 KiB
Go

package discovery
import (
"fmt"
"sync"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_discovery/api/module/discovery/model"
)
var discoverer *discovery
func DiscoveryInit() {
discoverer = &discovery{}
discoverer.start()
}
func DiscoveryDestroy() {
discoverer.stop()
discoverer = nil
}
func DiscoverZone(dz *model.DiscoveryZone) {
discoverer.discoverZone(dz)
}
func DiscoverHost(dh *model.DiscoveryHost) {
discoverer.discoverHost(dh.Zone, dh)
}
func DiscoverPort(dp *model.DiscoveryPort) {
discoverer.discoverPort(dp.Host, dp)
}
func DiscoverService(ds *model.DiscoveryService) {
discoverer.discoverService(ds.Port, ds)
}
func Stop() {
}
type discovery struct {
sendChan chan interface{}
errChan chan error
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (d *discovery) start() {
if d.stopChan != nil {
panic("Discovery: discovery is already running. Stop it before starting it again")
}
d.stopChan = make(chan struct{})
d.sendChan = make(chan interface{})
d.errChan = make(chan error)
logging.Logger().Info(fmt.Sprintf("Discovery: discovery is started"))
}
func (d *discovery) stop() {
if d.stopChan == nil {
panic("Discovery: discovery must be started before stopping it")
}
close(d.stopChan)
d.stopWg.Wait()
d.stopChan = nil
logging.Logger().Info(fmt.Sprintf("Discovery: discovery is stopped"))
}
func (d *discovery) discoverZone(dz *model.DiscoveryZone) {
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}) {
scanZone(dz, resultChan, errChan, doneChan)
},
func(result interface{}) {
z := result.(*model.Zone)
logging.Logger().Info(fmt.Sprintf("zone: %v", z))
if nil != dz.DiscoveryHost {
d.discoverHost(z, dz.DiscoveryHost)
}
},
)
}
func (d *discovery) discoverHost(zone *model.Zone, dh *model.DiscoveryHost) {
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}) {
scanHost(zone, dh, resultChan, errChan, doneChan)
},
func(result interface{}) {
h := result.(*model.Host)
logging.Logger().Info(fmt.Sprintf("host: %v", h))
if nil != dh.DiscoveryPort {
d.discoverPort(h, dh.DiscoveryPort)
}
},
)
}
func (d *discovery) discoverPort(host *model.Host, dp *model.DiscoveryPort) {
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}) {
scanPort(host, dp, resultChan, errChan, doneChan)
},
func(result interface{}) {
p := result.(*model.Port)
logging.Logger().Info(fmt.Sprintf("port: %v", p))
if nil != dp.DiscoveryService {
d.discoverService(p, dp.DiscoveryService)
}
},
)
}
func (d *discovery) discoverService(port *model.Port, ds *model.DiscoveryService) {
taskScan(d,
func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}) {
scanService(port, ds, resultChan, errChan, doneChan)
},
func(result interface{}) {
s := result.(*model.Service)
logging.Logger().Info(fmt.Sprintf("service: %v", s))
},
)
}
func (d *discovery) sendResult() {
}
func (d *discovery) sendError() {
}
func taskScan(d *discovery, task func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}), onResult func(result interface{})) {
d.stopWg.Add(1)
resultChan := make(chan interface{})
errChan := make(chan error)
doneChan := make(chan struct{})
defer func() {
close(resultChan)
close(errChan)
close(doneChan)
d.stopWg.Done()
}()
go task(resultChan, errChan, doneChan)
for {
select {
case r := <-resultChan:
onResult(r)
case err := <-errChan:
logging.Logger().Info(fmt.Sprintf("task err: %v", err))
case <-doneChan:
logging.Logger().Debug(fmt.Sprintf("Discovery: task is complete"))
return
case <-d.stopChan:
return
}
}
}