This commit is contained in:
geek 2018-04-12 21:07:02 +09:00
commit e561dd5584
11 changed files with 756 additions and 0 deletions

68
.gitignore vendored Normal file
View File

@ -0,0 +1,68 @@
# Created by .ignore support plugin (hsz.mobi)
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/dictionaries
# Sensitive or high-churn files:
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.xml
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
# Gradle:
.idea/**/gradle.xml
.idea/**/libraries
# Mongo Explorer plugin:
.idea/**/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
### Go template
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
.idea/
*.iml
vendor/
glide.lock
.DS_Store
dist/
debug

32
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,32 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug",
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${workspaceRoot}/main.go",
"env": {},
"args": [],
"showLog": true
},
{
"name": "File Debug",
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${fileDirname}",
"env": {},
"args": [],
"showLog": true
}
]
}

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
// Place your settings in this file to overwrite default and user settings.
{
}

29
config.json Normal file
View File

@ -0,0 +1,29 @@
{
"serverHandler": {
"name": "Gateway Probe",
"network": "tcp4",
"address": ":19091",
"concurrency": 262144,
"keepAlive": 60,
"handshakeTimeout": 60,
"maxMessageSize": 8192,
"readBufferSize": 4096,
"writeBufferSize": 4096,
"readTimeout": 0,
"writeTimeout": 0,
"pongTimeout": 60,
"pingTimeout": 10,
"pingPeriod": 10,
"enableCompression": false
},
"external": {
"grpc": {
"network": "tcp4",
"address": "192.168.1.50:50006"
},
"redis": {
"network": "tcp4",
"address": "192.168.1.50:6379"
}
}
}

4
glide.yaml Normal file
View File

@ -0,0 +1,4 @@
package: git.loafle.net/overflow/probe_gateway_rpc
import:
- package: git.loafle.net/commons/server-go
- package: git.loafle.net/commons/configuration-go

60
main.go Normal file
View File

@ -0,0 +1,60 @@
package main
import (
"git.loafle.net/commons/logging-go"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.loafle.net/overflow/probe_gateway_rpc/server"
"context"
"flag"
"git.loafle.net/commons/configuration-go"
"git.loafle.net/overflow/member_gateway_rpc/config"
)
var (
configDir *string
)
func init() {
configDir = flag.String("config-dir", "./", "Config directory")
logConfigPath := flag.String("log-config", "", "logging config path")
flag.Parse()
logging.InitializeLogger(*logConfigPath)
}
func main() {
_config := &config.Config{}
configuration.SetConfigPath(*configDir)
if err := configuration.Load(_config, "config.json"); nil != err {
logging.Logger().Panic(err)
}
s := server.New(_config)
go func() {
err := s.ListenAndServe()
if nil != err {
log.Printf("err: %v", err)
}
}()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt,
syscall.SIGKILL,
syscall.SIGSTOP,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
<-interrupt
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.Shutdown(ctx); err != nil {
logging.Logger().Errorf("error: %v", err)
}
}

56
server/server-handler.go Normal file
View File

@ -0,0 +1,56 @@
package server
import (
cs "git.loafle.net/commons/server-go"
oge "git.loafle.net/overflow/gateway/external"
ogrs "git.loafle.net/overflow/gateway_rpc/server"
"git.loafle.net/overflow/member_gateway_rpc/config"
"git.loafle.net/overflow/member_gateway_rpc/subscribe"
)
type ServerHandler interface {
ogrs.ServerHandler
}
type ServerHandlers struct {
ogrs.ServerHandlers
Config *config.Config
}
func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.Init(serverCtx); nil != err {
return err
}
oge.InitPackage(sh.Config.External)
subscribe.InitPackage()
return nil
}
func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.OnStart(serverCtx); nil != err {
return err
}
oge.StartPackage(sh.Config.External)
subscribe.StartPackage()
return nil
}
func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) {
subscribe.StopPackage()
oge.StopPackage(sh.Config.External)
sh.ServerHandlers.OnStop(serverCtx)
}
func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) {
subscribe.DestroyPackage()
oge.DestroyPackage(sh.Config.External)
sh.ServerHandlers.Destroy(serverCtx)
}

27
server/server.go Normal file
View File

@ -0,0 +1,27 @@
package server
import (
cssw "git.loafle.net/commons/server-go/socket/web"
"git.loafle.net/overflow/member_gateway_rpc/config"
"git.loafle.net/overflow/probe_gateway_rpc/servlet"
)
func New(_config *config.Config) *cssw.Server {
as := &servlet.AuthServlets{}
ps := &servlet.ProbeServlets{}
sh := &ServerHandlers{
ServerHandlers: *_config.ServerHandler,
Config: _config,
}
sh.RegisterServlet("/auth", as)
sh.RegisterServlet("/probe", ps)
s := &cssw.Server{
ServerHandler: sh,
}
return s
}

218
servlet/auth-servlet.go Normal file
View File

@ -0,0 +1,218 @@
package servlet
import (
"sync"
"fmt"
"encoding/base64"
"context"
"encoding/json"
"github.com/valyala/fasthttp"
"git.loafle.net/commons/server-go"
og "git.loafle.net/overflow/gateway"
ocnc "git.loafle.net/overflow/commons-go/noauthprobe/constants"
ocnm "git.loafle.net/overflow/commons-go/noauthprobe/model"
"git.loafle.net/overflow/member_gateway_rpc/subscribe"
ogs "git.loafle.net/overflow/gateway/subscribe"
ogrs "git.loafle.net/overflow/gateway_rpc/servlet"
"git.loafle.net/commons/logging-go"
"git.loafle.net/overflow/gateway/external/grpc"
"git.loafle.net/commons/server-go/socket"
)
type AuthServlet interface {
ogrs.RPCServlet
}
type AuthServlets struct {
ogrs.RPCServlets
connections sync.Map
}
func (s *AuthServlets) Init(serverCtx server.ServerCtx) error {
if err := s.RPCServlets.Init(serverCtx); nil != err {
return err
}
return nil
}
func (s *AuthServlets) OnStart(serverCtx server.ServerCtx) error {
if err := s.RPCServlets.OnStart(serverCtx); nil != err {
return err
}
subscribeChan, err := subscribe.Subscriber.Subscribe(ocnc.HTTPEntry_Auth)
if nil != err {
return err
}
go s.handleSubscribe(serverCtx, subscribeChan)
return nil
}
func (s *AuthServlets) OnStop(serverCtx server.ServerCtx) {
if err := subscribe.Subscriber.Unsubscribe(ocnc.HTTPEntry_Auth); nil != err {
logging.Logger().Warn(err)
}
s.RPCServlets.OnStop(serverCtx)
}
func (s *AuthServlets) Destroy(serverCtx server.ServerCtx) {
s.RPCServlets.Destroy(serverCtx)
}
func (s *AuthServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) {
bMethod := ctx.Request.Header.Peek(ocnc.HTTPRequestHeaderKey_NoAuthProbe_Method)
if nil == bMethod {
return nil, fmt.Errorf("Unexpected noauth probe method: %v", bMethod)
}
method := string(bMethod)
switch method {
case ocnc.HTTPRequestHeaderValue_NoAuthProbe_Method_Regist:
bInfo := ctx.Request.Header.Peek(ocnc.HTTPRequestHeaderKey_NoAuthProbe_Info)
if nil == bInfo {
return nil, fmt.Errorf("Unexpected HTTPRequestHeaderKey NoAuthProbe Info")
}
rJSON := make([]byte, base64.StdEncoding.DecodedLen(len(bInfo)))
_, err := base64.StdEncoding.Decode(rJSON, bInfo)
if nil != err {
return nil, fmt.Errorf("Base64 Encoding Error: %s", err.Error())
}
grpcCTX := context.Background()
r, err := grpc.Exec(grpcCTX, "NoAuthProbeService.regist", string(rJSON))
if nil != err {
return nil, fmt.Errorf("grpc call Error: %s", err.Error())
}
nap := &ocnm.NoAuthProbe{}
err = json.Unmarshal([]byte(r), nap)
if nil != err {
return nil, fmt.Errorf("grpc result unMarshal Error: %s", err.Error())
}
extHeader := &fasthttp.ResponseHeader{}
extHeader.Add(ocnc.HTTPResponseHeaderKey_NoAuthProbe_SetTempProbeKey, nap.TempProbeKey)
return extHeader, nil
case ocnc.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect:
bTempProbeKey := ctx.Request.Header.Peek(ocnc.HTTPRequestHeaderKey_NoAuthProbe_TempProbeKey)
if nil == bTempProbeKey {
return nil, fmt.Errorf("Noauth probe temp key is not an existing key", )
}
grpcCTX := context.Background()
r, err := grpc.Exec(grpcCTX, "NoAuthProbeService.readByTempKey", string(bTempProbeKey))
if nil != err {
return nil, fmt.Errorf("grpc result error: %s", err.Error() )
}
nap := &ocnm.NoAuthProbe{}
err = json.Unmarshal([]byte(r), nap)
if nil != err {
return nil, fmt.Errorf("grpc result unMarshal Error: %s", err.Error())
}
servletCtx.SetAttribute(og.SessionIDKey, bTempProbeKey)
servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE)
servletCtx.SetAttribute(og.SessionTargetIDKey, bTempProbeKey)
return nil, nil
default:
return nil, fmt.Errorf("Unexpected noauth probe httpRequestHeaderValue: %v", method)
}
return nil, nil
}
func (s *AuthServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
s.RPCServlets.OnConnect(servletCtx, conn)
sessionID := servletCtx.GetAttribute(og.SessionIDKey)
targetID := servletCtx.GetAttribute(og.SessionTargetIDKey)
if nil != sessionID && nil != targetID {
s.connections.Store(sessionID.(string), retainConnection(targetID.(string), servletCtx))
}
}
func (s *AuthServlets) OnDisconnect(servletCtx server.ServletCtx) {
s.RPCServlets.OnDisconnect(servletCtx)
sessionID := servletCtx.GetAttribute(og.SessionIDKey)
if nil != sessionID {
s.connections.Delete(sessionID.(string))
}
}
func (s *AuthServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) {
for {
select {
case msg, ok := <- subscribeChan:
if !ok {
return
}
switch msg.TargetType {
case ogs.PROBE:
for _, targetID := range msg.Targets {
_connections := s.getProbeConnections(targetID)
if nil == _connections || 0 == len(_connections) {
break
}
for _, _connection := range _connections {
_writeChan := _connection.servletCtx.GetAttribute(og.SessionWriteChanKey)
if nil != _writeChan {
writeChan := _writeChan.(chan<- []byte)
writeChan <- msg.Message
}
}
}
}
}
}
}
func (s *AuthServlets) getProbeConnections(targetID string) []*connection {
var connections []*connection
s.connections.Range(func(k, v interface{}) bool {
_connection := v.(*connection)
if _connection.targetID == targetID {
connections = append(connections, _connection)
}
return true
})
return connections
}
type connection struct {
targetID string
servletCtx server.ServletCtx
}
var connectionPool sync.Pool
func retainConnection(targetID string, servletCtx server.ServletCtx) *connection {
return nil
}
func releaseConnection(_connection *connection) {
_connection.targetID = ""
_connection.servletCtx = nil
connectionPool.Put(_connection)
}

66
servlet/data-servlet.go Normal file
View File

@ -0,0 +1,66 @@
package servlet
import (
//crpj "git.loafle.net/commons/rpc-go/protocol/json"
//"git.loafle.net/commons/server-go"
//
//"log"
//"github.com/valyala/fasthttp"
//"crypto/rsa"
//"sync"
)
//type WebappServlet interface {
// ogrs.RPCServlet
//}
//
//type WebappServlets struct {
// ogrs.RPCServlets
//
// VerifyKey *rsa.PublicKey
// SignKey *rsa.PrivateKey
//
// connections sync.Map
//}
//
//func init() {
// // member RSA file read
//}
//
//func (s *RPCServlet) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) {
// // probe key extraction
//
// return nil, nil
//}
//
//func (s *RPCServlet) Handle(
// servletCtx server.ServletCtx,
// stopChan <-chan struct{},
// doneChan chan<- struct{},
// readChan <-chan []byte,
// writeChan chan<- []byte ) {
// defer func() {
// doneChan <- struct{}{}
// }()
//
// sc := crpj.NewServerCodec()
//
// for {
// select {
// case msg, ok := <-readChan:
// if !ok {
// return
// }
// // grpc exec method call
// src, _ := sc.NewRequest(msg)
// m := src.Method()
// p,_ := src.Params()
// log.Println("METHOD : %s", m)
// log.Println("Params : %s", p)
// case <-stopChan:
// return
// }
//
// }
//
//}

193
servlet/probe-servlet.go Normal file
View File

@ -0,0 +1,193 @@
package servlet
import (
"sync"
"fmt"
"context"
"encoding/json"
"github.com/valyala/fasthttp"
"git.loafle.net/commons/server-go"
og "git.loafle.net/overflow/gateway"
ocpc "git.loafle.net/overflow/commons-go/probe/constants"
ocpm "git.loafle.net/overflow/commons-go/probe/model"
"git.loafle.net/overflow/member_gateway_rpc/subscribe"
ogs "git.loafle.net/overflow/gateway/subscribe"
ogrs "git.loafle.net/overflow/gateway_rpc/servlet"
"git.loafle.net/commons/logging-go"
"git.loafle.net/overflow/gateway/external/grpc"
"git.loafle.net/commons/server-go/socket"
)
type ProbeServlet interface {
ogrs.RPCServlet
}
type ProbeServlets struct {
ogrs.RPCServlets
connections sync.Map
}
func (s *ProbeServlets) Init(serverCtx server.ServerCtx) error {
if err := s.RPCServlets.Init(serverCtx); nil != err {
return err
}
return nil
}
func (s *ProbeServlets) OnStart(serverCtx server.ServerCtx) error {
if err := s.RPCServlets.OnStart(serverCtx); nil != err {
return err
}
subscribeChan, err := subscribe.Subscriber.Subscribe(ocpc.HTTPEntry_Probe)
if nil != err {
return err
}
go s.handleSubscribe(serverCtx, subscribeChan)
return nil
}
func (s *ProbeServlets) OnStop(serverCtx server.ServerCtx) {
if err := subscribe.Subscriber.Unsubscribe(ocpc.HTTPEntry_Probe); nil != err {
logging.Logger().Warn(err)
}
s.RPCServlets.OnStop(serverCtx)
}
func (s *ProbeServlets) Destroy(serverCtx server.ServerCtx) {
s.RPCServlets.Destroy(serverCtx)
}
func (s *ProbeServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) {
bMethod := ctx.Request.Header.Peek(ocpc.HTTPRequestHeaderKey_Probe_Method)
if nil == bMethod {
return nil, fmt.Errorf("Unexpected probe method: %v", bMethod)
}
method := string(bMethod)
switch method {
case ocpc.HTTPRequestHeaderValue_Probe_Method_Connect:
default:
return nil, fmt.Errorf("Unexpected noauth probe httpRequestHeaderValue: %v", method)
}
bProbeKey := ctx.Request.Header.Peek(ocpc.HTTPRequestHeaderKey_Probe_ProbeKey)
if nil == bProbeKey {
return nil, fmt.Errorf("Unexpected probe key : %v", bProbeKey)
}
probeKey := string(bProbeKey)
grpcCTX := context.Background()
r, err := grpc.Exec(grpcCTX, "ProbeService.readByProbeKey", probeKey)
if nil != err {
return nil, fmt.Errorf("grpc call Error: %s", err.Error())
}
probe := ocpm.Probe{}
err = json.Unmarshal([]byte(r), probe)
if nil != err {
return nil, fmt.Errorf("grpc result unMarshal Error: %s", err.Error())
}
extHeader := &fasthttp.ResponseHeader{}
extHeader.Add(ocpc.HTTPResponseHeaderKey_Probe_SetEncryptionKey, probe.EncryptionKey)
servletCtx.SetAttribute(og.SessionIDKey, probe.ProbeKey)
servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE)
servletCtx.SetAttribute(og.SessionTargetIDKey, probe.ProbeKey)
return extHeader, nil
}
func (s *ProbeServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
s.RPCServlets.OnConnect(servletCtx, conn)
sessionID := servletCtx.GetAttribute(og.SessionIDKey)
targetID := servletCtx.GetAttribute(og.SessionTargetIDKey)
if nil != sessionID && nil != targetID {
s.connections.Store(sessionID.(string), retainConnection(targetID.(string), servletCtx))
}
}
func (s *ProbeServlets) OnDisconnect(servletCtx server.ServletCtx) {
s.RPCServlets.OnDisconnect(servletCtx)
sessionID := servletCtx.GetAttribute(og.SessionIDKey)
if nil != sessionID {
s.connections.Delete(sessionID.(string))
}
}
func (s *ProbeServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) {
for {
select {
case msg, ok := <- subscribeChan:
if !ok {
return
}
switch msg.TargetType {
case ogs.PROBE:
for _, targetID := range msg.Targets {
_connections := s.getProbeConnections(targetID)
if nil == _connections || 0 == len(_connections) {
break
}
for _, _connection := range _connections {
_writeChan := _connection.servletCtx.GetAttribute(og.SessionWriteChanKey)
if nil != _writeChan {
writeChan := _writeChan.(chan<- []byte)
writeChan <- msg.Message
}
}
}
}
}
}
}
func (s *ProbeServlets) getProbeConnections(targetID string) []*connection {
var connections []*connection
s.connections.Range(func(k, v interface{}) bool {
_connection := v.(*connection)
if _connection.targetID == targetID {
connections = append(connections, _connection)
}
return true
})
return connections
}
//type connection struct {
// targetID string
// servletCtx server.ServletCtx
//}
//
//var connectionPool sync.Pool
//
//func retainConnection(targetID string, servletCtx server.ServletCtx) *connection {
// return nil
//}
//
//func releaseConnection(_connection *connection) {
// _connection.targetID = ""
// _connection.servletCtx = nil
//
// connectionPool.Put(_connection)
//}