From cf875b8b320333d7a8abfa3ba29f8507e36d71de Mon Sep 17 00:00:00 2001 From: crusader Date: Mon, 9 Apr 2018 20:37:54 +0900 Subject: [PATCH] ing --- external/external.go | 6 +- external/grpc/grpc.go | 12 ++-- external/kafka/kafka.go | 46 +++++++++++++ external/redis/redis.go | 26 ++++++++ glide.yaml | 3 + subscribe/message.go | 69 ++++++++++++++++++++ subscribe/redis/redis.go | 135 +++++++++++++++++++++++++++++++++++++++ subscribe/subscriber.go | 29 +++++++++ 8 files changed, 317 insertions(+), 9 deletions(-) create mode 100644 external/kafka/kafka.go create mode 100644 external/redis/redis.go create mode 100644 subscribe/message.go create mode 100644 subscribe/redis/redis.go create mode 100644 subscribe/subscriber.go diff --git a/external/external.go b/external/external.go index 4547e9e..4b4b31e 100644 --- a/external/external.go +++ b/external/external.go @@ -1,11 +1,9 @@ package external -import "git.loafle.net/overflow/gateway/external/grpc" - func InitPackage() { - grpc.InitPackage() + } func DestroyPackage() { - grpc.DestroyPackage() + } diff --git a/external/grpc/grpc.go b/external/grpc/grpc.go index 502514e..1451c39 100644 --- a/external/grpc/grpc.go +++ b/external/grpc/grpc.go @@ -14,11 +14,13 @@ import ( var grpcClient oci.OverflowApiServerClient func InitPackage() { - conn, err := grpc.Dial("192.168.1.50:50006", grpc.WithInsecure()) - if nil != err { - logging.Logger().Panic(err) - } - grpcClient = oci.NewOverflowApiServerClient(conn) + go func() { + conn, err := grpc.Dial("192.168.1.50:50006", grpc.WithInsecure()) + if nil != err { + logging.Logger().Panic(err) + } + grpcClient = oci.NewOverflowApiServerClient(conn) + }() } diff --git a/external/kafka/kafka.go b/external/kafka/kafka.go new file mode 100644 index 0000000..15149b9 --- /dev/null +++ b/external/kafka/kafka.go @@ -0,0 +1,46 @@ +package kafka + +import ( + "context" + "fmt" + + "git.loafle.net/commons/logging-go" + "github.com/segmentio/kafka-go" +) + +var ( + kafkaWriter *kafka.Writer +) + +func InitPackage() { + go func() { + kafkaWriter = kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"192.168.1.50:9092"}, + Topic: "overflow-metric-topic", + Balancer: &kafka.LeastBytes{}, + }) + }() +} + +func DestroyPackage() { + go func() { + if err := kafkaWriter.Close(); nil != err { + logging.Logger().Errorf("%v", err) + } + }() +} + +func Write(key []byte, value []byte) error { + if nil == kafkaWriter { + return fmt.Errorf("Kafka client is not valid") + } + + err := kafkaWriter.WriteMessages(context.Background(), + kafka.Message{ + Key: key, + Value: value, + }, + ) + + return err +} diff --git a/external/redis/redis.go b/external/redis/redis.go new file mode 100644 index 0000000..afa7c3c --- /dev/null +++ b/external/redis/redis.go @@ -0,0 +1,26 @@ +package redis + +import ( + "github.com/gomodule/redigo/redis" +) + +var Pool *redis.Pool + +func InitPackage() { + go func() { + Pool = &redis.Pool{ + MaxIdle: 1, + MaxActive: 3, + IdleTimeout: 240, + Wait: true, + MaxConnLifetime: 1, + Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", "192.168.1.50:6379") + }, + } + }() +} + +func DestroyPackage() { + Pool.Close() +} diff --git a/glide.yaml b/glide.yaml index 8a41f14..95a9f61 100644 --- a/glide.yaml +++ b/glide.yaml @@ -4,3 +4,6 @@ import: - package: git.loafle.net/commons/logging-go - package: google.golang.org/grpc version: ^1.11.2 +- package: github.com/gomodule/redigo + version: ^2.0.0 +- package: github.com/segmentio/kafka-go diff --git a/subscribe/message.go b/subscribe/message.go new file mode 100644 index 0000000..86f1d10 --- /dev/null +++ b/subscribe/message.go @@ -0,0 +1,69 @@ +package subscribe + +import ( + "encoding/json" + "fmt" +) + +type Message struct { + TargetType TargetType `json:"targetType"` + Targets []string `json:"targets"` + MessageRaw *json.RawMessage `json:"message"` + Message []byte +} + +type MessageBody struct { + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` +} + +type TargetType int + +const ( + MEMBER_SESSION TargetType = iota + MEMBER + PROBE +) + +var ( + targetTypeID = map[TargetType]string{ + MEMBER_SESSION: "MEMBER_SESSION", + MEMBER: "MEMBER", + PROBE: "PROBE", + } + + targetTypeName = map[string]TargetType{ + "MEMBER_SESSION": MEMBER_SESSION, + "MEMBER": MEMBER, + "PROBE": PROBE, + } +) + +func (st TargetType) String() string { + return targetTypeID[st] +} + +func (st TargetType) MarshalJSON() ([]byte, error) { + value, ok := targetTypeID[st] + if !ok { + return nil, fmt.Errorf("Invalid EnumType[%s] value", st) + } + return json.Marshal(value) +} + +func (st TargetType) UnmarshalJSON(b []byte) error { + // unmarshal as string + var s string + err := json.Unmarshal(b, &s) + if err != nil { + return err + } + + value, ok := targetTypeName[s] + if !ok { + return fmt.Errorf("Invalid EnumType[%s] value", s) + } + st = value + return nil +} diff --git a/subscribe/redis/redis.go b/subscribe/redis/redis.go new file mode 100644 index 0000000..bcca07b --- /dev/null +++ b/subscribe/redis/redis.go @@ -0,0 +1,135 @@ +package redis + +import ( + "encoding/json" + "fmt" + "sync" + + logging "git.loafle.net/commons/logging-go" + "git.loafle.net/overflow/gateway/subscribe" + "github.com/gomodule/redigo/redis" +) + +func New(conn redis.Conn) subscribe.Subscriber { + return &Subscribers{ + Conn: conn, + } +} + +type Subscribers struct { + Conn redis.Conn + pubSubConn *redis.PubSubConn + subscriptions map[string]chan *subscribe.Message + + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (s *Subscribers) Start() error { + if s.stopChan != nil { + return fmt.Errorf("Subscriber: already running. Stop it before starting it again") + } + + if nil == s.Conn { + return fmt.Errorf("Subscriber: Conn is nil") + } + + if nil == s.subscriptions { + s.subscriptions = make(map[string]chan *subscribe.Message) + } + + s.pubSubConn = &redis.PubSubConn{Conn: s.Conn} + + s.stopChan = make(chan struct{}) + + s.stopWg.Add(1) + go s.handleSubscriber() + + return nil +} + +func (s *Subscribers) Stop() error { + if s.stopChan == nil { + return fmt.Errorf("Subscriber: must be started before stopping it") + } + close(s.stopChan) + s.stopWg.Wait() + s.stopChan = nil + return nil +} + +func (s *Subscribers) Subscribe(channel string) (chan<- *subscribe.Message, error) { + if _, ok := s.subscriptions[channel]; ok { + return nil, subscribe.ChannelExistError{Channel: channel} + } + + if err := s.pubSubConn.Subscribe(channel); nil != err { + return nil, err + } + + msgChan := make(chan *subscribe.Message) + s.subscriptions[channel] = msgChan + + return msgChan, nil +} + +func (s *Subscribers) Unsubscribe(channel string) error { + msgChan, ok := s.subscriptions[channel] + if !ok { + return subscribe.ChannelIsNotExistError{Channel: channel} + } + delete(s.subscriptions, channel) + close(msgChan) + + return s.pubSubConn.Unsubscribe(channel) +} + +func (s *Subscribers) handleSubscriber() { + var ( + receive interface{} + ) + defer func() { + if nil != s.subscriptions { + for _, msgChan := range s.subscriptions { + close(msgChan) + } + } + + if nil != s.pubSubConn { + s.pubSubConn.Close() + } + + s.stopWg.Done() + }() + + receiveChan := make(chan interface{}) + + for { + go func() { + receive = s.pubSubConn.Receive() + receiveChan <- receive + }() + + select { + case msg := <-receiveChan: + switch v := msg.(type) { + case redis.Message: + msgChan, ok := s.subscriptions[v.Channel] + if !ok { + logging.Logger().Warnf("Subscriber: Channel[%s] is not exist", v.Channel) + break + } + message := &subscribe.Message{} + if err := json.Unmarshal(v.Data, message); nil != err { + logging.Logger().Errorf("Subscriber: Cannot unmarshal data[%s] of Channel[%s] %v", string(v.Data), v.Channel, err) + break + } + msgChan <- message + case redis.Subscription: + case error: + } + case <-s.stopChan: + return + } + } +} diff --git a/subscribe/subscriber.go b/subscribe/subscriber.go new file mode 100644 index 0000000..f7456dc --- /dev/null +++ b/subscribe/subscriber.go @@ -0,0 +1,29 @@ +package subscribe + +import "fmt" + +type ChannelExistError struct { + Channel string +} + +// Error returns the formatted configuration error. +func (cee ChannelExistError) Error() string { + return fmt.Sprintf("Subscriber: Channel[%q] is already subscribed.", cee.Channel) +} + +type ChannelIsNotExistError struct { + Channel string +} + +// Error returns the formatted configuration error. +func (cinee ChannelIsNotExistError) Error() string { + return fmt.Sprintf("Subscriber: Channel[%q] is not subscribed.", cinee.Channel) +} + +type Subscriber interface { + Start() error + Stop() error + + Subscribe(channel string) (chan<- *Message, error) + Unsubscribe(channel string) error +}