diff --git a/servlet/webapp-servlet.go b/servlet/webapp-servlet.go index 1a05f6b..3533af8 100644 --- a/servlet/webapp-servlet.go +++ b/servlet/webapp-servlet.go @@ -4,10 +4,12 @@ import ( "crypto/rsa" "fmt" "io/ioutil" + "sync" logging "git.loafle.net/commons/logging-go" "git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go/socket" + og "git.loafle.net/overflow/gateway" ogs "git.loafle.net/overflow/gateway/subscribe" ogrs "git.loafle.net/overflow/gateway_rpc/servlet" "git.loafle.net/overflow/member_gateway_rpc/subscribe" @@ -49,6 +51,8 @@ type WebappServlet interface { type WebappServlets struct { ogrs.RPCServlets + + connections sync.Map } func (s *WebappServlets) Init(serverCtx server.ServerCtx) error { @@ -111,53 +115,112 @@ func (s *WebappServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.R userEmail := claims["sub"].(string) sessionID := uuid.NewV4().String() - logging.Logger().Debugf("%s, %s", userEmail, sessionID) - - // servletCtx.SetAttribute(oogw.ClientTypeKey, ogs.MEMBER) - // servletCtx.SetAttribute(oogw.TargetIDKey, userEmail) + servletCtx.SetAttribute(og.SessionIDKey, sessionID) + servletCtx.SetAttribute(og.SessionClientTypeKey, ogs.MEMBER) + servletCtx.SetAttribute(og.SessionTargetIDKey, userEmail) return nil, nil } func (s *WebappServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { - // + s.RPCServlets.OnConnect(servletCtx, conn) + + sessionID := servletCtx.GetAttribute(og.SessionIDKey) + targetID := servletCtx.GetAttribute(og.SessionTargetIDKey) + if nil != sessionID && nil != targetID { + s.connections.Store(sessionID.(string), retainConnection(targetID.(string), servletCtx)) + } } func (s *WebappServlets) OnDisconnect(servletCtx server.ServletCtx) { - // + s.RPCServlets.OnDisconnect(servletCtx) + + sessionID := servletCtx.GetAttribute(og.SessionIDKey) + if nil != sessionID { + s.connections.Delete(sessionID.(string)) + } } func (s *WebappServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) { - // for { - // select { - // case msg, ok := <-subscribeChan: - // switch msg.TargetType { - // case ogs.MEMBER: - // for _, uid := range msg.Targets { - // sockets := sh.getMemberSockets(uid) - // if nil == sockets { - // continue - // } + for { + select { + case msg, ok := <-subscribeChan: + switch msg.TargetType { + case ogs.MEMBER: + for _, targetID := range msg.Targets { + _connections := s.getMemberConnections(targetID) + if nil == _connections || 0 == len(_connections) { + break + } - // for _, soc := range sockets { - // if err := soc.WriteMessageUseWriter(websocket.TextMessage, message.Message); nil != err { - // logging.Logger().Errorf("OnSubscribe: write error %v", err) - // } - // } - // } - - // case oos.MEMBER_SESSION: - // for _, sid := range message.Targets { - // if soc := sh.GetSocket(sid); nil != soc { - // if err := soc.WriteMessageUseWriter(websocket.TextMessage, message.Message); nil != err { - // logging.Logger().Errorf("OnSubscribe: write error %v", err) - // } - // } - // } - // default: - // logging.Logger().Warnf("[WEBAPP] SubscriberHandler: Unknown TargetType %s", message.TargetType) - // } - // } - // } + for _, _connection := range _connections { + _writeChan := _connection.servletCtx.GetAttribute(og.SessionWriterChanKey) + if nil != _writeChan { + writeChan := _writeChan.(chan<- []byte) + writeChan <- msg.Message + } + } + } + case oos.MEMBER_SESSION: + for _, sessionID := range message.Targets { + _connection, ok := s.connections.Load(sessionID) + if !ok { + logging.Logger().Debugf("Client[%s] is not exist", sessionID) + break + } + _writeChan := _connection.servletCtx.GetAttribute(og.SessionWriterChanKey) + if nil != _writeChan { + writeChan := _writeChan.(chan<- []byte) + writeChan <- msg.Message + } + } + default: + logging.Logger().Warnf("Subscriber: Unknown TargetType %s", message.TargetType) + } + } + } +} + +func (s *WebappServlets) getMemberConnections(targetID string) []*connection { + var connections []*connection + + s.connections.Range(func(k, v interface{}) bool { + _connection := v.(*connection) + if _connection.targetID == targetID { + connections = append(connections, _connection) + } + return true + }) + + return connections +} + +type connection struct { + targetID string + servletCtx server.ServletCtx +} + +var connectionPool sync.Pool + +func retainConnection(targetID string, servletCtx server.ServletCtx) *connection { + v := connectionPool.Get() + var _connection *connection + if v == nil { + _connection = &connection{} + } else { + _connection = v.(*connection) + } + + _connection.targetID = targetID + _connection.servletCtx = servletCtx + + return _connection +} + +func releaseConnection(_connection *connection) { + _connection.targetID = "" + _connection.servletCtx = nil + + connectionPool.Put(_connection) }