package redis import ( "encoding/json" "sync" "git.loafle.net/commons_go/logging" cuc "git.loafle.net/commons_go/util/channel" oos "git.loafle.net/overflow/overflow_subscriber" "github.com/garyburd/redigo/redis" ) type channelAction struct { cuc.Action h oos.SubscriberHandler } type subscribers struct { conn redis.PubSubConn subHandlers map[string]oos.SubscriberHandler isListen bool subCh chan channelAction stopChan chan struct{} stopWg sync.WaitGroup } func New(conn redis.Conn) oos.Subscriber { s := &subscribers{ isListen: false, } s.conn = redis.PubSubConn{Conn: conn} return s } func (s *subscribers) Start() error { if s.stopChan != nil { panic("Redis Subscriber: subscriber is already running. Stop it before starting it again") } s.stopChan = make(chan struct{}) s.subHandlers = make(map[string]oos.SubscriberHandler) s.subCh = make(chan channelAction) s.stopWg.Add(1) go handleSubscriber(s) return nil } func (s *subscribers) Stop() { if s.stopChan == nil { panic("Redis Subscriber: subscriber must be started before stopping it") } close(s.stopChan) s.stopWg.Wait() s.stopChan = nil } func (s *subscribers) Subscribe(h oos.SubscriberHandler) error { if _, ok := s.subHandlers[h.GetChannel()]; ok { return oos.ChannelExistError{Channel: h.GetChannel()} } ca := channelAction{ h: h, } ca.Type = cuc.ActionTypeCreate s.subCh <- ca return nil } func (s *subscribers) Unsubscribe(h oos.SubscriberHandler) error { if _, ok := s.subHandlers[h.GetChannel()]; !ok { return oos.ChannelIsNotExistError{Channel: h.GetChannel()} } ca := channelAction{ h: h, } ca.Type = cuc.ActionTypeDelete s.subCh <- ca return nil } func handleSubscriber(s *subscribers) { defer s.stopWg.Done() for { select { case ca := <-s.subCh: switch ca.Type { case cuc.ActionTypeCreate: s.subHandlers[ca.h.GetChannel()] = ca.h s.conn.Subscribe(ca.h.GetChannel()) listenSubscriptions(s) break case cuc.ActionTypeDelete: s.conn.Unsubscribe(ca.h.GetChannel()) delete(s.subHandlers, ca.h.GetChannel()) break } case <-s.stopChan: s.conn.Close() return } } } func listenSubscriptions(s *subscribers) { if s.isListen { return } s.stopWg.Add(1) go func() { defer s.stopWg.Done() for { switch v := s.conn.Receive().(type) { case redis.Message: if h, ok := s.subHandlers[v.Channel]; ok { if message, err := unmarshalMessage(v.Data); nil != err { logging.Logger().Errorf("Subscriber Unmarshal error:%v", err) break } else { h.OnSubscribe(v.Channel, message) } } break case redis.Subscription: break case error: s.Stop() return default: } } }() s.isListen = true } func unmarshalMessage(data []byte) (oos.SubscribeMessage, error) { var err error var message oos.SubscribeMessage if err = json.Unmarshal(data, &message); nil != err { return message, err } if message.Message, err = message.MessageRaw.MarshalJSON(); nil != err { return message, err } return message, nil }