package redis import ( "context" "encoding/json" "fmt" "git.loafle.net/commons_go/logging" uch "git.loafle.net/commons_go/util/channel" ofs "git.loafle.net/overflow/overflow_subscriber" "github.com/garyburd/redigo/redis" ) type channelAction struct { uch.Action h ofs.SubscriberHandler } type subscribers struct { ctx context.Context conn redis.PubSubConn subHandlers map[string]ofs.SubscriberHandler isListen bool subCh chan channelAction isRunning bool } func New(ctx context.Context, conn redis.Conn) ofs.Subscriber { s := &subscribers{ ctx: ctx, subHandlers: make(map[string]ofs.SubscriberHandler), isListen: false, subCh: make(chan channelAction), } s.conn = redis.PubSubConn{Conn: conn} go s.listen() s.isRunning = true return s } func (s *subscribers) listen() { for { select { case ca := <-s.subCh: switch ca.Type { case uch.ActionTypeCreate: s.subHandlers[ca.h.GetChannel()] = ca.h s.conn.Subscribe(ca.h.GetChannel()) s.listenSubscriptions() break case uch.ActionTypeDelete: s.conn.Unsubscribe(ca.h.GetChannel()) delete(s.subHandlers, ca.h.GetChannel()) break } case <-s.ctx.Done(): s.destroy() return } } } func (s *subscribers) destroy() { s.isRunning = false s.conn.Close() } func (s *subscribers) listenSubscriptions() { if s.isListen { return } go func() { for { switch v := s.conn.Receive().(type) { case redis.Message: if h, ok := s.subHandlers[v.Channel]; ok { var message ofs.SubscribeMessage if err := json.Unmarshal(v.Data, &message); nil != err { logging.Logger.Error(fmt.Sprintf("Subscriber: Unmarshal error:%v", err)) break } h.OnSubscribe(v.Channel, message) } break case redis.Subscription: break case error: s.destroy() return default: } } }() s.isListen = true } func (s *subscribers) Subscribe(h ofs.SubscriberHandler) error { if _, ok := s.subHandlers[h.GetChannel()]; ok { return ofs.ChannelExistError{Channel: h.GetChannel()} } ca := channelAction{ h: h, } ca.Type = uch.ActionTypeCreate s.subCh <- ca return nil } func (s *subscribers) Unsubscribe(h ofs.SubscriberHandler) error { if _, ok := s.subHandlers[h.GetChannel()]; !ok { return ofs.ChannelIsNotExistError{Channel: h.GetChannel()} } ca := channelAction{ h: h, } ca.Type = uch.ActionTypeDelete s.subCh <- ca return nil }