This commit is contained in:
crusader 2017-11-09 18:39:21 +09:00
parent 90b8d44773
commit 16c58bc833
4 changed files with 93 additions and 67 deletions

View File

@ -1,85 +1,133 @@
package redis package redis
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
uch "git.loafle.net/commons_go/util/channel" cuc "git.loafle.net/commons_go/util/channel"
ofs "git.loafle.net/overflow/overflow_subscriber" oos "git.loafle.net/overflow/overflow_subscriber"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
) )
type channelAction struct { type channelAction struct {
uch.Action cuc.Action
h ofs.SubscriberHandler h oos.SubscriberHandler
} }
type subscribers struct { type subscribers struct {
ctx context.Context
conn redis.PubSubConn conn redis.PubSubConn
subHandlers map[string]ofs.SubscriberHandler subHandlers map[string]oos.SubscriberHandler
isListen bool isListen bool
subCh chan channelAction subCh chan channelAction
isRunning bool
stopChan chan struct{}
stopWg sync.WaitGroup
} }
func New(ctx context.Context, conn redis.Conn) ofs.Subscriber { func New(conn redis.Conn) oos.Subscriber {
s := &subscribers{ s := &subscribers{
ctx: ctx,
subHandlers: make(map[string]ofs.SubscriberHandler),
isListen: false, isListen: false,
subCh: make(chan channelAction),
} }
s.conn = redis.PubSubConn{Conn: conn} s.conn = redis.PubSubConn{Conn: conn}
go s.listen()
s.isRunning = true
return s return s
} }
func (s *subscribers) listen() { func (s *subscribers) Start() error {
if s.stopChan != nil {
panic("Redis Subscriber: subscriber is already running. Stop it before starting it again")
}
s.stopChan = make(chan struct{})
s.subHandlers = make(map[string]oos.SubscriberHandler)
s.subCh = make(chan channelAction)
s.stopWg.Add(1)
go handleSubscriber(s)
return nil
}
func (s *subscribers) Stop() {
if s.stopChan == nil {
panic("Redis Subscriber: subscriber must be started before stopping it")
}
close(s.stopChan)
s.stopWg.Wait()
s.stopChan = nil
}
func (s *subscribers) Subscribe(h oos.SubscriberHandler) error {
if _, ok := s.subHandlers[h.GetChannel()]; ok {
return oos.ChannelExistError{Channel: h.GetChannel()}
}
ca := channelAction{
h: h,
}
ca.Type = cuc.ActionTypeCreate
s.subCh <- ca
return nil
}
func (s *subscribers) Unsubscribe(h oos.SubscriberHandler) error {
if _, ok := s.subHandlers[h.GetChannel()]; !ok {
return oos.ChannelIsNotExistError{Channel: h.GetChannel()}
}
ca := channelAction{
h: h,
}
ca.Type = cuc.ActionTypeDelete
s.subCh <- ca
return nil
}
func handleSubscriber(s *subscribers) {
defer s.stopWg.Done()
for { for {
select { select {
case ca := <-s.subCh: case ca := <-s.subCh:
switch ca.Type { switch ca.Type {
case uch.ActionTypeCreate: case cuc.ActionTypeCreate:
s.subHandlers[ca.h.GetChannel()] = ca.h s.subHandlers[ca.h.GetChannel()] = ca.h
s.conn.Subscribe(ca.h.GetChannel()) s.conn.Subscribe(ca.h.GetChannel())
s.listenSubscriptions() listenSubscriptions(s)
break break
case uch.ActionTypeDelete: case cuc.ActionTypeDelete:
s.conn.Unsubscribe(ca.h.GetChannel()) s.conn.Unsubscribe(ca.h.GetChannel())
delete(s.subHandlers, ca.h.GetChannel()) delete(s.subHandlers, ca.h.GetChannel())
break break
} }
case <-s.ctx.Done(): case <-s.stopChan:
s.destroy() s.conn.Close()
return return
} }
} }
} }
func (s *subscribers) destroy() { func listenSubscriptions(s *subscribers) {
s.isRunning = false
s.conn.Close()
}
func (s *subscribers) listenSubscriptions() {
if s.isListen { if s.isListen {
return return
} }
s.stopWg.Add(1)
go func() { go func() {
defer s.stopWg.Done()
for { for {
switch v := s.conn.Receive().(type) { switch v := s.conn.Receive().(type) {
case redis.Message: case redis.Message:
if h, ok := s.subHandlers[v.Channel]; ok { if h, ok := s.subHandlers[v.Channel]; ok {
if message, err := s.unmarshalMessage(v.Data); nil != err { if message, err := unmarshalMessage(v.Data); nil != err {
logging.Logger.Error(fmt.Sprintf("Subscriber Unmarshal error:%v", err)) logging.Logger().Error(fmt.Sprintf("Subscriber Unmarshal error:%v", err))
break break
} else { } else {
h.OnSubscribe(v.Channel, message) h.OnSubscribe(v.Channel, message)
@ -89,7 +137,7 @@ func (s *subscribers) listenSubscriptions() {
case redis.Subscription: case redis.Subscription:
break break
case error: case error:
s.destroy() s.Stop()
return return
default: default:
} }
@ -99,9 +147,9 @@ func (s *subscribers) listenSubscriptions() {
s.isListen = true s.isListen = true
} }
func (s *subscribers) unmarshalMessage(data []byte) (ofs.SubscribeMessage, error) { func unmarshalMessage(data []byte) (oos.SubscribeMessage, error) {
var err error var err error
var message ofs.SubscribeMessage var message oos.SubscribeMessage
if err = json.Unmarshal(data, &message); nil != err { if err = json.Unmarshal(data, &message); nil != err {
return message, err return message, err
} }
@ -112,33 +160,3 @@ func (s *subscribers) unmarshalMessage(data []byte) (ofs.SubscribeMessage, error
return message, nil return message, nil
} }
func (s *subscribers) Subscribe(h ofs.SubscriberHandler) error {
if _, ok := s.subHandlers[h.GetChannel()]; ok {
return ofs.ChannelExistError{Channel: h.GetChannel()}
}
ca := channelAction{
h: h,
}
ca.Type = uch.ActionTypeCreate
s.subCh <- ca
return nil
}
func (s *subscribers) Unsubscribe(h ofs.SubscriberHandler) error {
if _, ok := s.subHandlers[h.GetChannel()]; !ok {
return ofs.ChannelIsNotExistError{Channel: h.GetChannel()}
}
ca := channelAction{
h: h,
}
ca.Type = uch.ActionTypeDelete
s.subCh <- ca
return nil
}

View File

@ -21,6 +21,9 @@ func (cinee ChannelIsNotExistError) Error() string {
} }
type Subscriber interface { type Subscriber interface {
Start() error
Stop()
Subscribe(h SubscriberHandler) error Subscribe(h SubscriberHandler) error
Unsubscribe(h SubscriberHandler) error Unsubscribe(h SubscriberHandler) error
} }

View File

@ -3,4 +3,6 @@ package overflow_subscriber
type SubscriberHandler interface { type SubscriberHandler interface {
GetChannel() string GetChannel() string
OnSubscribe(channel string, message SubscribeMessage) OnSubscribe(channel string, message SubscribeMessage)
Validate()
} }

View File

@ -4,8 +4,11 @@ type SubscriberHandlers struct {
Channel string Channel string
} }
func (h *SubscriberHandlers) GetChannel() string { func (sh *SubscriberHandlers) GetChannel() string {
return h.Channel return sh.Channel
} }
func (h *SubscriberHandlers) OnSubscribe(channel string, message SubscribeMessage) { func (sh *SubscriberHandlers) OnSubscribe(channel string, message SubscribeMessage) {
}
func (sh *SubscriberHandlers) Validate() {
} }