package redis import ( "encoding/json" "fmt" "sync" logging "git.loafle.net/commons/logging-go" "git.loafle.net/overflow/gateway/subscribe" "github.com/gomodule/redigo/redis" ) type Subscribers struct { Conn redis.Conn pubSubConn *redis.PubSubConn subscriptions map[string]chan *subscribe.Message stopChan chan struct{} stopWg sync.WaitGroup } func (s *Subscribers) Start() error { if s.stopChan != nil { return fmt.Errorf("Subscriber: already running. Stop it before starting it again") } if nil == s.Conn { return fmt.Errorf("Subscriber: Conn is nil") } if nil == s.subscriptions { s.subscriptions = make(map[string]chan *subscribe.Message) } s.pubSubConn = &redis.PubSubConn{Conn: s.Conn} s.stopChan = make(chan struct{}) s.stopWg.Add(1) go s.handleSubscriber() return nil } func (s *Subscribers) Stop() error { if s.stopChan == nil { return fmt.Errorf("Subscriber: must be started before stopping it") } close(s.stopChan) s.stopWg.Wait() s.stopChan = nil return nil } func (s *Subscribers) Subscribe(channel string) (<-chan *subscribe.Message, error) { if _, ok := s.subscriptions[channel]; ok { return nil, subscribe.ChannelExistError{Channel: channel} } if err := s.pubSubConn.Subscribe(channel); nil != err { return nil, err } msgChan := make(chan *subscribe.Message) s.subscriptions[channel] = msgChan return msgChan, nil } func (s *Subscribers) Unsubscribe(channel string) error { msgChan, ok := s.subscriptions[channel] if !ok { return subscribe.ChannelIsNotExistError{Channel: channel} } delete(s.subscriptions, channel) close(msgChan) return s.pubSubConn.Unsubscribe(channel) } func (s *Subscribers) handleSubscriber() { var ( receive interface{} ) defer func() { if nil != s.subscriptions { for _, msgChan := range s.subscriptions { close(msgChan) } } if nil != s.pubSubConn { s.pubSubConn.Close() } s.stopWg.Done() }() receiveChan := make(chan interface{}) for { go func() { receive = s.pubSubConn.Receive() receiveChan <- receive }() select { case msg := <-receiveChan: switch v := msg.(type) { case redis.Message: msgChan, ok := s.subscriptions[v.Channel] if !ok { logging.Logger().Warnf("Subscriber: Channel[%s] is not exist", v.Channel) break } message := &subscribe.Message{} if err := json.Unmarshal(v.Data, message); nil != err { logging.Logger().Errorf("Subscriber: Cannot unmarshal data[%s] of Channel[%s] %v", string(v.Data), v.Channel, err) break } if nil == message.Message { logging.Logger().Errorf("Subscriber: Message is not valid data[%s] of Channel[%s]", string(v.Data), v.Channel) break } msgChan <- message case redis.Subscription: case error: } case <-s.stopChan: return } } }