gateway/subscribe/redis/redis.go
crusader cf875b8b32 ing
2018-04-09 20:37:54 +09:00

136 lines
2.7 KiB
Go

package redis
import (
"encoding/json"
"fmt"
"sync"
logging "git.loafle.net/commons/logging-go"
"git.loafle.net/overflow/gateway/subscribe"
"github.com/gomodule/redigo/redis"
)
func New(conn redis.Conn) subscribe.Subscriber {
return &Subscribers{
Conn: conn,
}
}
type Subscribers struct {
Conn redis.Conn
pubSubConn *redis.PubSubConn
subscriptions map[string]chan *subscribe.Message
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (s *Subscribers) Start() error {
if s.stopChan != nil {
return fmt.Errorf("Subscriber: already running. Stop it before starting it again")
}
if nil == s.Conn {
return fmt.Errorf("Subscriber: Conn is nil")
}
if nil == s.subscriptions {
s.subscriptions = make(map[string]chan *subscribe.Message)
}
s.pubSubConn = &redis.PubSubConn{Conn: s.Conn}
s.stopChan = make(chan struct{})
s.stopWg.Add(1)
go s.handleSubscriber()
return nil
}
func (s *Subscribers) Stop() error {
if s.stopChan == nil {
return fmt.Errorf("Subscriber: must be started before stopping it")
}
close(s.stopChan)
s.stopWg.Wait()
s.stopChan = nil
return nil
}
func (s *Subscribers) Subscribe(channel string) (chan<- *subscribe.Message, error) {
if _, ok := s.subscriptions[channel]; ok {
return nil, subscribe.ChannelExistError{Channel: channel}
}
if err := s.pubSubConn.Subscribe(channel); nil != err {
return nil, err
}
msgChan := make(chan *subscribe.Message)
s.subscriptions[channel] = msgChan
return msgChan, nil
}
func (s *Subscribers) Unsubscribe(channel string) error {
msgChan, ok := s.subscriptions[channel]
if !ok {
return subscribe.ChannelIsNotExistError{Channel: channel}
}
delete(s.subscriptions, channel)
close(msgChan)
return s.pubSubConn.Unsubscribe(channel)
}
func (s *Subscribers) handleSubscriber() {
var (
receive interface{}
)
defer func() {
if nil != s.subscriptions {
for _, msgChan := range s.subscriptions {
close(msgChan)
}
}
if nil != s.pubSubConn {
s.pubSubConn.Close()
}
s.stopWg.Done()
}()
receiveChan := make(chan interface{})
for {
go func() {
receive = s.pubSubConn.Receive()
receiveChan <- receive
}()
select {
case msg := <-receiveChan:
switch v := msg.(type) {
case redis.Message:
msgChan, ok := s.subscriptions[v.Channel]
if !ok {
logging.Logger().Warnf("Subscriber: Channel[%s] is not exist", v.Channel)
break
}
message := &subscribe.Message{}
if err := json.Unmarshal(v.Data, message); nil != err {
logging.Logger().Errorf("Subscriber: Cannot unmarshal data[%s] of Channel[%s] %v", string(v.Data), v.Channel, err)
break
}
msgChan <- message
case redis.Subscription:
case error:
}
case <-s.stopChan:
return
}
}
}