This commit is contained in:
crusader 2018-04-13 21:17:58 +09:00
parent cd50707b16
commit c33a743a25
3 changed files with 4 additions and 62 deletions

View File

@ -9,7 +9,9 @@ import (
func New(_config *config.Config) *cssw.Server { func New(_config *config.Config) *cssw.Server {
as := &servlet.AuthServlets{} as := &servlet.AuthServlets{}
as.UseSession = true
ps := &servlet.ProbeServlets{} ps := &servlet.ProbeServlets{}
ps.UseSession = true
ds := &servlet.DataServlets{} ds := &servlet.DataServlets{}
sh := &ServerHandlers{ sh := &ServerHandlers{

View File

@ -5,7 +5,6 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
@ -27,7 +26,6 @@ type AuthServlet interface {
type AuthServlets struct { type AuthServlets struct {
ogrs.RPCServlets ogrs.RPCServlets
sessions sync.Map
} }
func (s *AuthServlets) Init(serverCtx server.ServerCtx) error { func (s *AuthServlets) Init(serverCtx server.ServerCtx) error {
@ -132,21 +130,10 @@ func (s *AuthServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.Req
func (s *AuthServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { func (s *AuthServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
s.RPCServlets.OnConnect(servletCtx, conn) s.RPCServlets.OnConnect(servletCtx, conn)
sessionID := servletCtx.GetAttribute(og.SessionIDKey)
targetID := servletCtx.GetAttribute(og.SessionTargetIDKey)
if nil != sessionID && nil != targetID {
s.sessions.Store(sessionID.(string), ogrs.RetainSession(targetID.(string), servletCtx))
}
} }
func (s *AuthServlets) OnDisconnect(servletCtx server.ServletCtx) { func (s *AuthServlets) OnDisconnect(servletCtx server.ServletCtx) {
s.RPCServlets.OnDisconnect(servletCtx) s.RPCServlets.OnDisconnect(servletCtx)
sessionID := servletCtx.GetAttribute(og.SessionIDKey)
if nil != sessionID {
s.sessions.Delete(sessionID.(string))
}
} }
func (s *AuthServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) { func (s *AuthServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) {
@ -162,7 +149,7 @@ LOOP:
switch msg.TargetType { switch msg.TargetType {
case ogs.PROBE: case ogs.PROBE:
sessions = s.getAuthSessions(msg.Targets) sessions = s.GetSessions(msg.Targets)
default: default:
logging.Logger().Warnf("Subscriber: Unknown TargetType %s", msg.TargetType) logging.Logger().Warnf("Subscriber: Unknown TargetType %s", msg.TargetType)
continue LOOP continue LOOP
@ -181,20 +168,3 @@ LOOP:
} }
} }
} }
func (s *AuthServlets) getAuthSessions(sessionIDs []string) []*ogrs.Session {
var sessions []*ogrs.Session
if nil == sessionIDs || 0 == len(sessionIDs) {
return sessions
}
for _, sessionID := range sessionIDs {
session, ok := s.sessions.Load(sessionID)
if ok {
sessions = append(sessions, session.(*ogrs.Session))
}
}
return sessions
}

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
@ -26,7 +25,6 @@ type ProbeServlet interface {
type ProbeServlets struct { type ProbeServlets struct {
ogrs.RPCServlets ogrs.RPCServlets
sessions sync.Map
} }
func (s *ProbeServlets) Init(serverCtx server.ServerCtx) error { func (s *ProbeServlets) Init(serverCtx server.ServerCtx) error {
@ -109,21 +107,10 @@ func (s *ProbeServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.Re
func (s *ProbeServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { func (s *ProbeServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
s.RPCServlets.OnConnect(servletCtx, conn) s.RPCServlets.OnConnect(servletCtx, conn)
sessionID := servletCtx.GetAttribute(og.SessionIDKey)
targetID := servletCtx.GetAttribute(og.SessionTargetIDKey)
if nil != sessionID && nil != targetID {
s.sessions.Store(sessionID.(string), ogrs.RetainSession(targetID.(string), servletCtx))
}
} }
func (s *ProbeServlets) OnDisconnect(servletCtx server.ServletCtx) { func (s *ProbeServlets) OnDisconnect(servletCtx server.ServletCtx) {
s.RPCServlets.OnDisconnect(servletCtx) s.RPCServlets.OnDisconnect(servletCtx)
sessionID := servletCtx.GetAttribute(og.SessionIDKey)
if nil != sessionID {
s.sessions.Delete(sessionID.(string))
}
} }
func (s *ProbeServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) { func (s *ProbeServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) {
@ -139,7 +126,7 @@ LOOP:
switch msg.TargetType { switch msg.TargetType {
case ogs.PROBE: case ogs.PROBE:
sessions = s.getProbeSessions(msg.Targets) sessions = s.GetSessions(msg.Targets)
default: default:
logging.Logger().Warnf("Subscriber: Unknown TargetType %s", msg.TargetType) logging.Logger().Warnf("Subscriber: Unknown TargetType %s", msg.TargetType)
continue LOOP continue LOOP
@ -160,20 +147,3 @@ LOOP:
} }
} }
} }
func (s *ProbeServlets) getProbeSessions(sessionIDs []string) []*ogrs.Session {
var sessions []*ogrs.Session
if nil == sessionIDs || 0 == len(sessionIDs) {
return sessions
}
for _, sessionID := range sessionIDs {
session, ok := s.sessions.Load(sessionID)
if ok {
sessions = append(sessions, session.(*ogrs.Session))
}
}
return sessions
}