Project has been created.

This commit is contained in:
crusader 2017-09-11 11:24:18 +09:00
commit b3f8909c67
26 changed files with 1127 additions and 0 deletions

68
.gitignore vendored Normal file
View File

@ -0,0 +1,68 @@
# Created by .ignore support plugin (hsz.mobi)
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/dictionaries
# Sensitive or high-churn files:
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.xml
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
# Gradle:
.idea/**/gradle.xml
.idea/**/libraries
# Mongo Explorer plugin:
.idea/**/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
### Go template
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
.idea/
*.iml
vendor/
glide.lock
.DS_Store
dist/
debug

32
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,32 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug",
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${workspaceRoot}/main.go",
"env": {},
"args": [],
"showLog": true
},
{
"name": "File Debug",
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${fileDirname}",
"env": {},
"args": [],
"showLog": true
}
]
}

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
// Place your settings in this file to overwrite default and user settings.
{
}

36
Dockerfile Normal file
View File

@ -0,0 +1,36 @@
FROM alpine:latest
ENV APP_VERSION=1.0.0 \
APP_HOME=/opt \
APP_NAME=overflow_gateway_probe
ENV APP_CONFIG_PATH="${APP_HOME}/config" \
APP_LOGS_PATH="${APP_HOME}/logs" \
PATH="$PATH:${APP_HOME}/bin"
ADD dist/${APP_NAME} ${APP_HOME}/bin/
ADD _docker/bin/*.sh ${APP_HOME}/bin/
RUN apk add --no-cache curl \
&& chmod +x ${APP_HOME}/bin/*.sh \
&& mkdir -p ${APP_LOGS_PATH}
ENV TINI_VERSION='0.15.0' \
TINI_SHA='4007655082f573603c02bc1d2137443c8e153af047ffd088d02ccc01e6f06170'
# Use tini as subreaper in Docker container to adopt zombie processes
RUN curl -fsSL https://github.com/krallin/tini/releases/download/v${TINI_VERSION}/tini-static-amd64 -o /bin/tini \
&& chmod +x /bin/tini \
&& echo "$TINI_SHA /bin/tini" | sha256sum -c -
VOLUME ${APP_CONFIG_PATH}
VOLUME ${APP_LOGS_PATH}
EXPOSE 80 443
ENTRYPOINT ["/bin/tini", "--"]
CMD ["docker-entrypoint.sh"]
# docker build -f Dockerfile -t docker.loafle.net/overflow/overflow_service_websocket:1.0.0-alpine .
# docker run -d -p 18081:80 -t docker.loafle.net/overflow/overflow_service_websocket:1.0.0-alpine

View File

@ -0,0 +1,7 @@
#!/bin/sh
set -e
/opt/bin/overflow_gateway_probe -config=/opt/config/
exec "$@"

View File

@ -0,0 +1,89 @@
{
"server": {
"addr": ":80",
"tls": false
},
"auth": {
"signingKey": "tWB0lUXiCwX4U3qsJZcZ10mKvEH793RHkTJDbDuZVshQTk4uNB6ck59UQ96lhsRi4XNUiEnlIbP8XYQMPabeNtERX3iyHeDcwocgUVAor1nkAajYeq1gNyJszGpMhEOT"
},
"websocket": {
"HandshakeTimeout": 0,
"ReadBufferSize": 8192,
"WriteBufferSize": 8192,
"EnableCompression": false
},
"grpc": {
"addr": "192.168.1.50:50006",
"tls": false,
"pool": {
"MaxIdle": 1,
"MaxCapacity": 3,
"increaseCapacity": 10
}
},
"redis": {
"network": "tcp",
"addr": "192.168.1.50:6379",
"tls": false,
"pool": {
"MaxIdle": 3,
"IdleTimeout": 240,
"increaseCapacity": 10
}
},
"handlers": {
"probe": {
"entry": "/probe",
"socket": {
"MaxMessageSize": 8192,
"WriteTimeout": 0,
"ReadTimeout": 0,
"PongTimeout": 60,
"PingTimeout": 10,
"PingPeriod": 10,
"BinaryMessage": false
}
},
"auth": {
"entry": "/auth",
"socket": {
"MaxMessageSize": 8192,
"WriteTimeout": 0,
"ReadTimeout": 0,
"PongTimeout": 60,
"PingTimeout": 10,
"PingPeriod": 10,
"BinaryMessage": false
}
}
},
"logging": {
"level": "debug",
"development": true,
"disableCaller": true,
"disableStacktrace": true,
"sampling": {
"initial": 100,
"thereafter": 100
},
"encoding": "console",
"encoderConfig": {
"messageKey": "message",
"levelKey": "level",
"timeKey": "time",
"nameKey": "name",
"callerKey": "caller",
"stacktraceKey": "stacktrace",
"lineEnding": "\n",
"levelEncoder": "color",
"timeEncoder": "ISO8601",
"durationEncoder": "string",
"callerEncoder": "full",
"nameEncoder": "full"
},
"outputPaths": ["/opt/logs/log"],
"errorOutputPaths": ["stderr"]
}
}

2
build.sh Executable file
View File

@ -0,0 +1,2 @@
rm ./dist/*
CGO_ENABLED=0 go build -a --installsuffix cgo --ldflags="-s" -o ./dist/overflow_gateway_probe

89
config.json Normal file
View File

@ -0,0 +1,89 @@
{
"server": {
"addr": ":19190",
"tls": false
},
"auth": {
"signingKey": "tWB0lUXiCwX4U3qsJZcZ10mKvEH793RHkTJDbDuZVshQTk4uNB6ck59UQ96lhsRi4XNUiEnlIbP8XYQMPabeNtERX3iyHeDcwocgUVAor1nkAajYeq1gNyJszGpMhEOT"
},
"websocket": {
"HandshakeTimeout": 0,
"ReadBufferSize": 8192,
"WriteBufferSize": 8192,
"EnableCompression": false
},
"grpc": {
"addr": "192.168.1.50:50006",
"tls": false,
"pool": {
"MaxIdle": 1,
"MaxCapacity": 3,
"increaseCapacity": 10
}
},
"redis": {
"network": "tcp",
"addr": "192.168.1.50:6379",
"tls": false,
"pool": {
"MaxIdle": 3,
"IdleTimeout": 240,
"increaseCapacity": 10
}
},
"handlers": {
"probe": {
"entry": "/probe",
"socket": {
"MaxMessageSize": 8192,
"WriteTimeout": 0,
"ReadTimeout": 0,
"PongTimeout": 60,
"PingTimeout": 10,
"PingPeriod": 10,
"BinaryMessage": false
}
},
"auth": {
"entry": "/auth",
"socket": {
"MaxMessageSize": 8192,
"WriteTimeout": 0,
"ReadTimeout": 0,
"PongTimeout": 60,
"PingTimeout": 10,
"PingPeriod": 10,
"BinaryMessage": false
}
}
},
"logging": {
"level": "debug",
"development": true,
"disableCaller": true,
"disableStacktrace": true,
"sampling": {
"initial": 100,
"thereafter": 100
},
"encoding": "console",
"encoderConfig": {
"messageKey": "message",
"levelKey": "level",
"timeKey": "time",
"nameKey": "name",
"callerKey": "caller",
"stacktraceKey": "stacktrace",
"lineEnding": "\n",
"levelEncoder": "color",
"timeEncoder": "ISO8601",
"durationEncoder": "string",
"callerEncoder": "full",
"nameEncoder": "full"
},
"outputPaths": ["stdout", "/tmp/logs"],
"errorOutputPaths": ["stderr"]
}
}

33
docker-compose.yml Normal file
View File

@ -0,0 +1,33 @@
version: "3"
services:
overflow_gateway_probe:
restart: always
image: docker.loafle.net/overflow/overflow_gateway_probe:1.0.0-SNAPSHOT
container_name: overflow_gateway_probe
volumes:
- /service/overflow_gateway_probe/data/opt/config:/opt/config
- /service/overflow_gateway_probe/data/opt/logs:/opt/logs
# - /home/crusader/Temp/docker/overflow_gateway_probe/config:/opt/config
# - /home/crusader/Temp/docker/overflow_gateway_probe/logs:/opt/logs
ports:
- "19090:80"
- "19443:443"
# postgresql:
# restart: always
# image: postgres:9.6-alpine
# container_name: overFlow-dao-postgres
# environment:
# - POSTGRES_DB=overflow
# - POSTGRES_USER=overflow
# - POSTGRES_PASSWORD=qwer5795
# # - POSTGRES_INITDB_ARGS="--data-checksums"
# ports:
# - "5432:5432"
# docker-compose up -d
# docker-compose stop
# docker-compose rm
# or
# docker-compose -f ./docker-compose.yml up -d

30
glide.yaml Normal file
View File

@ -0,0 +1,30 @@
package: git.loafle.net/overflow/overflow_gateway_probe
import:
- package: git.loafle.net/commons_go/config
- package: git.loafle.net/commons_go/grpc_pool
- package: git.loafle.net/commons_go/logging
- package: git.loafle.net/commons_go/redis_pool
- package: git.loafle.net/commons_go/util
subpackages:
- channel
- package: git.loafle.net/overflow/overflow_api_server
subpackages:
- golang
- package: git.loafle.net/overflow/overflow_gateway_web
subpackages:
- server
- package: git.loafle.net/overflow/overflow_gateway_websocket
subpackages:
- protocol/jsonrpc
- package: git.loafle.net/overflow/overflow_subscriber
subpackages:
- redis
- package: github.com/dgrijalva/jwt-go
- package: github.com/garyburd/redigo
subpackages:
- redis
- package: github.com/valyala/fasthttp
- package: go.uber.org/zap
- package: google.golang.org/grpc
subpackages:
- metadata

30
grpc/client.go Normal file
View File

@ -0,0 +1,30 @@
package grpc
import (
"context"
"strings"
oas "git.loafle.net/overflow/overflow_api_server/golang"
)
func Exec(ctx context.Context, method string, params []string) (string, error) {
c, err := _pool.Get()
if nil != err {
}
defer _pool.Put(c)
sm := strings.Split(method, ".")
si := &oas.ServerInput{
Target: sm[0],
Method: sm[1],
Params: params,
}
so, err := c.(oas.OverflowApiServerClient).Exec(ctx, si)
if err != nil {
return "", err
}
return so.Result, nil
}

28
grpc/pool.go Normal file
View File

@ -0,0 +1,28 @@
package grpc
import (
"context"
"fmt"
"git.loafle.net/commons_go/config"
cgp "git.loafle.net/commons_go/grpc_pool"
"git.loafle.net/commons_go/logging"
)
var _pool cgp.Pool
func InitializePool(ctx context.Context) {
var err error
h := &poolHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.cfg = config.Sub("grpc")
h.MaxIdle = h.cfg.GetInt("pool.MaxIdle")
h.MaxCapacity = h.cfg.GetInt("pool.MaxCapacity")
_pool, err = cgp.New(ctx, h)
if nil != err {
h.logger.Fatal(fmt.Sprintf("GRpc Pool: %v", err))
}
}

30
grpc/pool_handlers.go Normal file
View File

@ -0,0 +1,30 @@
package grpc
import (
"context"
"google.golang.org/grpc"
"git.loafle.net/commons_go/config"
cgp "git.loafle.net/commons_go/grpc_pool"
oas "git.loafle.net/overflow/overflow_api_server/golang"
"go.uber.org/zap"
)
type poolHandlers struct {
cgp.PoolHandlers
ctx context.Context
logger *zap.Logger
cfg config.Configurator
}
func (h *poolHandlers) OnCreate() (*grpc.ClientConn, interface{}, error) {
var err error
conn, err := grpc.Dial(config.GetString("grpc.addr"), grpc.WithInsecure())
if nil != err {
return nil, nil, err
}
c := oas.NewOverflowApiServerClient(conn)
return conn, c, nil
}

View File

@ -0,0 +1,56 @@
package handler
import (
"context"
"google.golang.org/grpc/metadata"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_gateway_probe/grpc"
"git.loafle.net/overflow/overflow_gateway_probe/server"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc"
"go.uber.org/zap"
)
func newAuthJSONRpcHandler(ctx context.Context) ogw.ProtocolHandler {
h := &authJSONRpcHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
p := jsonrpc.New(ctx, h)
return p
}
type authJSONRpcHandlers struct {
jsonrpc.JSONRpcHandlers
ctx context.Context
logger *zap.Logger
}
func (h *authJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) {
h.logger.Info("OnRequest",
zap.String("path", soc.Path()),
zap.String("method", method),
zap.Any("params", params),
)
uid := server.GetUID(soc.Path(), soc)
md := metadata.Pairs("email", uid)
ctx := metadata.NewOutgoingContext(context.Background(), md)
r, err := grpc.Exec(ctx, method, params)
if err != nil {
return nil, err
}
return r, nil
}
func (h *authJSONRpcHandlers) OnNotify(soc ogw.Socket, method string, params []string) error {
return nil
}

View File

@ -0,0 +1,39 @@
package handler
import (
"context"
"time"
"git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
"go.uber.org/zap"
)
type authSocketHandlers struct {
ogw.SocketHandlers
ctx context.Context
logger *zap.Logger
cfg config.Configurator
}
func NewAuthSocketHandler(ctx context.Context) ogw.SocketHandler {
h := &authSocketHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.cfg = config.Sub("handlers.auth")
h.MaxMessageSize = h.cfg.GetInt64("socket.MaxMessageSize")
h.WriteTimeout = h.cfg.GetDuration("socket.WriteTimeout") * time.Second
h.ReadTimeout = h.cfg.GetDuration("socket.ReadTimeout") * time.Second
h.PongTimeout = h.cfg.GetDuration("socket.PongTimeout") * time.Second
h.PingTimeout = h.cfg.GetDuration("socket.PingTimeout") * time.Second
h.PingPeriod = h.cfg.GetDuration("socket.PingPeriod") * time.Second
h.BinaryMessage = h.cfg.GetBool("socket.BinaryMessage")
ph := newAuthJSONRpcHandler(ctx)
h.Protocol = ph
return h
}

View File

@ -0,0 +1,56 @@
package handler
import (
"context"
"google.golang.org/grpc/metadata"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_gateway_probe/grpc"
"git.loafle.net/overflow/overflow_gateway_probe/server"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc"
"go.uber.org/zap"
)
func newProbeJSONRpcHandler(ctx context.Context) ogw.ProtocolHandler {
h := &probeJSONRpcHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
p := jsonrpc.New(ctx, h)
return p
}
type probeJSONRpcHandlers struct {
jsonrpc.JSONRpcHandlers
ctx context.Context
logger *zap.Logger
}
func (h *probeJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) {
h.logger.Info("OnRequest",
zap.String("path", soc.Path()),
zap.String("method", method),
zap.Any("params", params),
)
uid := server.GetUID(soc.Path(), soc)
md := metadata.Pairs("email", uid)
ctx := metadata.NewOutgoingContext(context.Background(), md)
r, err := grpc.Exec(ctx, method, params)
if err != nil {
return nil, err
}
return r, nil
}
func (h *probeJSONRpcHandlers) OnNotify(soc ogw.Socket, method string, params []string) error {
return nil
}

View File

@ -0,0 +1,39 @@
package handler
import (
"context"
"time"
"git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
"go.uber.org/zap"
)
type probeSocketHandlers struct {
ogw.SocketHandlers
ctx context.Context
logger *zap.Logger
cfg config.Configurator
}
func NewProbeSocketHandler(ctx context.Context) ogw.SocketHandler {
h := &probeSocketHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.cfg = config.Sub("handlers.probe")
h.MaxMessageSize = h.cfg.GetInt64("socket.MaxMessageSize")
h.WriteTimeout = h.cfg.GetDuration("socket.WriteTimeout") * time.Second
h.ReadTimeout = h.cfg.GetDuration("socket.ReadTimeout") * time.Second
h.PongTimeout = h.cfg.GetDuration("socket.PongTimeout") * time.Second
h.PingTimeout = h.cfg.GetDuration("socket.PingTimeout") * time.Second
h.PingPeriod = h.cfg.GetDuration("socket.PingPeriod") * time.Second
h.BinaryMessage = h.cfg.GetBool("socket.BinaryMessage")
ph := newProbeJSONRpcHandler(ctx)
h.Protocol = ph
return h
}

78
main.go Normal file
View File

@ -0,0 +1,78 @@
package main
import (
"context"
"encoding/json"
"flag"
"log"
"go.uber.org/zap"
"git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_gateway_probe/grpc"
"git.loafle.net/overflow/overflow_gateway_probe/handler"
"git.loafle.net/overflow/overflow_gateway_probe/redis"
"git.loafle.net/overflow/overflow_gateway_probe/server"
"git.loafle.net/overflow/overflow_gateway_probe/subscribe"
)
var logger *zap.Logger
func main() {
configPath := flag.String("config", ".", "The path of config file")
flag.Parse()
loadConfig(*configPath)
ctx := newContext()
defer logger.Sync()
s := server.NewServer(ctx)
rp := redis.NewPool(ctx)
grpc.InitializePool(ctx)
psh := handler.NewProbeSocketHandler(ctx)
ash := handler.NewAuthSocketHandler(ctx)
subscribe.Subscribe(ctx, rp.Get())
s.HandleSocket(config.GetString("handlers.probe.entry"), psh)
s.HandleSocket(config.GetString("handlers.auth.entry"), ash)
s.ListenAndServe(config.GetString("server.addr"))
}
func loadConfig(path string) {
config.SetConfigName("config")
config.AddConfigPath(path)
err := config.ReadInConfig()
if nil != err {
log.Fatalf("config error: %v", err)
}
}
func newContext() context.Context {
var err error
ctx := context.Background()
logConfig := config.Sub("logging")
buf, err := logConfig.Marshal("json")
if err != nil {
panic(err)
}
var cfg zap.Config
if err = json.Unmarshal(buf, &cfg); err != nil {
panic(err)
}
logger, err = cfg.Build()
if err != nil {
panic(err)
}
ctx = logging.NewContext(ctx, logger)
return ctx
}

24
redis/pool.go Normal file
View File

@ -0,0 +1,24 @@
package redis
import (
"context"
"time"
"git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
rp "git.loafle.net/commons_go/redis_pool"
)
func NewPool(ctx context.Context) rp.Pool {
h := &poolHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.cfg = config.Sub("redis")
h.MaxIdle = h.cfg.GetInt("pool.MaxIdle")
h.IdleTimeout = h.cfg.GetDuration("pool.IdleTimeout") * time.Second
p := rp.NewPool(ctx, h)
return p
}

27
redis/pool_handlers.go Normal file
View File

@ -0,0 +1,27 @@
package redis
import (
"context"
"time"
"github.com/garyburd/redigo/redis"
"git.loafle.net/commons_go/config"
rp "git.loafle.net/commons_go/redis_pool"
"go.uber.org/zap"
)
type poolHandlers struct {
rp.PoolHandlers
ctx context.Context
logger *zap.Logger
cfg config.Configurator
}
func (h *poolHandlers) Dial() (redis.Conn, error) {
return redis.Dial(h.cfg.GetString("network"), h.cfg.GetString("addr"))
}
func (h *poolHandlers) TestOnBorrow(c redis.Conn, t time.Time) error {
return nil
}

19
server/server.go Normal file
View File

@ -0,0 +1,19 @@
package server
import (
"context"
"git.loafle.net/commons_go/config"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
var _s ogw.Server
func NewServer(ctx context.Context) ogw.Server {
h := newServerHandler(ctx)
_s := ogw.NewServer(ctx, h)
ofSigningKey = []byte(config.GetString("auth.signingKey"))
return _s
}

84
server/server_handlers.go Normal file
View File

@ -0,0 +1,84 @@
package server
import (
"context"
"fmt"
"time"
"git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
jwt "github.com/dgrijalva/jwt-go"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
var ofSigningKey []byte
func newServerHandler(ctx context.Context) ogw.ServerHandler {
h := &serverHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.cfg = config.Sub("websocket")
h.HandshakeTimeout = h.cfg.GetDuration("HandshakeTimeout") * time.Second
h.ReadBufferSize = h.cfg.GetInt("ReadBufferSize")
h.WriteBufferSize = h.cfg.GetInt("WriteBufferSize")
h.EnableCompression = h.cfg.GetBool("EnableCompression")
return h
}
type serverHandlers struct {
ogw.ServerHandlers
ctx context.Context
logger *zap.Logger
cfg config.Configurator
}
func (h *serverHandlers) OnConnection(soc ogw.Socket) {
// tokenString := string(soc.Conn().Headers().Cookie("AuthToken"))
tokenString := "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJpc3MiOiJvdmVyRmxvdyIsImlhdCI6MTUwNDU5NTExOSwiZXhwIjoxNTM2MTMxMTE5LCJhdWQiOiJ3d3cub3ZlcmZsb3cuY2xvdWQiLCJzdWIiOiJvdmVyZmxvd0Bsb2FmbGUuY29tIn0.-WQi3OykPlJ9x8RcZGhWXEtGw4GhU6wmyJ_AWh2rMeUatQylfPzvmum2Xfp6pwKLMmcP76XoDPNyq06i7RKWNQ"
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
// Don't forget to validate the alg is what you expect:
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"])
}
// hmacSampleSecret is a []byte containing your secret, e.g. []byte("my_secret_key")
return ofSigningKey, nil
})
path := soc.Path()
var uid string
if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
uid = claims["sub"].(string)
} else {
fmt.Println(err)
return
}
AddSocket(path, uid, soc)
}
func (h *serverHandlers) OnDisconnected(soc ogw.Socket) {
path := soc.Path()
RemoveSocket(path, soc)
}
func (h *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool {
if origin := string(ctx.Request.Header.Peek("Origin")); origin != "" {
ctx.Response.Header.Set("Access-Control-Allow-Origin", origin)
if string(ctx.Method()) == "OPTIONS" && string(ctx.Request.Header.Peek("Access-Control-Request-Method")) != "" {
ctx.Response.Header.Set("Access-Control-Allow-Headers", "Content-Type, Accept")
ctx.Response.Header.Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE")
}
}
return true
}
func (h *serverHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) {
}

143
server/socket_manager.go Normal file
View File

@ -0,0 +1,143 @@
package server
import (
uch "git.loafle.net/commons_go/util/channel"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
type socketsChannelAction struct {
uch.Action
path string
uid string
soc ogw.Socket
}
type sockets struct {
s map[string]ogw.Socket
sr map[ogw.Socket]string
}
func (s *sockets) addSocket(uid string, soc ogw.Socket) {
s.s[uid] = soc
s.sr[soc] = uid
}
func (s *sockets) removeSocket(soc ogw.Socket) {
var uid string
var ok bool
if uid, ok = s.sr[soc]; !ok {
return
}
delete(s.s, uid)
delete(s.sr, soc)
}
func (s *sockets) getSocket(uid string) ogw.Socket {
return s.s[uid]
}
func (s *sockets) getUID(soc ogw.Socket) string {
return s.sr[soc]
}
type SocketManager interface {
AddSocket(path string, uid string, soc ogw.Socket)
RemoveSocket(path string, soc ogw.Socket)
GetSocket(path string, uid string) ogw.Socket
GetUID(path string, soc ogw.Socket) string
}
type socketManager struct {
sMap map[string]*sockets
sCh chan socketsChannelAction
}
var _m SocketManager
func init() {
_m = New()
}
func New() SocketManager {
m := &socketManager{
sMap: make(map[string]*sockets),
sCh: make(chan socketsChannelAction),
}
go m.listenChannel()
return m
}
func AddSocket(path string, uid string, soc ogw.Socket) { _m.AddSocket(path, uid, soc) }
func (m *socketManager) AddSocket(path string, uid string, soc ogw.Socket) {
ca := socketsChannelAction{
path: path,
uid: uid,
soc: soc,
}
ca.Type = uch.ActionTypeCreate
m.sCh <- ca
}
func RemoveSocket(path string, soc ogw.Socket) { _m.RemoveSocket(path, soc) }
func (m *socketManager) RemoveSocket(path string, soc ogw.Socket) {
ca := socketsChannelAction{
path: path,
soc: soc,
}
ca.Type = uch.ActionTypeDelete
m.sCh <- ca
}
func GetSocket(path string, uid string) ogw.Socket { return _m.GetSocket(path, uid) }
func (m *socketManager) GetSocket(path string, uid string) ogw.Socket {
var s *sockets
var ok bool
if s, ok = m.sMap[path]; !ok {
return nil
}
return s.getSocket(uid)
}
func GetUID(path string, soc ogw.Socket) string { return _m.GetUID(path, soc) }
func (m *socketManager) GetUID(path string, soc ogw.Socket) string {
var s *sockets
var ok bool
if s, ok = m.sMap[path]; !ok {
return ""
}
return s.getUID(soc)
}
func (m *socketManager) listenChannel() {
for {
select {
case ca := <-m.sCh:
switch ca.Type {
case uch.ActionTypeCreate:
var s *sockets
var ok bool
if s, ok = m.sMap[ca.path]; !ok {
s = &sockets{
s: make(map[string]ogw.Socket),
sr: make(map[ogw.Socket]string),
}
m.sMap[ca.path] = s
}
s.addSocket(ca.uid, ca.soc)
break
case uch.ActionTypeDelete:
var s *sockets
var ok bool
if s, ok = m.sMap[ca.path]; !ok {
return
}
s.removeSocket(ca.soc)
break
}
}
}
}

View File

@ -0,0 +1,34 @@
package subscribe
import (
"context"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_gateway_probe/server"
ofs "git.loafle.net/overflow/overflow_subscriber"
"go.uber.org/zap"
)
func newAuthSubscriberHandler(ctx context.Context, channel string) ofs.SubscriberHandler {
h := &authSubscriberHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.Channel = channel
return h
}
type authSubscriberHandlers struct {
ofs.SubscriberHandlers
ctx context.Context
logger *zap.Logger
}
func (h *authSubscriberHandlers) OnSubscribe(payload string) {
uid := "kdkdkd"
channel := "auth"
soc := server.GetSocket(channel, uid)
soc.Write([]byte(payload))
}

View File

@ -0,0 +1,34 @@
package subscribe
import (
"context"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_gateway_probe/server"
ofs "git.loafle.net/overflow/overflow_subscriber"
"go.uber.org/zap"
)
func newProbeSubscriberHandler(ctx context.Context, channel string) ofs.SubscriberHandler {
h := &probeSubscriberHandlers{
ctx: ctx,
logger: logging.WithContext(ctx),
}
h.Channel = channel
return h
}
type probeSubscriberHandlers struct {
ofs.SubscriberHandlers
ctx context.Context
logger *zap.Logger
}
func (h *probeSubscriberHandlers) OnSubscribe(payload string) {
uid := "kdkdkd"
channel := "auth"
soc := server.GetSocket(channel, uid)
soc.Write([]byte(payload))
}

17
subscribe/redis.go Normal file
View File

@ -0,0 +1,17 @@
package subscribe
import (
"context"
ofs_redis "git.loafle.net/overflow/overflow_subscriber/redis"
"github.com/garyburd/redigo/redis"
)
func Subscribe(ctx context.Context, redisConn redis.Conn) {
s := ofs_redis.New(ctx, redisConn)
probeS := newProbeSubscriberHandler(ctx, "probe")
authS := newAuthSubscriberHandler(ctx, "auth")
s.Subscribe(probeS)
s.Subscribe(authS)
}