diff --git a/crawler/crawler.go b/crawler/crawler.go new file mode 100644 index 0000000..f6dee11 --- /dev/null +++ b/crawler/crawler.go @@ -0,0 +1,23 @@ +package crawler + +import ( + "git.loafle.net/overflow/crawler-go" + + "git.loafle.net/overflow/container_discovery/crawler/discovery" +) + +var crawlers map[string]crawler.Crawler + +func init() { + crawlers = make(map[string]crawler.Crawler, 0) +} + +func addCrawler(c crawler.Crawler) { + crawlers[c.Name()] = c + + addCrawler(discovery.NewCrawler()) +} + +func GetCrawlers() map[string]crawler.Crawler { + return crawlers +} diff --git a/crawler/discovery/discovery-crawler.go b/crawler/discovery/discovery-crawler.go new file mode 100644 index 0000000..94b771a --- /dev/null +++ b/crawler/discovery/discovery-crawler.go @@ -0,0 +1,32 @@ +package discovery + +import ( + ocsm "git.loafle.net/overflow/commons-go/sensorconfig/model" + "git.loafle.net/overflow/crawler-go" +) + +type DiscoveryCrawler struct { +} + +func (c *DiscoveryCrawler) Name() string { + return "DISCOVERY" +} + +func (c *DiscoveryCrawler) String() string { + return "Discovery" +} + +func (c *DiscoveryCrawler) Auth(auth map[string]string) error { + return nil +} + +func (c *DiscoveryCrawler) Get(sensorConfig *ocsm.SensorConfig) (map[string]string, error) { + + return nil, nil +} + +func NewCrawler() crawler.Crawler { + c := &DiscoveryCrawler{} + + return c +} diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..3f068f6 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,8 @@ +package: git.loafle.net/overflow/container_discovery +import: +- package: git.loafle.net/commons/logging-go +- package: git.loafle.net/commons/server-go +- package: git.loafle.net/commons/rpc-go +- package: git.loafle.net/commons/di-go +- package: git.loafle.net/overflow/commons-go +- package: git.loafle.net/overflow/container-go diff --git a/internal/discoverer/discoverer.go b/internal/discoverer/discoverer.go new file mode 100644 index 0000000..e75399f --- /dev/null +++ b/internal/discoverer/discoverer.go @@ -0,0 +1,293 @@ +package discoverer + +import ( + "sync" + "time" + + "git.loafle.net/commons/util-go/net/cidr" + ocdm "git.loafle.net/overflow/commons-go/discovery/model" +) + +type DiscoveryDataType int + +const ( + DiscoveryDataTypeNone DiscoveryDataType = iota + DiscoveryDataTypeStart + DiscoveryDataTypeStop + DiscoveryDataTypeError + DiscoveryDataTypeZone + DiscoveryDataTypeHost + DiscoveryDataTypePort + DiscoveryDataTypeService +) + +type DiscoveryData struct { + Type DiscoveryDataType + Result interface{} + Error error + Time time.Time +} + +func (dd *DiscoveryData) Release() { + releaseDiscoveryData(dd) +} + +var _discoverer Discoverer + +func GetDiscoverer() Discoverer { + if nil == _discoverer { + _discoverer = &defaultDiscoverer{} + } + return _discoverer +} + +type Discoverer interface { + Retain() chan *DiscoveryData + Release(dataChan chan *DiscoveryData) + DiscoverZone(dataChan chan *DiscoveryData, dz *ocdm.DiscoveryZone) + DiscoverHost(dataChan chan *DiscoveryData, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) + DiscoverPort(dataChan chan *DiscoveryData, host *ocdm.Host, dp *ocdm.DiscoveryPort) + DiscoverSerice(dataChan chan *DiscoveryData, port *ocdm.Port, ds *ocdm.DiscoveryService) +} + +type defaultDiscoverer struct { + dataChanPool chan chan *DiscoveryData + + stopChan chan struct{} +} + +func (d *defaultDiscoverer) Retain() chan *DiscoveryData { + if nil == d.dataChanPool { + d.dataChanPool = make(chan chan *DiscoveryData, 1) + d.dataChanPool <- make(chan *DiscoveryData, 256) + } + return <-d.dataChanPool +} + +func (d *defaultDiscoverer) Release(dataChan chan *DiscoveryData) { + d.dataChanPool <- dataChan +} + +func (d *defaultDiscoverer) Stop(dataChan chan *DiscoveryData) { + if nil != d.stopChan { + close(d.stopChan) + d.stopChan = nil + } +} + +func (d *defaultDiscoverer) DiscoverZone(dataChan chan *DiscoveryData, dz *ocdm.DiscoveryZone) { + var wg sync.WaitGroup + d.stopChan = make(chan struct{}) + + dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, time.Now(), nil, nil) + wg.Add(1) + go d.innerDiscoverZone(&wg, dataChan, dz) + wg.Wait() + if nil != d.stopChan { + close(d.stopChan) + d.stopChan = nil + } + + dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, time.Now(), nil, nil) +} + +func (d *defaultDiscoverer) DiscoverHost(dataChan chan *DiscoveryData, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) { + var wg sync.WaitGroup + d.stopChan = make(chan struct{}) + + dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, time.Now(), nil, nil) + wg.Add(1) + go d.innerDiscoverHost(&wg, dataChan, zone, dh) + wg.Wait() + if nil != d.stopChan { + close(d.stopChan) + d.stopChan = nil + } + + dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, time.Now(), nil, nil) +} + +func (d *defaultDiscoverer) DiscoverPort(dataChan chan *DiscoveryData, host *ocdm.Host, dp *ocdm.DiscoveryPort) { + var wg sync.WaitGroup + d.stopChan = make(chan struct{}) + + dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, time.Now(), nil, nil) + wg.Add(1) + go d.innerDiscoverPort(&wg, dataChan, host, dp) + wg.Wait() + if nil != d.stopChan { + close(d.stopChan) + d.stopChan = nil + } + + dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, time.Now(), nil, nil) +} + +func (d *defaultDiscoverer) DiscoverSerice(dataChan chan *DiscoveryData, port *ocdm.Port, ds *ocdm.DiscoveryService) { + var wg sync.WaitGroup + d.stopChan = make(chan struct{}) + + dataChan <- retainDiscoveryData(DiscoveryDataTypeStart, time.Now(), nil, nil) + wg.Add(1) + go d.innerDiscoverSerice(&wg, dataChan, port, ds) + wg.Wait() + if nil != d.stopChan { + close(d.stopChan) + d.stopChan = nil + } + + dataChan <- retainDiscoveryData(DiscoveryDataTypeStop, time.Now(), nil, nil) +} + +func (d *defaultDiscoverer) innerDiscoverZone(wg *sync.WaitGroup, dataChan chan *DiscoveryData, dz *ocdm.DiscoveryZone) { + defer func() { + wg.Done() + }() + + taskScan(d, + func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { + scanZone(dz, resultChan, errChan, doneChan, stopChan) + }, + func(result interface{}) { + z := result.(*ocdm.Zone) + dataChan <- retainDiscoveryData(DiscoveryDataTypeZone, time.Now(), z, nil) + if nil != dz.DiscoveryHost { + cr, _ := cidr.NewCIDRRanger(z.Network) + dh := &ocdm.DiscoveryHost{ + FirstScanRange: cr.First().String(), + LastScanRange: cr.Last().String(), + DiscoveryPort: dz.DiscoveryHost.DiscoveryPort, + } + wg.Add(1) + d.innerDiscoverHost(wg, dataChan, z, dh) + } + }, + func(err error) { + dataChan <- retainDiscoveryData(DiscoveryDataTypeError, time.Now(), nil, err) + }, + ) +} + +func (d *defaultDiscoverer) innerDiscoverHost(wg *sync.WaitGroup, dataChan chan *DiscoveryData, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) { + defer func() { + wg.Done() + }() + + taskScan(d, + func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { + scanHost(zone, dh, resultChan, errChan, doneChan, stopChan) + }, + func(result interface{}) { + h := result.(*ocdm.Host) + dataChan <- retainDiscoveryData(DiscoveryDataTypeHost, time.Now(), h, nil) + if nil != dh.DiscoveryPort { + wg.Add(1) + d.innerDiscoverPort(wg, dataChan, h, dh.DiscoveryPort) + } + }, + func(err error) { + dataChan <- retainDiscoveryData(DiscoveryDataTypeError, time.Now(), nil, err) + }, + ) +} + +func (d *defaultDiscoverer) innerDiscoverPort(wg *sync.WaitGroup, dataChan chan *DiscoveryData, host *ocdm.Host, dp *ocdm.DiscoveryPort) { + defer func() { + wg.Done() + }() + + taskScan(d, + func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { + scanPort(host, dp, resultChan, errChan, doneChan, stopChan) + }, + func(result interface{}) { + p := result.(*ocdm.Port) + dataChan <- retainDiscoveryData(DiscoveryDataTypePort, time.Now(), p, nil) + if nil != dp.DiscoveryService { + wg.Add(1) + d.innerDiscoverSerice(wg, dataChan, p, dp.DiscoveryService) + } + }, + func(err error) { + dataChan <- retainDiscoveryData(DiscoveryDataTypeError, time.Now(), nil, err) + }, + ) +} + +func (d *defaultDiscoverer) innerDiscoverSerice(wg *sync.WaitGroup, dataChan chan *DiscoveryData, port *ocdm.Port, ds *ocdm.DiscoveryService) { + defer func() { + wg.Done() + }() + + taskScan(d, + func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}) { + scanService(port, ds, resultChan, errChan, doneChan, stopChan) + }, + func(result interface{}) { + s := result.(*ocdm.Service) + dataChan <- retainDiscoveryData(DiscoveryDataTypeService, time.Now(), s, nil) + }, + func(err error) { + dataChan <- retainDiscoveryData(DiscoveryDataTypeError, time.Now(), nil, err) + }, + ) +} + +func taskScan(d *defaultDiscoverer, + taskFunc func(resultChan chan interface{}, errChan chan error, doneChan chan struct{}, stopChan chan struct{}), + reesultFunc func(result interface{}), + errorFunc func(err error)) { + + resultChan := make(chan interface{}) + errChan := make(chan error) + stopChan := make(chan struct{}) + doneChan := make(chan struct{}) + + defer func() { + }() + + go taskFunc(resultChan, errChan, doneChan, stopChan) + + for { + select { + case r := <-resultChan: + reesultFunc(r) + case err := <-errChan: + errorFunc(err) + case <-doneChan: + return + case <-d.stopChan: + close(stopChan) + <-doneChan + return + } + } +} + +var discoveryDataPool sync.Pool + +func retainDiscoveryData(discoveryDataType DiscoveryDataType, t time.Time, result interface{}, err error) *DiscoveryData { + v := discoveryDataPool.Get() + var discoveryData *DiscoveryData + if v == nil { + discoveryData = &DiscoveryData{} + } else { + discoveryData = v.(*DiscoveryData) + } + + discoveryData.Type = discoveryDataType + discoveryData.Time = t + discoveryData.Result = result + discoveryData.Error = err + + return discoveryData +} + +func releaseDiscoveryData(discoveryData *DiscoveryData) { + discoveryData.Type = DiscoveryDataTypeNone + discoveryData.Result = nil + discoveryData.Error = nil + discoveryData.Time = time.Time{} + + discoveryDataPool.Put(discoveryData) +} diff --git a/internal/discoverer/host.go b/internal/discoverer/host.go new file mode 100644 index 0000000..2fa8781 --- /dev/null +++ b/internal/discoverer/host.go @@ -0,0 +1,32 @@ +package discoverer + +import ( + "fmt" + "net" + + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "git.loafle.net/overflow/container_discovery/internal/discoverer/ipv4" + "git.loafle.net/overflow/container_discovery/internal/discoverer/ipv6" +) + +func scanHost(zone *ocdm.Zone, dh *ocdm.DiscoveryHost, resultChan chan interface{}, errChan chan error, doneChan chan<- struct{}, stopChan chan struct{}) { + defer func() { + doneChan <- struct{}{} + }() + + _, ipNet, err := net.ParseCIDR(zone.Network) + if nil != err { + errChan <- err + return + } + switch len(ipNet.IP) { + case net.IPv4len: + ipv4.ScanHost(zone, dh, resultChan, errChan, stopChan) + case net.IPv6len: + ipv6.ScanHost(zone, dh, resultChan, errChan, stopChan) + + default: + errChan <- fmt.Errorf("Discovery: Not supported ip length") + return + } +} diff --git a/internal/discoverer/ipv4/host.go b/internal/discoverer/ipv4/host.go new file mode 100644 index 0000000..2013650 --- /dev/null +++ b/internal/discoverer/ipv4/host.go @@ -0,0 +1,206 @@ +package ipv4 + +import ( + "fmt" + "net" + "time" + + "git.loafle.net/commons/logging-go" + "git.loafle.net/commons/util-go/net/cidr" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "git.loafle.net/overflow/container_discovery/internal/pcap" +) + +func ScanHost(zone *ocdm.Zone, dh *ocdm.DiscoveryHost, resultChan chan interface{}, errChan chan error, stopChan chan struct{}) { + ps, err := pcap.RetainScanner(zone) + if nil != err { + errChan <- fmt.Errorf("Discovery: Cannot retain pcap instance %v", err) + return + } + + defer func() { + pcap.ReleaseScanner(zone) + }() + + cr, err := cidr.NewCIDRRanger(zone.Network) + if nil != err { + errChan <- err + return + } + + hostRanges, err := getTargetHostRange(dh, cr) + if nil != err { + errChan <- err + return + } + + arpChan := ps.OpenARP() + defer func() { + ps.CloseARP(arpChan) + }() + + go func() { + hosts := make(map[string]*ocdm.Host) + for { + select { + case packet, ok := <-arpChan: + if !ok { + logging.Logger().Debugf("Discovery: arp channel is closed") + return + } + if h := handlePacketARP(zone, hostRanges, hosts, packet); nil != h { + resultChan <- h + } + case <-stopChan: + return + } + } + }() + + if err := sendARP(ps, zone, hostRanges, stopChan); nil != err { + errChan <- err + return + } + + timer := time.NewTimer(10 * time.Second) + + select { + case <-stopChan: + return + case <-timer.C: + return + } + +} + +func sendARP(ps pcap.PCapScanner, zone *ocdm.Zone, hostRanges []net.IP, stopChan chan struct{}) error { + hwAddr, err := net.ParseMAC(zone.Mac) + if nil != err { + return err + } + ip := net.ParseIP(zone.IP) + if nil == ip { + return fmt.Errorf("Discovery: IP(%s) of zone is not valid", zone.IP) + } + + ethPacket := makePacketEthernet(hwAddr) + arpPacket := makePacketARP(hwAddr, ip.To4()) + opts := gopacket.SerializeOptions{FixLengths: true, ComputeChecksums: true} + buf := gopacket.NewSerializeBuffer() + + for _, targetHost := range hostRanges { + arpPacket.DstProtAddress = []byte(targetHost) + // log.Printf("ARP:%v", arpPacket) + gopacket.SerializeLayers(buf, opts, ðPacket, &arpPacket) + if err := ps.WritePacketData(buf.Bytes()); err != nil { + return err + } + + timer := time.NewTimer(time.Microsecond * 100) + + select { + case <-stopChan: + return nil + case <-timer.C: + } + + } + return nil +} + +func handlePacketARP(zone *ocdm.Zone, hostRanges []net.IP, hosts map[string]*ocdm.Host, packet *layers.ARP) *ocdm.Host { + if packet.Operation != layers.ARPReply { + return nil + } + + ip := net.IP(packet.SourceProtAddress) + if _, ok := hosts[ip.String()]; ok { + return nil + } + + inRange := false + for _, h := range hostRanges { + if h.Equal(ip) { + inRange = true + break + } + } + if !inRange { + return nil + } + + h := &ocdm.Host{} + h.IP = ip.String() + h.Mac = net.HardwareAddr(packet.SourceHwAddress).String() + h.Zone = zone + + hosts[ip.String()] = h + + return h +} + +func getTargetHostRange(dh *ocdm.DiscoveryHost, cr cidr.CIDRRanger) ([]net.IP, error) { + var firstIP net.IP + if "" != dh.FirstScanRange { + firstIP = net.ParseIP(dh.FirstScanRange) + if nil == firstIP { + return nil, fmt.Errorf("Discovery: IP(%v) of FirstScanRange host is not valid", firstIP) + } + } + var lastIP net.IP + if "" != dh.LastScanRange { + lastIP = net.ParseIP(dh.LastScanRange) + if nil == lastIP { + return nil, fmt.Errorf("Discovery: IP(%v) of LastScanRange host is not valid", lastIP) + } + } + + includeIPs := make([]net.IP, 0) + for _, iHost := range dh.IncludeHosts { + iIP := net.ParseIP(iHost) + if nil == iIP { + return nil, fmt.Errorf("Discovery: IP(%v) of include host is not valid", iHost) + } + includeIPs = append(includeIPs, iIP) + } + + excludeIPs := make([]net.IP, 0) + for _, eHost := range dh.ExcludeHosts { + eIP := net.ParseIP(eHost) + if nil == eIP { + return nil, fmt.Errorf("Discovery: IP(%v) of exclude host is not valid", eHost) + } + excludeIPs = append(excludeIPs, eIP) + } + + ranges, err := cr.Ranges(firstIP, lastIP, includeIPs, excludeIPs) + if nil != err { + return nil, err + } + + return ranges, nil +} + +func makePacketEthernet(hw net.HardwareAddr) layers.Ethernet { + return layers.Ethernet{ + SrcMAC: hw, + DstMAC: net.HardwareAddr{0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + EthernetType: layers.EthernetTypeARP, + } +} + +func makePacketARP(hw net.HardwareAddr, ip net.IP) layers.ARP { + return layers.ARP{ + AddrType: layers.LinkTypeEthernet, + Protocol: layers.EthernetTypeIPv4, + HwAddressSize: 6, + ProtAddressSize: 4, + Operation: layers.ARPRequest, + SourceHwAddress: []byte(hw), + SourceProtAddress: []byte(ip), + DstHwAddress: []byte{0, 0, 0, 0, 0, 0}, + } +} diff --git a/internal/discoverer/ipv4/port.go b/internal/discoverer/ipv4/port.go new file mode 100644 index 0000000..09ac386 --- /dev/null +++ b/internal/discoverer/ipv4/port.go @@ -0,0 +1,23 @@ +package ipv4 + +import ( + "sync" + + ocdm "git.loafle.net/overflow/commons-go/discovery/model" +) + +func ScanPort(host *ocdm.Host, dp *ocdm.DiscoveryPort, resultChan chan interface{}, errChan chan error, stopChan chan struct{}) { + var wg sync.WaitGroup + + if dp.IncludeTCP { + wg.Add(1) + go scanPortTCP(host, dp, resultChan, errChan, stopChan, &wg) + } + + if dp.IncludeUDP { + wg.Add(1) + go scanPortUDP(host, dp, resultChan, errChan, stopChan, &wg) + } + + wg.Wait() +} diff --git a/internal/discoverer/ipv4/port_tcp.go b/internal/discoverer/ipv4/port_tcp.go new file mode 100644 index 0000000..3169102 --- /dev/null +++ b/internal/discoverer/ipv4/port_tcp.go @@ -0,0 +1,187 @@ +package ipv4 + +import ( + "encoding/json" + "fmt" + "net" + "strconv" + "sync" + "time" + + "git.loafle.net/commons/logging-go" + occc "git.loafle.net/overflow/commons-go/core/constants" + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "git.loafle.net/overflow/container_discovery/internal/pcap" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +func scanPortTCP(host *ocdm.Host, dp *ocdm.DiscoveryPort, resultChan chan interface{}, errChan chan error, stopChan chan struct{}, wg *sync.WaitGroup) { + defer func() { + wg.Done() + }() + + ps, err := pcap.RetainScanner(host.Zone) + if nil != err { + errChan <- fmt.Errorf("Discovery: Cannot retain pcap instance %v", err) + return + } + defer func() { + pcap.ReleaseScanner(host.Zone) + }() + + tcpChan := ps.OpenTCP(host.IP) + defer func() { + ps.CloseTCP(host.IP, tcpChan) + }() + + go func() { + ports := make(map[int]*ocdm.Port) + + for { + select { + case packet, ok := <-tcpChan: + if !ok { + logging.Logger().Debugf("Discovery: tcp channel is closed") + return + } + if p := handlePacketTCP(host, dp, ports, packet); nil != p { + resultChan <- p + } + case <-stopChan: + return + } + } + }() + + if err := sendTCP(host, dp, stopChan); nil != err { + errChan <- err + return + } + + timer := time.NewTimer(20 * time.Second) + + select { + case <-stopChan: + return + case <-timer.C: + return + } +} + +func sendTCP(host *ocdm.Host, dp *ocdm.DiscoveryPort, stopChan chan struct{}) error { + tcpPacket, err := makePacketPortTCP(host) + if nil != err { + return err + } + defer func() { + tcpPacket.PacketConn.Close() + }() + + buf := gopacket.NewSerializeBuffer() + +Loop: + for portNumber := dp.FirstScanRange; portNumber < dp.LastScanRange; portNumber++ { + if nil != dp.ExcludePorts { + for _, exPortNumber := range dp.ExcludePorts { + if portNumber == exPortNumber { + continue Loop + } + } + } + tcpPacket.TCP.DstPort = layers.TCPPort(portNumber) + tcpPacket.TCP.SetNetworkLayerForChecksum(tcpPacket.IP) + + if err := gopacket.SerializeLayers(buf, tcpPacket.Opts, tcpPacket.TCP); err != nil { + return err + } + if _, err := tcpPacket.PacketConn.WriteTo(buf.Bytes(), &net.IPAddr{IP: tcpPacket.IP.DstIP}); err != nil { + return err + } + + timer := time.NewTimer(time.Microsecond * 100) + + select { + case <-stopChan: + return nil + case <-timer.C: + } + } + + return nil +} + +func handlePacketTCP(host *ocdm.Host, dp *ocdm.DiscoveryPort, ports map[int]*ocdm.Port, packet *layers.TCP) *ocdm.Port { + if nil == packet || packet.DstPort != 60000 { + return nil + } + + if packet.RST { + return nil + } + + port := int(packet.SrcPort) + logging.Logger().Debugf("Discovery: IP of TCP(%d) src %s", port, host.IP) + + if _, ok := ports[port]; ok || !dp.Contains(port) { + return nil + } + + p := &ocdm.Port{ + PortType: occc.PortTypeTCP, + PortNumber: json.Number(strconv.Itoa(port)), + } + p.Host = host + + ports[port] = p + + return p +} + +type PortPacketTCP struct { + IP *layers.IPv4 + TCP *layers.TCP + Opts gopacket.SerializeOptions + PacketConn net.PacketConn +} + +func makePacketPortTCP(host *ocdm.Host) (*PortPacketTCP, error) { + packetTCP := &PortPacketTCP{} + + srcIP := net.ParseIP(host.Zone.IP) + if nil == srcIP { + return nil, fmt.Errorf("Discovery: IP(%s) of zone is not valid", host.Zone.IP) + } + dstIP := net.ParseIP(host.IP) + if nil == dstIP { + return nil, fmt.Errorf("Discovery: IP(%s) of host is not valid", host.IP) + } + + packetTCP.IP = &layers.IPv4{ + SrcIP: srcIP.To4(), + DstIP: dstIP.To4(), + Version: 4, + TTL: 64, + Protocol: layers.IPProtocolTCP, + } + packetTCP.TCP = &layers.TCP{ + SrcPort: 60000, + DstPort: 0, // will be incremented during the scan + SYN: true, + Seq: 0, + } + packetTCP.Opts = gopacket.SerializeOptions{ + ComputeChecksums: true, + FixLengths: true, + } + + conn, err := net.ListenPacket("ip4:tcp", "0.0.0.0") + if err != nil { + return nil, fmt.Errorf("Discovery: SYN create socket error %v", err) + } + + packetTCP.PacketConn = conn + + return packetTCP, nil +} diff --git a/internal/discoverer/ipv4/port_udp.go b/internal/discoverer/ipv4/port_udp.go new file mode 100644 index 0000000..a3ab708 --- /dev/null +++ b/internal/discoverer/ipv4/port_udp.go @@ -0,0 +1,158 @@ +package ipv4 + +import ( + "encoding/json" + "fmt" + "net" + "strconv" + "sync" + "time" + + "git.loafle.net/commons/logging-go" + occc "git.loafle.net/overflow/commons-go/core/constants" + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "git.loafle.net/overflow/container_discovery/internal/pcap" + + "git.loafle.net/overflow/container_discovery/internal/matcher" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +func scanPortUDP(host *ocdm.Host, dp *ocdm.DiscoveryPort, resultChan chan interface{}, errChan chan error, stopChan chan struct{}, wg *sync.WaitGroup) { + defer func() { + wg.Done() + }() + + ps, err := pcap.RetainScanner(host.Zone) + if nil != err { + errChan <- fmt.Errorf("Discovery: Cannot retain pcap instance %v", err) + return + } + defer func() { + pcap.ReleaseScanner(host.Zone) + }() + + udpChan := ps.OpenUDP(host.IP) + defer func() { + ps.CloseUDP(host.IP, udpChan) + }() + + go func() { + ports := make(map[int]*ocdm.Port) + + for { + select { + case packet, ok := <-udpChan: + if !ok { + logging.Logger().Debugf("Discovery: udp channel is closed") + return + } + if p := handlePacketUDP(host, dp, ports, packet); nil != p { + resultChan <- p + } + case <-stopChan: + return + } + } + }() + + if err := sendUDP(host, dp, stopChan); nil != err { + errChan <- err + return + } + + timer := time.NewTimer(3 * time.Second) + + select { + case <-stopChan: + return + case <-timer.C: + return + } +} + +func sendUDP(host *ocdm.Host, dp *ocdm.DiscoveryPort, stopChan chan struct{}) error { + ip := net.ParseIP(host.IP) + if nil == ip { + return fmt.Errorf("Discovery: IP(%s) of host is not valid", host.IP) + } + + ms := matcher.GetUDPMatchers() + + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero, Port: 0}) + if err != nil { + return err + } + defer func() { + conn.Close() + }() + + for indexI := 0; indexI < len(ms); indexI++ { + m := ms[indexI] + + Loop: + for portNumber := dp.FirstScanRange; portNumber < dp.LastScanRange; portNumber++ { + if nil != dp.ExcludePorts { + for _, exPortNumber := range dp.ExcludePorts { + if portNumber == exPortNumber { + continue Loop + } + } + } + + if !m.IsSend(portNumber) { + continue + } + + addr := &net.UDPAddr{IP: ip.To4(), Port: portNumber} + for i := 0; i < m.PacketCount(); i++ { + p := m.Packet(i) + if _, err := conn.WriteToUDP(p.Buffer, addr); err != nil { + logging.Logger().Errorf("Discovery: UDP write error %v", err) + } + } + + timer := time.NewTimer(time.Microsecond * 200) + + select { + case <-stopChan: + return nil + case <-timer.C: + } + } + } + + return nil +} + +func handlePacketUDP(host *ocdm.Host, dp *ocdm.DiscoveryPort, ports map[int]*ocdm.Port, packet gopacket.Packet) *ocdm.Port { + ipLayer := packet.Layer(layers.LayerTypeIPv4) + + if ipLayer.(*layers.IPv4).SrcIP.String() == host.Zone.IP { + return nil + } + + if net := packet.NetworkLayer(); net == nil { + } else if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer == nil { + } else if udp, ok := udpLayer.(*layers.UDP); ok { + + srcIP := ipLayer.(*layers.IPv4).SrcIP + port := int(udp.SrcPort) + logging.Logger().Debugf("Discovery: IP of UDP(%d) src %v", port, srcIP) + if _, ok := ports[port]; ok || !dp.Contains(port) { + return nil + } + + p := &ocdm.Port{ + PortType: occc.PortTypeUDP, + PortNumber: json.Number(strconv.Itoa(port)), + UDPLayer: udpLayer, + } + p.Host = host + ports[port] = p + + return p + } + + return nil +} diff --git a/internal/discoverer/ipv4/service.go b/internal/discoverer/ipv4/service.go new file mode 100644 index 0000000..5772ad1 --- /dev/null +++ b/internal/discoverer/ipv4/service.go @@ -0,0 +1,46 @@ +package ipv4 + +import ( + "fmt" + + cuej "git.loafle.net/commons/util-go/encoding/json" + occc "git.loafle.net/overflow/commons-go/core/constants" + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "github.com/google/gopacket/layers" +) + +func ScanService(port *ocdm.Port, ds *ocdm.DiscoveryService, resultChan chan interface{}, errChan chan error, stopChan chan struct{}) { + portNumber, err := cuej.NumberToInt(port.PortNumber) + if err != nil { + errChan <- fmt.Errorf("Discovery: Service scan port[%s] error %v ", port.PortNumber, err) + return + } + + switch port.PortType { + case occc.PortTypeTCP: + if !scanServiceTCP(port, ds, resultChan, errChan, stopChan) { + + if dName, ok := layers.TCPPortNames[layers.TCPPort(portNumber)]; ok { + sName := fmt.Sprintf("Not Supported Service. Perhaps %s[%d]", dName, portNumber) + s := &ocdm.Service{ + ServiceName: sName, + } + s.Port = port + resultChan <- s + } + } + + case occc.PortTypeUDP: + if !scanServiceUDP(port, ds, resultChan, errChan, stopChan) { + if dName, ok := layers.UDPPortNames[layers.UDPPort(portNumber)]; ok { + sName := fmt.Sprintf("Not Supported Service. Perhaps %s[%d]", dName, portNumber) + s := &ocdm.Service{ + ServiceName: sName, + } + s.Port = port + resultChan <- s + } + } + } + +} diff --git a/internal/discoverer/ipv4/service_conn.go b/internal/discoverer/ipv4/service_conn.go new file mode 100644 index 0000000..45d58a1 --- /dev/null +++ b/internal/discoverer/ipv4/service_conn.go @@ -0,0 +1,68 @@ +package ipv4 + +import ( + "crypto/tls" + "fmt" + "net" + "time" +) + +type serviceConnector interface { + Type() string + Dial(ip string, port int) (net.Conn, error) +} + +type normalServiceConn struct { + t string +} + +func (nsc *normalServiceConn) Type() string { + return nsc.t +} + +func (nsc *normalServiceConn) Dial(ip string, port int) (net.Conn, error) { + addr := fmt.Sprintf("%s:%d", ip, port) + conn, err := net.DialTimeout("tcp", addr, time.Duration(3)*time.Second) + if err != nil { + return nil, err + } + + err = conn.SetDeadline(time.Now().Add(3 * time.Second)) + if err != nil { + return nil, err + } + return conn, err +} + +type tlsServiceConn struct { + t string +} + +func (tsc *tlsServiceConn) Type() string { + return tsc.t +} + +func (tsc *tlsServiceConn) Dial(ip string, port int) (net.Conn, error) { + addr := fmt.Sprintf("%s:%d", ip, port) + dialer := &net.Dialer{ + Timeout: 3 * time.Second, + } + conn, err := tls.DialWithDialer( + dialer, + "tcp", + addr, + &tls.Config{ + InsecureSkipVerify: true, + ServerName: ip, + }, + ) + if err != nil { + return nil, err + } + + err = conn.SetDeadline(time.Now().Add(3 * time.Second)) + if err != nil { + return nil, err + } + return conn, err +} diff --git a/internal/discoverer/ipv4/service_tcp.go b/internal/discoverer/ipv4/service_tcp.go new file mode 100644 index 0000000..5584712 --- /dev/null +++ b/internal/discoverer/ipv4/service_tcp.go @@ -0,0 +1,221 @@ +package ipv4 + +import ( + "fmt" + "net" + + "git.loafle.net/commons/logging-go" + csm "git.loafle.net/commons/service_matcher-go" + cuej "git.loafle.net/commons/util-go/encoding/json" + occc "git.loafle.net/overflow/commons-go/core/constants" + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "git.loafle.net/overflow/container_discovery/internal/matcher" +) + +func scanServiceTCP(port *ocdm.Port, ds *ocdm.DiscoveryService, resultChan chan interface{}, errChan chan error, stopChan chan struct{}) bool { + + hostIP := port.Host.IP + portNumber, err := cuej.NumberToInt(port.PortNumber) + if err != nil { + errChan <- fmt.Errorf("Discovery: Service scan on %s:%s error has occurred %v ", hostIP, port.PortNumber, err) + return false + } + + info := csm.NewMatchInfo(hostIP, portNumber) + var s *ocdm.Service + + scs := []serviceConnector{ + &normalServiceConn{ + t: occc.PortTypeTCP.String(), + }, + &tlsServiceConn{ + t: occc.CryptoTypeTLS.String(), + }, + } + + for i := 0; i < len(scs); i++ { + sc := scs[i] + + conn, err := sc.Dial(hostIP, portNumber) + if err != nil { + errChan <- fmt.Errorf("Discovery: Service scan[%s] on %s:%d error has occurred %v ", sc.Type(), hostIP, portNumber, err) + break + } + logging.Logger().Debugf("Discovery: Service scan connected[%s:%d] %s", hostIP, portNumber, sc.Type()) + + buf := make([]byte, 1024) + rn, err := conn.Read(buf) + if err != nil { + rn = 0 + } + if rn != 0 { + s = hadlePrePacket(info, sc, conn, csm.NewPacket(buf, rn)) + } else { + conn.Close() + s = hadlePostPacket(info, sc) + } + + if nil != s { + break + } + + select { + case <-stopChan: + return true + } + } + + if nil != s { + s.Port = port + resultChan <- s + return true + } + + return false +} + +func hadlePrePacket(info csm.MatchInfo, sc serviceConnector, conn net.Conn, packet *csm.Packet) *ocdm.Service { + defer func() { + conn.Close() + }() + + // logging.Logger().Debugf("Discovery: Service scan pre packet length[%d], buf[%v]", packet.Len, packet.Buffer) + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d pre packet length[%d]", sc.Type(), info.IP(), info.Port(), packet.Len) + + ms := matcher.GetTCPMatchers(true) + buf := make([]byte, 1024) + var s *ocdm.Service + +Loop: + for i := 0; i < len(ms); i++ { + m := ms[i] + + if m.Match(info, 0, packet) { + packetCount := m.PacketCount() + + if 0 == packetCount { + s = &ocdm.Service{ + ServiceName: m.Name(), + CryptoType: occc.ToCryptoType(sc.Type()), + } + break Loop + } + + found := false + + for j := 0; j < packetCount; j++ { + tPacket := m.Packet(j) + // logging.Logger().Debugf("Discovery: Service scan send packet length[%d], buf[%v]", tPacket.Len, tPacket.Buffer) + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet length[%d]", sc.Type(), info.IP(), info.Port(), tPacket.Len) + wn, err := conn.Write(tPacket.Buffer) + if nil != err { + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet error %v", sc.Type(), info.IP(), info.Port(), err) + break + } + if wn != tPacket.Len { + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet length[%d] not same with %d", sc.Type(), info.IP(), info.Port(), wn, tPacket.Len) + break + } + + rn, err := conn.Read(buf) + if nil != err { + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive packet error %v", sc.Type(), info.IP(), info.Port(), err) + break + } + + if m.Match(info, j+1, csm.NewPacket(buf, rn)) { + // logging.Logger().Debugf("Discovery: Service scan receive match length[%d], buf[%v]", rn, buf) + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive match length[%d]", sc.Type(), info.IP(), info.Port(), rn) + found = true + } else { + // logging.Logger().Debugf("Discovery: Service scan receive not match length[%d], buf[%v]", rn, buf) + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive not match length[%d]", sc.Type(), info.IP(), info.Port(), rn) + found = false + break + } + } + + if found { + s = &ocdm.Service{ + ServiceName: m.Name(), + CryptoType: occc.ToCryptoType(sc.Type()), + } + break Loop + } + } + } + + return s +} + +func hadlePostPacket(info csm.MatchInfo, sc serviceConnector) *ocdm.Service { + ms := matcher.GetTCPMatchers(false) + buf := make([]byte, 1024) + var s *ocdm.Service + +Loop: + for i := 0; i < len(ms); i++ { + m := ms[i] + + conn, err := sc.Dial(info.IP(), info.Port()) + if err != nil { + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d socket dial error %v", sc.Type(), info.IP(), info.Port(), err) + break Loop + } + + packetCount := m.PacketCount() + for j := 0; j < packetCount; j++ { + tPacket := m.Packet(j) + // logging.Logger().Debugf("Discovery: Service scan send packet length[%d], buf[%v]", tPacket.Len, tPacket.Buffer) + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet length[%d]", sc.Type(), info.IP(), info.Port(), tPacket.Len) + wn, err := conn.Write(tPacket.Buffer) + if nil != err { + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet error %v", sc.Type(), info.IP(), info.Port(), err) + break + } + if wn != tPacket.Len { + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d send packet length[%d] not same with %d", sc.Type(), info.IP(), info.Port(), wn, tPacket.Len) + break + } + + rn, err := conn.Read(buf) + if nil != err { + if !m.HasResponse(j) { + s = &ocdm.Service{ + ServiceName: m.Name(), + CryptoType: occc.ToCryptoType(sc.Type()), + } + break + } + + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive packet error %v", sc.Type(), info.IP(), info.Port(), err) + break + } + + if m.Match(info, j, csm.NewPacket(buf, rn)) { + if packetCount-1 == j { + s = &ocdm.Service{ + ServiceName: m.Name(), + CryptoType: occc.ToCryptoType(sc.Type()), + } + break + } + + // logging.Logger().Debugf("Discovery: Service scan receive match length[%d], buf[%v]", rn, buf) + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive match length[%d]", sc.Type(), info.IP(), info.Port(), rn) + continue + } else { + // logging.Logger().Debugf("Discovery: Service scan receive not match length[%d], buf[%v]", rn, buf) + logging.Logger().Debugf("Discovery: Service scan[%s] on %s:%d receive not match length[%d]", sc.Type(), info.IP(), info.Port(), rn) + break + } + } + conn.Close() + + if nil != s { + break Loop + } + } + + return s +} diff --git a/internal/discoverer/ipv4/service_udp.go b/internal/discoverer/ipv4/service_udp.go new file mode 100644 index 0000000..3cdeecd --- /dev/null +++ b/internal/discoverer/ipv4/service_udp.go @@ -0,0 +1,37 @@ +package ipv4 + +import ( + "fmt" + + csm "git.loafle.net/commons/service_matcher-go" + cuej "git.loafle.net/commons/util-go/encoding/json" + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "git.loafle.net/overflow/container_discovery/internal/matcher" +) + +func scanServiceUDP(port *ocdm.Port, ds *ocdm.DiscoveryService, resultChan chan interface{}, errChan chan error, stopChan chan struct{}) bool { + portNumber, err := cuej.NumberToInt(port.PortNumber) + if err != nil { + errChan <- fmt.Errorf("Discovery: Service scan port[%s] error %v ", port.PortNumber, err) + return false + } + + ms := matcher.GetUDPMatchers() + mi := csm.NewMatchInfo(port.Host.IP, portNumber) + + for i := 0; i < len(ms); i++ { + m := ms[i] + p := csm.NewPacket(port.UDPLayer.LayerPayload(), len(port.UDPLayer.LayerPayload())) + + if m.Match(mi, 0, p) { + s := &ocdm.Service{ + ServiceName: m.Name(), + } + s.Port = port + resultChan <- s + return true + } + } + + return false +} diff --git a/internal/discoverer/ipv6/host.go b/internal/discoverer/ipv6/host.go new file mode 100644 index 0000000..3b4672a --- /dev/null +++ b/internal/discoverer/ipv6/host.go @@ -0,0 +1,9 @@ +package ipv6 + +import ( + ocdm "git.loafle.net/overflow/commons-go/discovery/model" +) + +func ScanHost(zone *ocdm.Zone, dh *ocdm.DiscoveryHost, resultChan chan interface{}, errChan chan error, stopChan chan struct{}) { + +} diff --git a/internal/discoverer/ipv6/port.go b/internal/discoverer/ipv6/port.go new file mode 100644 index 0000000..025fb5b --- /dev/null +++ b/internal/discoverer/ipv6/port.go @@ -0,0 +1,9 @@ +package ipv6 + +import ( + ocdm "git.loafle.net/overflow/commons-go/discovery/model" +) + +func ScanPort(host *ocdm.Host, dp *ocdm.DiscoveryPort, resultChan chan interface{}, errChan chan error, stopChan chan struct{}) { + +} diff --git a/internal/discoverer/ipv6/service.go b/internal/discoverer/ipv6/service.go new file mode 100644 index 0000000..a5b1167 --- /dev/null +++ b/internal/discoverer/ipv6/service.go @@ -0,0 +1,9 @@ +package ipv6 + +import ( + ocdm "git.loafle.net/overflow/commons-go/discovery/model" +) + +func ScanService(port *ocdm.Port, ds *ocdm.DiscoveryService, resultChan chan interface{}, errChan chan error, stopChan chan struct{}) { + +} diff --git a/internal/discoverer/port.go b/internal/discoverer/port.go new file mode 100644 index 0000000..0042f60 --- /dev/null +++ b/internal/discoverer/port.go @@ -0,0 +1,32 @@ +package discoverer + +import ( + "fmt" + "net" + + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "git.loafle.net/overflow/container_discovery/internal/discoverer/ipv4" + "git.loafle.net/overflow/container_discovery/internal/discoverer/ipv6" +) + +func scanPort(host *ocdm.Host, dp *ocdm.DiscoveryPort, resultChan chan interface{}, errChan chan error, doneChan chan<- struct{}, stopChan chan struct{}) { + defer func() { + doneChan <- struct{}{} + }() + + _, ipNet, err := net.ParseCIDR(host.Zone.Network) + if nil != err { + errChan <- err + return + } + switch len(ipNet.IP) { + case net.IPv4len: + ipv4.ScanPort(host, dp, resultChan, errChan, stopChan) + case net.IPv6len: + ipv6.ScanPort(host, dp, resultChan, errChan, stopChan) + + default: + errChan <- fmt.Errorf("Discovery: Not supported ip length") + return + } +} diff --git a/internal/discoverer/service.go b/internal/discoverer/service.go new file mode 100644 index 0000000..df3a1b0 --- /dev/null +++ b/internal/discoverer/service.go @@ -0,0 +1,33 @@ +package discoverer + +import ( + "fmt" + "net" + + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "git.loafle.net/overflow/container_discovery/internal/discoverer/ipv4" + "git.loafle.net/overflow/container_discovery/internal/discoverer/ipv6" +) + +func scanService(port *ocdm.Port, ds *ocdm.DiscoveryService, resultChan chan interface{}, errChan chan error, doneChan chan<- struct{}, stopChan chan struct{}) { + defer func() { + doneChan <- struct{}{} + }() + + _, ipNet, err := net.ParseCIDR(port.Host.Zone.Network) + if nil != err { + errChan <- err + return + } + + switch len(ipNet.IP) { + case net.IPv4len: + ipv4.ScanService(port, ds, resultChan, errChan, stopChan) + case net.IPv6len: + ipv6.ScanService(port, ds, resultChan, errChan, stopChan) + + default: + errChan <- fmt.Errorf("Discovery: Not supported ip length") + return + } +} diff --git a/internal/discoverer/zone.go b/internal/discoverer/zone.go new file mode 100644 index 0000000..758b120 --- /dev/null +++ b/internal/discoverer/zone.go @@ -0,0 +1,88 @@ +package discoverer + +import ( + "net" + "regexp" + "strings" + + ocdm "git.loafle.net/overflow/commons-go/discovery/model" +) + +func scanZone(dz *ocdm.DiscoveryZone, resultChan chan interface{}, errChan chan error, doneChan chan<- struct{}, stopChan chan struct{}) { + defer func() { + doneChan <- struct{}{} + }() + + var err error + var ifaces []net.Interface + var addrs []net.Addr + var ipnet *net.IPNet + var zones []*net.IPNet + // var gwIP net.IP + // var gwIFace string + + // if gwIP, gwIFace, err = gateway.DiscoverGateway(); nil != err { + // logChan <- err + // return + // } + + if ifaces, err = net.Interfaces(); nil != err { + errChan <- err + return + } + + zones = make([]*net.IPNet, 0) + + for _, i := range ifaces { + + if addrs, err = i.Addrs(); nil != err { + errChan <- err + continue + } + + for _, addr := range addrs { + + if _, ipnet, err = net.ParseCIDR(addr.String()); nil != err { + errChan <- err + continue + } + if ipnet.IP.IsLoopback() || checkSameZone(zones, ipnet) || checkExclude(dz.ExcludePatterns, i.Name) { + continue + } + + zones = append(zones, ipnet) + + z := &ocdm.Zone{ + Network: ipnet.String(), + Iface: i.Name, + Mac: i.HardwareAddr.String(), + IP: strings.Split(addr.String(), "/")[0], + } + + resultChan <- z + } + } +} + +func checkExclude(ep []string, iface string) bool { + var r *regexp.Regexp + var err error + for _, p := range ep { + if r, err = regexp.Compile(p); nil != err { + return false + } + if r.MatchString(iface) { + return true + } + } + return false +} + +func checkSameZone(zones []*net.IPNet, ipnet *net.IPNet) bool { + for _, i := range zones { + if i.Contains(ipnet.IP) { + return true + } + } + return false +} diff --git a/internal/matcher/matcher.go b/internal/matcher/matcher.go new file mode 100644 index 0000000..70fd71a --- /dev/null +++ b/internal/matcher/matcher.go @@ -0,0 +1,106 @@ +package matcher + +import ( + csm "git.loafle.net/commons/service_matcher-go" + + "git.loafle.net/commons/service_matcher-go/activedirectory" + "git.loafle.net/commons/service_matcher-go/cassandra" + "git.loafle.net/commons/service_matcher-go/dns" + "git.loafle.net/commons/service_matcher-go/ftp" + "git.loafle.net/commons/service_matcher-go/http" + "git.loafle.net/commons/service_matcher-go/imap" + "git.loafle.net/commons/service_matcher-go/ldap" + "git.loafle.net/commons/service_matcher-go/mongodb" + "git.loafle.net/commons/service_matcher-go/mysql" + "git.loafle.net/commons/service_matcher-go/netbios" + "git.loafle.net/commons/service_matcher-go/oracle" + "git.loafle.net/commons/service_matcher-go/pop" + "git.loafle.net/commons/service_matcher-go/postgresql" + "git.loafle.net/commons/service_matcher-go/redis" + redis_protected "git.loafle.net/commons/service_matcher-go/redis/protected" + "git.loafle.net/commons/service_matcher-go/rmi" + "git.loafle.net/commons/service_matcher-go/smb" + "git.loafle.net/commons/service_matcher-go/smtp" + snmp_v2 "git.loafle.net/commons/service_matcher-go/snmp/v2" + snmp_v3 "git.loafle.net/commons/service_matcher-go/snmp/v3" + "git.loafle.net/commons/service_matcher-go/sqlserver" + "git.loafle.net/commons/service_matcher-go/ssh" + "git.loafle.net/commons/service_matcher-go/telnet" + "git.loafle.net/commons/service_matcher-go/wmi" +) + +var ( + AllMatchers []csm.Matcher + + TCPMatchers []csm.Matcher + UDPMatchers []csm.UDPMatcher + + TCPPrePacketMatchers []csm.Matcher + TCPNotPrePacketMatchers []csm.Matcher +) + +func init() { + //TCP + addTCPMatcher(smtp.NewMatcher()) + addTCPMatcher(ldap.NewMatcher()) + addTCPMatcher(activedirectory.NewMatcher()) + addTCPMatcher(mongodb.NewMatcher()) + addTCPMatcher(mysql.NewMatcher()) + addTCPMatcher(sqlserver.NewMatcher()) + addTCPMatcher(redis.NewMatcher()) + addTCPMatcher(redis_protected.NewMatcher()) + addTCPMatcher(netbios.NewMatcher()) + addTCPMatcher(smb.NewMatcher()) + addTCPMatcher(cassandra.NewMatcher()) + addTCPMatcher(imap.NewMatcher()) + addTCPMatcher(oracle.NewMatcher()) + addTCPMatcher(postgresql.NewMatcher()) + addTCPMatcher(pop.NewMatcher()) + addTCPMatcher(wmi.NewMatcher()) + addTCPMatcher(ftp.NewMatcher()) + addTCPMatcher(http.NewMatcher()) + addTCPMatcher(rmi.NewMatcher()) + addTCPMatcher(ssh.NewMatcher()) + addTCPMatcher(telnet.NewMatcher()) + + // UDP + addUDPMatcher(dns.NewMatcher()) + addUDPMatcher(snmp_v2.NewMatcher()) + addUDPMatcher(snmp_v3.NewMatcher()) +} + +func addTCPMatcher(m csm.Matcher) { + AllMatchers = append(AllMatchers, m) + TCPMatchers = append(TCPMatchers, m) + if m.IsPrePacket() { + TCPPrePacketMatchers = append(TCPPrePacketMatchers, m) + } else { + TCPNotPrePacketMatchers = append(TCPNotPrePacketMatchers, m) + } +} + +func addUDPMatcher(m csm.UDPMatcher) { + AllMatchers = append(AllMatchers, m) + UDPMatchers = append(UDPMatchers, m) +} + +func GetTCPMatchers(isPrePacket bool) []csm.Matcher { + if isPrePacket { + return TCPPrePacketMatchers + } + + return TCPNotPrePacketMatchers +} + +func GetUDPMatchers() []csm.UDPMatcher { + return UDPMatchers +} + +func GetMatcherByName(name string) csm.Matcher { + for _, m := range AllMatchers { + if m.Name() == name { + return m + } + } + return nil +} diff --git a/internal/pcap/packet.go b/internal/pcap/packet.go new file mode 100644 index 0000000..03d7675 --- /dev/null +++ b/internal/pcap/packet.go @@ -0,0 +1,106 @@ +package pcap + +import ( + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +type PacketType int + +const ( + PacketTypeUnknown PacketType = iota + PacketTypeARP + PacketTypeTCP + PacketTypeUDP +) + +func getPacketType(packet gopacket.Packet) PacketType { + if packet == nil { + return PacketTypeUnknown + } + layer := packet.Layer(layers.LayerTypeARP) + if layer != nil { + return PacketTypeARP + } + + layer = packet.Layer(layers.LayerTypeTCP) + if layer != nil { + if _, ok := layer.(*layers.TCP); ok { + return PacketTypeTCP + } + } + + layer = packet.Layer(layers.LayerTypeUDP) + if layer != nil { + if _, ok := layer.(*layers.UDP); ok { + return PacketTypeUDP + } + } + return PacketTypeUnknown +} + +func handlePacket(ps *pCapScan, packet gopacket.Packet) { + switch getPacketType(packet) { + case PacketTypeARP: + handlePacketARP(ps, packet) + case PacketTypeTCP: + handlePacketTCP(ps, packet) + case PacketTypeUDP: + handlePacketUDP(ps, packet) + default: + } +} + +func handlePacketARP(ps *pCapScan, packet gopacket.Packet) { + ps.arpListenerChanMtx.RLock() + defer ps.arpListenerChanMtx.RUnlock() + + arpLayer := packet.Layer(layers.LayerTypeARP) + arp := arpLayer.(*layers.ARP) + + for _, ch := range ps.arpListenerChans { + ch <- arp + } +} + +func handlePacketTCP(ps *pCapScan, packet gopacket.Packet) { + ipLayer := packet.Layer(layers.LayerTypeIPv4) + if nil == ipLayer { + return + } + + ip := ipLayer.(*layers.IPv4).SrcIP.String() + + ps.tcpListenerChanMtx.RLock() + defer ps.tcpListenerChanMtx.RUnlock() + + chs, ok := ps.tcpListenerChans[ip] + if ok { + + layer := packet.Layer(layers.LayerTypeTCP) + tcp, _ := layer.(*layers.TCP) + + for _, ch := range chs { + ch <- tcp + } + } +} + +func handlePacketUDP(ps *pCapScan, packet gopacket.Packet) { + ipLayer := packet.Layer(layers.LayerTypeIPv4) + if nil == ipLayer { + return + } + + ip := ipLayer.(*layers.IPv4).SrcIP.String() + + ps.udpListenerChanMtx.RLock() + defer ps.udpListenerChanMtx.RUnlock() + + chs, ok := ps.udpListenerChans[ip] + if ok { + for _, ch := range chs { + ch <- packet + } + } +} diff --git a/internal/pcap/pcap.go b/internal/pcap/pcap.go new file mode 100644 index 0000000..722bd4b --- /dev/null +++ b/internal/pcap/pcap.go @@ -0,0 +1,60 @@ +package pcap + +import ( + "sync" + "time" + + ocdm "git.loafle.net/overflow/commons-go/discovery/model" +) + +var mtx sync.Mutex +var instances map[string]PCapScanner + +func init() { + instances = make(map[string]PCapScanner, 0) +} + +func RetainScanner(zone *ocdm.Zone) (PCapScanner, error) { + mtx.Lock() + defer mtx.Unlock() + + var ps PCapScanner + var ok bool + if ps, ok = instances[zone.Network]; !ok { + ps = newPCapScanner(zone) + if err := ps.start(); nil != err { + return nil, err + } + instances[zone.Network] = ps + } + ps.retain() + + return ps, nil +} + +func ReleaseScanner(zone *ocdm.Zone) { + + go func() { + time.Sleep(2 * time.Second) + + mtx.Lock() + defer mtx.Unlock() + + if ps, ok := instances[zone.Network]; ok { + if ps.release() { + ps.stop() + delete(instances, zone.Network) + } + } + }() +} + +func ReleaseScannerAll() { + mtx.Lock() + defer mtx.Unlock() + + for k, ps := range instances { + ps.stop() + delete(instances, k) + } +} diff --git a/internal/pcap/pcap_scan.go b/internal/pcap/pcap_scan.go new file mode 100644 index 0000000..6495ea5 --- /dev/null +++ b/internal/pcap/pcap_scan.go @@ -0,0 +1,249 @@ +package pcap + +import ( + "fmt" + "sort" + "sync" + + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" +) + +func newPCapScanner(zone *ocdm.Zone) PCapScanner { + ps := &pCapScan{ + zone: zone, + } + + return ps +} + +type PCapScanner interface { + OpenARP() chan *layers.ARP + CloseARP(ch chan *layers.ARP) + + OpenTCP(ip string) chan *layers.TCP + CloseTCP(ip string, ch chan *layers.TCP) + + OpenUDP(ip string) chan gopacket.Packet + CloseUDP(ip string, ch chan gopacket.Packet) + + WritePacketData(data []byte) (err error) + + start() error + stop() + + retain() + release() bool +} + +type pCapScan struct { + pCapHandle *pcap.Handle + zone *ocdm.Zone + + arpListenerChanMtx sync.RWMutex + arpListenerChans []chan *layers.ARP + + tcpListenerChanMtx sync.RWMutex + tcpListenerChans map[string][]chan *layers.TCP + + udpListenerChanMtx sync.RWMutex + udpListenerChans map[string][]chan gopacket.Packet + + refCount int + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (ps *pCapScan) start() error { + if ps.stopChan != nil { + return fmt.Errorf("PCapScanner: already running. Stop it before starting it again") + } + + // new pcap handle + var h *pcap.Handle + var err error + if h, err = pcap.OpenLive(ps.zone.Iface, 65536, true, pcap.BlockForever); nil != err { + return err + } + // set filter + // todo add tcp, udp filter + if err = h.SetBPFFilter("arp and src net " + ps.zone.Network + " or (((tcp[tcpflags] & (tcp-syn|tcp-ack) != 0) or (tcp[tcpflags] & (tcp-rst) != 0)) and port 60000) or udp "); nil != err { + h.Close() + return err + } + ps.pCapHandle = h + + ps.arpListenerChans = make([]chan *layers.ARP, 0) + ps.tcpListenerChans = make(map[string][]chan *layers.TCP, 0) + ps.udpListenerChans = make(map[string][]chan gopacket.Packet, 0) + + ps.stopChan = make(chan struct{}) + + ps.stopWg.Add(1) + go handleReceive(ps) + + return nil +} + +func (ps *pCapScan) stop() { + if ps.stopChan == nil { + panic("PCapScanner: must be started before stopping it") + } + close(ps.stopChan) + ps.stopWg.Wait() + ps.stopChan = nil +} + +func (ps *pCapScan) OpenARP() chan *layers.ARP { + ps.arpListenerChanMtx.Lock() + defer ps.arpListenerChanMtx.Unlock() + + c := make(chan *layers.ARP, 0) + ps.arpListenerChans = append(ps.arpListenerChans, c) + return c +} +func (ps *pCapScan) CloseARP(ch chan *layers.ARP) { + ps.arpListenerChanMtx.Lock() + defer ps.arpListenerChanMtx.Unlock() + + i := sort.Search(len(ps.arpListenerChans), func(i int) bool { + return ch == ps.arpListenerChans[i] + }) + + if -1 != i { + close(ch) + ps.arpListenerChans = append(ps.arpListenerChans[:i], ps.arpListenerChans[i+1:]...) + } +} + +func (ps *pCapScan) OpenTCP(ip string) chan *layers.TCP { + ps.tcpListenerChanMtx.Lock() + defer ps.tcpListenerChanMtx.Unlock() + + if _, ok := ps.tcpListenerChans[ip]; !ok { + ps.tcpListenerChans[ip] = make([]chan *layers.TCP, 0) + } + + ch := make(chan *layers.TCP, 0) + ps.tcpListenerChans[ip] = append(ps.tcpListenerChans[ip], ch) + + return ch +} +func (ps *pCapScan) CloseTCP(ip string, ch chan *layers.TCP) { + ps.tcpListenerChanMtx.Lock() + defer ps.tcpListenerChanMtx.Unlock() + + if _, ok := ps.tcpListenerChans[ip]; !ok { + return + } + + chs := ps.tcpListenerChans[ip] + i := sort.Search(len(chs), func(i int) bool { + return ch == chs[i] + }) + + if -1 != i { + close(ch) + ps.tcpListenerChans[ip] = append(ps.tcpListenerChans[ip][:i], ps.tcpListenerChans[ip][i+1:]...) + } + +} + +func (ps *pCapScan) OpenUDP(ip string) chan gopacket.Packet { + ps.udpListenerChanMtx.Lock() + defer ps.udpListenerChanMtx.Unlock() + + if _, ok := ps.udpListenerChans[ip]; !ok { + ps.udpListenerChans[ip] = make([]chan gopacket.Packet, 0) + } + + ch := make(chan gopacket.Packet, 0) + ps.udpListenerChans[ip] = append(ps.udpListenerChans[ip], ch) + + return ch + +} +func (ps *pCapScan) CloseUDP(ip string, ch chan gopacket.Packet) { + ps.udpListenerChanMtx.Lock() + defer ps.udpListenerChanMtx.Unlock() + + if _, ok := ps.udpListenerChans[ip]; !ok { + return + } + + chs := ps.udpListenerChans[ip] + i := sort.Search(len(chs), func(i int) bool { + return ch == chs[i] + }) + + if -1 != i { + close(ch) + ps.udpListenerChans[ip] = append(ps.udpListenerChans[ip][:i], ps.udpListenerChans[ip][i+1:]...) + } +} + +func (ps *pCapScan) WritePacketData(data []byte) (err error) { + return ps.pCapHandle.WritePacketData(data) +} + +func (ps *pCapScan) retain() { + ps.refCount++ +} +func (ps *pCapScan) release() bool { + ps.refCount-- + if 0 > ps.refCount { + ps.refCount = 0 + } + + return 0 == ps.refCount +} + +func (ps *pCapScan) destroy() { + ps.tcpListenerChanMtx.Lock() + for k, v := range ps.tcpListenerChans { + for _, ch := range v { + close(ch) + } + v = v[:0] + delete(ps.tcpListenerChans, k) + } + ps.tcpListenerChanMtx.Unlock() + + ps.udpListenerChanMtx.Lock() + for k, v := range ps.udpListenerChans { + for _, ch := range v { + close(ch) + } + v = v[:0] + delete(ps.udpListenerChans, k) + } + ps.udpListenerChanMtx.Unlock() + + ps.arpListenerChanMtx.Lock() + for _, v := range ps.arpListenerChans { + close(v) + } + ps.arpListenerChans = ps.arpListenerChans[:0] + ps.arpListenerChanMtx.Unlock() + + ps.pCapHandle.Close() +} + +func handleReceive(ps *pCapScan) { + defer ps.stopWg.Done() + + pSrc := gopacket.NewPacketSource(ps.pCapHandle, layers.LayerTypeEthernet) + inPacket := pSrc.Packets() + + for { + select { + case packet := <-inPacket: + handlePacket(ps, packet) + case <-ps.stopChan: + ps.destroy() + return + } + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..ac00cd1 --- /dev/null +++ b/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" + + "git.loafle.net/commons/logging-go" + ocpcc "git.loafle.net/overflow/commons-go/probe/constants" + "git.loafle.net/overflow/container_discovery/server" +) + +var ( + pidFilePath *string +) + +func init() { + pidFilePath = flag.String(ocpcc.FlagPidFilePathName, "./dist/discovery.pid", "PID file path") + loggingConfigFilePath := flag.String(ocpcc.FlagLoggingConfigFilePathName, "", "logging config path") + flag.Parse() + + logging.InitializeLogger(*loggingConfigFilePath) +} + +func main() { + defer logging.Logger().Sync() + + s := server.New(*pidFilePath) + + go func() { + err := s.ListenAndServe() + if nil != err { + log.Printf("err: %v", err) + } + os.Exit(1) + }() + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, + syscall.SIGKILL, + syscall.SIGSTOP, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + <-interrupt + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.Shutdown(ctx); err != nil { + logging.Logger().Errorf("error: %v", err) + } +} diff --git a/server/server-handler.go b/server/server-handler.go new file mode 100644 index 0000000..e74bc91 --- /dev/null +++ b/server/server-handler.go @@ -0,0 +1,48 @@ +package server + +import ( + cs "git.loafle.net/commons/server-go" + ocs "git.loafle.net/overflow/container-go/server" +) + +type ServerHandler interface { + ocs.ServerHandler +} + +type ServerHandlers struct { + ocs.ServerHandlers +} + +func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error { + if err := sh.ServerHandlers.Init(serverCtx); nil != err { + return err + } + + return nil +} + +func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error { + if err := sh.ServerHandlers.OnStart(serverCtx); nil != err { + return err + } + + return nil +} + +func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) { + + sh.ServerHandlers.OnStop(serverCtx) +} + +func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) { + + sh.ServerHandlers.Destroy(serverCtx) +} + +func (sh *ServerHandlers) Validate() error { + if err := sh.ServerHandlers.Validate(); nil != err { + return err + } + + return nil +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..e5769a9 --- /dev/null +++ b/server/server.go @@ -0,0 +1,42 @@ +package server + +import ( + cdr "git.loafle.net/commons/di-go/registry" + logging "git.loafle.net/commons/logging-go" + crr "git.loafle.net/commons/rpc-go/registry" + cssn "git.loafle.net/commons/server-go/socket/net" + occa "git.loafle.net/overflow/commons-go/core/annotation" + "git.loafle.net/overflow/container-go" + "git.loafle.net/overflow/container_discovery/crawler" + "git.loafle.net/overflow/container_discovery/service" + "git.loafle.net/overflow/container_discovery/servlet" +) + +func New(pidFilePath string) *cssn.Server { + cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers()) + + services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType) + if nil != err { + logging.Logger().Panic(err) + } + + rpcRegistry := crr.NewRPCRegistry() + rpcRegistry.RegisterServices(services...) + + ds := &servlet.DiscoveryServlets{} + ds.RPCInvoker = rpcRegistry + + sh := &ServerHandlers{} + sh.Name = "Container Discovery" + sh.PIDFilePath = pidFilePath + sh.Services = services + sh.OrderedServices = service.OrderedServices + + sh.RegisterServlet(ds) + + s := &cssn.Server{ + ServerHandler: sh, + } + + return s +} diff --git a/service/DiscoveryService.go b/service/DiscoveryService.go new file mode 100644 index 0000000..640ff80 --- /dev/null +++ b/service/DiscoveryService.go @@ -0,0 +1,150 @@ +package service + +import ( + "fmt" + "reflect" + "sync" + + cda "git.loafle.net/commons/di-go/annotation" + cdr "git.loafle.net/commons/di-go/registry" + ocdm "git.loafle.net/overflow/commons-go/discovery/model" + ocs "git.loafle.net/overflow/container-go/service" + "git.loafle.net/overflow/container_discovery/internal/discoverer" + + // For annotation + _ "git.loafle.net/overflow/commons-go/core/annotation" +) + +var DiscoveryServiceType = reflect.TypeOf((*DiscoveryService)(nil)) + +func init() { + cdr.RegisterType(DiscoveryServiceType) +} + +type DiscoveryService struct { + cda.TypeAnnotation `annotation:"@overflow:RPCService()"` + + ProbeService *ocs.ProbeService `annotation:"@Inject()"` + + pendingDiscovery sync.Map + discoverer discoverer.Discoverer +} + +func (s *DiscoveryService) InitService() error { + s.discoverer = discoverer.GetDiscoverer() + return nil +} + +func (s *DiscoveryService) StartService() error { + + return nil +} + +func (s *DiscoveryService) StopService() { + +} + +func (s *DiscoveryService) DestroyService() { + +} + +func (s *DiscoveryService) DiscoverZone(requesterID string, dz *ocdm.DiscoveryZone) error { + s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { + s.discoverer.DiscoverZone(dataChan, dz) + }) + + return nil +} + +func (s *DiscoveryService) DiscoverHost(requesterID string, zone *ocdm.Zone, dh *ocdm.DiscoveryHost) error { + s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { + s.discoverer.DiscoverHost(dataChan, zone, dh) + }) + + return nil +} + +func (s *DiscoveryService) DiscoverPort(requesterID string, host *ocdm.Host, dp *ocdm.DiscoveryPort) error { + s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { + s.discoverer.DiscoverPort(dataChan, host, dp) + }) + + return nil +} + +func (s *DiscoveryService) DiscoverService(requesterID string, port *ocdm.Port, ds *ocdm.DiscoveryService) error { + s.handleDiscovery(requesterID, func(dataChan chan *discoverer.DiscoveryData) { + s.discoverer.DiscoverSerice(dataChan, port, ds) + }) + + return nil +} + +func (s *DiscoveryService) StopRequest(requesterID string) error { + _stopChan, ok := s.pendingDiscovery.Load(requesterID) + if !ok { + return fmt.Errorf("discovery request for [%s] is not exist", requesterID) + } + stopChan := _stopChan.(chan struct{}) + + close(stopChan) + + return nil +} + +func (s *DiscoveryService) handleDiscovery(requesterID string, discoveryFunc func(dataChan chan *discoverer.DiscoveryData)) error { + stopChan := make(chan struct{}) + s.pendingDiscovery.Store(requesterID, stopChan) + defer func() { + s.pendingDiscovery.Delete(requesterID) + }() + + var dataChan chan *discoverer.DiscoveryData + retainChan := make(chan struct{}) + + go func() { + dataChan = s.discoverer.Retain() + close(retainChan) + }() + + select { + case <-stopChan: + return nil + case <-retainChan: + } + defer func() { + s.discoverer.Release(dataChan) + }() + + discoveryFunc(dataChan) + + for { + select { + case data, ok := <-dataChan: + if !ok { + return nil + } + switch data.Type { + case discoverer.DiscoveryDataTypeStart: + s.ProbeService.Send("DiscoveryService.DiscoveryStart", requesterID, data.Time) + case discoverer.DiscoveryDataTypeStop: + s.ProbeService.Send("DiscoveryService.DiscoveryStop", requesterID, data.Time) + data.Release() + return nil + case discoverer.DiscoveryDataTypeError: + s.ProbeService.Send("DiscoveryService.DiscoveryError", requesterID, data.Error) + case discoverer.DiscoveryDataTypeZone: + s.ProbeService.Send("DiscoveryService.DiscoveredZone", requesterID, data.Result) + case discoverer.DiscoveryDataTypeHost: + s.ProbeService.Send("DiscoveryService.DiscoveredHost", requesterID, data.Result) + case discoverer.DiscoveryDataTypePort: + s.ProbeService.Send("DiscoveryService.DiscoveredPort", requesterID, data.Result) + case discoverer.DiscoveryDataTypeService: + s.ProbeService.Send("DiscoveryService.DiscoveredService", requesterID, data.Result) + } + data.Release() + case <-stopChan: + return nil + } + } +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..4f3f200 --- /dev/null +++ b/service/service.go @@ -0,0 +1,21 @@ +package service + +import ( + ocs "git.loafle.net/overflow/container-go/service" +) + +var ( + OrderedServices = append(ocs.OrderedServices, DiscoveryServiceType) +) + +func InitPackage() { +} + +func StartPackage() { +} + +func StopPackage() { +} + +func DestroyPackage() { +} diff --git a/servlet/discovery-servlet.go b/servlet/discovery-servlet.go new file mode 100644 index 0000000..7614327 --- /dev/null +++ b/servlet/discovery-servlet.go @@ -0,0 +1,55 @@ +package servlet + +import ( + "net" + + "git.loafle.net/commons/server-go" + css "git.loafle.net/commons/server-go/socket" + ocs "git.loafle.net/overflow/container-go/servlet" +) + +type DiscoveryServlet interface { + ocs.RPCServlet +} + +type DiscoveryServlets struct { + ocs.RPCServlets +} + +func (s *DiscoveryServlets) Init(serverCtx server.ServerCtx) error { + if err := s.RPCServlets.Init(serverCtx); nil != err { + return err + } + + return nil +} + +func (s *DiscoveryServlets) OnStart(serverCtx server.ServerCtx) error { + if err := s.RPCServlets.OnStart(serverCtx); nil != err { + return err + } + + return nil +} + +func (s *DiscoveryServlets) OnStop(serverCtx server.ServerCtx) { + + s.RPCServlets.OnStop(serverCtx) +} + +func (s *DiscoveryServlets) Destroy(serverCtx server.ServerCtx) { + + s.RPCServlets.Destroy(serverCtx) +} + +func (s *DiscoveryServlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error { + return nil +} + +func (s *DiscoveryServlets) OnConnect(servletCtx server.ServletCtx, conn css.Conn) { + s.RPCServlets.OnConnect(servletCtx, conn) +} + +func (s *DiscoveryServlets) OnDisconnect(servletCtx server.ServletCtx) { + s.RPCServlets.OnDisconnect(servletCtx) +}