From efdb4f869984565c5cba24aad35ec29be63ba1b1 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 1 Sep 2017 15:13:18 +0900 Subject: [PATCH] ing --- redis/subscriber.go | 122 ----------------------------------------- redis/subscribers.go | 122 +++++++++++++++++++++++++++++++++++++++++ subscriber.go | 26 +++++++-- subscriber_handler.go | 6 ++ subscriber_handlers.go | 11 ++++ 5 files changed, 160 insertions(+), 127 deletions(-) delete mode 100644 redis/subscriber.go create mode 100644 redis/subscribers.go create mode 100644 subscriber_handler.go create mode 100644 subscriber_handlers.go diff --git a/redis/subscriber.go b/redis/subscriber.go deleted file mode 100644 index ceff03d..0000000 --- a/redis/subscriber.go +++ /dev/null @@ -1,122 +0,0 @@ -package redis - -import ( - "context" - - channelUtil "git.loafle.net/commons_go/util/channel" - ofSubscriber "git.loafle.net/overflow/overflow_subscriber" - "github.com/garyburd/redigo/redis" -) - -type subscribeChannelAction struct { - channelUtil.Action - channel string - cb ofSubscriber.OnSubscribeFunc -} - -type Subscriber interface { - ofSubscriber.Subscriber -} - -type subscriber struct { - ctx context.Context - conn redis.PubSubConn - subListeners map[string]ofSubscriber.OnSubscribeFunc - isListenSubscriptions bool - subCh chan subscribeChannelAction - isRunning bool -} - -func New(ctx context.Context, conn redis.Conn) Subscriber { - n := &subscriber{ - ctx: ctx, - subListeners: make(map[string]ofSubscriber.OnSubscribeFunc), - isListenSubscriptions: false, - subCh: make(chan subscribeChannelAction), - } - n.conn = redis.PubSubConn{Conn: conn} - - go n.listen() - - n.isRunning = true - - return n -} - -func (n *subscriber) listen() { - for { - select { - case sa := <-n.subCh: - switch sa.Type { - case channelUtil.ActionTypeCreate: - _, ok := n.subListeners[sa.channel] - if !ok { - n.subListeners[sa.channel] = sa.cb - n.conn.Subscribe(sa.channel) - n.listenSubscriptions() - } - break - case channelUtil.ActionTypeDelete: - _, ok := n.subListeners[sa.channel] - if ok { - n.conn.Unsubscribe(sa.channel) - delete(n.subListeners, sa.channel) - } - break - } - case <-n.ctx.Done(): - n.destroy() - return - } - } -} - -func (n *subscriber) destroy() { - n.isRunning = false - n.conn.Close() -} - -func (n *subscriber) listenSubscriptions() { - if n.isListenSubscriptions { - return - } - - go func() { - for { - switch v := n.conn.Receive().(type) { - case redis.Message: - if cb, ok := n.subListeners[v.Channel]; ok { - cb(v.Channel, string(v.Data)) - } - break - case redis.Subscription: - break - case error: - n.destroy() - return - default: - } - } - }() - - n.isListenSubscriptions = true -} - -func (n *subscriber) Subscribe(channel string, cb ofSubscriber.OnSubscribeFunc) { - ca := subscribeChannelAction{ - channel: channel, - cb: cb, - } - ca.Type = channelUtil.ActionTypeCreate - - n.subCh <- ca -} - -func (n *subscriber) Unsubscribe(channel string) { - ca := subscribeChannelAction{ - channel: channel, - } - ca.Type = channelUtil.ActionTypeDelete - - n.subCh <- ca -} diff --git a/redis/subscribers.go b/redis/subscribers.go new file mode 100644 index 0000000..799b3c8 --- /dev/null +++ b/redis/subscribers.go @@ -0,0 +1,122 @@ +package redis + +import ( + "context" + + uch "git.loafle.net/commons_go/util/channel" + ofs "git.loafle.net/overflow/overflow_subscriber" + "github.com/garyburd/redigo/redis" +) + +type channelAction struct { + uch.Action + h ofs.SubscriberHandler +} + +type subscribers struct { + ctx context.Context + conn redis.PubSubConn + subHandlers map[string]ofs.SubscriberHandler + isListen bool + subCh chan channelAction + isRunning bool +} + +func New(ctx context.Context, conn redis.Conn) ofs.Subscriber { + s := &subscribers{ + ctx: ctx, + subHandlers: make(map[string]ofs.SubscriberHandler), + isListen: false, + subCh: make(chan channelAction), + } + s.conn = redis.PubSubConn{Conn: conn} + + go s.listen() + + s.isRunning = true + + return s +} + +func (s *subscribers) listen() { + for { + select { + case ca := <-s.subCh: + switch ca.Type { + case uch.ActionTypeCreate: + s.subHandlers[ca.h.GetChannel()] = ca.h + s.conn.Subscribe(ca.h.GetChannel()) + s.listenSubscriptions() + break + case uch.ActionTypeDelete: + s.conn.Unsubscribe(ca.h.GetChannel()) + delete(s.subHandlers, ca.h.GetChannel()) + break + } + case <-s.ctx.Done(): + s.destroy() + return + } + } +} + +func (s *subscribers) destroy() { + s.isRunning = false + s.conn.Close() +} + +func (s *subscribers) listenSubscriptions() { + if s.isListen { + return + } + + go func() { + for { + switch v := s.conn.Receive().(type) { + case redis.Message: + if h, ok := s.subHandlers[v.Channel]; ok { + h.OnSubscribe(string(v.Data)) + } + break + case redis.Subscription: + break + case error: + s.destroy() + return + default: + } + } + }() + + s.isListen = true +} + +func (s *subscribers) Subscribe(h ofs.SubscriberHandler) error { + if _, ok := s.subHandlers[h.GetChannel()]; ok { + return ofs.ChannelExistError{Channel: h.GetChannel()} + } + + ca := channelAction{ + h: h, + } + ca.Type = uch.ActionTypeCreate + + s.subCh <- ca + + return nil +} + +func (s *subscribers) Unsubscribe(h ofs.SubscriberHandler) error { + if _, ok := s.subHandlers[h.GetChannel()]; !ok { + return ofs.ChannelIsNotExistError{Channel: h.GetChannel()} + } + + ca := channelAction{ + h: h, + } + ca.Type = uch.ActionTypeDelete + + s.subCh <- ca + + return nil +} diff --git a/subscriber.go b/subscriber.go index e610ec9..a62ceab 100644 --- a/subscriber.go +++ b/subscriber.go @@ -1,10 +1,26 @@ package overflow_subscriber -type ( - OnSubscribeFunc func(channel string, payload string) -) +import "fmt" + +type ChannelExistError struct { + Channel string +} + +// Error returns the formatted configuration error. +func (cee ChannelExistError) Error() string { + return fmt.Sprintf("Subscriber: Channel[%q] is already subscribed.", cee.Channel) +} + +type ChannelIsNotExistError struct { + Channel string +} + +// Error returns the formatted configuration error. +func (cinee ChannelIsNotExistError) Error() string { + return fmt.Sprintf("Subscriber: Channel[%q] is not subscribed.", cinee.Channel) +} type Subscriber interface { - Subscribe(channel string, cb OnSubscribeFunc) - Unsubscribe(channel string) + Subscribe(h SubscriberHandler) error + Unsubscribe(h SubscriberHandler) error } diff --git a/subscriber_handler.go b/subscriber_handler.go new file mode 100644 index 0000000..d1b32b5 --- /dev/null +++ b/subscriber_handler.go @@ -0,0 +1,6 @@ +package overflow_subscriber + +type SubscriberHandler interface { + GetChannel() string + OnSubscribe(payload string) +} diff --git a/subscriber_handlers.go b/subscriber_handlers.go new file mode 100644 index 0000000..1ea7366 --- /dev/null +++ b/subscriber_handlers.go @@ -0,0 +1,11 @@ +package overflow_subscriber + +type SubscriberHandlers struct { + Channel string +} + +func (h *SubscriberHandlers) GetChannel() string { + return h.Channel +} +func (h *SubscriberHandlers) OnSubscribe(payload string) { +}