From e561dd55849dfb1aeb0480b2243201967fd084a1 Mon Sep 17 00:00:00 2001 From: geek Date: Thu, 12 Apr 2018 21:07:02 +0900 Subject: [PATCH] ing --- .gitignore | 68 ++++++++++++ .vscode/launch.json | 32 ++++++ .vscode/settings.json | 3 + config.json | 29 ++++++ glide.yaml | 4 + main.go | 60 +++++++++++ server/server-handler.go | 56 ++++++++++ server/server.go | 27 +++++ servlet/auth-servlet.go | 218 +++++++++++++++++++++++++++++++++++++++ servlet/data-servlet.go | 66 ++++++++++++ servlet/probe-servlet.go | 193 ++++++++++++++++++++++++++++++++++ 11 files changed, 756 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json create mode 100644 config.json create mode 100644 glide.yaml create mode 100644 main.go create mode 100644 server/server-handler.go create mode 100644 server/server.go create mode 100644 servlet/auth-servlet.go create mode 100644 servlet/data-servlet.go create mode 100644 servlet/probe-servlet.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3733e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1,68 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +.idea/ +*.iml + +vendor/ +glide.lock +.DS_Store +dist/ +debug diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..20af2f6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +// Place your settings in this file to overwrite default and user settings. +{ +} \ No newline at end of file diff --git a/config.json b/config.json new file mode 100644 index 0000000..f9619f8 --- /dev/null +++ b/config.json @@ -0,0 +1,29 @@ +{ + "serverHandler": { + "name": "Gateway Probe", + "network": "tcp4", + "address": ":19091", + "concurrency": 262144, + "keepAlive": 60, + "handshakeTimeout": 60, + "maxMessageSize": 8192, + "readBufferSize": 4096, + "writeBufferSize": 4096, + "readTimeout": 0, + "writeTimeout": 0, + "pongTimeout": 60, + "pingTimeout": 10, + "pingPeriod": 10, + "enableCompression": false + }, + "external": { + "grpc": { + "network": "tcp4", + "address": "192.168.1.50:50006" + }, + "redis": { + "network": "tcp4", + "address": "192.168.1.50:6379" + } + } +} diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..bea7d17 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,4 @@ +package: git.loafle.net/overflow/probe_gateway_rpc +import: +- package: git.loafle.net/commons/server-go +- package: git.loafle.net/commons/configuration-go diff --git a/main.go b/main.go new file mode 100644 index 0000000..40425ed --- /dev/null +++ b/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "git.loafle.net/commons/logging-go" + "log" + "os" + "os/signal" + "syscall" + "time" + "git.loafle.net/overflow/probe_gateway_rpc/server" + "context" + "flag" + "git.loafle.net/commons/configuration-go" + "git.loafle.net/overflow/member_gateway_rpc/config" +) + +var ( + configDir *string +) + +func init() { + configDir = flag.String("config-dir", "./", "Config directory") + logConfigPath := flag.String("log-config", "", "logging config path") + flag.Parse() + + logging.InitializeLogger(*logConfigPath) +} + +func main() { + _config := &config.Config{} + configuration.SetConfigPath(*configDir) + if err := configuration.Load(_config, "config.json"); nil != err { + logging.Logger().Panic(err) + } + + s := server.New(_config) + + go func() { + err := s.ListenAndServe() + if nil != err { + log.Printf("err: %v", err) + } + }() + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, + syscall.SIGKILL, + syscall.SIGSTOP, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + <-interrupt + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.Shutdown(ctx); err != nil { + logging.Logger().Errorf("error: %v", err) + } +} diff --git a/server/server-handler.go b/server/server-handler.go new file mode 100644 index 0000000..d686f74 --- /dev/null +++ b/server/server-handler.go @@ -0,0 +1,56 @@ +package server + + +import ( + cs "git.loafle.net/commons/server-go" + oge "git.loafle.net/overflow/gateway/external" + ogrs "git.loafle.net/overflow/gateway_rpc/server" + "git.loafle.net/overflow/member_gateway_rpc/config" + "git.loafle.net/overflow/member_gateway_rpc/subscribe" +) + +type ServerHandler interface { + ogrs.ServerHandler +} + +type ServerHandlers struct { + ogrs.ServerHandlers + + Config *config.Config +} + + +func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error { + if err := sh.ServerHandlers.Init(serverCtx); nil != err { + return err + } + oge.InitPackage(sh.Config.External) + subscribe.InitPackage() + + return nil +} + +func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error { + if err := sh.ServerHandlers.OnStart(serverCtx); nil != err { + return err + } + + oge.StartPackage(sh.Config.External) + subscribe.StartPackage() + + return nil +} + +func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) { + subscribe.StopPackage() + oge.StopPackage(sh.Config.External) + + sh.ServerHandlers.OnStop(serverCtx) +} + +func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) { + subscribe.DestroyPackage() + oge.DestroyPackage(sh.Config.External) + + sh.ServerHandlers.Destroy(serverCtx) +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..6e9552c --- /dev/null +++ b/server/server.go @@ -0,0 +1,27 @@ +package server + +import ( + cssw "git.loafle.net/commons/server-go/socket/web" + "git.loafle.net/overflow/member_gateway_rpc/config" + "git.loafle.net/overflow/probe_gateway_rpc/servlet" +) + +func New(_config *config.Config) *cssw.Server { + + as := &servlet.AuthServlets{} + ps := &servlet.ProbeServlets{} + + sh := &ServerHandlers{ + ServerHandlers: *_config.ServerHandler, + Config: _config, + } + + sh.RegisterServlet("/auth", as) + sh.RegisterServlet("/probe", ps) + + s := &cssw.Server{ + ServerHandler: sh, + } + + return s +} diff --git a/servlet/auth-servlet.go b/servlet/auth-servlet.go new file mode 100644 index 0000000..6fbed72 --- /dev/null +++ b/servlet/auth-servlet.go @@ -0,0 +1,218 @@ +package servlet + +import ( + "sync" + "fmt" + "encoding/base64" + "context" + "encoding/json" + + "github.com/valyala/fasthttp" + + "git.loafle.net/commons/server-go" + og "git.loafle.net/overflow/gateway" + ocnc "git.loafle.net/overflow/commons-go/noauthprobe/constants" + ocnm "git.loafle.net/overflow/commons-go/noauthprobe/model" + + "git.loafle.net/overflow/member_gateway_rpc/subscribe" + ogs "git.loafle.net/overflow/gateway/subscribe" + ogrs "git.loafle.net/overflow/gateway_rpc/servlet" + "git.loafle.net/commons/logging-go" + "git.loafle.net/overflow/gateway/external/grpc" + "git.loafle.net/commons/server-go/socket" + +) + +type AuthServlet interface { + ogrs.RPCServlet +} + +type AuthServlets struct { + ogrs.RPCServlets + connections sync.Map +} + +func (s *AuthServlets) Init(serverCtx server.ServerCtx) error { + if err := s.RPCServlets.Init(serverCtx); nil != err { + return err + } + + return nil +} + +func (s *AuthServlets) OnStart(serverCtx server.ServerCtx) error { + if err := s.RPCServlets.OnStart(serverCtx); nil != err { + return err + } + + subscribeChan, err := subscribe.Subscriber.Subscribe(ocnc.HTTPEntry_Auth) + if nil != err { + return err + } + go s.handleSubscribe(serverCtx, subscribeChan) + + return nil +} + +func (s *AuthServlets) OnStop(serverCtx server.ServerCtx) { + if err := subscribe.Subscriber.Unsubscribe(ocnc.HTTPEntry_Auth); nil != err { + logging.Logger().Warn(err) + } + + s.RPCServlets.OnStop(serverCtx) +} + +func (s *AuthServlets) Destroy(serverCtx server.ServerCtx) { + + s.RPCServlets.Destroy(serverCtx) +} + +func (s *AuthServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) { + bMethod := ctx.Request.Header.Peek(ocnc.HTTPRequestHeaderKey_NoAuthProbe_Method) + + if nil == bMethod { + return nil, fmt.Errorf("Unexpected noauth probe method: %v", bMethod) + } + + method := string(bMethod) + + switch method { + case ocnc.HTTPRequestHeaderValue_NoAuthProbe_Method_Regist: + bInfo := ctx.Request.Header.Peek(ocnc.HTTPRequestHeaderKey_NoAuthProbe_Info) + if nil == bInfo { + return nil, fmt.Errorf("Unexpected HTTPRequestHeaderKey NoAuthProbe Info") + } + rJSON := make([]byte, base64.StdEncoding.DecodedLen(len(bInfo))) + _, err := base64.StdEncoding.Decode(rJSON, bInfo) + if nil != err { + return nil, fmt.Errorf("Base64 Encoding Error: %s", err.Error()) + } + grpcCTX := context.Background() + r, err := grpc.Exec(grpcCTX, "NoAuthProbeService.regist", string(rJSON)) + + if nil != err { + return nil, fmt.Errorf("grpc call Error: %s", err.Error()) + } + nap := &ocnm.NoAuthProbe{} + err = json.Unmarshal([]byte(r), nap) + if nil != err { + return nil, fmt.Errorf("grpc result unMarshal Error: %s", err.Error()) + } + + extHeader := &fasthttp.ResponseHeader{} + extHeader.Add(ocnc.HTTPResponseHeaderKey_NoAuthProbe_SetTempProbeKey, nap.TempProbeKey) + + return extHeader, nil + case ocnc.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect: + bTempProbeKey := ctx.Request.Header.Peek(ocnc.HTTPRequestHeaderKey_NoAuthProbe_TempProbeKey) + + if nil == bTempProbeKey { + return nil, fmt.Errorf("Noauth probe temp key is not an existing key", ) + } + + grpcCTX := context.Background() + r, err := grpc.Exec(grpcCTX, "NoAuthProbeService.readByTempKey", string(bTempProbeKey)) + + if nil != err { + return nil, fmt.Errorf("grpc result error: %s", err.Error() ) + } + + nap := &ocnm.NoAuthProbe{} + err = json.Unmarshal([]byte(r), nap) + if nil != err { + return nil, fmt.Errorf("grpc result unMarshal Error: %s", err.Error()) + } + + servletCtx.SetAttribute(og.SessionIDKey, bTempProbeKey) + servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE) + servletCtx.SetAttribute(og.SessionTargetIDKey, bTempProbeKey) + + return nil, nil + + default: + return nil, fmt.Errorf("Unexpected noauth probe httpRequestHeaderValue: %v", method) + } + + return nil, nil +} + +func (s *AuthServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { + s.RPCServlets.OnConnect(servletCtx, conn) + + sessionID := servletCtx.GetAttribute(og.SessionIDKey) + targetID := servletCtx.GetAttribute(og.SessionTargetIDKey) + if nil != sessionID && nil != targetID { + s.connections.Store(sessionID.(string), retainConnection(targetID.(string), servletCtx)) + } +} + +func (s *AuthServlets) OnDisconnect(servletCtx server.ServletCtx) { + s.RPCServlets.OnDisconnect(servletCtx) + + sessionID := servletCtx.GetAttribute(og.SessionIDKey) + if nil != sessionID { + s.connections.Delete(sessionID.(string)) + } +} + +func (s *AuthServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) { + for { + select { + case msg, ok := <- subscribeChan: + if !ok { + return + } + + switch msg.TargetType { + case ogs.PROBE: + for _, targetID := range msg.Targets { + _connections := s.getProbeConnections(targetID) + if nil == _connections || 0 == len(_connections) { + break + } + + for _, _connection := range _connections { + _writeChan := _connection.servletCtx.GetAttribute(og.SessionWriteChanKey) + if nil != _writeChan { + writeChan := _writeChan.(chan<- []byte) + writeChan <- msg.Message + } + } + } + + } + } + } +} + +func (s *AuthServlets) getProbeConnections(targetID string) []*connection { + var connections []*connection + + s.connections.Range(func(k, v interface{}) bool { + _connection := v.(*connection) + if _connection.targetID == targetID { + connections = append(connections, _connection) + } + return true + }) + + return connections +} + +type connection struct { + targetID string + servletCtx server.ServletCtx +} + +var connectionPool sync.Pool + +func retainConnection(targetID string, servletCtx server.ServletCtx) *connection { + return nil +} + +func releaseConnection(_connection *connection) { + _connection.targetID = "" + _connection.servletCtx = nil + + connectionPool.Put(_connection) +} \ No newline at end of file diff --git a/servlet/data-servlet.go b/servlet/data-servlet.go new file mode 100644 index 0000000..75e9304 --- /dev/null +++ b/servlet/data-servlet.go @@ -0,0 +1,66 @@ +package servlet + +import ( + //crpj "git.loafle.net/commons/rpc-go/protocol/json" + //"git.loafle.net/commons/server-go" + // + //"log" + //"github.com/valyala/fasthttp" + //"crypto/rsa" + //"sync" +) + +//type WebappServlet interface { +// ogrs.RPCServlet +//} +// +//type WebappServlets struct { +// ogrs.RPCServlets +// +// VerifyKey *rsa.PublicKey +// SignKey *rsa.PrivateKey +// +// connections sync.Map +//} +// +//func init() { +// // member RSA file read +//} +// +//func (s *RPCServlet) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) { +// // probe key extraction +// +// return nil, nil +//} +// +//func (s *RPCServlet) Handle( +// servletCtx server.ServletCtx, +// stopChan <-chan struct{}, +// doneChan chan<- struct{}, +// readChan <-chan []byte, +// writeChan chan<- []byte ) { +// defer func() { +// doneChan <- struct{}{} +// }() +// +// sc := crpj.NewServerCodec() +// +// for { +// select { +// case msg, ok := <-readChan: +// if !ok { +// return +// } +// // grpc exec method call +// src, _ := sc.NewRequest(msg) +// m := src.Method() +// p,_ := src.Params() +// log.Println("METHOD : %s", m) +// log.Println("Params : %s", p) +// case <-stopChan: +// return +// } +// +// } +// +//} \ No newline at end of file diff --git a/servlet/probe-servlet.go b/servlet/probe-servlet.go new file mode 100644 index 0000000..3126ddc --- /dev/null +++ b/servlet/probe-servlet.go @@ -0,0 +1,193 @@ +package servlet + +import ( + "sync" + "fmt" + "context" + "encoding/json" + + "github.com/valyala/fasthttp" + + "git.loafle.net/commons/server-go" + og "git.loafle.net/overflow/gateway" + ocpc "git.loafle.net/overflow/commons-go/probe/constants" + ocpm "git.loafle.net/overflow/commons-go/probe/model" + + "git.loafle.net/overflow/member_gateway_rpc/subscribe" + ogs "git.loafle.net/overflow/gateway/subscribe" + ogrs "git.loafle.net/overflow/gateway_rpc/servlet" + "git.loafle.net/commons/logging-go" + "git.loafle.net/overflow/gateway/external/grpc" + "git.loafle.net/commons/server-go/socket" + +) + +type ProbeServlet interface { + ogrs.RPCServlet +} + +type ProbeServlets struct { + ogrs.RPCServlets + connections sync.Map +} + +func (s *ProbeServlets) Init(serverCtx server.ServerCtx) error { + if err := s.RPCServlets.Init(serverCtx); nil != err { + return err + } + + return nil +} + +func (s *ProbeServlets) OnStart(serverCtx server.ServerCtx) error { + if err := s.RPCServlets.OnStart(serverCtx); nil != err { + return err + } + + subscribeChan, err := subscribe.Subscriber.Subscribe(ocpc.HTTPEntry_Probe) + if nil != err { + return err + } + go s.handleSubscribe(serverCtx, subscribeChan) + + return nil +} + +func (s *ProbeServlets) OnStop(serverCtx server.ServerCtx) { + if err := subscribe.Subscriber.Unsubscribe(ocpc.HTTPEntry_Probe); nil != err { + logging.Logger().Warn(err) + } + + s.RPCServlets.OnStop(serverCtx) +} + +func (s *ProbeServlets) Destroy(serverCtx server.ServerCtx) { + + s.RPCServlets.Destroy(serverCtx) +} + +func (s *ProbeServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) { + bMethod := ctx.Request.Header.Peek(ocpc.HTTPRequestHeaderKey_Probe_Method) + + if nil == bMethod { + return nil, fmt.Errorf("Unexpected probe method: %v", bMethod) + } + + method := string(bMethod) + + switch method { + case ocpc.HTTPRequestHeaderValue_Probe_Method_Connect: + default: + return nil, fmt.Errorf("Unexpected noauth probe httpRequestHeaderValue: %v", method) + } + + bProbeKey := ctx.Request.Header.Peek(ocpc.HTTPRequestHeaderKey_Probe_ProbeKey) + if nil == bProbeKey { + return nil, fmt.Errorf("Unexpected probe key : %v", bProbeKey) + } + + probeKey := string(bProbeKey) + + grpcCTX := context.Background() + r, err := grpc.Exec(grpcCTX, "ProbeService.readByProbeKey", probeKey) + + if nil != err { + return nil, fmt.Errorf("grpc call Error: %s", err.Error()) + } + + probe := ocpm.Probe{} + err = json.Unmarshal([]byte(r), probe) + if nil != err { + return nil, fmt.Errorf("grpc result unMarshal Error: %s", err.Error()) + } + + extHeader := &fasthttp.ResponseHeader{} + extHeader.Add(ocpc.HTTPResponseHeaderKey_Probe_SetEncryptionKey, probe.EncryptionKey) + + servletCtx.SetAttribute(og.SessionIDKey, probe.ProbeKey) + servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE) + servletCtx.SetAttribute(og.SessionTargetIDKey, probe.ProbeKey) + + return extHeader, nil +} + +func (s *ProbeServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { + s.RPCServlets.OnConnect(servletCtx, conn) + + sessionID := servletCtx.GetAttribute(og.SessionIDKey) + targetID := servletCtx.GetAttribute(og.SessionTargetIDKey) + if nil != sessionID && nil != targetID { + s.connections.Store(sessionID.(string), retainConnection(targetID.(string), servletCtx)) + } +} + +func (s *ProbeServlets) OnDisconnect(servletCtx server.ServletCtx) { + s.RPCServlets.OnDisconnect(servletCtx) + + sessionID := servletCtx.GetAttribute(og.SessionIDKey) + if nil != sessionID { + s.connections.Delete(sessionID.(string)) + } +} + +func (s *ProbeServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) { + for { + select { + case msg, ok := <- subscribeChan: + if !ok { + return + } + + switch msg.TargetType { + case ogs.PROBE: + for _, targetID := range msg.Targets { + _connections := s.getProbeConnections(targetID) + if nil == _connections || 0 == len(_connections) { + break + } + + for _, _connection := range _connections { + _writeChan := _connection.servletCtx.GetAttribute(og.SessionWriteChanKey) + if nil != _writeChan { + writeChan := _writeChan.(chan<- []byte) + writeChan <- msg.Message + } + } + } + + } + } + } +} + +func (s *ProbeServlets) getProbeConnections(targetID string) []*connection { + var connections []*connection + + s.connections.Range(func(k, v interface{}) bool { + _connection := v.(*connection) + if _connection.targetID == targetID { + connections = append(connections, _connection) + } + return true + }) + + return connections +} + +//type connection struct { +// targetID string +// servletCtx server.ServletCtx +//} +// +//var connectionPool sync.Pool +// +//func retainConnection(targetID string, servletCtx server.ServletCtx) *connection { +// return nil +//} +// +//func releaseConnection(_connection *connection) { +// _connection.targetID = "" +// _connection.servletCtx = nil +// +// connectionPool.Put(_connection) +//} \ No newline at end of file