diff --git a/servlet/webapp-servlet.go b/servlet/webapp-servlet.go index 959d5ae..409913f 100644 --- a/servlet/webapp-servlet.go +++ b/servlet/webapp-servlet.go @@ -13,7 +13,6 @@ import ( ogrs "git.loafle.net/overflow/gateway_rpc/servlet" "git.loafle.net/overflow/member_gateway_rpc/subscribe" - "github.com/dgrijalva/jwt-go" "github.com/satori/go.uuid" "github.com/valyala/fasthttp" @@ -119,58 +118,74 @@ func (s *WebappServlets) OnDisconnect(servletCtx server.ServletCtx) { } func (s *WebappServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) { + var sessions []*ogrs.Session + +LOOP: for { select { case msg, ok := <-subscribeChan: if !ok { return } + switch msg.TargetType { case ogs.MEMBER: - for _, targetID := range msg.Targets { - _sessions := s.getMemberSessions(targetID) - if nil == _sessions || 0 == len(_sessions) { - break - } - - for _, _session := range _sessions { - _writeChan := _session.ServletCtx.GetAttribute(og.SessionWriteChanKey) - if nil != _writeChan { - writeChan := _writeChan.(chan<- []byte) - writeChan <- msg.Message - } - } - } - + sessions = s.getMemberSessionsByTargetIDs(msg.Targets) case ogs.MEMBER_SESSION: - for _, sessionID := range msg.Targets { - _session, ok := s.sessions.Load(sessionID) - if !ok { - logging.Logger().Debugf("Client[%s] is not exist", sessionID) - break - } - session := _session.(*ogrs.Session) - _writeChan := session.ServletCtx.GetAttribute(og.SessionWriteChanKey) - if nil != _writeChan { - writeChan := _writeChan.(chan<- []byte) - writeChan <- msg.Message - } - } + sessions = s.getMemberSessions(msg.Targets) default: logging.Logger().Warnf("Subscriber: Unknown TargetType %s", msg.TargetType) + continue LOOP + } + + if nil == sessions || 0 == len(sessions) { + continue LOOP + } + + for _, session := range sessions { + _writeChan := session.ServletCtx.GetAttribute(og.SessionWriteChanKey) + if nil != _writeChan { + writeChan := _writeChan.(chan<- []byte) + writeChan <- *msg.Message + } } } } } -func (s *WebappServlets) getMemberSessions(targetID string) []*ogrs.Session { +func (s *WebappServlets) getMemberSessions(sessionIDs []string) []*ogrs.Session { var sessions []*ogrs.Session + if nil == sessionIDs || 0 == len(sessionIDs) { + return sessions + } + + for _, sessionID := range sessionIDs { + session, ok := s.sessions.Load(sessionID) + if ok { + sessions = append(sessions, session.(*ogrs.Session)) + } + } + + return sessions +} + +func (s *WebappServlets) getMemberSessionsByTargetIDs(targetIDs []string) []*ogrs.Session { + var sessions []*ogrs.Session + if nil == targetIDs || 0 == len(targetIDs) { + return sessions + } + s.sessions.Range(func(k, v interface{}) bool { session := v.(*ogrs.Session) - if session.TargetID == targetID { - sessions = append(sessions, session) + + for _, targetID := range targetIDs { + if session.TargetID == targetID { + sessions = append(sessions, session) + break + } } + return true })