diff --git a/config.json b/config.json index 9b37592..348c466 100644 --- a/config.json +++ b/config.json @@ -3,6 +3,9 @@ "addr": ":19090", "tls": false }, + "auth": { + "signingKey": "tWB0lUXiCwX4U3qsJZcZ10mKvEH793RHkTJDbDuZVshQTk4uNB6ck59UQ96lhsRi4XNUiEnlIbP8XYQMPabeNtERX3iyHeDcwocgUVAor1nkAajYeq1gNyJszGpMhEOT" + }, "websocket": { "HandshakeTimeout": 0, "ReadBufferSize": 8192, @@ -80,8 +83,7 @@ "nameEncoder": "full" }, "outputPaths": ["stdout", "/tmp/logs"], - "errorOutputPaths": ["stderr"], - "initialFields": {"foo": "bar"} + "errorOutputPaths": ["stderr"] } } diff --git a/glide.yaml b/glide.yaml index b787f30..3ca970a 100644 --- a/glide.yaml +++ b/glide.yaml @@ -4,7 +4,6 @@ import: - package: git.loafle.net/overflow/overflow_api_server subpackages: - golang -- package: git.loafle.net/overflow/overflow_grpc_pool - package: git.loafle.net/overflow/overflow_subscriber subpackages: - redis @@ -21,3 +20,6 @@ import: - package: git.loafle.net/overflow/overflow_gateway_websocket - package: git.loafle.net/commons_go/config - package: git.loafle.net/commons_go/redis_pool +- package: github.com/dgrijalva/jwt-go + version: v3.0.0 +- package: git.loafle.net/commons_go/grpc_pool diff --git a/grpc/client.go b/grpc/client.go new file mode 100644 index 0000000..17362f6 --- /dev/null +++ b/grpc/client.go @@ -0,0 +1,30 @@ +package grpc + +import ( + "context" + "strings" + + oas "git.loafle.net/overflow/overflow_api_server/golang" +) + +func Exec(ctx context.Context, method string, params []string) (string, error) { + c, err := _pool.Get() + if nil != err { + + } + defer _pool.Put(c) + sm := strings.Split(method, ".") + + si := &oas.ServerInput{ + Target: sm[0], + Method: sm[1], + Params: params, + } + + so, err := c.(oas.OverflowApiServerClient).Exec(ctx, si) + if err != nil { + return "", err + } + + return so.Result, nil +} diff --git a/grpc/pool.go b/grpc/pool.go index 4617d5a..0eb539c 100644 --- a/grpc/pool.go +++ b/grpc/pool.go @@ -5,11 +5,14 @@ import ( "fmt" "git.loafle.net/commons_go/config" + cgp "git.loafle.net/commons_go/grpc_pool" "git.loafle.net/commons_go/logging" - ogp "git.loafle.net/overflow/overflow_grpc_pool" ) -func NewPool(ctx context.Context) ogp.Pool { +var _pool cgp.Pool + +func InitializePool(ctx context.Context) { + var err error h := &poolHandlers{ ctx: ctx, logger: logging.WithContext(ctx), @@ -18,10 +21,8 @@ func NewPool(ctx context.Context) ogp.Pool { h.MaxIdle = h.cfg.GetInt("pool.MaxIdle") h.MaxCapacity = h.cfg.GetInt("pool.MaxCapacity") - p, err := ogp.New(ctx, h) + _pool, err = cgp.New(ctx, h) if nil != err { h.logger.Fatal(fmt.Sprintf("GRpc Pool: %v", err)) } - - return p } diff --git a/grpc/pool_handlers.go b/grpc/pool_handlers.go index 741bb94..7d39c48 100644 --- a/grpc/pool_handlers.go +++ b/grpc/pool_handlers.go @@ -6,13 +6,13 @@ import ( "google.golang.org/grpc" "git.loafle.net/commons_go/config" + cgp "git.loafle.net/commons_go/grpc_pool" oas "git.loafle.net/overflow/overflow_api_server/golang" - ogp "git.loafle.net/overflow/overflow_grpc_pool" "go.uber.org/zap" ) type poolHandlers struct { - ogp.PoolHandlers + cgp.PoolHandlers ctx context.Context logger *zap.Logger cfg config.Configurator diff --git a/handler/web_json_handlers.go b/handler/web_json_handlers.go index d217db9..03d9b17 100644 --- a/handler/web_json_handlers.go +++ b/handler/web_json_handlers.go @@ -2,23 +2,21 @@ package handler import ( "context" - "strings" "google.golang.org/grpc/metadata" "git.loafle.net/commons_go/logging" - oas "git.loafle.net/overflow/overflow_api_server/golang" + "git.loafle.net/overflow/overflow_gateway_web/grpc" + "git.loafle.net/overflow/overflow_gateway_web/server" ogw "git.loafle.net/overflow/overflow_gateway_websocket" "git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc" - ogp "git.loafle.net/overflow/overflow_grpc_pool" "go.uber.org/zap" ) -func newWebJSONRpcHandler(ctx context.Context, gPool ogp.Pool) ogw.ProtocolHandler { +func newWebJSONRpcHandler(ctx context.Context) ogw.ProtocolHandler { h := &webJSONRpcHandlers{ ctx: ctx, logger: logging.WithContext(ctx), - gPool: gPool, } p := jsonrpc.New(ctx, h) @@ -30,7 +28,6 @@ type webJSONRpcHandlers struct { jsonrpc.JSONRpcHandlers ctx context.Context logger *zap.Logger - gPool ogp.Pool } func (h *webJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) { @@ -40,30 +37,17 @@ func (h *webJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []s zap.Any("params", params), ) - c, err := h.gPool.Get() - if err != nil { - h.logger.Error("cannot retrive GRPC Client") - return nil, err - } - defer h.gPool.Put(c) + uid := server.GetUID(soc) - sm := strings.Split(method, ".") - - si := &oas.ServerInput{ - Target: sm[0], - Method: sm[1], - Params: params, - } - - md := metadata.Pairs("email", "overflow@loafle.com") + md := metadata.Pairs("email", uid) ctx := metadata.NewOutgoingContext(context.Background(), md) - so, err := c.(oas.OverflowApiServerClient).Exec(ctx, si) + r, err := grpc.Exec(ctx, method, params) if err != nil { return nil, err } - return so.Result, nil + return r, nil } func (h *webJSONRpcHandlers) OnNotify(soc ogw.Socket, method string, params []string) error { diff --git a/handler/web_socket_handlers.go b/handler/web_socket_handlers.go index 2bc0629..00d08a8 100644 --- a/handler/web_socket_handlers.go +++ b/handler/web_socket_handlers.go @@ -7,7 +7,6 @@ import ( "git.loafle.net/commons_go/config" "git.loafle.net/commons_go/logging" ogw "git.loafle.net/overflow/overflow_gateway_websocket" - ogp "git.loafle.net/overflow/overflow_grpc_pool" "go.uber.org/zap" ) @@ -18,7 +17,7 @@ type webSocketHandlers struct { cfg config.Configurator } -func NewWebSocketHandler(ctx context.Context, gPool ogp.Pool) ogw.SocketHandler { +func NewWebSocketHandler(ctx context.Context) ogw.SocketHandler { h := &webSocketHandlers{ ctx: ctx, logger: logging.WithContext(ctx), @@ -32,7 +31,7 @@ func NewWebSocketHandler(ctx context.Context, gPool ogp.Pool) ogw.SocketHandler h.PingPeriod = h.cfg.GetDuration("socket.PingPeriod") * time.Second h.BinaryMessage = h.cfg.GetBool("socket.BinaryMessage") - ph := newWebJSONRpcHandler(ctx, gPool) + ph := newWebJSONRpcHandler(ctx) h.Protocol = ph diff --git a/main.go b/main.go index 97e648e..d49027d 100644 --- a/main.go +++ b/main.go @@ -26,9 +26,9 @@ func main() { s := server.NewServer(ctx) rp := redis.NewPool(ctx) - gp := grpc.NewPool(ctx) + grpc.InitializePool(ctx) - wsh := handler.NewWebSocketHandler(ctx, gp) + wsh := handler.NewWebSocketHandler(ctx) subscribe.Subscribe(ctx, rp.Get()) diff --git a/server/server.go b/server/server.go index c50d5b3..c2d7f52 100644 --- a/server/server.go +++ b/server/server.go @@ -2,25 +2,18 @@ package server import ( "context" - "time" "git.loafle.net/commons_go/config" - "git.loafle.net/commons_go/logging" ogw "git.loafle.net/overflow/overflow_gateway_websocket" ) +var _s ogw.Server + func NewServer(ctx context.Context) ogw.Server { - h := &serverHandlers{ - ctx: ctx, - logger: logging.WithContext(ctx), - } - h.cfg = config.Sub("websocket") - h.HandshakeTimeout = h.cfg.GetDuration("HandshakeTimeout") * time.Second - h.ReadBufferSize = h.cfg.GetInt("ReadBufferSize") - h.WriteBufferSize = h.cfg.GetInt("WriteBufferSize") - h.EnableCompression = h.cfg.GetBool("EnableCompression") + h := newServerHandler(ctx) + _s := ogw.NewServer(ctx, h) - s := ogw.NewServer(ctx, h) + ofSigningKey = []byte(config.GetString("auth.signingKey")) - return s + return _s } diff --git a/server/server_handlers.go b/server/server_handlers.go index 8cd834c..544bdee 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -2,14 +2,34 @@ package server import ( "context" + "fmt" + "time" "git.loafle.net/commons_go/config" + "git.loafle.net/commons_go/logging" + jwt "github.com/dgrijalva/jwt-go" "github.com/valyala/fasthttp" "go.uber.org/zap" ogw "git.loafle.net/overflow/overflow_gateway_websocket" ) +var ofSigningKey []byte + +func newServerHandler(ctx context.Context) ogw.ServerHandler { + h := &serverHandlers{ + ctx: ctx, + logger: logging.WithContext(ctx), + } + h.cfg = config.Sub("websocket") + h.HandshakeTimeout = h.cfg.GetDuration("HandshakeTimeout") * time.Second + h.ReadBufferSize = h.cfg.GetInt("ReadBufferSize") + h.WriteBufferSize = h.cfg.GetInt("WriteBufferSize") + h.EnableCompression = h.cfg.GetBool("EnableCompression") + + return h +} + type serverHandlers struct { ogw.ServerHandlers ctx context.Context @@ -17,20 +37,37 @@ type serverHandlers struct { cfg config.Configurator } -func (sh *serverHandlers) OnConnection(soc ogw.Socket) { - sh.logger.Info("connect", - zap.String("path", soc.Path()), - zap.String("id", soc.ID()), - ) +func (h *serverHandlers) OnConnection(soc ogw.Socket) { + // tokenString := string(soc.Conn().Headers().Cookie("AuthToken")) + tokenString := "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJpc3MiOiJvdmVyRmxvdyIsImlhdCI6MTUwNDU5NTExOSwiZXhwIjoxNTM2MTMxMTE5LCJhdWQiOiJ3d3cub3ZlcmZsb3cuY2xvdWQiLCJzdWIiOiJvdmVyZmxvd0Bsb2FmbGUuY29tIn0.-WQi3OykPlJ9x8RcZGhWXEtGw4GhU6wmyJ_AWh2rMeUatQylfPzvmum2Xfp6pwKLMmcP76XoDPNyq06i7RKWNQ" + token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) { + // Don't forget to validate the alg is what you expect: + if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"]) + } + + // hmacSampleSecret is a []byte containing your secret, e.g. []byte("my_secret_key") + return ofSigningKey, nil + }) + + var uid string + if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid { + uid = claims["sub"].(string) + } else { + fmt.Println(err) + return + } + + AddSocket(uid, soc) + } -func (sh *serverHandlers) OnDisconnected(soc ogw.Socket) { - sh.logger.Info("disconnect", - zap.String("path", soc.Path()), - zap.String("id", soc.ID()), - ) +func (h *serverHandlers) OnDisconnected(soc ogw.Socket) { + + RemoveSocket(soc) } -func (sh *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool { + +func (h *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool { if origin := string(ctx.Request.Header.Peek("Origin")); origin != "" { ctx.Response.Header.Set("Access-Control-Allow-Origin", origin) if string(ctx.Method()) == "OPTIONS" && string(ctx.Request.Header.Peek("Access-Control-Request-Method")) != "" { @@ -40,6 +77,7 @@ func (sh *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool { } return true } -func (sh *serverHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) { + +func (h *serverHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) { } diff --git a/server/socket_manager.go b/server/socket_manager.go new file mode 100644 index 0000000..760516e --- /dev/null +++ b/server/socket_manager.go @@ -0,0 +1,111 @@ +package server + +import ( + uch "git.loafle.net/commons_go/util/channel" + ogw "git.loafle.net/overflow/overflow_gateway_websocket" +) + +type socketsChannelAction struct { + uch.Action + uid string + soc ogw.Socket +} + +type SocketManager interface { + AddSocket(uid string, soc ogw.Socket) + RemoveSocket(soc ogw.Socket) + GetSocket(uid string) ogw.Socket + GetUID(soc ogw.Socket) string +} + +type socketManager struct { + sockets map[string]ogw.Socket + socketsR map[ogw.Socket]string + socketsCh chan socketsChannelAction +} + +var _m SocketManager + +func init() { + _m = New() +} + +func New() SocketManager { + m := &socketManager{ + sockets: make(map[string]ogw.Socket), + socketsR: make(map[ogw.Socket]string), + socketsCh: make(chan socketsChannelAction), + } + + go m.listenChannel() + + return m +} + +func AddSocket(uid string, soc ogw.Socket) { _m.AddSocket(uid, soc) } +func (m *socketManager) AddSocket(uid string, soc ogw.Socket) { + ca := socketsChannelAction{ + uid: uid, + soc: soc, + } + ca.Type = uch.ActionTypeCreate + m.socketsCh <- ca +} + +func RemoveSocket(soc ogw.Socket) { _m.RemoveSocket(soc) } +func (m *socketManager) RemoveSocket(soc ogw.Socket) { + ca := socketsChannelAction{ + soc: soc, + } + ca.Type = uch.ActionTypeDelete + m.socketsCh <- ca +} + +func GetSocket(uid string) ogw.Socket { return _m.GetSocket(uid) } +func (m *socketManager) GetSocket(uid string) ogw.Socket { + var ok bool + var soc ogw.Socket + if soc, ok = m.sockets[uid]; !ok { + return nil + } + return soc +} + +func GetUID(soc ogw.Socket) string { return _m.GetUID(soc) } +func (m *socketManager) GetUID(soc ogw.Socket) string { + var ok bool + var uid string + if uid, ok = m.socketsR[soc]; !ok { + return "" + } + return uid +} + +func (m *socketManager) _removeSocket(ca *socketsChannelAction) { + var uid string + var ok bool + soc := ca.soc + if uid, ok = m.socketsR[soc]; !ok { + return + } + delete(m.sockets, uid) + delete(m.socketsR, soc) +} + +func (m *socketManager) listenChannel() { + for { + select { + case ca := <-m.socketsCh: + switch ca.Type { + case uch.ActionTypeCreate: + m.sockets[ca.uid] = ca.soc + m.socketsR[ca.soc] = ca.uid + + break + case uch.ActionTypeDelete: + m._removeSocket(&ca) + break + } + } + } +} diff --git a/subscribe/web_subscriber_handlers.go b/subscribe/web_subscriber_handlers.go index c7bf52f..3320c72 100644 --- a/subscribe/web_subscriber_handlers.go +++ b/subscribe/web_subscriber_handlers.go @@ -4,6 +4,7 @@ import ( "context" "git.loafle.net/commons_go/logging" + "git.loafle.net/overflow/overflow_gateway_web/server" ofs "git.loafle.net/overflow/overflow_subscriber" "go.uber.org/zap" ) @@ -25,7 +26,8 @@ type webSubscriberHandlers struct { } func (h *webSubscriberHandlers) OnSubscribe(payload string) { - h.logger.Info("Subscriber:Web", - zap.String("payload", payload), - ) + uid := "kdkdkd" + soc := server.GetSocket(uid) + + soc.Write([]byte(payload)) }