commit 87b8c60cd95ddde5eb48128ab54d1d9fe95e6079 Author: crusader Date: Tue Dec 5 14:20:42 2017 +0900 ing diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3733e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1,68 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +.idea/ +*.iml + +vendor/ +glide.lock +.DS_Store +dist/ +debug diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..60835be --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +// Place your settings in this file to overwrite default and user settings. +{ + "java.configuration.updateBuildConfiguration": "automatic" +} \ No newline at end of file diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..c598817 --- /dev/null +++ b/client/client.go @@ -0,0 +1,15 @@ +package client + +import ( + crc "git.loafle.net/commons_go/rpc/client" + crcrs "git.loafle.net/commons_go/rpc/client/rwc/socket" + csc "git.loafle.net/commons_go/server/client" +) + +func New(clientHandler ClientHandler, socketBuilder csc.SocketBuilder) crc.Client { + cRWCHandler := crcrs.New(socketBuilder) + + c := crc.New(clientHandler, cRWCHandler) + + return c +} diff --git a/client/client_handler.go b/client/client_handler.go new file mode 100644 index 0000000..7e0ce21 --- /dev/null +++ b/client/client_handler.go @@ -0,0 +1,7 @@ +package client + +import "git.loafle.net/commons_go/rpc/client" + +type ClientHandler interface { + client.ClientHandler +} diff --git a/client/client_handlers.go b/client/client_handlers.go new file mode 100644 index 0000000..769379d --- /dev/null +++ b/client/client_handlers.go @@ -0,0 +1,19 @@ +package client + +import ( + crc "git.loafle.net/commons_go/rpc/client" + "git.loafle.net/commons_go/rpc/protocol/json" + crr "git.loafle.net/commons_go/rpc/registry" +) + +func NewClientHandler(rpcInvoker crr.RPCInvoker) ClientHandler { + ch := &ClientHandlers{} + ch.RPCInvoker = rpcInvoker + ch.Codec = json.NewClientCodec() + + return ch +} + +type ClientHandlers struct { + crc.ClientHandlers +} diff --git a/client/socket_builders.go b/client/socket_builders.go new file mode 100644 index 0000000..d6292e8 --- /dev/null +++ b/client/socket_builders.go @@ -0,0 +1,17 @@ +package client + +import ( + csc "git.loafle.net/commons_go/server/client" +) + +func NewSocketBuilder(address string) csc.SocketBuilder { + return newSocketBuilder(address) +} + +type SocketBuilders struct { + csc.SocketBuilders +} + +func (sb *SocketBuilders) SocketHandler() csc.SocketHandler { + return NewSocketHandler() +} diff --git a/client/socket_builders_unix.go b/client/socket_builders_unix.go new file mode 100644 index 0000000..0923953 --- /dev/null +++ b/client/socket_builders_unix.go @@ -0,0 +1,13 @@ +package client + +import ( + csc "git.loafle.net/commons_go/server/client" +) + +func newSocketBuilder(address string) csc.SocketBuilder { + sb := &SocketBuilders{} + sb.Network = "unix" + sb.Address = address + + return sb +} diff --git a/client/socket_builders_windows.go b/client/socket_builders_windows.go new file mode 100644 index 0000000..c8c8dd5 --- /dev/null +++ b/client/socket_builders_windows.go @@ -0,0 +1,23 @@ +package client + +import ( + "net" + + csc "git.loafle.net/commons_go/server/client" + "gopkg.in/natefinch/npipe.v2" +) + +func newSocketBuilder(address string) csc.SocketBuilder { + sb := &SocketBuilders{} + sb.Network = "pipe" + sb.Address = address + + return sb +} + +func (sb *SocketBuilders) Dial(network, address string) (net.Conn, error) { + if 0 == sb.HandshakeTimeout { + return npipe.Dial(`\\.\pipe\` + address) + } + return npipe.DialTimeout(`\\.\pipe\`+address, sb.HandshakeTimeout) +} diff --git a/client/socket_handlers.go b/client/socket_handlers.go new file mode 100644 index 0000000..c287ace --- /dev/null +++ b/client/socket_handlers.go @@ -0,0 +1,28 @@ +package client + +import ( + "log" + "net" + + csc "git.loafle.net/commons_go/server/client" +) + +type SocketHandlers struct { + csc.SocketHandlers +} + +func (sh *SocketHandlers) OnConnect(socketContext csc.SocketContext, conn net.Conn) { + log.Printf("OnConnect res: %v \n", conn) +} + +func (sh *SocketHandlers) OnDisconnect(soc csc.Socket) { + log.Printf("OnDisconnect \n") +} + +func (sh *SocketHandlers) Validate() { + sh.SocketHandlers.Validate() +} + +func NewSocketHandler() csc.SocketHandler { + return &SocketHandlers{} +} diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..316a343 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,15 @@ +package: git.loafle.net/overflow/overflow_probe_container +import: +- package: git.loafle.net/commons_go/logging +- package: git.loafle.net/commons_go/rpc + subpackages: + - client + - client/rwc/socket + - protocol/json + - registry + - server + - server/rwc/socket +- package: git.loafle.net/commons_go/server + subpackages: + - client +- package: gopkg.in/natefinch/npipe.v2 diff --git a/server/rpc_servlet_handler.go b/server/rpc_servlet_handler.go new file mode 100644 index 0000000..5b4944a --- /dev/null +++ b/server/rpc_servlet_handler.go @@ -0,0 +1,9 @@ +package server + +import ( + "git.loafle.net/commons_go/rpc/server" +) + +type RPCServletHandler interface { + server.ServletHandler +} diff --git a/server/rpc_servlet_handlers.go b/server/rpc_servlet_handlers.go new file mode 100644 index 0000000..429abaf --- /dev/null +++ b/server/rpc_servlet_handlers.go @@ -0,0 +1,22 @@ +package server + +import ( + crr "git.loafle.net/commons_go/rpc/registry" + "git.loafle.net/commons_go/rpc/server" +) + +func newRPCServletHandler(rpcInvoker crr.RPCInvoker) server.ServletHandler { + sh := &RPCServletHandlers{} + sh.RPCInvoker = rpcInvoker + + return sh +} + +type RPCServletHandlers struct { + server.ServletHandlers +} + +func (sh *RPCServletHandlers) Validate() { + sh.ServletHandlers.Validate() + +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..754e213 --- /dev/null +++ b/server/server.go @@ -0,0 +1,19 @@ +package server + +import ( + crpj "git.loafle.net/commons_go/rpc/protocol/json" + crr "git.loafle.net/commons_go/rpc/registry" + "git.loafle.net/commons_go/server" +) + +func New(addr string, rpcInvoker crr.RPCInvoker) server.Server { + rpcSH := newRPCServletHandler(rpcInvoker) + rpcSH.RegisterCodec(crpj.Name, crpj.NewServerCodec()) + + socketHandler := newSocketHandler(rpcSH) + + sh := newServerHandler(addr, socketHandler) + s := server.New(sh) + + return s +} diff --git a/server/server_handler.go b/server/server_handler.go new file mode 100644 index 0000000..cf02713 --- /dev/null +++ b/server/server_handler.go @@ -0,0 +1,7 @@ +package server + +import "git.loafle.net/commons_go/server" + +type ServerHandler interface { + server.ServerHandler +} diff --git a/server/server_handlers.go b/server/server_handlers.go new file mode 100644 index 0000000..e0860d3 --- /dev/null +++ b/server/server_handlers.go @@ -0,0 +1,52 @@ +package server + +import ( + "fmt" + + "git.loafle.net/commons_go/logging" + + "git.loafle.net/commons_go/server" +) + +func newServerHandler(addr string, socketHandler SocketHandler) ServerHandler { + sh := &ServerHandlers{ + addr: addr, + } + + sh.Name = "Network Container" + sh.SocketHandler = socketHandler + return sh +} + +type ServerHandlers struct { + server.ServerHandlers + + addr string +} + +func (sh *ServerHandlers) Init(serverCTX server.ServerContext) error { + if err := sh.ServerHandlers.Init(serverCTX); nil != err { + return err + } + + return nil +} + +func (sh *ServerHandlers) OnStart(serverCTX server.ServerContext) { + sh.ServerHandlers.OnStart(serverCTX) + +} + +func (sh *ServerHandlers) OnStop(serverCTX server.ServerContext) { + + sh.ServerHandlers.OnStop(serverCTX) +} + +func (sh *ServerHandlers) Validate() { + sh.ServerHandlers.Validate() + + if "" == sh.addr { + logging.Logger().Panic(fmt.Sprintf("Server: Address of server must be specified")) + } + +} diff --git a/server/server_handlers_unix.go b/server/server_handlers_unix.go new file mode 100644 index 0000000..4aad065 --- /dev/null +++ b/server/server_handlers_unix.go @@ -0,0 +1,17 @@ +package server + +import ( + "net" + "os" + + "git.loafle.net/commons_go/server" +) + +func (sh *ServerHandlers) Listen(serverCTX server.ServerContext) (net.Listener, error) { + os.Remove(sh.addr) + l, err := net.ListenUnix("unix", &net.UnixAddr{Name: sh.addr, Net: "unix"}) + if nil == err { + os.Chmod(sh.addr, 0777) + } + return l, err +} diff --git a/server/server_handlers_windows.go b/server/server_handlers_windows.go new file mode 100644 index 0000000..a37870e --- /dev/null +++ b/server/server_handlers_windows.go @@ -0,0 +1,16 @@ +package server + +import ( + "net" + + "git.loafle.net/commons_go/server" + npipe "gopkg.in/natefinch/npipe.v2" +) + +func (sh *ServerHandlers) Listen(serverCTX server.ServerContext) (net.Listener, error) { + ln, err := npipe.Listen(`\\.\pipe\` + sh.addr) + if err != nil { + // handle error + } + return ln, err +} diff --git a/server/socket_handler.go b/server/socket_handler.go new file mode 100644 index 0000000..ed3b15e --- /dev/null +++ b/server/socket_handler.go @@ -0,0 +1,9 @@ +package server + +import ( + "git.loafle.net/commons_go/server" +) + +type SocketHandler interface { + server.SocketHandler +} diff --git a/server/socket_handlers.go b/server/socket_handlers.go new file mode 100644 index 0000000..0b134f9 --- /dev/null +++ b/server/socket_handlers.go @@ -0,0 +1,101 @@ +package server + +import ( + "net" + "sync" + + cRPC "git.loafle.net/commons_go/rpc" + crsrs "git.loafle.net/commons_go/rpc/server/rwc/socket" + + "git.loafle.net/commons_go/rpc/protocol/json" + "git.loafle.net/commons_go/server" +) + +func newSocketHandler(rpcSH RPCServletHandler) SocketHandler { + rpcRWCSH := crsrs.New() + + sh := &SocketHandlers{ + rpcSH: rpcSH, + rpcRWCSH: rpcRWCSH, + } + + return sh +} + +type SocketHandlers struct { + server.SocketHandlers + + rpcRWCSH cRPC.ServletReadWriteCloseHandler + rpcSH RPCServletHandler +} + +func (sh *SocketHandlers) Init(serverCTX server.ServerContext) error { + if err := sh.SocketHandlers.Init(serverCTX); nil != err { + return err + } + + return nil +} + +func (sh *SocketHandlers) Handshake(socketCTX server.SocketContext, conn net.Conn) (id string) { + return "discovery" +} + +func (sh *SocketHandlers) OnConnect(soc server.Socket) { + sh.SocketHandlers.OnConnect(soc) + + soc.Context().SetAttribute(cRPC.ContentTypeKey, json.Name) + +} + +func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, doneChan chan<- error) { + var err error + rpcServlet := retainRPCServlet(sh.rpcSH, sh.rpcRWCSH) + + defer func() { + releaseRPCServlet(rpcServlet) + doneChan <- err + }() + + rpcDoneChan := make(chan error, 1) + + if err = rpcServlet.Start(soc.Context(), soc, rpcDoneChan); nil != err { + return + } + + select { + case err = <-rpcDoneChan: + case <-stopChan: + rpcServlet.Stop() + <-rpcDoneChan + } +} + +func (sh *SocketHandlers) OnDisconnect(soc server.Socket) { + + sh.SocketHandlers.OnDisconnect(soc) +} + +func (sh *SocketHandlers) Destroy() { + + sh.SocketHandlers.Destroy() +} + +func (sh *SocketHandlers) Validate() { + +} + +var rpcServletPool sync.Pool + +func retainRPCServlet(sh RPCServletHandler, rpcRWCSH cRPC.ServletReadWriteCloseHandler) cRPC.Servlet { + v := rpcServletPool.Get() + if v == nil { + return cRPC.NewServlet(sh, rpcRWCSH) + } + return v.(cRPC.Servlet) +} + +func releaseRPCServlet(s cRPC.Servlet) { + + rpcServletPool.Put(s) +}