refactoring

This commit is contained in:
crusader 2017-09-05 16:48:19 +09:00
parent 7b05682732
commit 0cf3116314
12 changed files with 228 additions and 66 deletions

View File

@ -3,6 +3,9 @@
"addr": ":19090",
"tls": false
},
"auth": {
"signingKey": "tWB0lUXiCwX4U3qsJZcZ10mKvEH793RHkTJDbDuZVshQTk4uNB6ck59UQ96lhsRi4XNUiEnlIbP8XYQMPabeNtERX3iyHeDcwocgUVAor1nkAajYeq1gNyJszGpMhEOT"
},
"websocket": {
"HandshakeTimeout": 0,
"ReadBufferSize": 8192,
@ -80,8 +83,7 @@
"nameEncoder": "full"
},
"outputPaths": ["stdout", "/tmp/logs"],
"errorOutputPaths": ["stderr"],
"initialFields": {"foo": "bar"}
"errorOutputPaths": ["stderr"]
}
}

View File

@ -4,7 +4,6 @@ import:
- package: git.loafle.net/overflow/overflow_api_server
subpackages:
- golang
- package: git.loafle.net/overflow/overflow_grpc_pool
- package: git.loafle.net/overflow/overflow_subscriber
subpackages:
- redis
@ -21,3 +20,6 @@ import:
- package: git.loafle.net/overflow/overflow_gateway_websocket
- package: git.loafle.net/commons_go/config
- package: git.loafle.net/commons_go/redis_pool
- package: github.com/dgrijalva/jwt-go
version: v3.0.0
- package: git.loafle.net/commons_go/grpc_pool

30
grpc/client.go Normal file
View File

@ -0,0 +1,30 @@
package grpc
import (
"context"
"strings"
oas "git.loafle.net/overflow/overflow_api_server/golang"
)
func Exec(ctx context.Context, method string, params []string) (string, error) {
c, err := _pool.Get()
if nil != err {
}
defer _pool.Put(c)
sm := strings.Split(method, ".")
si := &oas.ServerInput{
Target: sm[0],
Method: sm[1],
Params: params,
}
so, err := c.(oas.OverflowApiServerClient).Exec(ctx, si)
if err != nil {
return "", err
}
return so.Result, nil
}

View File

@ -5,11 +5,14 @@ import (
"fmt"
"git.loafle.net/commons_go/config"
cgp "git.loafle.net/commons_go/grpc_pool"
"git.loafle.net/commons_go/logging"
ogp "git.loafle.net/overflow/overflow_grpc_pool"
)
func NewPool(ctx context.Context) ogp.Pool {
var _pool cgp.Pool
func InitializePool(ctx context.Context) {
var err error
h := &poolHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
@ -18,10 +21,8 @@ func NewPool(ctx context.Context) ogp.Pool {
h.MaxIdle = h.cfg.GetInt("pool.MaxIdle")
h.MaxCapacity = h.cfg.GetInt("pool.MaxCapacity")
p, err := ogp.New(ctx, h)
_pool, err = cgp.New(ctx, h)
if nil != err {
h.logger.Fatal(fmt.Sprintf("GRpc Pool: %v", err))
}
return p
}

View File

@ -6,13 +6,13 @@ import (
"google.golang.org/grpc"
"git.loafle.net/commons_go/config"
cgp "git.loafle.net/commons_go/grpc_pool"
oas "git.loafle.net/overflow/overflow_api_server/golang"
ogp "git.loafle.net/overflow/overflow_grpc_pool"
"go.uber.org/zap"
)
type poolHandlers struct {
ogp.PoolHandlers
cgp.PoolHandlers
ctx context.Context
logger *zap.Logger
cfg config.Configurator

View File

@ -2,23 +2,21 @@ package handler
import (
"context"
"strings"
"google.golang.org/grpc/metadata"
"git.loafle.net/commons_go/logging"
oas "git.loafle.net/overflow/overflow_api_server/golang"
"git.loafle.net/overflow/overflow_gateway_web/grpc"
"git.loafle.net/overflow/overflow_gateway_web/server"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc"
ogp "git.loafle.net/overflow/overflow_grpc_pool"
"go.uber.org/zap"
)
func newWebJSONRpcHandler(ctx context.Context, gPool ogp.Pool) ogw.ProtocolHandler {
func newWebJSONRpcHandler(ctx context.Context) ogw.ProtocolHandler {
h := &webJSONRpcHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
gPool: gPool,
}
p := jsonrpc.New(ctx, h)
@ -30,7 +28,6 @@ type webJSONRpcHandlers struct {
jsonrpc.JSONRpcHandlers
ctx context.Context
logger *zap.Logger
gPool ogp.Pool
}
func (h *webJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) {
@ -40,30 +37,17 @@ func (h *webJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []s
zap.Any("params", params),
)
c, err := h.gPool.Get()
if err != nil {
h.logger.Error("cannot retrive GRPC Client")
return nil, err
}
defer h.gPool.Put(c)
uid := server.GetUID(soc)
sm := strings.Split(method, ".")
si := &oas.ServerInput{
Target: sm[0],
Method: sm[1],
Params: params,
}
md := metadata.Pairs("email", "overflow@loafle.com")
md := metadata.Pairs("email", uid)
ctx := metadata.NewOutgoingContext(context.Background(), md)
so, err := c.(oas.OverflowApiServerClient).Exec(ctx, si)
r, err := grpc.Exec(ctx, method, params)
if err != nil {
return nil, err
}
return so.Result, nil
return r, nil
}
func (h *webJSONRpcHandlers) OnNotify(soc ogw.Socket, method string, params []string) error {

View File

@ -7,7 +7,6 @@ import (
"git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
ogp "git.loafle.net/overflow/overflow_grpc_pool"
"go.uber.org/zap"
)
@ -18,7 +17,7 @@ type webSocketHandlers struct {
cfg config.Configurator
}
func NewWebSocketHandler(ctx context.Context, gPool ogp.Pool) ogw.SocketHandler {
func NewWebSocketHandler(ctx context.Context) ogw.SocketHandler {
h := &webSocketHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
@ -32,7 +31,7 @@ func NewWebSocketHandler(ctx context.Context, gPool ogp.Pool) ogw.SocketHandler
h.PingPeriod = h.cfg.GetDuration("socket.PingPeriod") * time.Second
h.BinaryMessage = h.cfg.GetBool("socket.BinaryMessage")
ph := newWebJSONRpcHandler(ctx, gPool)
ph := newWebJSONRpcHandler(ctx)
h.Protocol = ph

View File

@ -26,9 +26,9 @@ func main() {
s := server.NewServer(ctx)
rp := redis.NewPool(ctx)
gp := grpc.NewPool(ctx)
grpc.InitializePool(ctx)
wsh := handler.NewWebSocketHandler(ctx, gp)
wsh := handler.NewWebSocketHandler(ctx)
subscribe.Subscribe(ctx, rp.Get())

View File

@ -2,25 +2,18 @@ package server
import (
"context"
"time"
"git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
var _s ogw.Server
func NewServer(ctx context.Context) ogw.Server {
h := &serverHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.cfg = config.Sub("websocket")
h.HandshakeTimeout = h.cfg.GetDuration("HandshakeTimeout") * time.Second
h.ReadBufferSize = h.cfg.GetInt("ReadBufferSize")
h.WriteBufferSize = h.cfg.GetInt("WriteBufferSize")
h.EnableCompression = h.cfg.GetBool("EnableCompression")
h := newServerHandler(ctx)
_s := ogw.NewServer(ctx, h)
s := ogw.NewServer(ctx, h)
ofSigningKey = []byte(config.GetString("auth.signingKey"))
return s
return _s
}

View File

@ -2,14 +2,34 @@ package server
import (
"context"
"fmt"
"time"
"git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
jwt "github.com/dgrijalva/jwt-go"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
var ofSigningKey []byte
func newServerHandler(ctx context.Context) ogw.ServerHandler {
h := &serverHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.cfg = config.Sub("websocket")
h.HandshakeTimeout = h.cfg.GetDuration("HandshakeTimeout") * time.Second
h.ReadBufferSize = h.cfg.GetInt("ReadBufferSize")
h.WriteBufferSize = h.cfg.GetInt("WriteBufferSize")
h.EnableCompression = h.cfg.GetBool("EnableCompression")
return h
}
type serverHandlers struct {
ogw.ServerHandlers
ctx context.Context
@ -17,20 +37,37 @@ type serverHandlers struct {
cfg config.Configurator
}
func (sh *serverHandlers) OnConnection(soc ogw.Socket) {
sh.logger.Info("connect",
zap.String("path", soc.Path()),
zap.String("id", soc.ID()),
)
func (h *serverHandlers) OnConnection(soc ogw.Socket) {
// tokenString := string(soc.Conn().Headers().Cookie("AuthToken"))
tokenString := "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJpc3MiOiJvdmVyRmxvdyIsImlhdCI6MTUwNDU5NTExOSwiZXhwIjoxNTM2MTMxMTE5LCJhdWQiOiJ3d3cub3ZlcmZsb3cuY2xvdWQiLCJzdWIiOiJvdmVyZmxvd0Bsb2FmbGUuY29tIn0.-WQi3OykPlJ9x8RcZGhWXEtGw4GhU6wmyJ_AWh2rMeUatQylfPzvmum2Xfp6pwKLMmcP76XoDPNyq06i7RKWNQ"
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
// Don't forget to validate the alg is what you expect:
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
}
// hmacSampleSecret is a []byte containing your secret, e.g. []byte("my_secret_key")
return ofSigningKey, nil
})
var uid string
if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
uid = claims["sub"].(string)
} else {
fmt.Println(err)
return
}
AddSocket(uid, soc)
}
func (sh *serverHandlers) OnDisconnected(soc ogw.Socket) {
sh.logger.Info("disconnect",
zap.String("path", soc.Path()),
zap.String("id", soc.ID()),
)
func (h *serverHandlers) OnDisconnected(soc ogw.Socket) {
RemoveSocket(soc)
}
func (sh *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool {
func (h *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool {
if origin := string(ctx.Request.Header.Peek("Origin")); origin != "" {
ctx.Response.Header.Set("Access-Control-Allow-Origin", origin)
if string(ctx.Method()) == "OPTIONS" && string(ctx.Request.Header.Peek("Access-Control-Request-Method")) != "" {
@ -40,6 +77,7 @@ func (sh *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool {
}
return true
}
func (sh *serverHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) {
func (h *serverHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) {
}

111
server/socket_manager.go Normal file
View File

@ -0,0 +1,111 @@
package server
import (
uch "git.loafle.net/commons_go/util/channel"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
type socketsChannelAction struct {
uch.Action
uid string
soc ogw.Socket
}
type SocketManager interface {
AddSocket(uid string, soc ogw.Socket)
RemoveSocket(soc ogw.Socket)
GetSocket(uid string) ogw.Socket
GetUID(soc ogw.Socket) string
}
type socketManager struct {
sockets map[string]ogw.Socket
socketsR map[ogw.Socket]string
socketsCh chan socketsChannelAction
}
var _m SocketManager
func init() {
_m = New()
}
func New() SocketManager {
m := &socketManager{
sockets: make(map[string]ogw.Socket),
socketsR: make(map[ogw.Socket]string),
socketsCh: make(chan socketsChannelAction),
}
go m.listenChannel()
return m
}
func AddSocket(uid string, soc ogw.Socket) { _m.AddSocket(uid, soc) }
func (m *socketManager) AddSocket(uid string, soc ogw.Socket) {
ca := socketsChannelAction{
uid: uid,
soc: soc,
}
ca.Type = uch.ActionTypeCreate
m.socketsCh <- ca
}
func RemoveSocket(soc ogw.Socket) { _m.RemoveSocket(soc) }
func (m *socketManager) RemoveSocket(soc ogw.Socket) {
ca := socketsChannelAction{
soc: soc,
}
ca.Type = uch.ActionTypeDelete
m.socketsCh <- ca
}
func GetSocket(uid string) ogw.Socket { return _m.GetSocket(uid) }
func (m *socketManager) GetSocket(uid string) ogw.Socket {
var ok bool
var soc ogw.Socket
if soc, ok = m.sockets[uid]; !ok {
return nil
}
return soc
}
func GetUID(soc ogw.Socket) string { return _m.GetUID(soc) }
func (m *socketManager) GetUID(soc ogw.Socket) string {
var ok bool
var uid string
if uid, ok = m.socketsR[soc]; !ok {
return ""
}
return uid
}
func (m *socketManager) _removeSocket(ca *socketsChannelAction) {
var uid string
var ok bool
soc := ca.soc
if uid, ok = m.socketsR[soc]; !ok {
return
}
delete(m.sockets, uid)
delete(m.socketsR, soc)
}
func (m *socketManager) listenChannel() {
for {
select {
case ca := <-m.socketsCh:
switch ca.Type {
case uch.ActionTypeCreate:
m.sockets[ca.uid] = ca.soc
m.socketsR[ca.soc] = ca.uid
break
case uch.ActionTypeDelete:
m._removeSocket(&ca)
break
}
}
}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_gateway_web/server"
ofs "git.loafle.net/overflow/overflow_subscriber"
"go.uber.org/zap"
)
@ -25,7 +26,8 @@ type webSubscriberHandlers struct {
}
func (h *webSubscriberHandlers) OnSubscribe(payload string) {
h.logger.Info("Subscriber:Web",
zap.String("payload", payload),
)
uid := "kdkdkd"
soc := server.GetSocket(uid)
soc.Write([]byte(payload))
}