This commit is contained in:
crusader 2018-03-21 19:34:40 +09:00
parent c78d3b60ea
commit 3ceb82880a
8 changed files with 57 additions and 48 deletions

View File

@ -1,5 +1,5 @@
rm ./dist/* rm ./dist/*
CGO_ENABLED=1 go build -a --installsuffix cgo --ldflags="-s" -o ./dist/overflow_probe_container_discovery CGO_ENABLED=1 go build -a --installsuffix cgo --ldflags="-s" -o ./dist/overflow_probe_container_discovery
chmod u+s ./dist/overflow_probe_container_discovery cp ./dist/overflow_probe_container_discovery /project/overFlow/probe/bin/
cp ./dist/overflow_probe_container_discovery /project/overFlow/ chmod u+s /project/overFlow/probe/bin/overflow_probe_container_discovery

View File

@ -1,7 +1,6 @@
package discovery package discovery
import ( import (
"fmt"
"sync" "sync"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
@ -54,24 +53,24 @@ type discovery struct {
func (d *discovery) start() { func (d *discovery) start() {
if d.stopChan != nil { if d.stopChan != nil {
panic("Discovery: discovery is already running. Stop it before starting it again") logging.Logger().Panicf("Discovery: discovery is already running. Stop it before starting it again")
} }
d.stopChan = make(chan struct{}) d.stopChan = make(chan struct{})
d.sendChan = make(chan interface{}) d.sendChan = make(chan interface{})
d.errChan = make(chan error) d.errChan = make(chan error)
logging.Logger().Info(fmt.Sprintf("Discovery: discovery is started")) logging.Logger().Infof("Discovery: discovery is started")
} }
func (d *discovery) stop() { func (d *discovery) stop() {
if d.stopChan == nil { if d.stopChan == nil {
panic("Discovery: discovery must be started before stopping it") logging.Logger().Panicf("Discovery: discovery must be started before stopping it")
} }
close(d.stopChan) close(d.stopChan)
d.stopWg.Wait() d.stopWg.Wait()
d.stopChan = nil d.stopChan = nil
logging.Logger().Info(fmt.Sprintf("Discovery: discovery is stopped")) logging.Logger().Infof("Discovery: discovery is stopped")
} }
func (d *discovery) discoverZone(requesterID string, dz *discoveryM.DiscoveryZone) { func (d *discovery) discoverZone(requesterID string, dz *discoveryM.DiscoveryZone) {
@ -81,7 +80,7 @@ func (d *discovery) discoverZone(requesterID string, dz *discoveryM.DiscoveryZon
}, },
func(result interface{}) { func(result interface{}) {
z := result.(*discoveryM.Zone) z := result.(*discoveryM.Zone)
logging.Logger().Info(fmt.Sprintf("zone: %v", z)) logging.Logger().Debugf("zone: %v", z)
d.sendResult("DiscoveryService.DiscoveredZone", requesterID, z) d.sendResult("DiscoveryService.DiscoveredZone", requesterID, z)
if nil != dz.DiscoveryHost { if nil != dz.DiscoveryHost {
cr, _ := cidr.NewCIDRRanger(z.Network) cr, _ := cidr.NewCIDRRanger(z.Network)
@ -103,7 +102,7 @@ func (d *discovery) discoverHost(requesterID string, zone *discoveryM.Zone, dh *
}, },
func(result interface{}) { func(result interface{}) {
h := result.(*discoveryM.Host) h := result.(*discoveryM.Host)
logging.Logger().Info(fmt.Sprintf("host: %v", h)) logging.Logger().Debugf("host: %v", h)
d.sendResult("DiscoveryService.DiscoveredHost", requesterID, h) d.sendResult("DiscoveryService.DiscoveredHost", requesterID, h)
if nil != dh.DiscoveryPort { if nil != dh.DiscoveryPort {
d.discoverPort(requesterID, h, dh.DiscoveryPort) d.discoverPort(requesterID, h, dh.DiscoveryPort)
@ -119,7 +118,7 @@ func (d *discovery) discoverPort(requesterID string, host *discoveryM.Host, dp *
}, },
func(result interface{}) { func(result interface{}) {
p := result.(*discoveryM.Port) p := result.(*discoveryM.Port)
logging.Logger().Info(fmt.Sprintf("port: %v", p)) logging.Logger().Debugf("port: %v", p)
d.sendResult("DiscoveryService.DiscoveredPort", requesterID, p) d.sendResult("DiscoveryService.DiscoveredPort", requesterID, p)
if nil != dp.DiscoveryService { if nil != dp.DiscoveryService {
d.discoverService(requesterID, p, dp.DiscoveryService) d.discoverService(requesterID, p, dp.DiscoveryService)
@ -136,7 +135,7 @@ func (d *discovery) discoverService(requesterID string, port *discoveryM.Port, d
func(result interface{}) { func(result interface{}) {
s := result.(*discoveryM.Service) s := result.(*discoveryM.Service)
d.sendResult("DiscoveryService.DiscoveredService", requesterID, s) d.sendResult("DiscoveryService.DiscoveredService", requesterID, s)
logging.Logger().Info(fmt.Sprintf("service: %s(%s)[%s:%s]", s.ServiceName, s.CryptoType, port.Host.IP, port.PortNumber)) logging.Logger().Debugf("service: %s(%s)[%s:%s]", s.ServiceName, s.CryptoType, port.Host.IP, port.PortNumber)
}, },
) )
} }
@ -172,9 +171,9 @@ func taskScan(d *discovery, task func(resultChan chan interface{}, errChan chan
case r := <-resultChan: case r := <-resultChan:
onResult(r) onResult(r)
case err := <-errChan: case err := <-errChan:
logging.Logger().Info(fmt.Sprintf("task err: %v", err)) logging.Logger().Infof("task err: %v", err)
case <-doneChan: case <-doneChan:
logging.Logger().Debug(fmt.Sprintf("Discovery: task is complete")) logging.Logger().Debugf("Discovery: task is complete")
return return
case <-d.stopChan: case <-d.stopChan:
close(stopChan) close(stopChan)

View File

@ -48,7 +48,7 @@ func ScanHost(zone *discoveryM.Zone, dh *discoveryM.DiscoveryHost, resultChan ch
select { select {
case packet, ok := <-arpChan: case packet, ok := <-arpChan:
if !ok { if !ok {
logging.Logger().Debug(fmt.Sprintf("Discovery: arp channel is closed")) logging.Logger().Debugf("Discovery: arp channel is closed")
return return
} }
if h := handlePacketARP(zone, hostRanges, hosts, packet); nil != h { if h := handlePacketARP(zone, hostRanges, hosts, packet); nil != h {

View File

@ -42,7 +42,7 @@ func scanPortTCP(host *discoveryM.Host, dp *discoveryM.DiscoveryPort, resultChan
select { select {
case packet, ok := <-tcpChan: case packet, ok := <-tcpChan:
if !ok { if !ok {
logging.Logger().Debug(fmt.Sprintf("Discovery: tcp channel is closed")) logging.Logger().Debugf("Discovery: tcp channel is closed")
return return
} }
if p := handlePacketTCP(host, dp, ports, packet); nil != p { if p := handlePacketTCP(host, dp, ports, packet); nil != p {
@ -121,7 +121,7 @@ func handlePacketTCP(host *discoveryM.Host, dp *discoveryM.DiscoveryPort, ports
} }
port := int(packet.SrcPort) port := int(packet.SrcPort)
logging.Logger().Debug(fmt.Sprintf("Discovery: IP of TCP(%d) src %s", port, host.IP)) logging.Logger().Debugf("Discovery: IP of TCP(%d) src %s", port, host.IP)
if _, ok := ports[port]; ok || !dp.Contains(port) { if _, ok := ports[port]; ok || !dp.Contains(port) {
return nil return nil

View File

@ -43,7 +43,7 @@ func scanPortUDP(host *discoveryM.Host, dp *discoveryM.DiscoveryPort, resultChan
select { select {
case packet, ok := <-udpChan: case packet, ok := <-udpChan:
if !ok { if !ok {
logging.Logger().Debug(fmt.Sprintf("Discovery: udp channel is closed")) logging.Logger().Debugf("Discovery: udp channel is closed")
return return
} }
if p := handlePacketUDP(host, dp, ports, packet); nil != p { if p := handlePacketUDP(host, dp, ports, packet); nil != p {
@ -107,7 +107,7 @@ func sendUDP(host *discoveryM.Host, dp *discoveryM.DiscoveryPort, stopChan chan
for i := 0; i < m.PacketCount(); i++ { for i := 0; i < m.PacketCount(); i++ {
p := m.Packet(i) p := m.Packet(i)
if _, err := conn.WriteToUDP(p.Buffer, addr); err != nil { if _, err := conn.WriteToUDP(p.Buffer, addr); err != nil {
logging.Logger().Error(fmt.Sprintf("Discovery: UDP write error %v", err)) logging.Logger().Errorf("Discovery: UDP write error %v", err)
} }
} }
@ -137,7 +137,7 @@ func handlePacketUDP(host *discoveryM.Host, dp *discoveryM.DiscoveryPort, ports
srcIP := ipLayer.(*layers.IPv4).SrcIP srcIP := ipLayer.(*layers.IPv4).SrcIP
port := int(udp.SrcPort) port := int(udp.SrcPort)
logging.Logger().Debug(fmt.Sprintf("Discovery: IP of UDP(%d) src %v", port, srcIP)) logging.Logger().Debugf("Discovery: IP of UDP(%d) src %v", port, srcIP)
if _, ok := ports[port]; ok || !dp.Contains(port) { if _, ok := ports[port]; ok || !dp.Contains(port) {
return nil return nil
} }

View File

@ -41,7 +41,7 @@ func scanServiceTCP(port *discoveryM.Port, ds *discoveryM.DiscoveryService, resu
errChan <- fmt.Errorf("Discovery: Service scan[%s] on %s:%d error has occurred %v ", sc.Type(), hostIP, portNumber, err) errChan <- fmt.Errorf("Discovery: Service scan[%s] on %s:%d error has occurred %v ", sc.Type(), hostIP, portNumber, err)
break break
} }
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan connected[%s:%d] %s", hostIP, portNumber, sc.Type())) logging.Logger().Debugf("Discovery: Service scan connected[%s:%d] %s", hostIP, portNumber, sc.Type())
buf := make([]byte, 1024) buf := make([]byte, 1024)
rn, err := conn.Read(buf) rn, err := conn.Read(buf)
@ -79,8 +79,8 @@ func hadlePrePacket(info cnsm.MatchInfo, sc serviceConnector, conn net.Conn, pac
conn.Close() conn.Close()
}() }()
// logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan pre packet length[%d], buf[%v]", packet.Len, packet.Buffer)) // logging.Logger().Debugf("Discovery: Service scan pre packet length[%d], buf[%v]", packet.Len, packet.Buffer)
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d pre packet length[%d]", sc.Type(), info.IP(), info.Port(), packet.Len)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d pre packet length[%d]", sc.Type(), info.IP(), info.Port(), packet.Len)
ms := matcher.GetTCPMatchers(true) ms := matcher.GetTCPMatchers(true)
buf := make([]byte, 1024) buf := make([]byte, 1024)
@ -105,31 +105,31 @@ Loop:
for j := 0; j < packetCount; j++ { for j := 0; j < packetCount; j++ {
tPacket := m.Packet(j) tPacket := m.Packet(j)
// logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan send packet length[%d], buf[%v]", tPacket.Len, tPacket.Buffer)) // logging.Logger().Debugf("Discovery: Service scan send packet length[%d], buf[%v]", tPacket.Len, tPacket.Buffer)
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d send packet length[%d]", sc.Type(), info.IP(), info.Port(), tPacket.Len)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet length[%d]", sc.Type(), info.IP(), info.Port(), tPacket.Len)
wn, err := conn.Write(tPacket.Buffer) wn, err := conn.Write(tPacket.Buffer)
if nil != err { if nil != err {
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d send packet error %v", sc.Type(), info.IP(), info.Port(), err)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet error %v", sc.Type(), info.IP(), info.Port(), err)
break break
} }
if wn != tPacket.Len { if wn != tPacket.Len {
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d send packet length[%d] not same with %d", sc.Type(), info.IP(), info.Port(), wn, tPacket.Len)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet length[%d] not same with %d", sc.Type(), info.IP(), info.Port(), wn, tPacket.Len)
break break
} }
rn, err := conn.Read(buf) rn, err := conn.Read(buf)
if nil != err { if nil != err {
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d receive packet error %v", sc.Type(), info.IP(), info.Port(), err)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive packet error %v", sc.Type(), info.IP(), info.Port(), err)
break break
} }
if m.Match(info, j+1, cnsm.NewPacket(buf, rn)) { if m.Match(info, j+1, cnsm.NewPacket(buf, rn)) {
// logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan receive match length[%d], buf[%v]", rn, buf)) // logging.Logger().Debugf("Discovery: Service scan receive match length[%d], buf[%v]", rn, buf)
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d receive match length[%d]", sc.Type(), info.IP(), info.Port(), rn)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive match length[%d]", sc.Type(), info.IP(), info.Port(), rn)
found = true found = true
} else { } else {
// logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan receive not match length[%d], buf[%v]", rn, buf)) // logging.Logger().Debugf("Discovery: Service scan receive not match length[%d], buf[%v]", rn, buf)
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d receive not match length[%d]", sc.Type(), info.IP(), info.Port(), rn)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive not match length[%d]", sc.Type(), info.IP(), info.Port(), rn)
found = false found = false
break break
} }
@ -159,22 +159,22 @@ Loop:
conn, err := sc.Dial(info.IP(), info.Port()) conn, err := sc.Dial(info.IP(), info.Port())
if err != nil { if err != nil {
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d socket dial error %v", sc.Type(), info.IP(), info.Port(), err)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d socket dial error %v", sc.Type(), info.IP(), info.Port(), err)
break Loop break Loop
} }
packetCount := m.PacketCount() packetCount := m.PacketCount()
for j := 0; j < packetCount; j++ { for j := 0; j < packetCount; j++ {
tPacket := m.Packet(j) tPacket := m.Packet(j)
// logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan send packet length[%d], buf[%v]", tPacket.Len, tPacket.Buffer)) // logging.Logger().Debugf("Discovery: Service scan send packet length[%d], buf[%v]", tPacket.Len, tPacket.Buffer)
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d send packet length[%d]", sc.Type(), info.IP(), info.Port(), tPacket.Len)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet length[%d]", sc.Type(), info.IP(), info.Port(), tPacket.Len)
wn, err := conn.Write(tPacket.Buffer) wn, err := conn.Write(tPacket.Buffer)
if nil != err { if nil != err {
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d send packet error %v", sc.Type(), info.IP(), info.Port(), err)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet error %v", sc.Type(), info.IP(), info.Port(), err)
break break
} }
if wn != tPacket.Len { if wn != tPacket.Len {
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d send packet length[%d] not same with %d", sc.Type(), info.IP(), info.Port(), wn, tPacket.Len)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet length[%d] not same with %d", sc.Type(), info.IP(), info.Port(), wn, tPacket.Len)
break break
} }
@ -188,7 +188,7 @@ Loop:
break break
} }
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d receive packet error %v", sc.Type(), info.IP(), info.Port(), err)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive packet error %v", sc.Type(), info.IP(), info.Port(), err)
break break
} }
@ -201,12 +201,12 @@ Loop:
break break
} }
// logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan receive match length[%d], buf[%v]", rn, buf)) // logging.Logger().Debugf("Discovery: Service scan receive match length[%d], buf[%v]", rn, buf)
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d receive match length[%d]", sc.Type(), info.IP(), info.Port(), rn)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive match length[%d]", sc.Type(), info.IP(), info.Port(), rn)
continue continue
} else { } else {
// logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan receive not match length[%d], buf[%v]", rn, buf)) // logging.Logger().Debugf("Discovery: Service scan receive not match length[%d], buf[%v]", rn, buf)
logging.Logger().Debug(fmt.Sprintf("Discovery: Service scan[%s] on %s:%d receive not match length[%d]", sc.Type(), info.IP(), info.Port(), rn)) logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive not match length[%d]", sc.Type(), info.IP(), info.Port(), rn)
break break
} }
} }

13
main.go
View File

@ -2,23 +2,26 @@ package main
import ( import (
"flag" "flag"
"fmt"
"log"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
oocc "git.loafle.net/overflow/overflow_commons_go/config"
"git.loafle.net/overflow/overflow_discovery/server" "git.loafle.net/overflow/overflow_discovery/server"
) )
var ( var (
pidPath *string pidPath *string
logConfigPath *string
) )
func init() { func init() {
pidPath = flag.String("pid-path", "/tmp/discovery-container.pid", "The path of pid file") pidPath = oocc.FlagPidFilePath("./dist/discovery.pid")
logConfigPath = oocc.FlagLogConfigFilePath("")
flag.Parse() flag.Parse()
logging.InitializeLogger(*logConfigPath)
} }
func main() { func main() {
@ -31,14 +34,14 @@ func main() {
go func() { go func() {
if err := s.Start(); nil != err { if err := s.Start(); nil != err {
log.Printf("Server: Start error %v", err) logging.Logger().Infof("Server: Start error %v", err)
return return
} }
}() }()
select { select {
case signal := <-stop: case signal := <-stop:
fmt.Printf("Got signal: %v\n", signal) logging.Logger().Infof("Got signal: %v\n", signal)
} }
s.Stop() s.Stop()

View File

@ -2,6 +2,7 @@ package server
import ( import (
cdr "git.loafle.net/commons_go/di/registry" cdr "git.loafle.net/commons_go/di/registry"
"git.loafle.net/commons_go/logging"
crr "git.loafle.net/commons_go/rpc/registry" crr "git.loafle.net/commons_go/rpc/registry"
"git.loafle.net/commons_go/server" "git.loafle.net/commons_go/server"
oodca "git.loafle.net/overflow/overflow_discovery/commons/annotation" oodca "git.loafle.net/overflow/overflow_discovery/commons/annotation"
@ -10,11 +11,17 @@ import (
) )
func New(pidPath string) server.Server { func New(pidPath string) server.Server {
var (
services []interface{}
err error
)
oods.InitService() oods.InitService()
rpcRegistry := crr.NewRPCRegistry() rpcRegistry := crr.NewRPCRegistry()
services := cdr.GetInstancesByAnnotationName(oodca.ServiceTag) if services, err = cdr.GetInstancesByAnnotationName(oodca.ServiceTag); nil != err {
logging.Logger().Panic(err)
}
for _, s := range services { for _, s := range services {
rpcRegistry.RegisterService(s, "") rpcRegistry.RegisterService(s, "")