This commit is contained in:
crusader 2017-11-10 22:24:10 +09:00
commit 766cf34e71
29 changed files with 750 additions and 0 deletions

68
.gitignore vendored Normal file
View File

@ -0,0 +1,68 @@
# Created by .ignore support plugin (hsz.mobi)
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/dictionaries
# Sensitive or high-churn files:
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.xml
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
# Gradle:
.idea/**/gradle.xml
.idea/**/libraries
# Mongo Explorer plugin:
.idea/**/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
### Go template
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
.idea/
*.iml
vendor/
glide.lock
.DS_Store
dist/
debug

32
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,32 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug",
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${workspaceRoot}/main.go",
"env": {},
"args": [],
"showLog": true
},
{
"name": "File Debug",
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${fileDirname}",
"env": {},
"args": [],
"showLog": true
}
]
}

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
// Place your settings in this file to overwrite default and user settings.
{
}

5
config/auth.go Normal file
View File

@ -0,0 +1,5 @@
package config
type Auth struct {
SigningKey string `json:"signingKey" yaml:"signingKey" toml:"signingKey"`
}

12
config/config.go Normal file
View File

@ -0,0 +1,12 @@
package config
var Config GatewayConfig
type GatewayConfig struct {
Server Server `json:"server" yaml:"server" toml:"server"`
Auth Auth `json:"auth" yaml:"auth" toml:"auth"`
Websocket Websocket `json:"websocket" yaml:"websocket" toml:"websocket"`
GRPC GRPC `json:"gRPC" yaml:"gRPC" toml:"gRPC"`
Redis Redis `json:"redis" yaml:"redis" toml:"redis"`
Handlers map[string]Handler `json:"handlers" yaml:"handlers" toml:"handlers"`
}

6
config/grpc.go Normal file
View File

@ -0,0 +1,6 @@
package config
type GRPC struct {
Server
Pool Pool `json:"pool" yaml:"pool" toml:"pool"`
}

6
config/handler.go Normal file
View File

@ -0,0 +1,6 @@
package config
type Handler struct {
Entry string `json:"entry" yaml:"entry" toml:"entry"`
Socket Socket `json:"socket" yaml:"socket" toml:"socket"`
}

10
config/pool.go Normal file
View File

@ -0,0 +1,10 @@
package config
import "time"
type Pool struct {
MaxCapacity int `json:"maxCapacity" yaml:"maxCapacity" toml:"maxCapacity"`
MaxIdle int `json:"maxIdle" yaml:"maxIdle" toml:"maxIdle"`
IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" toml:"idleTimeout"`
Wait bool `json:"wait" yaml:"wait" toml:"wait"`
}

6
config/redis.go Normal file
View File

@ -0,0 +1,6 @@
package config
type Redis struct {
Server
Pool Pool `json:"pool" yaml:"pool" toml:"pool"`
}

12
config/server.go Normal file
View File

@ -0,0 +1,12 @@
package config
import "time"
type Server struct {
Name string `json:"name" yaml:"name" toml:"name"`
Network string `json:"network" yaml:"network" toml:"network"`
Addr string `json:"addr" yaml:"addr" toml:"addr"`
TLS bool `json:"tls" yaml:"tls" toml:"tls"`
Concurrency int `json:"concurrency" yaml:"concurrency" toml:"concurrency"`
MaxStopWaitTime time.Duration `json:"maxStopWaitTime" yaml:"maxStopWaitTime" toml:"maxStopWaitTime"`
}

13
config/socket.go Normal file
View File

@ -0,0 +1,13 @@
package config
import "time"
type Socket struct {
MaxMessageSize int64 `json:"maxMessageSize" yaml:"maxMessageSize" toml:"maxMessageSize"`
WriteTimeout time.Duration `json:"writeTimeout" yaml:"writeTimeout" toml:"writeTimeout"`
ReadTimeout time.Duration `json:"readTimeout" yaml:"readTimeout" toml:"readTimeout"`
PongTimeout time.Duration `json:"pongTimeout" yaml:"pongTimeout" toml:"pongTimeout"`
PingTimeout time.Duration `json:"pingTimeout" yaml:"pingTimeout" toml:"pingTimeout"`
PingPeriod time.Duration `json:"pingPeriod" yaml:"pingPeriod" toml:"pingPeriod"`
BinaryMessage bool `json:"binaryMessage" yaml:"binaryMessage" toml:"binaryMessage"`
}

10
config/websocket.go Normal file
View File

@ -0,0 +1,10 @@
package config
import "time"
type Websocket struct {
HandshakeTimeout time.Duration `json:"handshakeTimeout" yaml:"handshakeTimeout" toml:"handshakeTimeout"`
ReadBufferSize int `json:"readBufferSize" yaml:"readBufferSize" toml:"readBufferSize"`
WriteBufferSize int `json:"writeBufferSize" yaml:"writeBufferSize" toml:"writeBufferSize"`
EnableCompression bool `json:"enableCompression" yaml:"enableCompression" toml:"enableCompression"`
}

15
constants.go Normal file
View File

@ -0,0 +1,15 @@
package overflow_gateway_websocket
import (
"git.loafle.net/commons_go/util"
)
var (
ServletSocketKey = util.ContextKey("ServletSocket")
ServletContentTypeKey = util.ContextKey("ServletContentType")
)
const (
ConfigPathFlagName = "config-dir"
ConfigFileName = "config.json"
)

16
external/external.go vendored Normal file
View File

@ -0,0 +1,16 @@
package external
import (
"git.loafle.net/overflow/overflow_gateway_app/external/grpc"
"git.loafle.net/overflow/overflow_gateway_app/external/redis"
)
func ExternalInit() {
grpc.ExternalInit()
redis.ExternalInit()
}
func ExternalDestroy() {
grpc.ExternalDestroy()
redis.ExternalDestroy()
}

35
external/grpc/client.go vendored Normal file
View File

@ -0,0 +1,35 @@
package grpc
import (
"context"
"fmt"
"strings"
ooas "git.loafle.net/overflow/overflow_api_server/golang"
)
func Exec(ctx context.Context, method string, params []string) (string, error) {
if nil == grpcPool {
return "", fmt.Errorf("App: GRPC Pool is not initialized")
}
var client interface{}
var err error
if client, err = grpcPool.Get(); nil != err {
return "", err
}
defer grpcPool.Put(client)
sm := strings.Split(method, ".")
si := &ooas.ServerInput{
Target: sm[0],
Method: sm[1],
Params: params,
}
var so *ooas.ServerOutput
if so, err = client.(ooas.OverflowApiServerClient).Exec(ctx, si); nil != err {
return "", err
}
return so.Result, nil
}

49
external/grpc/pool.go vendored Normal file
View File

@ -0,0 +1,49 @@
package grpc
import (
"fmt"
cgp "git.loafle.net/commons_go/grpc_pool"
"git.loafle.net/commons_go/logging"
ooas "git.loafle.net/overflow/overflow_api_server/golang"
"git.loafle.net/overflow/overflow_gateway_app/conf"
"google.golang.org/grpc"
)
var grpcPool cgp.Pool
func ExternalInit() {
ph := &grpcPoolHandlers{}
ph.MaxCapacity = conf.Config.GRPC.Pool.MaxCapacity
ph.MaxIdle = conf.Config.GRPC.Pool.MaxIdle
ph.IdleTimeout = conf.Config.GRPC.Pool.IdleTimeout
ph.Wait = conf.Config.GRPC.Pool.Wait
grpcPool = cgp.New(ph)
if err := grpcPool.Start(); nil != err {
logging.Logger().Panic(fmt.Sprintf("App: %v", err))
return
}
}
func ExternalDestroy() {
if nil != grpcPool {
grpcPool.Stop()
}
}
type grpcPoolHandlers struct {
cgp.PoolHandlers
}
func (h *grpcPoolHandlers) Dial() (*grpc.ClientConn, interface{}, error) {
var err error
conn, err := grpc.Dial(conf.Config.GRPC.Addr, grpc.WithInsecure())
if nil != err {
return nil, nil, err
}
c := ooas.NewOverflowApiServerClient(conn)
return conn, c, nil
}

42
external/redis/pool.go vendored Normal file
View File

@ -0,0 +1,42 @@
package redis
import (
"fmt"
"git.loafle.net/overflow/overflow_gateway_websocket/config"
"git.loafle.net/commons_go/logging"
crp "git.loafle.net/commons_go/redis_pool"
"github.com/garyburd/redigo/redis"
)
var RedisPool crp.Pool
func ExternalInit() {
ph := &redisPoolHandlers{}
ph.MaxCapacity = config.Config.Redis.Pool.MaxCapacity
ph.MaxIdle = config.Config.Redis.Pool.MaxIdle
ph.IdleTimeout = config.Config.Redis.Pool.IdleTimeout
ph.Wait = config.Config.Redis.Pool.Wait
RedisPool = crp.New(ph)
if err := RedisPool.Start(); nil != err {
logging.Logger().Panic(fmt.Sprintf("App: %v", err))
return
}
}
func ExternalDestroy() {
if nil != RedisPool {
RedisPool.Stop()
}
}
type redisPoolHandlers struct {
crp.PoolHandlers
}
func (ph *redisPoolHandlers) Dial() (redis.Conn, error) {
return redis.Dial(config.Config.Redis.Network, config.Config.Redis.Addr)
}

19
gateway_websocket.go Normal file
View File

@ -0,0 +1,19 @@
package overflow_gateway_websocket
import (
"flag"
cc "git.loafle.net/commons_go/config"
"git.loafle.net/overflow/overflow_gateway_websocket/config"
)
func init() {
configPath := flag.String(ConfigPathFlagName, "./", "The path of config file")
flag.Parse()
cc.SetConfigPath(*configPath)
if err := cc.Load(&config.Config, ConfigFileName); nil != err {
panic(err)
}
}

19
glide.yaml Normal file
View File

@ -0,0 +1,19 @@
package: git.loafle.net/overflow/overflow_gateway_websocket
import:
- package: git.loafle.net/commons_go/grpc_pool
- package: git.loafle.net/commons_go/logging
- package: git.loafle.net/commons_go/redis_pool
- package: git.loafle.net/overflow/overflow_api_server
subpackages:
- golang
- package: git.loafle.net/overflow/overflow_gateway_app
subpackages:
- conf
- external/grpc
- external/redis
- package: github.com/garyburd/redigo
subpackages:
- redis
- package: google.golang.org/grpc
- package: github.com/valyala/fasthttp
version: v20160617

View File

@ -0,0 +1,101 @@
package server
import (
"context"
"log"
"github.com/valyala/fasthttp"
"git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/server"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
type RPCGatewaySocketHandlers struct {
cwf.SocketHandlers
}
func (sh *RPCGatewaySocketHandlers) Init() error {
if err := sh.SocketHandlers.Init(); nil != err {
return err
}
return nil
}
func (sh *RPCGatewaySocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
// OnConnect invoked when client is connected
// If you override ths method, must call
func (sh *RPCGatewaySocketHandlers) OnConnect(soc *cwf.Socket) {
sh.SocketHandlers.OnConnect(soc)
}
func (sh *RPCGatewaySocketHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
var socConn *cwf.SocketConn
ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc)
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if socConn, err = soc.WaitRequest(); nil != err {
doneChan <- struct{}{}
return
}
// // "git.loafle.net/commons_go/websocket_fasthttp/websocket"
// switch socConn.MessageType {
// case websocket.TextMessage:
// case websocket.BinaryMessage:
// }
if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = socConn.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
}
}
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
func (sh *RPCGatewaySocketHandlers) OnDisconnect(soc *cwf.Socket) {
sh.SocketHandlers.OnDisconnect(soc)
}
// Destroy invoked when server is stopped
// If you override ths method, must call
func (sh *RPCGatewaySocketHandlers) Destroy() {
sh.SocketHandlers.Destroy()
}
func (sh *RPCGatewaySocketHandlers) Validate() {
sh.SocketHandlers.Validate()
}

View File

@ -0,0 +1 @@
package server

View File

@ -0,0 +1 @@
package rpc

13
server/server.go Normal file
View File

@ -0,0 +1,13 @@
package server
import (
"flag"
cc "git.loafle.net/commons_go/config"
"git.loafle.net/overflow/overflow_gateway_websocket/config"
)
func New() {
}

14
server/server_handler.go Normal file
View File

@ -0,0 +1,14 @@
package server
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
oos "git.loafle.net/overflow/overflow_subscriber"
)
type ServerHandler interface {
cwf.ServerHandler
RegisterServletHandler(servletHandler servlet.ServletHandler)
RegisterSubscriberHandler(subscriberHandler oos.SubscriberHandler)
}

80
server/server_handlers.go Normal file
View File

@ -0,0 +1,80 @@
package server
import (
"fmt"
"net"
"git.loafle.net/commons_go/logging"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/overflow/overflow_gateway_websocket/config"
"git.loafle.net/overflow/overflow_gateway_websocket/external"
"git.loafle.net/overflow/overflow_gateway_websocket/external/redis"
"git.loafle.net/overflow/overflow_gateway_websocket/servlet"
oos "git.loafle.net/overflow/overflow_subscriber"
oosr "git.loafle.net/overflow/overflow_subscriber/redis"
"github.com/valyala/fasthttp"
)
type ServerHandlers struct {
cwf.ServerHandlers
redisSubscriber oos.Subscriber
subscribers []oos.SubscriberHandler
}
// Init invoked before the server is started
// If you override ths method, must call
func (sh *ServerHandlers) Init() error {
if err := sh.ServerHandlers.Init(); nil != err {
return err
}
external.ExternalInit()
return nil
}
func (sh *ServerHandlers) Listen() (net.Listener, error) {
return net.Listen(config.Config.Server.Network, config.Config.Server.Addr)
}
func (sh *ServerHandlers) OnStart() {
sh.ServerHandlers.OnStart()
sh.redisSubscriber = oosr.New(redis.RedisPool.Get())
if err := sh.redisSubscriber.Start(); nil != err {
logging.Logger().Error(fmt.Sprintf("App: Redist Subscriber did not start %v", err))
return
}
for _, subscriber := range sh.subscribers {
sh.redisSubscriber.Subscribe(subscriber)
}
}
func (sh *ServerHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) {
sh.OnError(ctx, status, reason)
}
func (sh *ServerHandlers) OnStop() {
sh.redisSubscriber.Stop()
external.ExternalDestroy()
sh.ServerHandlers.OnStop()
}
func (sh *ServerHandlers) RegisterServletHandler(servletHandler servlet.ServletHandler) {
sh.RegisterSocketHandler(servletHandler)
}
func (sh *ServerHandlers) RegisterSubscriberHandler(subscriberHandler oos.SubscriberHandler) {
if nil == sh.subscribers {
sh.subscribers = make([]oos.SubscriberHandler, 0)
}
sh.subscribers = append(sh.subscribers, subscriberHandler)
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}

View File

@ -0,0 +1,5 @@
package servlet
type RPCServletHandler interface {
ServletHandler
}

View File

@ -0,0 +1,98 @@
package servlet
import (
"context"
"log"
"git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/server"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
type RPCServletHandlers struct {
ServletHandlers
RPCHandler
}
func (sh *RPCServletHandlers) Init() error {
if err := sh.ServletHandlers.Init(); nil != err {
return err
}
return nil
}
// OnConnect invoked when client is connected
// If you override ths method, must call
func (sh *RPCServletHandlers) OnConnect(soc *cwf.Socket) {
sh.ServletHandlers.OnConnect(soc)
}
func (sh *RPCServletHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
var socConn *cwf.SocketConn
ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc)
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if socConn, err = soc.WaitRequest(); nil != err {
doneChan <- struct{}{}
return
}
// // "git.loafle.net/commons_go/websocket_fasthttp/websocket"
// switch socConn.MessageType {
// case websocket.TextMessage:
// case websocket.BinaryMessage:
// }
if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = socConn.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
}
}
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
func (sh *RPCServletHandlers) OnDisconnect(soc *cwf.Socket) {
sh.ServletHandlers.OnDisconnect(soc)
}
// Destroy invoked when server is stopped
// If you override ths method, must call
func (sh *ServletHandlers) Destroy() {
sh.SocketHandlers.Destroy()
}
func (sh *RPCServletHandlers) Validate() {
sh.ServletHandlers.Validate()
}

View File

@ -0,0 +1,9 @@
package servlet
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
type ServletHandler interface {
cwf.SocketHandler
}

View File

@ -0,0 +1,50 @@
package servlet
import (
"git.loafle.net/commons_go/logging"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
type ServletHandlers struct {
cwf.SocketHandlers
// EntryPath is path of url (ex: /web)
EntryPath string
}
func (sh *ServletHandlers) Init() error {
if err := sh.SocketHandlers.Init(); nil != err {
return err
}
return nil
}
// OnConnect invoked when client is connected
// If you override ths method, must call
func (sh *ServletHandlers) OnConnect(soc *cwf.Socket) {
sh.SocketHandlers.OnConnect(soc)
}
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
func (sh *ServletHandlers) OnDisconnect(soc *cwf.Socket) {
sh.SocketHandlers.OnDisconnect(soc)
}
// Destroy invoked when server is stopped
// If you override ths method, must call
func (sh *ServletHandlers) Destroy() {
sh.SocketHandlers.Destroy()
}
func (sh *ServletHandlers) Validate() {
sh.SocketHandlers.Validate()
if "" == sh.EntryPath {
logging.Logger().Panic("Geteway Server: The path of entry must be specified")
}
}