diff --git a/redis/subscriber.go b/redis/subscriber.go index 75a8ab3..ceff03d 100644 --- a/redis/subscriber.go +++ b/redis/subscriber.go @@ -2,7 +2,6 @@ package redis import ( "context" - "log" channelUtil "git.loafle.net/commons_go/util/channel" ofSubscriber "git.loafle.net/overflow/overflow_subscriber" @@ -25,6 +24,7 @@ type subscriber struct { subListeners map[string]ofSubscriber.OnSubscribeFunc isListenSubscriptions bool subCh chan subscribeChannelAction + isRunning bool } func New(ctx context.Context, conn redis.Conn) Subscriber { @@ -38,6 +38,8 @@ func New(ctx context.Context, conn redis.Conn) Subscriber { go n.listen() + n.isRunning = true + return n } @@ -48,9 +50,7 @@ func (n *subscriber) listen() { switch sa.Type { case channelUtil.ActionTypeCreate: _, ok := n.subListeners[sa.channel] - if ok { - log.Fatalf("Subscriber: Subscription of channel[%s] is already exist", sa.channel) - } else { + if !ok { n.subListeners[sa.channel] = sa.cb n.conn.Subscribe(sa.channel) n.listenSubscriptions() @@ -61,19 +61,21 @@ func (n *subscriber) listen() { if ok { n.conn.Unsubscribe(sa.channel) delete(n.subListeners, sa.channel) - } else { - log.Fatalf("Subscriber: Subscription of channel[%s] is not exist", sa.channel) } break } case <-n.ctx.Done(): - log.Println("redis subscriber: Context Done") - n.conn.Close() + n.destroy() return } } } +func (n *subscriber) destroy() { + n.isRunning = false + n.conn.Close() +} + func (n *subscriber) listenSubscriptions() { if n.isListenSubscriptions { return @@ -86,10 +88,11 @@ func (n *subscriber) listenSubscriptions() { if cb, ok := n.subListeners[v.Channel]; ok { cb(v.Channel, string(v.Data)) } + break case redis.Subscription: - log.Printf("subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count) + break case error: - log.Println("error pub/sub, delivery has stopped") + n.destroy() return default: }