From 192344a03499a271830dece3f813a301e84585d2 Mon Sep 17 00:00:00 2001 From: crusader Date: Tue, 29 Aug 2017 17:58:37 +0900 Subject: [PATCH] ing --- glide.yaml | 7 ++ main.go | 32 +++++++++- notification/notify.go | 10 +++ notification/redis/notify.go | 120 +++++++++++++++++++++++++++++++++++ 4 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 notification/notify.go create mode 100644 notification/redis/notify.go diff --git a/glide.yaml b/glide.yaml index 3be4433..3594980 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,3 +7,10 @@ import: subpackages: - golang - package: git.loafle.net/overflow/overflow_grpc_pool +- package: github.com/garyburd/redigo + version: v1.1.0 + subpackages: + - redis +- package: git.loafle.net/commons_go/util + subpackages: + - channel diff --git a/main.go b/main.go index bc6853d..21b5a57 100644 --- a/main.go +++ b/main.go @@ -1,26 +1,48 @@ package main import ( + "context" "log" + "time" "google.golang.org/grpc" + "github.com/garyburd/redigo/redis" "github.com/valyala/fasthttp" backGRpc "git.loafle.net/overflow/overflow_api_server/golang" "git.loafle.net/overflow/overflow_gateway_web/handler/file" "git.loafle.net/overflow/overflow_gateway_web/handler/web" + notiRedis "git.loafle.net/overflow/overflow_gateway_web/notification/redis" gws "git.loafle.net/overflow/overflow_gateway_websocket" grpcPool "git.loafle.net/overflow/overflow_grpc_pool" ) func main() { + ctx, cancel := NewContext() + defer cancel() + o := &gws.ServerOptions{ OnConnection: onConnection, OnDisconnected: onDisconnected, OnCheckOrigin: onCheckOrigin, } - s := gws.NewServer(o) + s := gws.NewServer(ctx, o) + + rPool := &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", "127.0.0.1:6379") + }, + } + defer rPool.Close() + + noti := notiRedis.New(ctx, rPool.Get()) + + noti.Subscribe("web", func(channel string, payload string) { + log.Printf("c:%s, p:%s", channel, payload) + }) bo := &grpcPool.Options{ MaxIdle: 1, @@ -50,6 +72,14 @@ func main() { s.ListenAndServe(":19090") } +func NewContext() (context.Context, context.CancelFunc) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + ctx = context.WithValue(ctx, "key1", "val1") + + return ctx, cancel +} + func onCheckOrigin(ctx *fasthttp.RequestCtx) bool { if origin := string(ctx.Request.Header.Peek("Origin")); origin != "" { ctx.Response.Header.Set("Access-Control-Allow-Origin", origin) diff --git a/notification/notify.go b/notification/notify.go new file mode 100644 index 0000000..2fae724 --- /dev/null +++ b/notification/notify.go @@ -0,0 +1,10 @@ +package notification + +type ( + OnSubscribeFunc func(channel string, payload string) +) + +type Notifier interface { + Subscribe(channel string, cb OnSubscribeFunc) + Unsubscribe(channel string, cb OnSubscribeFunc) +} diff --git a/notification/redis/notify.go b/notification/redis/notify.go new file mode 100644 index 0000000..a28a8c5 --- /dev/null +++ b/notification/redis/notify.go @@ -0,0 +1,120 @@ +package redis + +import ( + "context" + "log" + + channelUtil "git.loafle.net/commons_go/util/channel" + "git.loafle.net/overflow/overflow_gateway_web/notification" + "github.com/garyburd/redigo/redis" +) + +type subscribeChannelAction struct { + channelUtil.Action + channel string + cb notification.OnSubscribeFunc +} + +type Notifier interface { + notification.Notifier +} + +type notifier struct { + ctx context.Context + conn redis.PubSubConn + subListeners map[string]notification.OnSubscribeFunc + isListenSubscriptions bool + subCh chan subscribeChannelAction +} + +func New(ctx context.Context, conn redis.Conn) Notifier { + n := ¬ifier{ + ctx: ctx, + subListeners: make(map[string]notification.OnSubscribeFunc), + isListenSubscriptions: false, + subCh: make(chan subscribeChannelAction), + } + n.conn = redis.PubSubConn{Conn: conn} + + go n.listen() + + return n +} + +func (n *notifier) listen() { + for { + select { + case sa := <-n.subCh: + switch sa.Type { + case channelUtil.ActionTypeCreate: + _, ok := n.subListeners[sa.channel] + if ok { + log.Fatalf("notification: Subscription of channel[%s] is already exist", sa.channel) + } else { + n.subListeners[sa.channel] = sa.cb + n.conn.Subscribe(sa.channel) + n.listenSubscriptions() + } + break + case channelUtil.ActionTypeDelete: + _, ok := n.subListeners[sa.channel] + if ok { + n.conn.Unsubscribe(sa.channel) + delete(n.subListeners, sa.channel) + } else { + log.Fatalf("notification: Subscription of channel[%s] is not exist", sa.channel) + } + break + } + case <-n.ctx.Done(): + log.Println("redis noti: Context Done") + n.conn.Close() + return + } + } +} + +func (n *notifier) listenSubscriptions() { + if n.isListenSubscriptions { + return + } + + go func() { + for { + switch v := n.conn.Receive().(type) { + case redis.Message: + if cb, ok := n.subListeners[v.Channel]; ok { + cb(v.Channel, string(v.Data)) + } + case redis.Subscription: + log.Printf("subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count) + case error: + log.Println("error pub/sub, delivery has stopped") + return + default: + } + } + }() + + n.isListenSubscriptions = true +} + +func (n *notifier) Subscribe(channel string, cb notification.OnSubscribeFunc) { + ca := subscribeChannelAction{ + channel: channel, + cb: cb, + } + ca.Type = channelUtil.ActionTypeCreate + + n.subCh <- ca +} + +func (n *notifier) Unsubscribe(channel string, cb notification.OnSubscribeFunc) { + ca := subscribeChannelAction{ + channel: channel, + cb: cb, + } + ca.Type = channelUtil.ActionTypeDelete + + n.subCh <- ca +}