diff --git a/redis/subscribers.go b/redis/subscribers.go index 88e5a12..953bb01 100644 --- a/redis/subscribers.go +++ b/redis/subscribers.go @@ -78,12 +78,12 @@ func (s *subscribers) listenSubscriptions() { switch v := s.conn.Receive().(type) { case redis.Message: if h, ok := s.subHandlers[v.Channel]; ok { - var message ofs.SubscribeMessage - if err := json.Unmarshal(v.Data, &message); nil != err { - logging.Logger.Error(fmt.Sprintf("Subscriber: Unmarshal error:%v", err)) + if message, err := s.unmarshalMessage(v.Data); nil != err { + logging.Logger.Error(fmt.Sprintf("Subscriber Unmarshal error:%v", err)) break + } else { + h.OnSubscribe(v.Channel, message) } - h.OnSubscribe(v.Channel, message) } break case redis.Subscription: @@ -99,6 +99,20 @@ func (s *subscribers) listenSubscriptions() { s.isListen = true } +func (s *subscribers) unmarshalMessage(data []byte) (ofs.SubscribeMessage, error) { + var err error + var message ofs.SubscribeMessage + if err = json.Unmarshal(data, &message); nil != err { + return message, err + } + + if message.Message, err = message.MessageRaw.MarshalJSON(); nil != err { + return message, err + } + + return message, nil +} + func (s *subscribers) Subscribe(h ofs.SubscriberHandler) error { if _, ok := s.subHandlers[h.GetChannel()]; ok { return ofs.ChannelExistError{Channel: h.GetChannel()} diff --git a/subscribe_message.go b/subscribe_message.go index b16b43a..26e435d 100644 --- a/subscribe_message.go +++ b/subscribe_message.go @@ -3,8 +3,9 @@ package overflow_subscriber import "encoding/json" type SubscribeMessage struct { - Targets []string `json:"targets"` - Message *json.RawMessage `json:"message"` + Targets []string `json:"targets"` + MessageRaw *json.RawMessage `json:"message"` + Message []byte } type SubscribeMessageBody struct {