socket handler has been changed.

This commit is contained in:
crusader 2017-09-11 11:22:11 +09:00
parent 00d02cfba7
commit df15f27bca
4 changed files with 85 additions and 50 deletions

View File

@ -37,7 +37,7 @@ func (h *webJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []s
zap.Any("params", params),
)
uid := server.GetUID(soc)
uid := server.GetUID(soc.Path(), soc)
md := metadata.Pairs("email", uid)
ctx := metadata.NewOutgoingContext(context.Background(), md)

View File

@ -50,6 +50,7 @@ func (h *serverHandlers) OnConnection(soc ogw.Socket) {
return ofSigningKey, nil
})
path := soc.Path()
var uid string
if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
uid = claims["sub"].(string)
@ -58,13 +59,14 @@ func (h *serverHandlers) OnConnection(soc ogw.Socket) {
return
}
AddSocket(uid, soc)
AddSocket(path, uid, soc)
}
func (h *serverHandlers) OnDisconnected(soc ogw.Socket) {
path := soc.Path()
RemoveSocket(soc)
RemoveSocket(path, soc)
}
func (h *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool {

View File

@ -7,21 +7,50 @@ import (
type socketsChannelAction struct {
uch.Action
uid string
soc ogw.Socket
path string
uid string
soc ogw.Socket
}
type sockets struct {
s map[string]ogw.Socket
sr map[ogw.Socket]string
}
func (s *sockets) addSocket(uid string, soc ogw.Socket) {
s.s[uid] = soc
s.sr[soc] = uid
}
func (s *sockets) removeSocket(soc ogw.Socket) {
var uid string
var ok bool
if uid, ok = s.sr[soc]; !ok {
return
}
delete(s.s, uid)
delete(s.sr, soc)
}
func (s *sockets) getSocket(uid string) ogw.Socket {
return s.s[uid]
}
func (s *sockets) getUID(soc ogw.Socket) string {
return s.sr[soc]
}
type SocketManager interface {
AddSocket(uid string, soc ogw.Socket)
RemoveSocket(soc ogw.Socket)
GetSocket(uid string) ogw.Socket
GetUID(soc ogw.Socket) string
AddSocket(path string, uid string, soc ogw.Socket)
RemoveSocket(path string, soc ogw.Socket)
GetSocket(path string, uid string) ogw.Socket
GetUID(path string, soc ogw.Socket) string
}
type socketManager struct {
sockets map[string]ogw.Socket
socketsR map[ogw.Socket]string
socketsCh chan socketsChannelAction
sMap map[string]*sockets
sCh chan socketsChannelAction
}
var _m SocketManager
@ -32,9 +61,8 @@ func init() {
func New() SocketManager {
m := &socketManager{
sockets: make(map[string]ogw.Socket),
socketsR: make(map[ogw.Socket]string),
socketsCh: make(chan socketsChannelAction),
sMap: make(map[string]*sockets),
sCh: make(chan socketsChannelAction),
}
go m.listenChannel()
@ -42,68 +70,72 @@ func New() SocketManager {
return m
}
func AddSocket(uid string, soc ogw.Socket) { _m.AddSocket(uid, soc) }
func (m *socketManager) AddSocket(uid string, soc ogw.Socket) {
func AddSocket(path string, uid string, soc ogw.Socket) { _m.AddSocket(path, uid, soc) }
func (m *socketManager) AddSocket(path string, uid string, soc ogw.Socket) {
ca := socketsChannelAction{
uid: uid,
soc: soc,
path: path,
uid: uid,
soc: soc,
}
ca.Type = uch.ActionTypeCreate
m.socketsCh <- ca
m.sCh <- ca
}
func RemoveSocket(soc ogw.Socket) { _m.RemoveSocket(soc) }
func (m *socketManager) RemoveSocket(soc ogw.Socket) {
func RemoveSocket(path string, soc ogw.Socket) { _m.RemoveSocket(path, soc) }
func (m *socketManager) RemoveSocket(path string, soc ogw.Socket) {
ca := socketsChannelAction{
soc: soc,
path: path,
soc: soc,
}
ca.Type = uch.ActionTypeDelete
m.socketsCh <- ca
m.sCh <- ca
}
func GetSocket(uid string) ogw.Socket { return _m.GetSocket(uid) }
func (m *socketManager) GetSocket(uid string) ogw.Socket {
func GetSocket(path string, uid string) ogw.Socket { return _m.GetSocket(path, uid) }
func (m *socketManager) GetSocket(path string, uid string) ogw.Socket {
var s *sockets
var ok bool
var soc ogw.Socket
if soc, ok = m.sockets[uid]; !ok {
if s, ok = m.sMap[path]; !ok {
return nil
}
return soc
return s.getSocket(uid)
}
func GetUID(soc ogw.Socket) string { return _m.GetUID(soc) }
func (m *socketManager) GetUID(soc ogw.Socket) string {
func GetUID(path string, soc ogw.Socket) string { return _m.GetUID(path, soc) }
func (m *socketManager) GetUID(path string, soc ogw.Socket) string {
var s *sockets
var ok bool
var uid string
if uid, ok = m.socketsR[soc]; !ok {
if s, ok = m.sMap[path]; !ok {
return ""
}
return uid
}
func (m *socketManager) _removeSocket(ca *socketsChannelAction) {
var uid string
var ok bool
soc := ca.soc
if uid, ok = m.socketsR[soc]; !ok {
return
}
delete(m.sockets, uid)
delete(m.socketsR, soc)
return s.getUID(soc)
}
func (m *socketManager) listenChannel() {
for {
select {
case ca := <-m.socketsCh:
case ca := <-m.sCh:
switch ca.Type {
case uch.ActionTypeCreate:
m.sockets[ca.uid] = ca.soc
m.socketsR[ca.soc] = ca.uid
var s *sockets
var ok bool
if s, ok = m.sMap[ca.path]; !ok {
s = &sockets{
s: make(map[string]ogw.Socket),
sr: make(map[ogw.Socket]string),
}
m.sMap[ca.path] = s
}
s.addSocket(ca.uid, ca.soc)
break
case uch.ActionTypeDelete:
m._removeSocket(&ca)
var s *sockets
var ok bool
if s, ok = m.sMap[ca.path]; !ok {
return
}
s.removeSocket(ca.soc)
break
}
}

View File

@ -27,7 +27,8 @@ type webSubscriberHandlers struct {
func (h *webSubscriberHandlers) OnSubscribe(payload string) {
uid := "kdkdkd"
soc := server.GetSocket(uid)
channel := "web"
soc := server.GetSocket(channel, uid)
soc.Write([]byte(payload))
}