diff --git a/client/client.go b/client/client.go index 6751b83..c598817 100644 --- a/client/client.go +++ b/client/client.go @@ -1,15 +1,15 @@ package client import ( - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/client" + crc "git.loafle.net/commons_go/rpc/client" + crcrs "git.loafle.net/commons_go/rpc/client/rwc/socket" + csc "git.loafle.net/commons_go/server/client" ) -func New(addr string, registry rpc.Registry) client.Client { +func New(clientHandler ClientHandler, socketBuilder csc.SocketBuilder) crc.Client { + cRWCHandler := crcrs.New(socketBuilder) - ch := NewClientHandler(addr, registry) - - c := client.New(ch) + c := crc.New(clientHandler, cRWCHandler) return c } diff --git a/client/client_handlers.go b/client/client_handlers.go index 3a23a0e..62f7785 100644 --- a/client/client_handlers.go +++ b/client/client_handlers.go @@ -2,13 +2,12 @@ package client import ( "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/client" + crc "git.loafle.net/commons_go/rpc/client" "git.loafle.net/commons_go/rpc/protocol/json" ) -func NewClientHandler(addr string, registry rpc.Registry) ClientHandler { +func NewClientHandler(registry rpc.Registry) ClientHandler { ch := &ClientHandlers{} - ch.addr = addr ch.RPCRegistry = registry ch.Codec = json.NewClientCodec() @@ -16,7 +15,5 @@ func NewClientHandler(addr string, registry rpc.Registry) ClientHandler { } type ClientHandlers struct { - client.ClientHandlers - - addr string + crc.ClientHandlers } diff --git a/client/client_handlers_unix.go b/client/client_handlers_unix.go deleted file mode 100644 index 569da85..0000000 --- a/client/client_handlers_unix.go +++ /dev/null @@ -1,9 +0,0 @@ -package client - -import ( - "net" -) - -func (ch *ClientHandlers) Connect() (net.Conn, error) { - return net.Dial("unix", ch.addr) -} diff --git a/client/client_handlers_windows.go b/client/client_handlers_windows.go deleted file mode 100644 index 8c7ba50..0000000 --- a/client/client_handlers_windows.go +++ /dev/null @@ -1,11 +0,0 @@ -package client - -import ( - "net" - - "gopkg.in/natefinch/npipe.v2" -) - -func (ch *ClientHandlers) Connect() (net.Conn, error) { - return npipe.Dial(ch.addr) -} diff --git a/client/socket_builders.go b/client/socket_builders.go new file mode 100644 index 0000000..d6292e8 --- /dev/null +++ b/client/socket_builders.go @@ -0,0 +1,17 @@ +package client + +import ( + csc "git.loafle.net/commons_go/server/client" +) + +func NewSocketBuilder(address string) csc.SocketBuilder { + return newSocketBuilder(address) +} + +type SocketBuilders struct { + csc.SocketBuilders +} + +func (sb *SocketBuilders) SocketHandler() csc.SocketHandler { + return NewSocketHandler() +} diff --git a/client/socket_builders_unix.go b/client/socket_builders_unix.go new file mode 100644 index 0000000..0923953 --- /dev/null +++ b/client/socket_builders_unix.go @@ -0,0 +1,13 @@ +package client + +import ( + csc "git.loafle.net/commons_go/server/client" +) + +func newSocketBuilder(address string) csc.SocketBuilder { + sb := &SocketBuilders{} + sb.Network = "unix" + sb.Address = address + + return sb +} diff --git a/client/socket_builders_windows.go b/client/socket_builders_windows.go new file mode 100644 index 0000000..33e07aa --- /dev/null +++ b/client/socket_builders_windows.go @@ -0,0 +1,21 @@ +package client + +import ( + "net" + + csc "git.loafle.net/commons_go/server/client" + "gopkg.in/natefinch/npipe.v2" +) + +func newSocketBuilder(address string) csc.SocketBuilder { + sb := &SocketBuilders{} + sb.Network = "pipe" + sb.Address = address + + return sb +} + +func (sb *SocketBuilders) Dial(dialer *net.Dialer, network, address string) (net.Conn, error) { + + return npipe.DialTimeout(`\\.\pipe\`+address, dialer.Timeout) +} diff --git a/client/socket_handlers.go b/client/socket_handlers.go new file mode 100644 index 0000000..a59492d --- /dev/null +++ b/client/socket_handlers.go @@ -0,0 +1,28 @@ +package client + +import ( + "log" + "net" + + csc "git.loafle.net/commons_go/server/client" +) + +type SocketHandlers struct { + csc.SocketHandlers +} + +func (sh *SocketHandlers) OnConnect(socketContext csc.SocketContext, conn net.Conn) { + log.Printf("OnConnect res: %v \n", conn) +} + +func (sh *SocketHandlers) OnDisconnect(soc csc.Socket) { + log.Printf("OnDisconnect \n") +} + +func (sh *SocketHandlers) Validate() { + sh.SocketHandlers.Validate() +} + +func NewSocketHandler() SocketHandler { + return &SocketHandlers{} +} diff --git a/server/rpc_servlet_handlers.go b/server/rpc_servlet_handlers.go index 728bf24..f02d08f 100644 --- a/server/rpc_servlet_handlers.go +++ b/server/rpc_servlet_handlers.go @@ -2,8 +2,6 @@ package server import ( "git.loafle.net/commons_go/rpc" - crcs "git.loafle.net/commons_go/rpc/connection/socket" - "git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/server" ) @@ -16,19 +14,6 @@ func newRPCServletHandler(rpcRegistry rpc.Registry) server.ServletHandler { type RPCServletHandlers struct { server.ServletHandlers - rpcIO crcs.ServletHandlers -} - -func (sh *RPCServletHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { - return sh.rpcIO.ReadRequest(servletCTX, codec, conn) -} - -func (sh *RPCServletHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { - return sh.rpcIO.WriteResponse(servletCTX, conn, requestCodec, result, err) -} - -func (sh *RPCServletHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { - return sh.rpcIO.WriteNotification(servletCTX, conn, codec, method, args) } func (sh *RPCServletHandlers) Validate() { diff --git a/server/socket_handlers.go b/server/socket_handlers.go index cf3d4c3..37ab27c 100644 --- a/server/socket_handlers.go +++ b/server/socket_handlers.go @@ -5,14 +5,19 @@ import ( "sync" cRPC "git.loafle.net/commons_go/rpc" + crsrs "git.loafle.net/commons_go/rpc/server/rwc/socket" + "git.loafle.net/commons_go/rpc/protocol/json" "git.loafle.net/commons_go/server" "git.loafle.net/overflow/overflow_discovery/discovery" ) func newSocketHandler(rpcSH RPCServletHandler) SocketHandler { + rpcRWCSH := crsrs.New() + sh := &SocketHandlers{ - rpcSH: rpcSH, + rpcSH: rpcSH, + rpcRWCSH: rpcRWCSH, } return sh @@ -21,7 +26,8 @@ func newSocketHandler(rpcSH RPCServletHandler) SocketHandler { type SocketHandlers struct { server.SocketHandlers - rpcSH RPCServletHandler + rpcRWCSH cRPC.ServletReadWriteCloseHandler + rpcSH RPCServletHandler } func (sh *SocketHandlers) Init(serverCTX server.ServerContext) error { @@ -45,7 +51,7 @@ func (sh *SocketHandlers) OnConnect(soc server.Socket) { func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, doneChan chan<- error) { var err error - rpcServlet := retainRPCServlet(sh.rpcSH) + rpcServlet := retainRPCServlet(sh.rpcSH, sh.rpcRWCSH) discovery.RPCServlet = rpcServlet defer func() { @@ -63,8 +69,8 @@ func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, do case err = <-rpcDoneChan: case <-stopChan: rpcServlet.Stop() + <-rpcDoneChan } - } func (sh *SocketHandlers) OnDisconnect(soc server.Socket) { @@ -83,10 +89,10 @@ func (sh *SocketHandlers) Validate() { var rpcServletPool sync.Pool -func retainRPCServlet(sh RPCServletHandler) cRPC.Servlet { +func retainRPCServlet(sh RPCServletHandler, rpcRWCSH cRPC.ServletReadWriteCloseHandler) cRPC.Servlet { v := rpcServletPool.Get() if v == nil { - return cRPC.NewServlet(sh) + return cRPC.NewServlet(sh, rpcRWCSH) } return v.(cRPC.Servlet) }