package redis import ( "context" "log" channelUtil "git.loafle.net/commons_go/util/channel" ofSubscriber "git.loafle.net/overflow/overflow_subscriber" "github.com/garyburd/redigo/redis" ) type subscribeChannelAction struct { channelUtil.Action channel string cb ofSubscriber.OnSubscribeFunc } type Subscriber interface { ofSubscriber.Subscriber } type subscriber struct { ctx context.Context conn redis.PubSubConn subListeners map[string]ofSubscriber.OnSubscribeFunc isListenSubscriptions bool subCh chan subscribeChannelAction } func New(ctx context.Context, conn redis.Conn) Subscriber { n := &subscriber{ ctx: ctx, subListeners: make(map[string]ofSubscriber.OnSubscribeFunc), isListenSubscriptions: false, subCh: make(chan subscribeChannelAction), } n.conn = redis.PubSubConn{Conn: conn} go n.listen() return n } func (n *subscriber) listen() { for { select { case sa := <-n.subCh: switch sa.Type { case channelUtil.ActionTypeCreate: _, ok := n.subListeners[sa.channel] if ok { log.Fatalf("Subscriber: Subscription of channel[%s] is already exist", sa.channel) } else { n.subListeners[sa.channel] = sa.cb n.conn.Subscribe(sa.channel) n.listenSubscriptions() } break case channelUtil.ActionTypeDelete: _, ok := n.subListeners[sa.channel] if ok { n.conn.Unsubscribe(sa.channel) delete(n.subListeners, sa.channel) } else { log.Fatalf("Subscriber: Subscription of channel[%s] is not exist", sa.channel) } break } case <-n.ctx.Done(): log.Println("redis subscriber: Context Done") n.conn.Close() return } } } func (n *subscriber) listenSubscriptions() { if n.isListenSubscriptions { return } go func() { for { switch v := n.conn.Receive().(type) { case redis.Message: if cb, ok := n.subListeners[v.Channel]; ok { cb(v.Channel, string(v.Data)) } case redis.Subscription: log.Printf("subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count) case error: log.Println("error pub/sub, delivery has stopped") return default: } } }() n.isListenSubscriptions = true } func (n *subscriber) Subscribe(channel string, cb ofSubscriber.OnSubscribeFunc) { ca := subscribeChannelAction{ channel: channel, cb: cb, } ca.Type = channelUtil.ActionTypeCreate n.subCh <- ca } func (n *subscriber) Unsubscribe(channel string) { ca := subscribeChannelAction{ channel: channel, } ca.Type = channelUtil.ActionTypeDelete n.subCh <- ca }