diff --git a/client.go b/client.go new file mode 100644 index 0000000..c91709d --- /dev/null +++ b/client.go @@ -0,0 +1,126 @@ +package server + +import ( + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "git.loafle.net/commons_go/logging" +) + +type Client interface { + Start() error + Stop() +} + +func NewClient(ch ClientHandler) Client { + s := &client{ + clientHandler: ch, + } + return s +} + +type client struct { + clientHandler ClientHandler + + Stats ConnStats + + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (c *client) Start() error { + if nil == c.clientHandler { + panic("Client: client handler must be specified.") + } + c.clientHandler.Validate() + + if c.stopChan != nil { + panic("Client: client is already running. Stop it before starting it again") + } + c.stopChan = make(chan struct{}) + + c.stopWg.Add(1) + go runClient(c) + return nil +} + +func (c *client) Stop() { + if c.stopChan == nil { + panic("Client: client must be started before stopping it") + } + close(c.stopChan) + c.stopWg.Wait() + c.stopChan = nil +} + +func runClient(c *client) { + defer c.stopWg.Done() + + var conn io.ReadWriteCloser + var err error + var stopping atomic.Value + + for { + dialChan := make(chan struct{}) + go func() { + if conn, err = c.clientHandler.Dial(); err != nil { + if stopping.Load() == nil { + logging.Logger.Error(fmt.Sprintf("Client: [%s].Cannot establish rpc connection: [%s]", c.clientHandler.GetAddr(), err)) + } + } + close(dialChan) + }() + + select { + case <-c.stopChan: + stopping.Store(true) + <-dialChan + return + case <-dialChan: + c.Stats.incDialCalls() + } + + if err != nil { + c.Stats.incDialErrors() + select { + case <-c.stopChan: + return + case <-time.After(time.Second): + } + continue + } + + c.stopWg.Add(1) + handleClientConnection(c, conn) + + select { + case <-c.stopChan: + return + default: + } + } +} + +func handleClientConnection(c *client, conn io.ReadWriteCloser) { + if err := c.clientHandler.OnHandshake(c.clientHandler.GetAddr(), conn); nil != err { + logging.Logger.Error(fmt.Sprintf("Client: [%s]. handshake error: [%s]", c.clientHandler.GetAddr(), err)) + conn.Close() + return + } + + clientStopChan := make(chan struct{}) + go c.clientHandler.Handle(conn, clientStopChan) + + select { + case <-c.stopChan: + close(clientStopChan) + conn.Close() + return + case <-clientStopChan: + conn.Close() + return + } +} diff --git a/client_handler.go b/client_handler.go new file mode 100644 index 0000000..9c04b8b --- /dev/null +++ b/client_handler.go @@ -0,0 +1,12 @@ +package server + +import "io" + +type ClientHandler interface { + Dial() (conn io.ReadWriteCloser, err error) + OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error + Handle(rwc io.ReadWriteCloser, stopChan chan struct{}) + + GetAddr() string + Validate() +} diff --git a/client_handlers.go b/client_handlers.go new file mode 100644 index 0000000..5b06669 --- /dev/null +++ b/client_handlers.go @@ -0,0 +1,77 @@ +package server + +import ( + "errors" + "io" + "time" +) + +type ClientHandlers struct { + // Server address to connect to. + // + // The address format depends on the underlying transport provided + // by Client.Dial. The following transports are provided out of the box: + // * TCP - see NewTCPClient() and NewTCPServer(). + // * TLS - see NewTLSClient() and NewTLSServer(). + // * Unix sockets - see NewUnixClient() and NewUnixServer(). + // + // By default TCP transport is used. + Addr string + + // The maximum number of pending requests in the queue. + // + // The number of pending requsts should exceed the expected number + // of concurrent goroutines calling client's methods. + // Otherwise a lot of ClientError.Overflow errors may appear. + // + // Default is DefaultPendingMessages. + PendingRequests int + + // Maximum request time. + // Default value is DefaultRequestTimeout. + RequestTimeout time.Duration + + // Size of send buffer per each underlying connection in bytes. + // Default value is DefaultBufferSize. + SendBufferSize int + + // Size of recv buffer per each underlying connection in bytes. + // Default value is DefaultBufferSize. + RecvBufferSize int + + KeepAlivePeriod time.Duration +} + +func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { + return nil, errors.New("Client: Handler method[Dial] of Client is not implement") +} + +func (ch *ClientHandlers) OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error { + return nil +} + +func (ch *ClientHandlers) Handle(rwc io.ReadWriteCloser, stopChan chan struct{}) { + +} + +func (ch *ClientHandlers) GetAddr() string { + return ch.Addr +} + +func (ch *ClientHandlers) Validate() { + if ch.PendingRequests <= 0 { + ch.PendingRequests = DefaultPendingMessages + } + if ch.RequestTimeout <= 0 { + ch.RequestTimeout = DefaultRequestTimeout + } + if ch.SendBufferSize <= 0 { + ch.SendBufferSize = DefaultBufferSize + } + if ch.RecvBufferSize <= 0 { + ch.RecvBufferSize = DefaultBufferSize + } + if ch.KeepAlivePeriod <= 0 { + ch.KeepAlivePeriod = DefaultKeepAlivePeriod + } +} diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..aaaf0a8 --- /dev/null +++ b/constants.go @@ -0,0 +1,26 @@ +package server + +import "time" + +const ( + // DefaultConcurrency is the default number of concurrent rpc calls + // the server can process. + DefaultConcurrency = 8 * 1024 + + // DefaultRequestTimeout is the default timeout for client request. + DefaultRequestTimeout = 20 * time.Second + + // DefaultPendingMessages is the default number of pending messages + // handled by Client and Server. + DefaultPendingMessages = 32 * 1024 + + // DefaultFlushDelay is the default delay between message flushes + // on Client and Server. + DefaultFlushDelay = -1 + + // DefaultBufferSize is the default size for Client and Server buffers. + DefaultBufferSize = 64 * 1024 + + // DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection. + DefaultKeepAlivePeriod = 0 +) diff --git a/ipc/client_handlers.go b/ipc/client_handlers.go new file mode 100644 index 0000000..be1a026 --- /dev/null +++ b/ipc/client_handlers.go @@ -0,0 +1,11 @@ +package ipc + +import ( + "git.loafle.net/commons_go/server" +) + +type ClientHandlers struct { + server.ClientHandlers + + path string +} diff --git a/ipc/client_handlers_unix.go b/ipc/client_handlers_unix.go new file mode 100644 index 0000000..55b8657 --- /dev/null +++ b/ipc/client_handlers_unix.go @@ -0,0 +1,10 @@ +package ipc + +import ( + "io" + "net" +) + +func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { + return net.Dial("unix", ch.Addr) +} diff --git a/ipc/client_handlers_windows.go b/ipc/client_handlers_windows.go new file mode 100644 index 0000000..d6423cf --- /dev/null +++ b/ipc/client_handlers_windows.go @@ -0,0 +1,11 @@ +package ipc + +import ( + "io" + + "gopkg.in/natefinch/npipe.v2" +) + +func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { + return npipe.Dial(ch.Addr) +} diff --git a/ipc/ipc.go b/ipc/server_handlers.go similarity index 88% rename from ipc/ipc.go rename to ipc/server_handlers.go index 1e4c03e..a5d54c3 100644 --- a/ipc/ipc.go +++ b/ipc/server_handlers.go @@ -4,6 +4,4 @@ import "git.loafle.net/commons_go/server" type ServerHandlers struct { server.ServerHandlers - - path string } diff --git a/ipc/ipc_unix.go b/ipc/server_handlers_unix.go similarity index 85% rename from ipc/ipc_unix.go rename to ipc/server_handlers_unix.go index 2354286..0de673c 100644 --- a/ipc/ipc_unix.go +++ b/ipc/server_handlers_unix.go @@ -5,8 +5,8 @@ import ( "os" ) +// Addr is ex:/tmp/server.sock func (sh *ServerHandlers) Listen() (l net.Listener, err error) { - //sh.path = filepath.Join(os.TempDir(), sh.Addr) os.Remove(sh.Addr) l, err = net.ListenUnix("unix", &net.UnixAddr{Name: sh.Addr, Net: "unix"}) diff --git a/ipc/ipc_windows.go b/ipc/server_handlers_windows.go similarity index 85% rename from ipc/ipc_windows.go rename to ipc/server_handlers_windows.go index bacede0..0f46d16 100644 --- a/ipc/ipc_windows.go +++ b/ipc/server_handlers_windows.go @@ -6,9 +6,8 @@ import ( "gopkg.in/natefinch/npipe.v2" ) +// Addr is ex:\\.\pipe\server func (sh *ServerHandlers) Listen() (net.Listener, error) { - sh.path = `\\.\pipe\` + sh.Addr - return npipe.Listen(s.path) } diff --git a/server.go b/server.go index 784ad0e..d4cbf7c 100644 --- a/server.go +++ b/server.go @@ -113,11 +113,11 @@ func runServer(s *server) { } s.stopWg.Add(1) - go handleConnection(s, conn, clientAddr) + go handleServerConnection(s, conn, clientAddr) } } -func handleConnection(s *server, conn io.ReadWriteCloser, clientAddr string) { +func handleServerConnection(s *server, conn io.ReadWriteCloser, clientAddr string) { defer s.stopWg.Done() logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is connected.", clientAddr)) clientStopChan := make(chan struct{}) diff --git a/server_handlers.go b/server_handlers.go index 0086ca7..1d33a4a 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -7,29 +7,6 @@ import ( "time" ) -const ( - // DefaultConcurrency is the default number of concurrent rpc calls - // the server can process. - DefaultConcurrency = 8 * 1024 - - // DefaultRequestTimeout is the default timeout for client request. - DefaultRequestTimeout = 20 * time.Second - - // DefaultPendingMessages is the default number of pending messages - // handled by Client and Server. - DefaultPendingMessages = 32 * 1024 - - // DefaultFlushDelay is the default delay between message flushes - // on Client and Server. - DefaultFlushDelay = -1 - - // DefaultBufferSize is the default size for Client and Server buffers. - DefaultBufferSize = 64 * 1024 - - // DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection. - DefaultKeepAlivePeriod = 0 -) - type ServerHandlers struct { // Address to listen to for incoming connections. // @@ -66,31 +43,6 @@ func (sh *ServerHandlers) GetAddr() string { return sh.Addr } -// func (sh *ServerHandlers) { - -// } -// func (sh *ServerHandlers) { - -// } -// func (sh *ServerHandlers) { - -// } -// func (sh *ServerHandlers) { - -// } -// func (sh *ServerHandlers) { - -// } -// func (sh *ServerHandlers) { - -// } -// func (sh *ServerHandlers) { - -// } -// func (sh *ServerHandlers) { - -// } - func (sh *ServerHandlers) accept(l net.Listener) (io.ReadWriteCloser, string, error) { conn, err := l.Accept() if nil != err { diff --git a/tcp/client_handlers.go b/tcp/client_handlers.go new file mode 100644 index 0000000..9f37b95 --- /dev/null +++ b/tcp/client_handlers.go @@ -0,0 +1,22 @@ +package tcp + +import ( + "io" + "net" + "time" + + "git.loafle.net/commons_go/server" +) + +type ClientHandlers struct { + server.ClientHandlers +} + +func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { + dialer := &net.Dialer{ + Timeout: ch.RequestTimeout * time.Second, + KeepAlive: ch.KeepAlivePeriod * time.Second, + } + + return dialer.Dial("tcp", ch.Addr) +} diff --git a/tcp/tcp.go b/tcp/server_handlers.go similarity index 100% rename from tcp/tcp.go rename to tcp/server_handlers.go diff --git a/tcp_tls/client_handlers.go b/tcp_tls/client_handlers.go new file mode 100644 index 0000000..83f768d --- /dev/null +++ b/tcp_tls/client_handlers.go @@ -0,0 +1,25 @@ +package tcp_tls + +import ( + "crypto/tls" + "io" + "net" + "time" + + "git.loafle.net/commons_go/server" +) + +type ClientHandlers struct { + server.ClientHandlers + + tlsConfig *tls.Config +} + +func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { + dialer := &net.Dialer{ + Timeout: ch.RequestTimeout * time.Second, + KeepAlive: ch.KeepAlivePeriod * time.Second, + } + + return tls.DialWithDialer(dialer, "tcp", ch.Addr, ch.tlsConfig) +} diff --git a/tcp_tls/tcp_tls.go b/tcp_tls/server_handlers.go similarity index 100% rename from tcp_tls/tcp_tls.go rename to tcp_tls/server_handlers.go