This commit is contained in:
crusader 2017-09-27 17:07:34 +09:00
parent b480f5853e
commit d7ed180234
3 changed files with 11 additions and 3 deletions

View File

@ -2,7 +2,10 @@ package redis
import ( import (
"context" "context"
"encoding/json"
"fmt"
"git.loafle.net/commons_go/logging"
uch "git.loafle.net/commons_go/util/channel" uch "git.loafle.net/commons_go/util/channel"
ofs "git.loafle.net/overflow/overflow_subscriber" ofs "git.loafle.net/overflow/overflow_subscriber"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
@ -75,7 +78,12 @@ func (s *subscribers) listenSubscriptions() {
switch v := s.conn.Receive().(type) { switch v := s.conn.Receive().(type) {
case redis.Message: case redis.Message:
if h, ok := s.subHandlers[v.Channel]; ok { if h, ok := s.subHandlers[v.Channel]; ok {
h.OnSubscribe(v.Channel, v.Data) var message ofs.SubscribeMessage
if err := json.Unmarshal(v.Data, &message); nil != err {
logging.Logger.Error(fmt.Sprintf("Subscriber: Unmarshal error:%v", err))
break
}
h.OnSubscribe(v.Channel, message)
} }
break break
case redis.Subscription: case redis.Subscription:

View File

@ -2,5 +2,5 @@ package overflow_subscriber
type SubscriberHandler interface { type SubscriberHandler interface {
GetChannel() string GetChannel() string
OnSubscribe(channel string, payload []byte) OnSubscribe(channel string, message SubscribeMessage)
} }

View File

@ -7,5 +7,5 @@ type SubscriberHandlers struct {
func (h *SubscriberHandlers) GetChannel() string { func (h *SubscriberHandlers) GetChannel() string {
return h.Channel return h.Channel
} }
func (h *SubscriberHandlers) OnSubscribe(channel string, payload []byte) { func (h *SubscriberHandlers) OnSubscribe(channel string, message SubscribeMessage) {
} }