From 1e1e2129543800586fbe03b1a02157120efa65b4 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 1 Sep 2017 15:42:23 +0900 Subject: [PATCH] ing --- glide.yaml | 1 + grpc/pool.go | 27 ++++++ grpc/pool_handlers.go | 30 ++++++ handler/file/file.go | 67 -------------- handler/handler.go | 9 -- handler/web/web.go | 106 --------------------- handler/web_json_handlers.go | 72 +++++++++++++++ handler/web_socket_handlers.go | 40 ++++++++ main.go | 133 ++++++--------------------- redis/pool.go | 24 +++++ redis/pool_handlers.go | 27 ++++++ server/server.go | 26 ++++++ server/server_handlers.go | 45 +++++++++ subscribe/redis.go | 15 +++ subscribe/web_subscriber_handlers.go | 31 +++++++ 15 files changed, 365 insertions(+), 288 deletions(-) create mode 100644 grpc/pool.go create mode 100644 grpc/pool_handlers.go delete mode 100644 handler/file/file.go delete mode 100644 handler/handler.go delete mode 100644 handler/web/web.go create mode 100644 handler/web_json_handlers.go create mode 100644 handler/web_socket_handlers.go create mode 100644 redis/pool.go create mode 100644 redis/pool_handlers.go create mode 100644 server/server.go create mode 100644 server/server_handlers.go create mode 100644 subscribe/redis.go create mode 100644 subscribe/web_subscriber_handlers.go diff --git a/glide.yaml b/glide.yaml index 6886a25..b787f30 100644 --- a/glide.yaml +++ b/glide.yaml @@ -20,3 +20,4 @@ import: - metadata - package: git.loafle.net/overflow/overflow_gateway_websocket - package: git.loafle.net/commons_go/config +- package: git.loafle.net/commons_go/redis_pool diff --git a/grpc/pool.go b/grpc/pool.go new file mode 100644 index 0000000..4617d5a --- /dev/null +++ b/grpc/pool.go @@ -0,0 +1,27 @@ +package grpc + +import ( + "context" + "fmt" + + "git.loafle.net/commons_go/config" + "git.loafle.net/commons_go/logging" + ogp "git.loafle.net/overflow/overflow_grpc_pool" +) + +func NewPool(ctx context.Context) ogp.Pool { + h := &poolHandlers{ + ctx: ctx, + logger: logging.WithContext(ctx), + } + h.cfg = config.Sub("grpc") + h.MaxIdle = h.cfg.GetInt("pool.MaxIdle") + h.MaxCapacity = h.cfg.GetInt("pool.MaxCapacity") + + p, err := ogp.New(ctx, h) + if nil != err { + h.logger.Fatal(fmt.Sprintf("GRpc Pool: %v", err)) + } + + return p +} diff --git a/grpc/pool_handlers.go b/grpc/pool_handlers.go new file mode 100644 index 0000000..741bb94 --- /dev/null +++ b/grpc/pool_handlers.go @@ -0,0 +1,30 @@ +package grpc + +import ( + "context" + + "google.golang.org/grpc" + + "git.loafle.net/commons_go/config" + oas "git.loafle.net/overflow/overflow_api_server/golang" + ogp "git.loafle.net/overflow/overflow_grpc_pool" + "go.uber.org/zap" +) + +type poolHandlers struct { + ogp.PoolHandlers + ctx context.Context + logger *zap.Logger + cfg config.Configurator +} + +func (h *poolHandlers) OnCreate() (*grpc.ClientConn, interface{}, error) { + var err error + conn, err := grpc.Dial(config.GetString("grpc.addr"), grpc.WithInsecure()) + if nil != err { + return nil, nil, err + } + c := oas.NewOverflowApiServerClient(conn) + return conn, c, nil + +} diff --git a/handler/file/file.go b/handler/file/file.go deleted file mode 100644 index a999073..0000000 --- a/handler/file/file.go +++ /dev/null @@ -1,67 +0,0 @@ -package file - -import ( - "context" - "log" - "time" - - "go.uber.org/zap" - - "git.loafle.net/commons_go/config" - "git.loafle.net/commons_go/logging" - "git.loafle.net/overflow/overflow_gateway_web/handler" - gws "git.loafle.net/overflow/overflow_gateway_websocket" - "git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc" -) - -type FileHandler interface { - handler.Handler -} - -type fileHandler struct { - ctx context.Context - logger *zap.Logger - co *gws.SocketOptions - ho *jsonrpc.Options - handler gws.MessageHandler -} - -func New(ctx context.Context) FileHandler { - h := &fileHandler{ - ctx: ctx, - logger: logging.WithContext(ctx), - } - - h.ho = &jsonrpc.Options{ - OnRequest: h.onRequest, - OnNotify: h.onNotify, - } - - h.handler = jsonrpc.NewHandler(ctx, h.ho) - - h.co = &gws.SocketOptions{ - Handler: h.handler, - MaxMessageSize: config.GetInt64("handlers.file.socket.MaxMessageSize"), - WriteTimeout: time.Duration(config.GetInt("handlers.file.socket.WriteTimeout")) * time.Second, - ReadTimeout: time.Duration(config.GetInt("handlers.file.socket.ReadTimeout")) * time.Second, - PongTimeout: time.Duration(config.GetInt("handlers.file.socket.PongTimeout")) * time.Second, - PingTimeout: time.Duration(config.GetInt("handlers.file.socket.PingTimeout")) * time.Second, - PingPeriod: time.Duration(config.GetInt("handlers.file.socket.PingPeriod")) * time.Second, - BinaryMessage: config.GetBool("handlers.file.socket.BinaryMessage"), - } - - return h -} - -func (h *fileHandler) GetSocketOption() *gws.SocketOptions { - return h.co -} - -func (h *fileHandler) onRequest(soc gws.Socket, method string, params []string) (interface{}, error) { - log.Printf("path: %s, m: %s, params: %v", soc.Path(), method, params) - return nil, nil -} - -func (h *fileHandler) onNotify(soc gws.Socket, method string, params []string) error { - return nil -} diff --git a/handler/handler.go b/handler/handler.go deleted file mode 100644 index fc2c727..0000000 --- a/handler/handler.go +++ /dev/null @@ -1,9 +0,0 @@ -package handler - -import ( - gws "git.loafle.net/overflow/overflow_gateway_websocket" -) - -type Handler interface { - GetSocketOption() *gws.SocketOptions -} diff --git a/handler/web/web.go b/handler/web/web.go deleted file mode 100644 index eb75d9f..0000000 --- a/handler/web/web.go +++ /dev/null @@ -1,106 +0,0 @@ -package web - -import ( - "context" - "strings" - "time" - - "go.uber.org/zap" - - "google.golang.org/grpc/metadata" - - "git.loafle.net/commons_go/config" - "git.loafle.net/commons_go/logging" - backGRpc "git.loafle.net/overflow/overflow_api_server/golang" - "git.loafle.net/overflow/overflow_gateway_web/handler" - gws "git.loafle.net/overflow/overflow_gateway_websocket" - "git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc" - grpcPool "git.loafle.net/overflow/overflow_grpc_pool" -) - -type WebHandler interface { - handler.Handler -} - -type webHandler struct { - ctx context.Context - logger *zap.Logger - so *gws.SocketOptions - ro *jsonrpc.Options - handler gws.MessageHandler - pool grpcPool.Pool -} - -func New(ctx context.Context, pool grpcPool.Pool) WebHandler { - h := &webHandler{ - ctx: ctx, - logger: logging.WithContext(ctx), - pool: pool, - } - - h.ro = &jsonrpc.Options{ - OnRequest: h.onRequest, - OnNotify: h.onNotify, - } - - h.handler = jsonrpc.NewHandler(ctx, h.ro) - - h.so = &gws.SocketOptions{ - Handler: h.handler, - MaxMessageSize: config.GetInt64("handlers.web.socket.MaxMessageSize"), - WriteTimeout: time.Duration(config.GetInt("handlers.web.socket.WriteTimeout")) * time.Second, - ReadTimeout: time.Duration(config.GetInt("handlers.web.socket.ReadTimeout")) * time.Second, - PongTimeout: time.Duration(config.GetInt("handlers.web.socket.PongTimeout")) * time.Second, - PingTimeout: time.Duration(config.GetInt("handlers.web.socket.PingTimeout")) * time.Second, - PingPeriod: time.Duration(config.GetInt("handlers.web.socket.PingPeriod")) * time.Second, - BinaryMessage: config.GetBool("handlers.web.socket.BinaryMessage"), - } - - return h -} - -func (h *webHandler) GetSocketOption() *gws.SocketOptions { - return h.so -} - -func (h *webHandler) onRequest(soc gws.Socket, method string, params []string) (interface{}, error) { - h.logger.Info("OnRequest", - zap.String("path", soc.Path()), - zap.String("method", method), - zap.Any("params", params), - ) - - c, err := h.pool.Get() - if err != nil { - h.logger.Error("cannot retrive GRPC Client") - return nil, err - } - defer h.pool.Put(c) - - sm := strings.Split(method, ".") - - si := &backGRpc.ServerInput{ - Target: sm[0], - Method: sm[1], - Params: params, - } - - md := metadata.Pairs("email", "overflow@loafle.com") - ctx := metadata.NewOutgoingContext(context.Background(), md) - - so, err := c.(backGRpc.OverflowApiServerClient).Exec(ctx, si) - if err != nil { - return nil, err - } - - return so.Result, nil -} - -func (h *webHandler) onNotify(soc gws.Socket, method string, params []string) error { - h.logger.Info("OnRequest", - zap.String("path", soc.Path()), - zap.String("method", method), - zap.Any("params", params), - ) - return nil -} diff --git a/handler/web_json_handlers.go b/handler/web_json_handlers.go new file mode 100644 index 0000000..427b895 --- /dev/null +++ b/handler/web_json_handlers.go @@ -0,0 +1,72 @@ +package handler + +import ( + "context" + "strings" + + "google.golang.org/grpc/metadata" + + "git.loafle.net/commons_go/logging" + oas "git.loafle.net/overflow/overflow_api_server/golang" + ogw "git.loafle.net/overflow/overflow_gateway_websocket" + "git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc" + ogp "git.loafle.net/overflow/overflow_grpc_pool" + "go.uber.org/zap" +) + +func newWebJSONRpcHandler(ctx context.Context, gPool ogp.Pool) ogw.ProtocolHandler { + h := &webJSONRpcHandlers{ + ctx: ctx, + logger: logging.WithContext(ctx), + gPool: gPool, + } + + p := jsonrpc.New(ctx, h) + + return p +} + +type webJSONRpcHandlers struct { + jsonrpc.JSONRpcHandlers + ctx context.Context + logger *zap.Logger + gPool ogp.Pool +} + +func (h *webJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) { + h.logger.Info("OnRequest", + zap.String("path", soc.Path()), + zap.String("method", method), + zap.Any("params", params), + ) + + c, err := h.gPool.Get() + if err != nil { + h.logger.Error("cannot retrive GRPC Client") + return nil, err + } + defer h.gPool.Put(c) + + sm := strings.Split(method, ".") + + si := &oas.ServerInput{ + Target: sm[0], + Method: sm[1], + Params: params, + } + + md := metadata.Pairs("email", "overflow@loafle.com") + ctx := metadata.NewOutgoingContext(context.Background(), md) + + so, err := c.(oas.OverflowApiServerClient).Exec(ctx, si) + if err != nil { + return nil, err + } + + return so.Result, nil +} + +func (wjh *webJSONRpcHandlers) OnNotify(soc ogw.Socket, method string, params []string) error { + + return nil +} diff --git a/handler/web_socket_handlers.go b/handler/web_socket_handlers.go new file mode 100644 index 0000000..2bc0629 --- /dev/null +++ b/handler/web_socket_handlers.go @@ -0,0 +1,40 @@ +package handler + +import ( + "context" + "time" + + "git.loafle.net/commons_go/config" + "git.loafle.net/commons_go/logging" + ogw "git.loafle.net/overflow/overflow_gateway_websocket" + ogp "git.loafle.net/overflow/overflow_grpc_pool" + "go.uber.org/zap" +) + +type webSocketHandlers struct { + ogw.SocketHandlers + ctx context.Context + logger *zap.Logger + cfg config.Configurator +} + +func NewWebSocketHandler(ctx context.Context, gPool ogp.Pool) ogw.SocketHandler { + h := &webSocketHandlers{ + ctx: ctx, + logger: logging.WithContext(ctx), + } + h.cfg = config.Sub("handlers.web") + h.MaxMessageSize = h.cfg.GetInt64("socket.MaxMessageSize") + h.WriteTimeout = h.cfg.GetDuration("socket.WriteTimeout") * time.Second + h.ReadTimeout = h.cfg.GetDuration("socket.ReadTimeout") * time.Second + h.PongTimeout = h.cfg.GetDuration("socket.PongTimeout") * time.Second + h.PingTimeout = h.cfg.GetDuration("socket.PingTimeout") * time.Second + h.PingPeriod = h.cfg.GetDuration("socket.PingPeriod") * time.Second + h.BinaryMessage = h.cfg.GetBool("socket.BinaryMessage") + + ph := newWebJSONRpcHandler(ctx, gPool) + + h.Protocol = ph + + return h +} diff --git a/main.go b/main.go index 21190aa..97e648e 100644 --- a/main.go +++ b/main.go @@ -4,101 +4,49 @@ import ( "context" "encoding/json" "log" - "time" "go.uber.org/zap" - "google.golang.org/grpc" - - "github.com/garyburd/redigo/redis" - "github.com/valyala/fasthttp" - "git.loafle.net/commons_go/config" "git.loafle.net/commons_go/logging" - backGRpc "git.loafle.net/overflow/overflow_api_server/golang" - "git.loafle.net/overflow/overflow_gateway_web/handler/file" - "git.loafle.net/overflow/overflow_gateway_web/handler/web" - gws "git.loafle.net/overflow/overflow_gateway_websocket" - grpcPool "git.loafle.net/overflow/overflow_grpc_pool" - subscriberRedis "git.loafle.net/overflow/overflow_subscriber/redis" + "git.loafle.net/overflow/overflow_gateway_web/grpc" + "git.loafle.net/overflow/overflow_gateway_web/handler" + "git.loafle.net/overflow/overflow_gateway_web/redis" + "git.loafle.net/overflow/overflow_gateway_web/server" + "git.loafle.net/overflow/overflow_gateway_web/subscribe" ) var logger *zap.Logger func main() { + loadConfig() + ctx := newContext() + + defer logger.Sync() + + s := server.NewServer(ctx) + rp := redis.NewPool(ctx) + gp := grpc.NewPool(ctx) + + wsh := handler.NewWebSocketHandler(ctx, gp) + + subscribe.Subscribe(ctx, rp.Get()) + + s.HandleSocket(config.GetString("handlers.web.entry"), wsh) + + s.ListenAndServe(config.GetString("server.addr")) +} + +func loadConfig() { config.SetConfigName("config") config.AddConfigPath(".") err := config.ReadInConfig() if nil != err { log.Fatalf("config error: %v", err) } - - ctx, cancel := NewContext() - defer cancel() - - logger = logging.WithContext(ctx) - defer logger.Sync() - - o := &gws.ServerOptions{ - OnConnection: onConnection, - OnDisconnected: onDisconnected, - OnCheckOrigin: onCheckOrigin, - HandshakeTimeout: time.Duration(config.GetInt("websocket.HandshakeTimeout")) * time.Second, - ReadBufferSize: config.GetInt("websocket.ReadBufferSize"), - WriteBufferSize: config.GetInt("websocket.WriteBufferSize"), - EnableCompression: config.GetBool("websocket.EnableCompression"), - } - s := gws.NewServer(ctx, o) - - rPool := &redis.Pool{ - MaxIdle: config.GetInt("redis.pool.MaxIdle"), - IdleTimeout: time.Duration(config.GetInt("redis.pool.IdleTimeout")) * time.Second, - Dial: func() (redis.Conn, error) { - return redis.Dial(config.GetString("redis.network"), config.GetString("redis.addr")) - }, - } - defer func() { - rPool.Close() - }() - - noti := subscriberRedis.New(ctx, rPool.Get()) - - noti.Subscribe("web", func(channel string, payload string) { - logger.Info("Subscribe", - zap.String("channel", channel), - zap.String("payload", payload), - ) - }) - - bo := &grpcPool.Options{ - MaxIdle: config.GetInt("grpc.pool.MaxIdle"), - MaxCapacity: config.GetInt("grpc.pool.MaxCapacity"), - - Creators: func() (*grpc.ClientConn, interface{}, error) { - var err error - conn, err := grpc.Dial(config.GetString("grpc.addr"), grpc.WithInsecure()) - if nil != err { - return nil, nil, err - } - c := backGRpc.NewOverflowApiServerClient(conn) - return conn, c, nil - }, - } - bPool, err := grpcPool.New(ctx, bo) - if nil != err { - logger.Panic("Cannot create Pool of GRPC") - } - - wh := web.New(ctx, bPool) - fh := file.New(ctx) - - s.HandleSocket(config.GetString("handlers.web.entry"), wh.GetSocketOption()) - s.HandleSocket(config.GetString("handlers.file.entry"), fh.GetSocketOption()) - - s.ListenAndServe(config.GetString("server.addr")) } -func NewContext() (context.Context, context.CancelFunc) { +func newContext() context.Context { var err error ctx := context.Background() logConfig := config.Sub("logging") @@ -112,39 +60,12 @@ func NewContext() (context.Context, context.CancelFunc) { panic(err) } - logger, err := cfg.Build() + logger, err = cfg.Build() if err != nil { panic(err) } ctx = logging.NewContext(ctx, logger) - ctx, cancel := context.WithCancel(ctx) - - return ctx, cancel -} - -func onCheckOrigin(ctx *fasthttp.RequestCtx) bool { - if origin := string(ctx.Request.Header.Peek("Origin")); origin != "" { - ctx.Response.Header.Set("Access-Control-Allow-Origin", origin) - if string(ctx.Method()) == "OPTIONS" && string(ctx.Request.Header.Peek("Access-Control-Request-Method")) != "" { - ctx.Response.Header.Set("Access-Control-Allow-Headers", "Content-Type, Accept") - ctx.Response.Header.Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE") - } - } - return true -} - -func onConnection(soc gws.Socket) { - logger.Info("connect", - zap.String("path", soc.Path()), - zap.String("id", soc.ID()), - ) -} - -func onDisconnected(soc gws.Socket) { - logger.Info("disconnect", - zap.String("path", soc.Path()), - zap.String("id", soc.ID()), - ) + return ctx } diff --git a/redis/pool.go b/redis/pool.go new file mode 100644 index 0000000..e08397f --- /dev/null +++ b/redis/pool.go @@ -0,0 +1,24 @@ +package redis + +import ( + "context" + "time" + + "git.loafle.net/commons_go/config" + "git.loafle.net/commons_go/logging" + rp "git.loafle.net/commons_go/redis_pool" +) + +func NewPool(ctx context.Context) rp.Pool { + h := &poolHandlers{ + ctx: ctx, + logger: logging.WithContext(ctx), + } + h.cfg = config.Sub("redis") + h.MaxIdle = h.cfg.GetInt("pool.MaxIdle") + h.IdleTimeout = h.cfg.GetDuration("pool.IdleTimeout") * time.Second + + p := rp.NewPool(ctx, h) + + return p +} diff --git a/redis/pool_handlers.go b/redis/pool_handlers.go new file mode 100644 index 0000000..c82aa44 --- /dev/null +++ b/redis/pool_handlers.go @@ -0,0 +1,27 @@ +package redis + +import ( + "context" + "time" + + "github.com/garyburd/redigo/redis" + + "git.loafle.net/commons_go/config" + rp "git.loafle.net/commons_go/redis_pool" + "go.uber.org/zap" +) + +type poolHandlers struct { + rp.PoolHandlers + ctx context.Context + logger *zap.Logger + cfg config.Configurator +} + +func (h *poolHandlers) Dial() (redis.Conn, error) { + return redis.Dial(h.cfg.GetString("network"), h.cfg.GetString("addr")) +} + +func (h *poolHandlers) TestOnBorrow(c redis.Conn, t time.Time) error { + return nil +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..c50d5b3 --- /dev/null +++ b/server/server.go @@ -0,0 +1,26 @@ +package server + +import ( + "context" + "time" + + "git.loafle.net/commons_go/config" + "git.loafle.net/commons_go/logging" + ogw "git.loafle.net/overflow/overflow_gateway_websocket" +) + +func NewServer(ctx context.Context) ogw.Server { + h := &serverHandlers{ + ctx: ctx, + logger: logging.WithContext(ctx), + } + h.cfg = config.Sub("websocket") + h.HandshakeTimeout = h.cfg.GetDuration("HandshakeTimeout") * time.Second + h.ReadBufferSize = h.cfg.GetInt("ReadBufferSize") + h.WriteBufferSize = h.cfg.GetInt("WriteBufferSize") + h.EnableCompression = h.cfg.GetBool("EnableCompression") + + s := ogw.NewServer(ctx, h) + + return s +} diff --git a/server/server_handlers.go b/server/server_handlers.go new file mode 100644 index 0000000..8cd834c --- /dev/null +++ b/server/server_handlers.go @@ -0,0 +1,45 @@ +package server + +import ( + "context" + + "git.loafle.net/commons_go/config" + "github.com/valyala/fasthttp" + "go.uber.org/zap" + + ogw "git.loafle.net/overflow/overflow_gateway_websocket" +) + +type serverHandlers struct { + ogw.ServerHandlers + ctx context.Context + logger *zap.Logger + cfg config.Configurator +} + +func (sh *serverHandlers) OnConnection(soc ogw.Socket) { + sh.logger.Info("connect", + zap.String("path", soc.Path()), + zap.String("id", soc.ID()), + ) +} + +func (sh *serverHandlers) OnDisconnected(soc ogw.Socket) { + sh.logger.Info("disconnect", + zap.String("path", soc.Path()), + zap.String("id", soc.ID()), + ) +} +func (sh *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool { + if origin := string(ctx.Request.Header.Peek("Origin")); origin != "" { + ctx.Response.Header.Set("Access-Control-Allow-Origin", origin) + if string(ctx.Method()) == "OPTIONS" && string(ctx.Request.Header.Peek("Access-Control-Request-Method")) != "" { + ctx.Response.Header.Set("Access-Control-Allow-Headers", "Content-Type, Accept") + ctx.Response.Header.Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE") + } + } + return true +} +func (sh *serverHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) { + +} diff --git a/subscribe/redis.go b/subscribe/redis.go new file mode 100644 index 0000000..814f630 --- /dev/null +++ b/subscribe/redis.go @@ -0,0 +1,15 @@ +package subscribe + +import ( + "context" + + ofs_redis "git.loafle.net/overflow/overflow_subscriber/redis" + "github.com/garyburd/redigo/redis" +) + +func Subscribe(ctx context.Context, redisConn redis.Conn) { + s := ofs_redis.New(ctx, redisConn) + webS := newWebSubscriberHandler(ctx, "web") + + s.Subscribe(webS) +} diff --git a/subscribe/web_subscriber_handlers.go b/subscribe/web_subscriber_handlers.go new file mode 100644 index 0000000..c7bf52f --- /dev/null +++ b/subscribe/web_subscriber_handlers.go @@ -0,0 +1,31 @@ +package subscribe + +import ( + "context" + + "git.loafle.net/commons_go/logging" + ofs "git.loafle.net/overflow/overflow_subscriber" + "go.uber.org/zap" +) + +func newWebSubscriberHandler(ctx context.Context, channel string) ofs.SubscriberHandler { + h := &webSubscriberHandlers{ + ctx: ctx, + logger: logging.WithContext(ctx), + } + h.Channel = channel + + return h +} + +type webSubscriberHandlers struct { + ofs.SubscriberHandlers + ctx context.Context + logger *zap.Logger +} + +func (h *webSubscriberHandlers) OnSubscribe(payload string) { + h.logger.Info("Subscriber:Web", + zap.String("payload", payload), + ) +}