This commit is contained in:
crusader 2017-09-01 15:13:18 +09:00
parent d5249ae7e8
commit efdb4f8699
5 changed files with 160 additions and 127 deletions

View File

@ -1,122 +0,0 @@
package redis
import (
"context"
channelUtil "git.loafle.net/commons_go/util/channel"
ofSubscriber "git.loafle.net/overflow/overflow_subscriber"
"github.com/garyburd/redigo/redis"
)
type subscribeChannelAction struct {
channelUtil.Action
channel string
cb ofSubscriber.OnSubscribeFunc
}
type Subscriber interface {
ofSubscriber.Subscriber
}
type subscriber struct {
ctx context.Context
conn redis.PubSubConn
subListeners map[string]ofSubscriber.OnSubscribeFunc
isListenSubscriptions bool
subCh chan subscribeChannelAction
isRunning bool
}
func New(ctx context.Context, conn redis.Conn) Subscriber {
n := &subscriber{
ctx: ctx,
subListeners: make(map[string]ofSubscriber.OnSubscribeFunc),
isListenSubscriptions: false,
subCh: make(chan subscribeChannelAction),
}
n.conn = redis.PubSubConn{Conn: conn}
go n.listen()
n.isRunning = true
return n
}
func (n *subscriber) listen() {
for {
select {
case sa := <-n.subCh:
switch sa.Type {
case channelUtil.ActionTypeCreate:
_, ok := n.subListeners[sa.channel]
if !ok {
n.subListeners[sa.channel] = sa.cb
n.conn.Subscribe(sa.channel)
n.listenSubscriptions()
}
break
case channelUtil.ActionTypeDelete:
_, ok := n.subListeners[sa.channel]
if ok {
n.conn.Unsubscribe(sa.channel)
delete(n.subListeners, sa.channel)
}
break
}
case <-n.ctx.Done():
n.destroy()
return
}
}
}
func (n *subscriber) destroy() {
n.isRunning = false
n.conn.Close()
}
func (n *subscriber) listenSubscriptions() {
if n.isListenSubscriptions {
return
}
go func() {
for {
switch v := n.conn.Receive().(type) {
case redis.Message:
if cb, ok := n.subListeners[v.Channel]; ok {
cb(v.Channel, string(v.Data))
}
break
case redis.Subscription:
break
case error:
n.destroy()
return
default:
}
}
}()
n.isListenSubscriptions = true
}
func (n *subscriber) Subscribe(channel string, cb ofSubscriber.OnSubscribeFunc) {
ca := subscribeChannelAction{
channel: channel,
cb: cb,
}
ca.Type = channelUtil.ActionTypeCreate
n.subCh <- ca
}
func (n *subscriber) Unsubscribe(channel string) {
ca := subscribeChannelAction{
channel: channel,
}
ca.Type = channelUtil.ActionTypeDelete
n.subCh <- ca
}

122
redis/subscribers.go Normal file
View File

@ -0,0 +1,122 @@
package redis
import (
"context"
uch "git.loafle.net/commons_go/util/channel"
ofs "git.loafle.net/overflow/overflow_subscriber"
"github.com/garyburd/redigo/redis"
)
type channelAction struct {
uch.Action
h ofs.SubscriberHandler
}
type subscribers struct {
ctx context.Context
conn redis.PubSubConn
subHandlers map[string]ofs.SubscriberHandler
isListen bool
subCh chan channelAction
isRunning bool
}
func New(ctx context.Context, conn redis.Conn) ofs.Subscriber {
s := &subscribers{
ctx: ctx,
subHandlers: make(map[string]ofs.SubscriberHandler),
isListen: false,
subCh: make(chan channelAction),
}
s.conn = redis.PubSubConn{Conn: conn}
go s.listen()
s.isRunning = true
return s
}
func (s *subscribers) listen() {
for {
select {
case ca := <-s.subCh:
switch ca.Type {
case uch.ActionTypeCreate:
s.subHandlers[ca.h.GetChannel()] = ca.h
s.conn.Subscribe(ca.h.GetChannel())
s.listenSubscriptions()
break
case uch.ActionTypeDelete:
s.conn.Unsubscribe(ca.h.GetChannel())
delete(s.subHandlers, ca.h.GetChannel())
break
}
case <-s.ctx.Done():
s.destroy()
return
}
}
}
func (s *subscribers) destroy() {
s.isRunning = false
s.conn.Close()
}
func (s *subscribers) listenSubscriptions() {
if s.isListen {
return
}
go func() {
for {
switch v := s.conn.Receive().(type) {
case redis.Message:
if h, ok := s.subHandlers[v.Channel]; ok {
h.OnSubscribe(string(v.Data))
}
break
case redis.Subscription:
break
case error:
s.destroy()
return
default:
}
}
}()
s.isListen = true
}
func (s *subscribers) Subscribe(h ofs.SubscriberHandler) error {
if _, ok := s.subHandlers[h.GetChannel()]; ok {
return ofs.ChannelExistError{Channel: h.GetChannel()}
}
ca := channelAction{
h: h,
}
ca.Type = uch.ActionTypeCreate
s.subCh <- ca
return nil
}
func (s *subscribers) Unsubscribe(h ofs.SubscriberHandler) error {
if _, ok := s.subHandlers[h.GetChannel()]; !ok {
return ofs.ChannelIsNotExistError{Channel: h.GetChannel()}
}
ca := channelAction{
h: h,
}
ca.Type = uch.ActionTypeDelete
s.subCh <- ca
return nil
}

View File

@ -1,10 +1,26 @@
package overflow_subscriber package overflow_subscriber
type ( import "fmt"
OnSubscribeFunc func(channel string, payload string)
) type ChannelExistError struct {
Channel string
}
// Error returns the formatted configuration error.
func (cee ChannelExistError) Error() string {
return fmt.Sprintf("Subscriber: Channel[%q] is already subscribed.", cee.Channel)
}
type ChannelIsNotExistError struct {
Channel string
}
// Error returns the formatted configuration error.
func (cinee ChannelIsNotExistError) Error() string {
return fmt.Sprintf("Subscriber: Channel[%q] is not subscribed.", cinee.Channel)
}
type Subscriber interface { type Subscriber interface {
Subscribe(channel string, cb OnSubscribeFunc) Subscribe(h SubscriberHandler) error
Unsubscribe(channel string) Unsubscribe(h SubscriberHandler) error
} }

6
subscriber_handler.go Normal file
View File

@ -0,0 +1,6 @@
package overflow_subscriber
type SubscriberHandler interface {
GetChannel() string
OnSubscribe(payload string)
}

11
subscriber_handlers.go Normal file
View File

@ -0,0 +1,11 @@
package overflow_subscriber
type SubscriberHandlers struct {
Channel string
}
func (h *SubscriberHandlers) GetChannel() string {
return h.Channel
}
func (h *SubscriberHandlers) OnSubscribe(payload string) {
}