diff --git a/.vscode/settings.json b/.vscode/settings.json index 46c249c..20af2f6 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,11 +1,3 @@ // Place your settings in this file to overwrite default and user settings. { - // Specifies Lint tool name. - "go.lintTool": "gometalinter", - - // Flags to pass to Lint tool (e.g. ["-min_confidence=.8"]) - "go.lintFlags": [ - "--config=${workspaceRoot}/golint.json" - ] - } \ No newline at end of file diff --git a/glide.yaml b/glide.yaml index 3594980..29ab4ef 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,16 +1,21 @@ package: git.loafle.net/overflow/overflow_gateway_web import: -- package: git.loafle.net/overflow/overflow_gateway_websocket -- package: google.golang.org/grpc - version: v1.5.2 +- package: git.loafle.net/commons_go/logging - package: git.loafle.net/overflow/overflow_api_server subpackages: - golang - package: git.loafle.net/overflow/overflow_grpc_pool +- package: git.loafle.net/overflow/overflow_subscriber + subpackages: + - redis - package: github.com/garyburd/redigo version: v1.1.0 subpackages: - redis -- package: git.loafle.net/commons_go/util +- package: github.com/valyala/fasthttp + version: v20160617 +- package: google.golang.org/grpc + version: v1.5.2 subpackages: - - channel + - metadata +- package: git.loafle.net/overflow/overflow_gateway_websocket diff --git a/handler/file/file.go b/handler/file/file.go index 7027e25..6453a90 100644 --- a/handler/file/file.go +++ b/handler/file/file.go @@ -1,8 +1,12 @@ package file import ( + "context" "log" + "go.uber.org/zap" + + "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_gateway_web/handler" gws "git.loafle.net/overflow/overflow_gateway_websocket" "git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc" @@ -13,20 +17,25 @@ type FileHandler interface { } type fileHandler struct { + ctx context.Context + logger *zap.Logger co *gws.SocketOptions ho *jsonrpc.Options handler gws.MessageHandler } -func New() FileHandler { - h := &fileHandler{} +func New(ctx context.Context) FileHandler { + h := &fileHandler{ + ctx: ctx, + logger: logging.WithContext(ctx), + } h.ho = &jsonrpc.Options{ OnRequest: h.onRequest, OnNotify: h.onNotify, } - h.handler = jsonrpc.NewHandler(h.ho) + h.handler = jsonrpc.NewHandler(ctx, h.ho) h.co = &gws.SocketOptions{ Handler: h.handler, diff --git a/handler/web/web.go b/handler/web/web.go index 5095833..90042a4 100644 --- a/handler/web/web.go +++ b/handler/web/web.go @@ -2,11 +2,13 @@ package web import ( "context" - "log" "strings" + "go.uber.org/zap" + "google.golang.org/grpc/metadata" + "git.loafle.net/commons_go/logging" backGRpc "git.loafle.net/overflow/overflow_api_server/golang" "git.loafle.net/overflow/overflow_gateway_web/handler" gws "git.loafle.net/overflow/overflow_gateway_websocket" @@ -19,15 +21,19 @@ type WebHandler interface { } type webHandler struct { + ctx context.Context + logger *zap.Logger so *gws.SocketOptions ro *jsonrpc.Options handler gws.MessageHandler pool grpcPool.Pool } -func New(pool grpcPool.Pool) WebHandler { +func New(ctx context.Context, pool grpcPool.Pool) WebHandler { h := &webHandler{ - pool: pool, + ctx: ctx, + logger: logging.WithContext(ctx), + pool: pool, } h.ro = &jsonrpc.Options{ @@ -35,7 +41,7 @@ func New(pool grpcPool.Pool) WebHandler { OnNotify: h.onNotify, } - h.handler = jsonrpc.NewHandler(h.ro) + h.handler = jsonrpc.NewHandler(ctx, h.ro) h.so = &gws.SocketOptions{ Handler: h.handler, @@ -49,11 +55,15 @@ func (h *webHandler) GetSocketOption() *gws.SocketOptions { } func (h *webHandler) onRequest(soc gws.Socket, method string, params []string) (interface{}, error) { - log.Printf("path: %s, m: %s, params: %v", soc.Path(), method, params) + h.logger.Info("OnRequest", + zap.String("path", soc.Path()), + zap.String("method", method), + zap.Any("params", params), + ) c, err := h.pool.Get() if err != nil { - log.Println("cannot retrive GRPC Client") + h.logger.Error("cannot retrive GRPC Client") return nil, err } defer h.pool.Put(c) @@ -78,6 +88,10 @@ func (h *webHandler) onRequest(soc gws.Socket, method string, params []string) ( } func (h *webHandler) onNotify(soc gws.Socket, method string, params []string) error { - log.Printf("path: %s, m: %s, params: %v", soc.Path(), method, params) + h.logger.Info("OnRequest", + zap.String("path", soc.Path()), + zap.String("method", method), + zap.Any("params", params), + ) return nil } diff --git a/main.go b/main.go index 21b5a57..15a669a 100644 --- a/main.go +++ b/main.go @@ -2,26 +2,32 @@ package main import ( "context" - "log" "time" + "go.uber.org/zap" + "google.golang.org/grpc" "github.com/garyburd/redigo/redis" "github.com/valyala/fasthttp" + "git.loafle.net/commons_go/logging" backGRpc "git.loafle.net/overflow/overflow_api_server/golang" "git.loafle.net/overflow/overflow_gateway_web/handler/file" "git.loafle.net/overflow/overflow_gateway_web/handler/web" - notiRedis "git.loafle.net/overflow/overflow_gateway_web/notification/redis" gws "git.loafle.net/overflow/overflow_gateway_websocket" grpcPool "git.loafle.net/overflow/overflow_grpc_pool" + subscriberRedis "git.loafle.net/overflow/overflow_subscriber/redis" ) +var logger *zap.Logger + func main() { ctx, cancel := NewContext() defer cancel() + logger = logging.WithContext(ctx) + o := &gws.ServerOptions{ OnConnection: onConnection, OnDisconnected: onDisconnected, @@ -36,12 +42,17 @@ func main() { return redis.Dial("tcp", "127.0.0.1:6379") }, } - defer rPool.Close() + defer func() { + rPool.Close() + }() - noti := notiRedis.New(ctx, rPool.Get()) + noti := subscriberRedis.New(ctx, rPool.Get()) noti.Subscribe("web", func(channel string, payload string) { - log.Printf("c:%s, p:%s", channel, payload) + logger.Info("Subscribe", + zap.String("channel", channel), + zap.String("payload", payload), + ) }) bo := &grpcPool.Options{ @@ -58,13 +69,13 @@ func main() { return conn, c, nil }, } - bPool, err := grpcPool.New(bo) + bPool, err := grpcPool.New(ctx, bo) if nil != err { - log.Panic("Cannot create Pool of GRPC") + logger.Panic("Cannot create Pool of GRPC") } - wh := web.New(bPool) - fh := file.New() + wh := web.New(ctx, bPool) + fh := file.New(ctx) s.HandleSocket("/web", wh.GetSocketOption()) s.HandleSocket("/file", fh.GetSocketOption()) @@ -74,6 +85,8 @@ func main() { func NewContext() (context.Context, context.CancelFunc) { ctx := context.Background() + ctx = logging.NewContext(ctx, nil) + ctx, cancel := context.WithCancel(ctx) ctx = context.WithValue(ctx, "key1", "val1") @@ -92,9 +105,15 @@ func onCheckOrigin(ctx *fasthttp.RequestCtx) bool { } func onConnection(soc gws.Socket) { - log.Printf("connect: path: %s, id:%s\n", soc.Path(), soc.ID()) + logger.Info("connect", + zap.String("path", soc.Path()), + zap.String("id", soc.ID()), + ) } func onDisconnected(soc gws.Socket) { - log.Printf("disconnect: path: %s, id:%s\n", soc.Path(), soc.ID()) + logger.Info("connect", + zap.String("path", soc.Path()), + zap.String("id", soc.ID()), + ) } diff --git a/notification/notify.go b/notification/notify.go deleted file mode 100644 index 2fae724..0000000 --- a/notification/notify.go +++ /dev/null @@ -1,10 +0,0 @@ -package notification - -type ( - OnSubscribeFunc func(channel string, payload string) -) - -type Notifier interface { - Subscribe(channel string, cb OnSubscribeFunc) - Unsubscribe(channel string, cb OnSubscribeFunc) -} diff --git a/notification/redis/notify.go b/notification/redis/notify.go deleted file mode 100644 index a28a8c5..0000000 --- a/notification/redis/notify.go +++ /dev/null @@ -1,120 +0,0 @@ -package redis - -import ( - "context" - "log" - - channelUtil "git.loafle.net/commons_go/util/channel" - "git.loafle.net/overflow/overflow_gateway_web/notification" - "github.com/garyburd/redigo/redis" -) - -type subscribeChannelAction struct { - channelUtil.Action - channel string - cb notification.OnSubscribeFunc -} - -type Notifier interface { - notification.Notifier -} - -type notifier struct { - ctx context.Context - conn redis.PubSubConn - subListeners map[string]notification.OnSubscribeFunc - isListenSubscriptions bool - subCh chan subscribeChannelAction -} - -func New(ctx context.Context, conn redis.Conn) Notifier { - n := ¬ifier{ - ctx: ctx, - subListeners: make(map[string]notification.OnSubscribeFunc), - isListenSubscriptions: false, - subCh: make(chan subscribeChannelAction), - } - n.conn = redis.PubSubConn{Conn: conn} - - go n.listen() - - return n -} - -func (n *notifier) listen() { - for { - select { - case sa := <-n.subCh: - switch sa.Type { - case channelUtil.ActionTypeCreate: - _, ok := n.subListeners[sa.channel] - if ok { - log.Fatalf("notification: Subscription of channel[%s] is already exist", sa.channel) - } else { - n.subListeners[sa.channel] = sa.cb - n.conn.Subscribe(sa.channel) - n.listenSubscriptions() - } - break - case channelUtil.ActionTypeDelete: - _, ok := n.subListeners[sa.channel] - if ok { - n.conn.Unsubscribe(sa.channel) - delete(n.subListeners, sa.channel) - } else { - log.Fatalf("notification: Subscription of channel[%s] is not exist", sa.channel) - } - break - } - case <-n.ctx.Done(): - log.Println("redis noti: Context Done") - n.conn.Close() - return - } - } -} - -func (n *notifier) listenSubscriptions() { - if n.isListenSubscriptions { - return - } - - go func() { - for { - switch v := n.conn.Receive().(type) { - case redis.Message: - if cb, ok := n.subListeners[v.Channel]; ok { - cb(v.Channel, string(v.Data)) - } - case redis.Subscription: - log.Printf("subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count) - case error: - log.Println("error pub/sub, delivery has stopped") - return - default: - } - } - }() - - n.isListenSubscriptions = true -} - -func (n *notifier) Subscribe(channel string, cb notification.OnSubscribeFunc) { - ca := subscribeChannelAction{ - channel: channel, - cb: cb, - } - ca.Type = channelUtil.ActionTypeCreate - - n.subCh <- ca -} - -func (n *notifier) Unsubscribe(channel string, cb notification.OnSubscribeFunc) { - ca := subscribeChannelAction{ - channel: channel, - cb: cb, - } - ca.Type = channelUtil.ActionTypeDelete - - n.subCh <- ca -}