ing
This commit is contained in:
parent
fdb2635edc
commit
a1ecf315c6
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
|
@ -9,7 +9,7 @@
|
||||||
"remotePath": "",
|
"remotePath": "",
|
||||||
"port": 2345,
|
"port": 2345,
|
||||||
"host": "127.0.0.1",
|
"host": "127.0.0.1",
|
||||||
"program": "${workspaceRoot}/main.go",
|
"program": "${workspaceRoot}/cmd/main.go",
|
||||||
"env": {},
|
"env": {},
|
||||||
"args": [],
|
"args": [],
|
||||||
"showLog": true
|
"showLog": true
|
||||||
|
|
11
const.go
Normal file
11
const.go
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
package probe
|
||||||
|
|
||||||
|
import (
|
||||||
|
ouc "git.loafle.net/overflow/util-go/ctx"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
SessionIDKey = ouc.CtxKey("SessionID")
|
||||||
|
SessionTargetIDKey = ouc.CtxKey("SessionTargetID")
|
||||||
|
SessionWriteChanKey = ouc.CtxKey("SessionWriteChan")
|
||||||
|
)
|
|
@ -64,7 +64,7 @@ func (d *ofDiscoverer) Message() <-chan types.DiscoveryMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, data interface{}, err error) {
|
func (d *ofDiscoverer) SendMessage(discoveryRequest types.DiscoveryRequest, messageType types.DiscoveryMessageType, data interface{}, err error) {
|
||||||
d.messageChan <- retainDiscoveryMessage(discoveryRequest, messageType, data, err)
|
d.messageChan <- types.MakeDiscoveryMessage(discoveryRequest, messageType, data, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *ofDiscoverer) Shutdown() {
|
func (d *ofDiscoverer) Shutdown() {
|
||||||
|
@ -127,27 +127,32 @@ LOOP:
|
||||||
}
|
}
|
||||||
req.(*ofDiscoveryRequest).dequeue <- true
|
req.(*ofDiscoveryRequest).dequeue <- true
|
||||||
|
|
||||||
d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now(), nil)
|
startTime := time.Now()
|
||||||
d.discover(req)
|
|
||||||
d.SendMessage(req, types.DiscoveryMessageTypeStop, omu.Now(), nil)
|
|
||||||
|
|
||||||
|
d.SendMessage(req, types.DiscoveryMessageTypeStart, omu.Now(), nil)
|
||||||
|
s := session.RetainDiscoverySession()
|
||||||
|
d.discover(req, s)
|
||||||
|
d.SendMessage(req, types.DiscoveryMessageTypeStop, time.Since(startTime).String(), nil)
|
||||||
|
|
||||||
|
session.ReleaseDiscoverySession(s)
|
||||||
|
|
||||||
|
timer := time.NewTimer(time.Millisecond * 500)
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
}
|
||||||
req.(*ofDiscoveryRequest).release()
|
req.(*ofDiscoveryRequest).release()
|
||||||
|
|
||||||
case <-d.stopChan:
|
case <-d.stopChan:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *ofDiscoverer) discover(req types.DiscoveryRequest) {
|
func (d *ofDiscoverer) discover(req types.DiscoveryRequest, s session.DiscoverySession) {
|
||||||
if types.DiscoveryRequestTypeNone == req.RequestType() {
|
if types.DiscoveryRequestTypeNone == req.RequestType() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s := session.RetainDiscoverySession()
|
|
||||||
defer func() {
|
|
||||||
session.ReleaseDiscoverySession(s)
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err := s.InitWithRequest(req); nil != err {
|
if err := s.InitWithRequest(req); nil != err {
|
||||||
d.SendMessage(req, types.DiscoveryMessageTypeError, nil, err)
|
d.SendMessage(req, types.DiscoveryMessageTypeError, nil, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -1,70 +0,0 @@
|
||||||
package discovery
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.loafle.net/overflow_scanner/probe/discovery/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ofDiscoveryMessage struct {
|
|
||||||
request types.DiscoveryRequest
|
|
||||||
messageType types.DiscoveryMessageType
|
|
||||||
data interface{}
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *ofDiscoveryMessage) Request() types.DiscoveryRequest {
|
|
||||||
return dm.request
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *ofDiscoveryMessage) Type() types.DiscoveryMessageType {
|
|
||||||
return dm.messageType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *ofDiscoveryMessage) Data() interface{} {
|
|
||||||
return dm.data
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *ofDiscoveryMessage) Error() error {
|
|
||||||
return dm.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *ofDiscoveryMessage) Release() {
|
|
||||||
releaseDiscoveryMessage(dm)
|
|
||||||
}
|
|
||||||
|
|
||||||
func RetainDiscoveryMessage(request types.DiscoveryRequest, messageType types.DiscoveryMessageType, data interface{}, err error) *ofDiscoveryMessage {
|
|
||||||
return retainDiscoveryMessage(request, messageType, data, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func ReleaseDiscoveryMessage(dm *ofDiscoveryMessage) {
|
|
||||||
releaseDiscoveryMessage(dm)
|
|
||||||
}
|
|
||||||
|
|
||||||
var discoveryMessagePool sync.Pool
|
|
||||||
|
|
||||||
func retainDiscoveryMessage(request types.DiscoveryRequest, messageType types.DiscoveryMessageType, data interface{}, err error) *ofDiscoveryMessage {
|
|
||||||
v := discoveryMessagePool.Get()
|
|
||||||
var dm *ofDiscoveryMessage
|
|
||||||
if v == nil {
|
|
||||||
dm = &ofDiscoveryMessage{}
|
|
||||||
} else {
|
|
||||||
dm = v.(*ofDiscoveryMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
dm.request = request
|
|
||||||
dm.messageType = messageType
|
|
||||||
dm.data = data
|
|
||||||
dm.err = err
|
|
||||||
|
|
||||||
return dm
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseDiscoveryMessage(dm *ofDiscoveryMessage) {
|
|
||||||
dm.request = nil
|
|
||||||
dm.messageType = types.DiscoveryMessageTypeNone
|
|
||||||
dm.data = nil
|
|
||||||
dm.err = nil
|
|
||||||
|
|
||||||
discoveryMessagePool.Put(dm)
|
|
||||||
}
|
|
|
@ -568,6 +568,12 @@ func RetainDiscoverySession() *ofDiscoverySession {
|
||||||
func ReleaseDiscoverySession(ds *ofDiscoverySession) {
|
func ReleaseDiscoverySession(ds *ofDiscoverySession) {
|
||||||
close(ds.stopChan)
|
close(ds.stopChan)
|
||||||
|
|
||||||
|
// timer := time.NewTimer(time.Microsecond * 500)
|
||||||
|
|
||||||
|
// select {
|
||||||
|
// case <-timer.C:
|
||||||
|
// }
|
||||||
|
|
||||||
ds.discoveryRequest = nil
|
ds.discoveryRequest = nil
|
||||||
ds.zone = nil
|
ds.zone = nil
|
||||||
ds.host = nil
|
ds.host = nil
|
||||||
|
|
|
@ -1,40 +1,5 @@
|
||||||
package types
|
package types
|
||||||
|
|
||||||
func NewMockDiscoveryMessage(request DiscoveryRequest, messageType DiscoveryMessageType, data interface{}, err error) DiscoveryMessage {
|
|
||||||
return &MockDiscoveryMessage{
|
|
||||||
request: request,
|
|
||||||
messageType: messageType,
|
|
||||||
data: data,
|
|
||||||
err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type MockDiscoveryMessage struct {
|
|
||||||
request DiscoveryRequest
|
|
||||||
messageType DiscoveryMessageType
|
|
||||||
data interface{}
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *MockDiscoveryMessage) Request() DiscoveryRequest {
|
|
||||||
return dm.request
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *MockDiscoveryMessage) Type() DiscoveryMessageType {
|
|
||||||
return dm.messageType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *MockDiscoveryMessage) Data() interface{} {
|
|
||||||
return dm.data
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *MockDiscoveryMessage) Error() error {
|
|
||||||
return dm.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dm *MockDiscoveryMessage) Release() {
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMockDiscoveryRequest(requesterID string, requestType DiscoveryRequestType, params ...interface{}) DiscoveryRequest {
|
func NewMockDiscoveryRequest(requesterID string, requestType DiscoveryRequestType, params ...interface{}) DiscoveryRequest {
|
||||||
return &MockDiscoveryRequest{
|
return &MockDiscoveryRequest{
|
||||||
requesterID: requesterID,
|
requesterID: requesterID,
|
||||||
|
|
|
@ -15,14 +15,6 @@ const (
|
||||||
DiscoveryMessageTypeService
|
DiscoveryMessageTypeService
|
||||||
)
|
)
|
||||||
|
|
||||||
type DiscoveryMessage interface {
|
|
||||||
Request() DiscoveryRequest
|
|
||||||
Type() DiscoveryMessageType
|
|
||||||
Data() interface{}
|
|
||||||
Error() error
|
|
||||||
Release()
|
|
||||||
}
|
|
||||||
|
|
||||||
type DiscoveryRequestType int
|
type DiscoveryRequestType int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -32,6 +24,14 @@ const (
|
||||||
DiscoveryRequestTypeService
|
DiscoveryRequestTypeService
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type DiscoveryMessage func() (request DiscoveryRequest, messageType DiscoveryMessageType, data interface{}, err error)
|
||||||
|
|
||||||
|
func MakeDiscoveryMessage(request DiscoveryRequest, messageType DiscoveryMessageType, data interface{}, err error) DiscoveryMessage {
|
||||||
|
return func() (DiscoveryRequest, DiscoveryMessageType, interface{}, error) {
|
||||||
|
return request, messageType, data, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type DiscoveryRequest interface {
|
type DiscoveryRequest interface {
|
||||||
RequesterID() string
|
RequesterID() string
|
||||||
RequestType() DiscoveryRequestType
|
RequestType() DiscoveryRequestType
|
||||||
|
|
223
internal/pubsub/pubsub.go
Normal file
223
internal/pubsub/pubsub.go
Normal file
|
@ -0,0 +1,223 @@
|
||||||
|
package pubsub
|
||||||
|
|
||||||
|
type operation int
|
||||||
|
|
||||||
|
const (
|
||||||
|
sub operation = iota
|
||||||
|
subOnce
|
||||||
|
pub
|
||||||
|
tryPub
|
||||||
|
unsub
|
||||||
|
unsubAll
|
||||||
|
closeTopic
|
||||||
|
shutdown
|
||||||
|
)
|
||||||
|
|
||||||
|
// PubSub is a collection of topics.
|
||||||
|
type PubSub struct {
|
||||||
|
cmdChan chan cmd
|
||||||
|
capacity int
|
||||||
|
}
|
||||||
|
|
||||||
|
type cmd struct {
|
||||||
|
op operation
|
||||||
|
topics []string
|
||||||
|
ch chan interface{}
|
||||||
|
msg interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new PubSub and starts a goroutine for handling operations.
|
||||||
|
// The capacity of the channels created by Sub and SubOnce will be as specified.
|
||||||
|
func New(capacity int) *PubSub {
|
||||||
|
ps := &PubSub{make(chan cmd), capacity}
|
||||||
|
go ps.start()
|
||||||
|
return ps
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sub returns a channel on which messages published on any of
|
||||||
|
// the specified topics can be received.
|
||||||
|
func (ps *PubSub) Sub(topics ...string) chan interface{} {
|
||||||
|
return ps.sub(sub, topics...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubOnce is similar to Sub, but only the first message published, after subscription,
|
||||||
|
// on any of the specified topics can be received.
|
||||||
|
func (ps *PubSub) SubOnce(topics ...string) chan interface{} {
|
||||||
|
return ps.sub(subOnce, topics...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PubSub) sub(op operation, topics ...string) chan interface{} {
|
||||||
|
ch := make(chan interface{}, ps.capacity)
|
||||||
|
ps.cmdChan <- cmd{op: op, topics: topics, ch: ch}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddSub adds subscriptions to an existing channel.
|
||||||
|
func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) {
|
||||||
|
ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pub publishes the given message to all subscribers of
|
||||||
|
// the specified topics.
|
||||||
|
func (ps *PubSub) Pub(msg interface{}, topics ...string) {
|
||||||
|
ps.cmdChan <- cmd{op: pub, topics: topics, msg: msg}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryPub publishes the given message to all subscribers of
|
||||||
|
// the specified topics if the topic has buffer space.
|
||||||
|
func (ps *PubSub) TryPub(msg interface{}, topics ...string) {
|
||||||
|
ps.cmdChan <- cmd{op: tryPub, topics: topics, msg: msg}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsub unsubscribes the given channel from the specified
|
||||||
|
// topics. If no topic is specified, it is unsubscribed
|
||||||
|
// from all topics.
|
||||||
|
func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) {
|
||||||
|
if len(topics) == 0 {
|
||||||
|
ps.cmdChan <- cmd{op: unsubAll, ch: ch}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes all channels currently subscribed to the specified topics.
|
||||||
|
// If a channel is subscribed to multiple topics, some of which is
|
||||||
|
// not specified, it is not closed.
|
||||||
|
func (ps *PubSub) Close(topics ...string) {
|
||||||
|
ps.cmdChan <- cmd{op: closeTopic, topics: topics}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown closes all subscribed channels and terminates the goroutine.
|
||||||
|
func (ps *PubSub) Shutdown() {
|
||||||
|
ps.cmdChan <- cmd{op: shutdown}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PubSub) start() {
|
||||||
|
reg := registry{
|
||||||
|
topics: make(map[string]map[chan interface{}]bool),
|
||||||
|
revTopics: make(map[chan interface{}]map[string]bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for cmd := range ps.cmdChan {
|
||||||
|
if cmd.topics == nil {
|
||||||
|
switch cmd.op {
|
||||||
|
case unsubAll:
|
||||||
|
reg.removeChannel(cmd.ch)
|
||||||
|
|
||||||
|
case shutdown:
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
|
||||||
|
continue loop
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, topic := range cmd.topics {
|
||||||
|
switch cmd.op {
|
||||||
|
case sub:
|
||||||
|
reg.add(topic, cmd.ch, false)
|
||||||
|
|
||||||
|
case subOnce:
|
||||||
|
reg.add(topic, cmd.ch, true)
|
||||||
|
|
||||||
|
case tryPub:
|
||||||
|
reg.sendNoWait(topic, cmd.msg)
|
||||||
|
|
||||||
|
case pub:
|
||||||
|
reg.send(topic, cmd.msg)
|
||||||
|
|
||||||
|
case unsub:
|
||||||
|
reg.remove(topic, cmd.ch)
|
||||||
|
|
||||||
|
case closeTopic:
|
||||||
|
reg.removeTopic(topic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for topic, chans := range reg.topics {
|
||||||
|
for ch := range chans {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// registry maintains the current subscription state. It's not
|
||||||
|
// safe to access a registry from multiple goroutines simultaneously.
|
||||||
|
type registry struct {
|
||||||
|
topics map[string]map[chan interface{}]bool
|
||||||
|
revTopics map[chan interface{}]map[string]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) add(topic string, ch chan interface{}, once bool) {
|
||||||
|
if reg.topics[topic] == nil {
|
||||||
|
reg.topics[topic] = make(map[chan interface{}]bool)
|
||||||
|
}
|
||||||
|
reg.topics[topic][ch] = once
|
||||||
|
|
||||||
|
if reg.revTopics[ch] == nil {
|
||||||
|
reg.revTopics[ch] = make(map[string]bool)
|
||||||
|
}
|
||||||
|
reg.revTopics[ch][topic] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) send(topic string, msg interface{}) {
|
||||||
|
for ch, once := range reg.topics[topic] {
|
||||||
|
ch <- msg
|
||||||
|
if once {
|
||||||
|
for topic := range reg.revTopics[ch] {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) sendNoWait(topic string, msg interface{}) {
|
||||||
|
for ch, once := range reg.topics[topic] {
|
||||||
|
select {
|
||||||
|
case ch <- msg:
|
||||||
|
if once {
|
||||||
|
for topic := range reg.revTopics[ch] {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) removeTopic(topic string) {
|
||||||
|
for ch := range reg.topics[topic] {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) removeChannel(ch chan interface{}) {
|
||||||
|
for topic := range reg.revTopics[ch] {
|
||||||
|
reg.remove(topic, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reg *registry) remove(topic string, ch chan interface{}) {
|
||||||
|
if _, ok := reg.topics[topic]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := reg.topics[topic][ch]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(reg.topics[topic], ch)
|
||||||
|
delete(reg.revTopics[ch], topic)
|
||||||
|
|
||||||
|
if len(reg.topics[topic]) == 0 {
|
||||||
|
delete(reg.topics, topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reg.revTopics[ch]) == 0 {
|
||||||
|
close(ch)
|
||||||
|
delete(reg.revTopics, ch)
|
||||||
|
}
|
||||||
|
}
|
7
internal/rpc/message.go
Normal file
7
internal/rpc/message.go
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
type RPCMessage func() ([]string, string, []interface{})
|
||||||
|
|
||||||
|
func MakeRPCMessage(targets []string, method string, params []interface{}) RPCMessage {
|
||||||
|
return func() ([]string, string, []interface{}) { return targets, method, params }
|
||||||
|
}
|
|
@ -1,12 +1,21 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.loafle.net/overflow_scanner/probe/internal/pubsub"
|
||||||
|
"git.loafle.net/overflow_scanner/probe/internal/rpc"
|
||||||
|
|
||||||
olog "git.loafle.net/overflow/log-go"
|
olog "git.loafle.net/overflow/log-go"
|
||||||
orp "git.loafle.net/overflow/rpc-go/protocol"
|
orp "git.loafle.net/overflow/rpc-go/protocol"
|
||||||
orr "git.loafle.net/overflow/rpc-go/registry"
|
orr "git.loafle.net/overflow/rpc-go/registry"
|
||||||
"git.loafle.net/overflow/server-go"
|
"git.loafle.net/overflow/server-go"
|
||||||
oss "git.loafle.net/overflow/server-go/socket"
|
oss "git.loafle.net/overflow/server-go/socket"
|
||||||
ossw "git.loafle.net/overflow/server-go/socket/web"
|
ossw "git.loafle.net/overflow/server-go/socket/web"
|
||||||
|
"git.loafle.net/overflow_scanner/probe"
|
||||||
|
uuid "github.com/satori/go.uuid"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -19,6 +28,11 @@ type ScannerServlets struct {
|
||||||
|
|
||||||
RPCInvoker orr.RPCInvoker
|
RPCInvoker orr.RPCInvoker
|
||||||
RPCServerCodec orp.ServerCodec
|
RPCServerCodec orp.ServerCodec
|
||||||
|
PubSub *pubsub.PubSub
|
||||||
|
|
||||||
|
sessions sync.Map
|
||||||
|
|
||||||
|
subscribeChan chan interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ScannerServlets) Init(serverCtx server.ServerCtx) error {
|
func (s *ScannerServlets) Init(serverCtx server.ServerCtx) error {
|
||||||
|
@ -34,11 +48,16 @@ func (s *ScannerServlets) OnStart(serverCtx server.ServerCtx) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.subscribeChan = s.PubSub.Sub("/scanner")
|
||||||
|
go s.handleSubscribe(serverCtx)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ScannerServlets) OnStop(serverCtx server.ServerCtx) {
|
func (s *ScannerServlets) OnStop(serverCtx server.ServerCtx) {
|
||||||
|
|
||||||
|
s.PubSub.Unsub(s.subscribeChan, "/scanner")
|
||||||
|
|
||||||
s.Servlets.OnStop(serverCtx)
|
s.Servlets.OnStop(serverCtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,6 +67,15 @@ func (s *ScannerServlets) Destroy(serverCtx server.ServerCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ScannerServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) {
|
func (s *ScannerServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) {
|
||||||
|
requesterID := string(ctx.QueryArgs().Peek("requesterID"))
|
||||||
|
if "" == requesterID {
|
||||||
|
return nil, fmt.Errorf("requesterID is not valid")
|
||||||
|
}
|
||||||
|
|
||||||
|
sessionID := uuid.NewV4().String()
|
||||||
|
|
||||||
|
servletCtx.SetAttribute(probe.SessionTargetIDKey, requesterID)
|
||||||
|
servletCtx.SetAttribute(probe.SessionIDKey, sessionID)
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -55,11 +83,25 @@ func (s *ScannerServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.
|
||||||
func (s *ScannerServlets) OnConnect(servletCtx server.ServletCtx, conn oss.Conn) {
|
func (s *ScannerServlets) OnConnect(servletCtx server.ServletCtx, conn oss.Conn) {
|
||||||
s.Servlets.OnConnect(servletCtx, conn)
|
s.Servlets.OnConnect(servletCtx, conn)
|
||||||
|
|
||||||
|
sessionID := servletCtx.GetAttribute(probe.SessionIDKey)
|
||||||
|
targetID := servletCtx.GetAttribute(probe.SessionTargetIDKey)
|
||||||
|
|
||||||
|
if nil != sessionID && nil != targetID {
|
||||||
|
s.sessions.Store(sessionID.(string), RetainSession(targetID.(string), servletCtx))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ScannerServlets) OnDisconnect(servletCtx server.ServletCtx) {
|
func (s *ScannerServlets) OnDisconnect(servletCtx server.ServletCtx) {
|
||||||
s.Servlets.OnDisconnect(servletCtx)
|
s.Servlets.OnDisconnect(servletCtx)
|
||||||
|
|
||||||
|
sessionID := servletCtx.GetAttribute(probe.SessionIDKey)
|
||||||
|
if nil != sessionID {
|
||||||
|
session, ok := s.sessions.Load(sessionID)
|
||||||
|
if ok {
|
||||||
|
ReleaseSession(session.(*Session))
|
||||||
|
}
|
||||||
|
s.sessions.Delete(sessionID.(string))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ScannerServlets) Handle(servletCtx server.ServletCtx,
|
func (s *ScannerServlets) Handle(servletCtx server.ServletCtx,
|
||||||
|
@ -76,6 +118,8 @@ func (s *ScannerServlets) Handle(servletCtx server.ServletCtx,
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
|
servletCtx.SetAttribute(probe.SessionWriteChanKey, writeChan)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case socketMessage, ok := <-readChan:
|
case socketMessage, ok := <-readChan:
|
||||||
|
@ -118,6 +162,48 @@ func (s *ScannerServlets) Handle(servletCtx server.ServletCtx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ScannerServlets) handleSubscribe(serverCtx server.ServerCtx) {
|
||||||
|
var sessions []*Session
|
||||||
|
|
||||||
|
LOOP:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg, ok := <-s.subscribeChan:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_msg, ok := msg.(rpc.RPCMessage)
|
||||||
|
if !ok {
|
||||||
|
log.Print("RPCMessage is not valid")
|
||||||
|
continue LOOP
|
||||||
|
}
|
||||||
|
|
||||||
|
targets, method, params := _msg()
|
||||||
|
|
||||||
|
sessions = s.GetSessionsByTargetIDs(targets)
|
||||||
|
|
||||||
|
if nil == sessions || 0 == len(sessions) {
|
||||||
|
continue LOOP
|
||||||
|
}
|
||||||
|
|
||||||
|
messageType, message, err := s.RPCServerCodec.NewNotification(method, params)
|
||||||
|
if nil != err {
|
||||||
|
log.Print("RPCMessage is not valid ", _msg, err)
|
||||||
|
continue LOOP
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, session := range sessions {
|
||||||
|
_writeChan := session.ServletCtx.GetAttribute(probe.SessionWriteChanKey)
|
||||||
|
if nil != _writeChan {
|
||||||
|
writeChan := _writeChan.(chan<- oss.SocketMessage)
|
||||||
|
writeChan <- oss.MakeSocketMessage(messageType, message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *ScannerServlets) writeError(src orp.ServerRequestCodec, writeChan chan<- oss.SocketMessage, code orp.ErrorCode, message string, data interface{}) {
|
func (s *ScannerServlets) writeError(src orp.ServerRequestCodec, writeChan chan<- oss.SocketMessage, code orp.ErrorCode, message string, data interface{}) {
|
||||||
if !src.HasResponse() {
|
if !src.HasResponse() {
|
||||||
return
|
return
|
||||||
|
@ -136,3 +222,42 @@ func (s *ScannerServlets) writeError(src orp.ServerRequestCodec, writeChan chan<
|
||||||
}
|
}
|
||||||
writeChan <- oss.MakeSocketMessage(resMessageType, resMessage)
|
writeChan <- oss.MakeSocketMessage(resMessageType, resMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ScannerServlets) GetSessions(sessionIDs []string) []*Session {
|
||||||
|
var sessions []*Session
|
||||||
|
|
||||||
|
if nil == sessionIDs || 0 == len(sessionIDs) {
|
||||||
|
return sessions
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sessionID := range sessionIDs {
|
||||||
|
session, ok := s.sessions.Load(sessionID)
|
||||||
|
if ok {
|
||||||
|
sessions = append(sessions, session.(*Session))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sessions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ScannerServlets) GetSessionsByTargetIDs(targetIDs []string) []*Session {
|
||||||
|
var sessions []*Session
|
||||||
|
if nil == targetIDs || 0 == len(targetIDs) {
|
||||||
|
return sessions
|
||||||
|
}
|
||||||
|
|
||||||
|
s.sessions.Range(func(k, v interface{}) bool {
|
||||||
|
session := v.(*Session)
|
||||||
|
|
||||||
|
for _, targetID := range targetIDs {
|
||||||
|
if session.TargetID == targetID {
|
||||||
|
sessions = append(sessions, session)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
return sessions
|
||||||
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
olog "git.loafle.net/overflow/log-go"
|
olog "git.loafle.net/overflow/log-go"
|
||||||
"git.loafle.net/overflow/server-go"
|
"git.loafle.net/overflow/server-go"
|
||||||
ossw "git.loafle.net/overflow/server-go/socket/web"
|
ossw "git.loafle.net/overflow/server-go/socket/web"
|
||||||
|
"git.loafle.net/overflow_scanner/probe/internal/pubsub"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -21,7 +22,8 @@ type ServerHandlers struct {
|
||||||
ossw.ServerHandlers
|
ossw.ServerHandlers
|
||||||
|
|
||||||
Services []interface{}
|
Services []interface{}
|
||||||
OrderedServices []reflect.Type
|
|
||||||
|
PubSub *pubsub.PubSub
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServerHandlers) Init(serverCtx server.ServerCtx) error {
|
func (sh *ServerHandlers) Init(serverCtx server.ServerCtx) error {
|
||||||
|
@ -51,6 +53,8 @@ func (sh *ServerHandlers) OnStop(serverCtx server.ServerCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sh.PubSub.Shutdown()
|
||||||
|
|
||||||
sh.ServerHandlers.OnStop(serverCtx)
|
sh.ServerHandlers.OnStop(serverCtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,9 +86,5 @@ func (sh *ServerHandlers) Validate() error {
|
||||||
return fmt.Errorf("Services must be specified")
|
return fmt.Errorf("Services must be specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
if nil == sh.OrderedServices {
|
|
||||||
return fmt.Errorf("OrderedServices must be specified")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"git.loafle.net/overflow_scanner/probe/annotation"
|
"git.loafle.net/overflow_scanner/probe/annotation"
|
||||||
"git.loafle.net/overflow_scanner/probe/discovery"
|
"git.loafle.net/overflow_scanner/probe/discovery"
|
||||||
|
"git.loafle.net/overflow_scanner/probe/internal/pubsub"
|
||||||
|
|
||||||
// For service
|
// For service
|
||||||
_ "git.loafle.net/overflow_scanner/probe/service"
|
_ "git.loafle.net/overflow_scanner/probe/service"
|
||||||
|
@ -16,8 +17,10 @@ import (
|
||||||
|
|
||||||
func New() *ossw.Server {
|
func New() *ossw.Server {
|
||||||
rpcServerCodec := orpj.NewServerCodec()
|
rpcServerCodec := orpj.NewServerCodec()
|
||||||
|
_pubsub := pubsub.New(10)
|
||||||
|
|
||||||
od.RegisterSingletonByName("Discoverer", discovery.Instance())
|
od.RegisterSingletonByName("Discoverer", discovery.Instance())
|
||||||
|
od.RegisterSingletonByName("PubSub", _pubsub)
|
||||||
|
|
||||||
services, err := od.GetInstancesByAnnotationType(annotation.ServiceAnnotationType)
|
services, err := od.GetInstancesByAnnotationType(annotation.ServiceAnnotationType)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
|
@ -30,10 +33,12 @@ func New() *ossw.Server {
|
||||||
ss := &ScannerServlets{}
|
ss := &ScannerServlets{}
|
||||||
ss.RPCInvoker = rpcRegistry
|
ss.RPCInvoker = rpcRegistry
|
||||||
ss.RPCServerCodec = rpcServerCodec
|
ss.RPCServerCodec = rpcServerCodec
|
||||||
|
ss.PubSub = _pubsub
|
||||||
|
|
||||||
sh := &ServerHandlers{}
|
sh := &ServerHandlers{}
|
||||||
sh.Name = "Probe"
|
sh.Name = "Probe"
|
||||||
sh.Services = services
|
sh.Services = services
|
||||||
|
sh.PubSub = _pubsub
|
||||||
|
|
||||||
sh.RegisterServlet("/scanner", ss)
|
sh.RegisterServlet("/scanner", ss)
|
||||||
|
|
||||||
|
|
36
server/session.go
Normal file
36
server/session.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
server "git.loafle.net/overflow/server-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Session struct {
|
||||||
|
TargetID string
|
||||||
|
ServletCtx server.ServletCtx
|
||||||
|
}
|
||||||
|
|
||||||
|
var sessionPool sync.Pool
|
||||||
|
|
||||||
|
func RetainSession(targetID string, servletCtx server.ServletCtx) *Session {
|
||||||
|
v := sessionPool.Get()
|
||||||
|
var _session *Session
|
||||||
|
if v == nil {
|
||||||
|
_session = &Session{}
|
||||||
|
} else {
|
||||||
|
_session = v.(*Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
_session.TargetID = targetID
|
||||||
|
_session.ServletCtx = servletCtx
|
||||||
|
|
||||||
|
return _session
|
||||||
|
}
|
||||||
|
|
||||||
|
func ReleaseSession(_session *Session) {
|
||||||
|
_session.TargetID = ""
|
||||||
|
_session.ServletCtx = nil
|
||||||
|
|
||||||
|
sessionPool.Put(_session)
|
||||||
|
}
|
|
@ -4,6 +4,9 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
|
"git.loafle.net/overflow_scanner/probe/internal/pubsub"
|
||||||
|
"git.loafle.net/overflow_scanner/probe/internal/rpc"
|
||||||
|
|
||||||
oa "git.loafle.net/overflow/annotation-go"
|
oa "git.loafle.net/overflow/annotation-go"
|
||||||
od "git.loafle.net/overflow/di-go"
|
od "git.loafle.net/overflow/di-go"
|
||||||
omd "git.loafle.net/overflow/model/discovery"
|
omd "git.loafle.net/overflow/model/discovery"
|
||||||
|
@ -21,6 +24,7 @@ type DiscoveryService struct {
|
||||||
oa.TypeAnnotation `annotation:"@Injectable('name': 'DiscoveryService') @Service()"`
|
oa.TypeAnnotation `annotation:"@Injectable('name': 'DiscoveryService') @Service()"`
|
||||||
|
|
||||||
Discoverer discovery.Discoverer `annotation:"@Inject('name': 'Discoverer')"`
|
Discoverer discovery.Discoverer `annotation:"@Inject('name': 'Discoverer')"`
|
||||||
|
PubSub *pubsub.PubSub `annotation:"@Inject('name': 'PubSub')"`
|
||||||
|
|
||||||
_InitService oa.MethodAnnotation `annotation:"@PostConstruct()"`
|
_InitService oa.MethodAnnotation `annotation:"@PostConstruct()"`
|
||||||
_DestroyService oa.MethodAnnotation `annotation:"@PreDestroy()"`
|
_DestroyService oa.MethodAnnotation `annotation:"@PreDestroy()"`
|
||||||
|
@ -35,26 +39,99 @@ func (s *DiscoveryService) InitService() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
switch msg.Type() {
|
request, messageType, data, err := msg()
|
||||||
|
|
||||||
|
switch messageType {
|
||||||
case types.DiscoveryMessageTypeStart:
|
case types.DiscoveryMessageTypeStart:
|
||||||
log.Print("Start ", msg.Data())
|
s.PubSub.Pub(
|
||||||
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.DiscoveryStart",
|
||||||
|
[]interface{}{data},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("Start ", data)
|
||||||
case types.DiscoveryMessageTypeStop:
|
case types.DiscoveryMessageTypeStop:
|
||||||
log.Print("Stop ", msg.Data())
|
s.PubSub.Pub(
|
||||||
return
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.DiscoveryStop",
|
||||||
|
[]interface{}{data},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("Stop ", data)
|
||||||
case types.DiscoveryMessageTypeQueueing:
|
case types.DiscoveryMessageTypeQueueing:
|
||||||
log.Print("Queueing ", msg.Data())
|
s.PubSub.Pub(
|
||||||
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.Queueing",
|
||||||
|
[]interface{}{data},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("Queueing ", data)
|
||||||
case types.DiscoveryMessageTypeFailedQueueing:
|
case types.DiscoveryMessageTypeFailedQueueing:
|
||||||
log.Print("FailedQueueing ", msg.Data())
|
s.PubSub.Pub(
|
||||||
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.FailedQueueing",
|
||||||
|
[]interface{}{data},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("FailedQueueing ", data)
|
||||||
case types.DiscoveryMessageTypeTimeout:
|
case types.DiscoveryMessageTypeTimeout:
|
||||||
log.Print("Timeout ", msg.Data())
|
s.PubSub.Pub(
|
||||||
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.Timeout",
|
||||||
|
[]interface{}{data},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("Timeout ", data)
|
||||||
case types.DiscoveryMessageTypeError:
|
case types.DiscoveryMessageTypeError:
|
||||||
log.Print("Error ", msg.Error())
|
s.PubSub.Pub(
|
||||||
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.Error",
|
||||||
|
[]interface{}{err},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("Error ", err)
|
||||||
case types.DiscoveryMessageTypeHost:
|
case types.DiscoveryMessageTypeHost:
|
||||||
log.Print("Host ", msg.Data())
|
s.PubSub.Pub(
|
||||||
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.DiscoveredHost",
|
||||||
|
[]interface{}{data},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("Host ", data)
|
||||||
case types.DiscoveryMessageTypePort:
|
case types.DiscoveryMessageTypePort:
|
||||||
log.Print("Port ", msg.Data())
|
s.PubSub.Pub(
|
||||||
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.DiscoveredPort",
|
||||||
|
[]interface{}{data},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("Port ", data)
|
||||||
case types.DiscoveryMessageTypeService:
|
case types.DiscoveryMessageTypeService:
|
||||||
log.Print("Service ", msg.Data())
|
s.PubSub.Pub(
|
||||||
|
rpc.MakeRPCMessage(
|
||||||
|
[]string{request.RequesterID()},
|
||||||
|
"DiscoveryService.DiscoveredService",
|
||||||
|
[]interface{}{data},
|
||||||
|
),
|
||||||
|
"/scanner",
|
||||||
|
)
|
||||||
|
log.Print("Service ", data)
|
||||||
default:
|
default:
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user