diff --git a/client.go b/client.go deleted file mode 100644 index 274b0f8..0000000 --- a/client.go +++ /dev/null @@ -1,127 +0,0 @@ -package server - -import ( - "fmt" - "io" - "log" - "sync" - "sync/atomic" - "time" - - "git.loafle.net/commons_go/logging" -) - -type Client interface { - Start() - Stop() -} - -func NewClient(ch ClientHandler) Client { - s := &client{ - ch: ch, - } - return s -} - -type client struct { - ch ClientHandler - - Stats ConnStats - - stopChan chan struct{} - stopWg sync.WaitGroup -} - -func (c *client) Start() { - if nil == c.ch { - panic("Client: client handler must be specified.") - } - c.ch.Validate() - - if c.stopChan != nil { - panic("Client: client is already running. Stop it before starting it again") - } - c.stopChan = make(chan struct{}) - - c.ch.OnStart() - c.stopWg.Add(1) - runClient(c) -} - -func (c *client) Stop() { - if c.stopChan == nil { - panic("Client: client must be started before stopping it") - } - close(c.stopChan) - c.stopWg.Wait() - c.stopChan = nil - c.ch.OnStop() -} - -func runClient(c *client) { - defer c.stopWg.Done() - - var conn io.ReadWriteCloser - var err error - var stopping atomic.Value - - dialChan := make(chan struct{}) - go func() { - if conn, err = c.ch.Dial(); err != nil { - if stopping.Load() == nil { - logging.Logger.Error(fmt.Sprintf("Client: [%s].Cannot establish rpc connection: [%s]", c.ch.GetAddr(), err)) - } - } - close(dialChan) - }() - - select { - case <-c.stopChan: - stopping.Store(true) - <-dialChan - return - case <-dialChan: - c.Stats.incDialCalls() - } - - if err != nil { - c.Stats.incDialErrors() - select { - case <-c.stopChan: - return - case <-time.After(time.Second): - } - return - } - - go handleClientConnection(c, conn) - - select { - case <-c.stopChan: - return - default: - } - -} - -func handleClientConnection(c *client, conn io.ReadWriteCloser) { - if err := c.ch.OnHandshake(c.ch.GetAddr(), conn); nil != err { - logging.Logger.Error(fmt.Sprintf("Client: [%s]. handshake error: [%s]", c.ch.GetAddr(), err)) - conn.Close() - return - } - - log.Printf("handleClientConnection") - clientStopChan := make(chan struct{}) - go c.ch.Handle(conn, clientStopChan) - - select { - case <-c.stopChan: - close(clientStopChan) - conn.Close() - return - case <-clientStopChan: - conn.Close() - return - } -} diff --git a/client_handler.go b/client_handler.go deleted file mode 100644 index 4ae2525..0000000 --- a/client_handler.go +++ /dev/null @@ -1,24 +0,0 @@ -package server - -import ( - "io" - "time" -) - -type ClientHandler interface { - OnStart() - OnStop() - - Dial() (conn io.ReadWriteCloser, err error) - OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error - Handle(rwc io.ReadWriteCloser, stopChan chan struct{}) - - GetAddr() string - GetPendingRequests() int - GetRequestTimeout() time.Duration - GetSendBufferSize() int - GetRecvBufferSize() int - GetKeepAlivePeriod() time.Duration - - Validate() -} diff --git a/client_handlers.go b/client_handlers.go deleted file mode 100644 index b66a502..0000000 --- a/client_handlers.go +++ /dev/null @@ -1,108 +0,0 @@ -package server - -import ( - "errors" - "io" - "log" - "time" -) - -type ClientHandlers struct { - // Server address to connect to. - // - // The address format depends on the underlying transport provided - // by Client.Dial. The following transports are provided out of the box: - // * TCP - see NewTCPClient() and NewTCPServer(). - // * TLS - see NewTLSClient() and NewTLSServer(). - // * Unix sockets - see NewUnixClient() and NewUnixServer(). - // - // By default TCP transport is used. - Addr string - - // The maximum number of pending requests in the queue. - // - // The number of pending requsts should exceed the expected number - // of concurrent goroutines calling client's methods. - // Otherwise a lot of ClientError.Overflow errors may appear. - // - // Default is DefaultPendingMessages. - PendingRequests int - - // Maximum request time. - // Default value is DefaultRequestTimeout. - RequestTimeout time.Duration - - // Size of send buffer per each underlying connection in bytes. - // Default value is DefaultBufferSize. - SendBufferSize int - - // Size of recv buffer per each underlying connection in bytes. - // Default value is DefaultBufferSize. - RecvBufferSize int - - KeepAlivePeriod time.Duration -} - -func (ch *ClientHandlers) OnStart() { - // no op -} - -func (ch *ClientHandlers) OnStop() { - // no op -} - -func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { - return nil, errors.New("Client: Handler method[Dial] of Client is not implement") -} - -func (ch *ClientHandlers) OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error { - log.Printf("OnHandshake") - return nil -} - -func (ch *ClientHandlers) Handle(rwc io.ReadWriteCloser, stopChan chan struct{}) { - // no op - log.Printf("Handle") -} - -func (ch *ClientHandlers) GetAddr() string { - return ch.Addr -} - -func (ch *ClientHandlers) GetPendingRequests() int { - return ch.PendingRequests -} - -func (ch *ClientHandlers) GetRequestTimeout() time.Duration { - return ch.RequestTimeout -} - -func (ch *ClientHandlers) GetSendBufferSize() int { - return ch.SendBufferSize -} - -func (ch *ClientHandlers) GetRecvBufferSize() int { - return ch.RecvBufferSize -} - -func (ch *ClientHandlers) GetKeepAlivePeriod() time.Duration { - return ch.KeepAlivePeriod -} - -func (ch *ClientHandlers) Validate() { - if ch.PendingRequests <= 0 { - ch.PendingRequests = DefaultPendingMessages - } - if ch.RequestTimeout <= 0 { - ch.RequestTimeout = DefaultRequestTimeout - } - if ch.SendBufferSize <= 0 { - ch.SendBufferSize = DefaultBufferSize - } - if ch.RecvBufferSize <= 0 { - ch.RecvBufferSize = DefaultBufferSize - } - if ch.KeepAlivePeriod <= 0 { - ch.KeepAlivePeriod = DefaultKeepAlivePeriod - } -} diff --git a/ipc/client_handlers.go b/ipc/client_handlers.go deleted file mode 100644 index be1a026..0000000 --- a/ipc/client_handlers.go +++ /dev/null @@ -1,11 +0,0 @@ -package ipc - -import ( - "git.loafle.net/commons_go/server" -) - -type ClientHandlers struct { - server.ClientHandlers - - path string -} diff --git a/ipc/client_handlers_unix.go b/ipc/client_handlers_unix.go deleted file mode 100644 index 55b8657..0000000 --- a/ipc/client_handlers_unix.go +++ /dev/null @@ -1,10 +0,0 @@ -package ipc - -import ( - "io" - "net" -) - -func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { - return net.Dial("unix", ch.Addr) -} diff --git a/ipc/client_handlers_windows.go b/ipc/client_handlers_windows.go deleted file mode 100644 index d6423cf..0000000 --- a/ipc/client_handlers_windows.go +++ /dev/null @@ -1,11 +0,0 @@ -package ipc - -import ( - "io" - - "gopkg.in/natefinch/npipe.v2" -) - -func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { - return npipe.Dial(ch.Addr) -} diff --git a/ipc/server_handlers.go b/ipc/server_handlers.go deleted file mode 100644 index a5d54c3..0000000 --- a/ipc/server_handlers.go +++ /dev/null @@ -1,7 +0,0 @@ -package ipc - -import "git.loafle.net/commons_go/server" - -type ServerHandlers struct { - server.ServerHandlers -} diff --git a/ipc/server_handlers_unix.go b/ipc/server_handlers_unix.go deleted file mode 100644 index 5d14406..0000000 --- a/ipc/server_handlers_unix.go +++ /dev/null @@ -1,20 +0,0 @@ -package ipc - -import ( - "net" - "os" -) - -// Addr is ex:/tmp/server.sock -func (sh *ServerHandlers) Listen() (l net.Listener, err error) { - os.Remove(sh.Addr) - l, err = net.ListenUnix("unix", &net.UnixAddr{Name: sh.Addr, Net: "unix"}) - - os.Chmod(sh.Addr, 0777) - - return -} - -func (sh *ServerHandlers) OnStop() { - os.Remove(sh.Addr) -} diff --git a/ipc/server_handlers_windows.go b/ipc/server_handlers_windows.go deleted file mode 100644 index 769472c..0000000 --- a/ipc/server_handlers_windows.go +++ /dev/null @@ -1,16 +0,0 @@ -package ipc - -import ( - "net" - - "gopkg.in/natefinch/npipe.v2" -) - -// Addr is ex:\\.\pipe\server -func (sh *ServerHandlers) Listen() (net.Listener, error) { - return npipe.Listen(s.path) -} - -func (sh *ServerHandlers) OnStop() { - -} diff --git a/server.go b/server.go index c49fd77..a99ece4 100644 --- a/server.go +++ b/server.go @@ -2,7 +2,6 @@ package server import ( "fmt" - "io" "net" "sync" "sync/atomic" @@ -29,7 +28,7 @@ type server struct { listener net.Listener - Stats ConnStats + Stats ServerStats stopChan chan struct{} stopWg sync.WaitGroup @@ -44,16 +43,17 @@ func (s *server) Start() error { if s.stopChan != nil { panic("Server: server is already running. Stop it before starting it again") } - s.stopChan = make(chan struct{}) var err error if s.listener, err = s.sh.Listen(); nil != err { return err } + s.stopChan = make(chan struct{}) + s.sh.OnStart() s.stopWg.Add(1) - go runServer(s) + go handleServer(s) return nil } @@ -76,21 +76,22 @@ func (s *server) Serve() error { return nil } -func runServer(s *server) { +func handleServer(s *server) { defer s.stopWg.Done() - var conn io.ReadWriteCloser - var clientAddr string + var conn net.Conn var err error var stopping atomic.Value for { acceptChan := make(chan struct{}) go func() { - if conn, clientAddr, err = s.sh.accept(s.listener); err != nil { + if conn, err = s.listener.Accept(); err != nil { if stopping.Load() == nil { - logging.Logger.Error(fmt.Sprintf("Server: [%s]. Cannot accept new connection: [%s]", s.sh.GetAddr(), err)) + logging.Logger.Error(fmt.Sprintf("Server: Cannot accept new connection: [%s]", err)) } + } else { + conn, err = s.sh.OnAccept(conn) } close(acceptChan) }() @@ -116,31 +117,26 @@ func runServer(s *server) { } s.stopWg.Add(1) - go handleServerConnection(s, conn, clientAddr) + go handleConnection(s, conn) } } -func handleServerConnection(s *server, conn io.ReadWriteCloser, clientAddr string) { +func handleConnection(s *server, conn net.Conn) { defer s.stopWg.Done() - if err := s.sh.OnHandshake(clientAddr, conn); nil != err { - logging.Logger.Error(fmt.Sprintf("Server: [%s]. handshake error: [%s]", clientAddr, err)) - conn.Close() - return - } - - logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is connected.", clientAddr)) + logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is connected.", conn.RemoteAddr())) clientStopChan := make(chan struct{}) - go s.sh.Handle(clientAddr, conn, clientStopChan) + handleDoneCnah := make(chan struct{}) + go s.sh.Handle(conn, clientStopChan, handleDoneCnah) select { case <-s.stopChan: close(clientStopChan) conn.Close() - return - case <-clientStopChan: + <-handleDoneCnah + case <-handleDoneCnah: + close(clientStopChan) conn.Close() - logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", clientAddr)) - return + logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", conn.RemoteAddr())) } } diff --git a/server_handler.go b/server_handler.go index a5ee011..3ed7127 100644 --- a/server_handler.go +++ b/server_handler.go @@ -1,23 +1,19 @@ package server import ( - "io" "net" ) type ServerHandler interface { + Listen() (net.Listener, error) + OnAccept(conn net.Conn) (net.Conn, error) + OnStart() OnStop() - Listen() (net.Listener, error) - OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error - Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) + Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) IsClientDisconnect(err error) bool - GetAddr() string - Validate() - - accept(l net.Listener) (io.ReadWriteCloser, string, error) } diff --git a/server_handlers.go b/server_handlers.go index 3fb2610..51bb11f 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -4,23 +4,9 @@ import ( "errors" "io" "net" - "time" ) type ServerHandlers struct { - // Address to listen to for incoming connections. - // - // The address format depends on the underlying transport provided - // by Server.Listener. The following transports are provided - // out of the box: - // * TCP - see NewTCPServer() and NewTCPClient(). - // * TLS (aka SSL) - see NewTLSServer() and NewTLSClient(). - // * Unix sockets - see NewUnixServer() and NewUnixClient(). - // - // By default TCP transport is used. - Addr string - - KeepAlivePeriod time.Duration } func (sh *ServerHandlers) OnStart() { @@ -35,11 +21,11 @@ func (sh *ServerHandlers) Listen() (net.Listener, error) { return nil, errors.New("Server: Handler method[Listen] of Server is not implement") } -func (sh *ServerHandlers) OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error { - return nil +func (sh *ServerHandlers) OnAccept(conn net.Conn) (net.Conn, error) { + return conn, nil } -func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) { +func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { } @@ -47,44 +33,6 @@ func (sh *ServerHandlers) IsClientDisconnect(err error) bool { return err == io.ErrUnexpectedEOF || err == io.EOF } -func (sh *ServerHandlers) GetAddr() string { - return sh.Addr -} - -func (sh *ServerHandlers) accept(l net.Listener) (io.ReadWriteCloser, string, error) { - conn, err := l.Accept() - if nil != err { - return nil, "", err - } - - if 0 < sh.KeepAlivePeriod { - if err = setupKeepalive(conn, sh.KeepAlivePeriod); nil != err { - conn.Close() - return nil, "", err - } - } - - return conn, conn.RemoteAddr().String(), nil -} - func (sh *ServerHandlers) Validate() { - if sh.KeepAlivePeriod <= 0 { - sh.KeepAlivePeriod = DefaultKeepAlivePeriod - } } - -func setupKeepalive(conn net.Conn, keepAlivePeriod time.Duration) error { - tcpConn, ok := conn.(*net.TCPConn) - if !ok { - return errors.New("Server: Keepalive is valid when connection is net.TCPConn") - } - - if err := tcpConn.SetKeepAlive(true); err != nil { - return err - } - if err := tcpConn.SetKeepAlivePeriod(keepAlivePeriod * time.Second); err != nil { - return err - } - return nil -} diff --git a/stats.go b/stats.go index 78d231f..c0ce5a4 100644 --- a/stats.go +++ b/stats.go @@ -6,7 +6,7 @@ import ( "time" ) -type ConnStats struct { +type ServerStats struct { // The number of request performed. RequestCount uint64 @@ -53,68 +53,68 @@ type ConnStats struct { // AvgRequestTime returns the average Request execution time. // -// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (cs *ConnStats) AvgRequestTime() time.Duration { - return time.Duration(float64(cs.RequestTime)/float64(cs.RequestCount)) * time.Millisecond +func (ss *ServerStats) AvgRequestTime() time.Duration { + return time.Duration(float64(ss.RequestTime)/float64(ss.RequestCount)) * time.Millisecond } // AvgRequestBytes returns the average bytes sent / received per Request. // -// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (cs *ConnStats) AvgRequestBytes() (send float64, recv float64) { - return float64(cs.BytesWritten) / float64(cs.RequestCount), float64(cs.BytesRead) / float64(cs.RequestCount) +func (ss *ServerStats) AvgRequestBytes() (send float64, recv float64) { + return float64(ss.BytesWritten) / float64(ss.RequestCount), float64(ss.BytesRead) / float64(ss.RequestCount) } // AvgRequestCalls returns the average number of write() / read() syscalls per Request. // -// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (cs *ConnStats) AvgRequestCalls() (write float64, read float64) { - return float64(cs.WriteCalls) / float64(cs.RequestCount), float64(cs.ReadCalls) / float64(cs.RequestCount) +func (ss *ServerStats) AvgRequestCalls() (write float64, read float64) { + return float64(ss.WriteCalls) / float64(ss.RequestCount), float64(ss.ReadCalls) / float64(ss.RequestCount) } type writerCounter struct { w io.Writer - cs *ConnStats + ss *ServerStats } type readerCounter struct { r io.Reader - cs *ConnStats + ss *ServerStats } -func newWriterCounter(w io.Writer, cs *ConnStats) io.Writer { +func newWriterCounter(w io.Writer, ss *ServerStats) io.Writer { return &writerCounter{ w: w, - cs: cs, + ss: ss, } } -func newReaderCounter(r io.Reader, cs *ConnStats) io.Reader { +func newReaderCounter(r io.Reader, ss *ServerStats) io.Reader { return &readerCounter{ r: r, - cs: cs, + ss: ss, } } func (w *writerCounter) Write(p []byte) (int, error) { n, err := w.w.Write(p) - w.cs.incWriteCalls() + w.ss.incWriteCalls() if err != nil { - w.cs.incWriteErrors() + w.ss.incWriteErrors() } - w.cs.addBytesWritten(uint64(n)) + w.ss.addBytesWritten(uint64(n)) return n, err } func (r *readerCounter) Read(p []byte) (int, error) { n, err := r.r.Read(p) - r.cs.incReadCalls() + r.ss.incReadCalls() if err != nil { - r.cs.incReadErrors() + r.ss.incReadErrors() } - r.cs.addBytesRead(uint64(n)) + r.ss.addBytesRead(uint64(n)) return n, err } diff --git a/stats_386.go b/stats_386.go index 9161044..091efac 100644 --- a/stats_386.go +++ b/stats_386.go @@ -4,103 +4,103 @@ import "sync" // Snapshot returns connection statistics' snapshot. // -// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (cs *ConnStats) Snapshot() *ConnStats { - cs.lock.Lock() - snapshot := *cs - cs.lock.Unlock() +func (ss *ServerStats) Snapshot() *ServerStats { + ss.lock.Lock() + snapshot := *ss + ss.lock.Unlock() snapshot.lock = sync.Mutex{} return &snapshot } // Reset resets all the stats counters. -func (cs *ConnStats) Reset() { - cs.lock.Lock() - cs.RequestCount = 0 - cs.RequestTime = 0 - cs.BytesWritten = 0 - cs.BytesRead = 0 - cs.WriteCalls = 0 - cs.WriteErrors = 0 - cs.ReadCalls = 0 - cs.ReadErrors = 0 - cs.DialCalls = 0 - cs.DialErrors = 0 - cs.AcceptCalls = 0 - cs.AcceptErrors = 0 - cs.lock.Unlock() +func (ss *ServerStats) Reset() { + ss.lock.Lock() + ss.RequestCount = 0 + ss.RequestTime = 0 + ss.BytesWritten = 0 + ss.BytesRead = 0 + ss.WriteCalls = 0 + ss.WriteErrors = 0 + ss.ReadCalls = 0 + ss.ReadErrors = 0 + ss.DialCalls = 0 + ss.DialErrors = 0 + ss.AcceptCalls = 0 + ss.AcceptErrors = 0 + ss.lock.Unlock() } -func (cs *ConnStats) incRPCCalls() { - cs.lock.Lock() - cs.RequestCount++ - cs.lock.Unlock() +func (ss *ServerStats) incRPCCalls() { + ss.lock.Lock() + ss.RequestCount++ + ss.lock.Unlock() } -func (cs *ConnStats) incRPCTime(dt uint64) { - cs.lock.Lock() - cs.RequestTime += dt - cs.lock.Unlock() +func (ss *ServerStats) incRPCTime(dt uint64) { + ss.lock.Lock() + ss.RequestTime += dt + ss.lock.Unlock() } -func (cs *ConnStats) addBytesWritten(n uint64) { - cs.lock.Lock() - cs.BytesWritten += n - cs.lock.Unlock() +func (ss *ServerStats) addBytesWritten(n uint64) { + ss.lock.Lock() + ss.BytesWritten += n + ss.lock.Unlock() } -func (cs *ConnStats) addBytesRead(n uint64) { - cs.lock.Lock() - cs.BytesRead += n - cs.lock.Unlock() +func (ss *ServerStats) addBytesRead(n uint64) { + ss.lock.Lock() + ss.BytesRead += n + ss.lock.Unlock() } -func (cs *ConnStats) incReadCalls() { - cs.lock.Lock() - cs.ReadCalls++ - cs.lock.Unlock() +func (ss *ServerStats) incReadCalls() { + ss.lock.Lock() + ss.ReadCalls++ + ss.lock.Unlock() } -func (cs *ConnStats) incReadErrors() { - cs.lock.Lock() - cs.ReadErrors++ - cs.lock.Unlock() +func (ss *ServerStats) incReadErrors() { + ss.lock.Lock() + ss.ReadErrors++ + ss.lock.Unlock() } -func (cs *ConnStats) incWriteCalls() { - cs.lock.Lock() - cs.WriteCalls++ - cs.lock.Unlock() +func (ss *ServerStats) incWriteCalls() { + ss.lock.Lock() + ss.WriteCalls++ + ss.lock.Unlock() } -func (cs *ConnStats) incWriteErrors() { - cs.lock.Lock() - cs.WriteErrors++ - cs.lock.Unlock() +func (ss *ServerStats) incWriteErrors() { + ss.lock.Lock() + ss.WriteErrors++ + ss.lock.Unlock() } -func (cs *ConnStats) incDialCalls() { - cs.lock.Lock() - cs.DialCalls++ - cs.lock.Unlock() +func (ss *ServerStats) incDialCalls() { + ss.lock.Lock() + ss.DialCalls++ + ss.lock.Unlock() } -func (cs *ConnStats) incDialErrors() { - cs.lock.Lock() - cs.DialErrors++ - cs.lock.Unlock() +func (ss *ServerStats) incDialErrors() { + ss.lock.Lock() + ss.DialErrors++ + ss.lock.Unlock() } -func (cs *ConnStats) incAcceptCalls() { - cs.lock.Lock() - cs.AcceptCalls++ - cs.lock.Unlock() +func (ss *ServerStats) incAcceptCalls() { + ss.lock.Lock() + ss.AcceptCalls++ + ss.lock.Unlock() } -func (cs *ConnStats) incAcceptErrors() { - cs.lock.Lock() - cs.AcceptErrors++ - cs.lock.Unlock() +func (ss *ServerStats) incAcceptErrors() { + ss.lock.Lock() + ss.AcceptErrors++ + ss.lock.Unlock() } diff --git a/stats_generic.go b/stats_generic.go index 5fffdb6..dd9cba6 100644 --- a/stats_generic.go +++ b/stats_generic.go @@ -4,85 +4,85 @@ import "sync/atomic" // Snapshot returns connection statistics' snapshot. // -// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (cs *ConnStats) Snapshot() *ConnStats { - return &ConnStats{ - RequestCount: atomic.LoadUint64(&cs.RequestCount), - RequestTime: atomic.LoadUint64(&cs.RequestTime), - BytesWritten: atomic.LoadUint64(&cs.BytesWritten), - BytesRead: atomic.LoadUint64(&cs.BytesRead), - ReadCalls: atomic.LoadUint64(&cs.ReadCalls), - ReadErrors: atomic.LoadUint64(&cs.ReadErrors), - WriteCalls: atomic.LoadUint64(&cs.WriteCalls), - WriteErrors: atomic.LoadUint64(&cs.WriteErrors), - DialCalls: atomic.LoadUint64(&cs.DialCalls), - DialErrors: atomic.LoadUint64(&cs.DialErrors), - AcceptCalls: atomic.LoadUint64(&cs.AcceptCalls), - AcceptErrors: atomic.LoadUint64(&cs.AcceptErrors), +func (ss *ServerStats) Snapshot() *ServerStats { + return &ServerStats{ + RequestCount: atomic.LoadUint64(&ss.RequestCount), + RequestTime: atomic.LoadUint64(&ss.RequestTime), + BytesWritten: atomic.LoadUint64(&ss.BytesWritten), + BytesRead: atomic.LoadUint64(&ss.BytesRead), + ReadCalls: atomic.LoadUint64(&ss.ReadCalls), + ReadErrors: atomic.LoadUint64(&ss.ReadErrors), + WriteCalls: atomic.LoadUint64(&ss.WriteCalls), + WriteErrors: atomic.LoadUint64(&ss.WriteErrors), + DialCalls: atomic.LoadUint64(&ss.DialCalls), + DialErrors: atomic.LoadUint64(&ss.DialErrors), + AcceptCalls: atomic.LoadUint64(&ss.AcceptCalls), + AcceptErrors: atomic.LoadUint64(&ss.AcceptErrors), } } // Reset resets all the stats counters. -func (cs *ConnStats) Reset() { - atomic.StoreUint64(&cs.RequestCount, 0) - atomic.StoreUint64(&cs.RequestTime, 0) - atomic.StoreUint64(&cs.BytesWritten, 0) - atomic.StoreUint64(&cs.BytesRead, 0) - atomic.StoreUint64(&cs.WriteCalls, 0) - atomic.StoreUint64(&cs.WriteErrors, 0) - atomic.StoreUint64(&cs.ReadCalls, 0) - atomic.StoreUint64(&cs.ReadErrors, 0) - atomic.StoreUint64(&cs.DialCalls, 0) - atomic.StoreUint64(&cs.DialErrors, 0) - atomic.StoreUint64(&cs.AcceptCalls, 0) - atomic.StoreUint64(&cs.AcceptErrors, 0) +func (ss *ServerStats) Reset() { + atomic.StoreUint64(&ss.RequestCount, 0) + atomic.StoreUint64(&ss.RequestTime, 0) + atomic.StoreUint64(&ss.BytesWritten, 0) + atomic.StoreUint64(&ss.BytesRead, 0) + atomic.StoreUint64(&ss.WriteCalls, 0) + atomic.StoreUint64(&ss.WriteErrors, 0) + atomic.StoreUint64(&ss.ReadCalls, 0) + atomic.StoreUint64(&ss.ReadErrors, 0) + atomic.StoreUint64(&ss.DialCalls, 0) + atomic.StoreUint64(&ss.DialErrors, 0) + atomic.StoreUint64(&ss.AcceptCalls, 0) + atomic.StoreUint64(&ss.AcceptErrors, 0) } -func (cs *ConnStats) incRPCCalls() { - atomic.AddUint64(&cs.RequestCount, 1) +func (ss *ServerStats) incRPCCalls() { + atomic.AddUint64(&ss.RequestCount, 1) } -func (cs *ConnStats) incRPCTime(dt uint64) { - atomic.AddUint64(&cs.RequestTime, dt) +func (ss *ServerStats) incRPCTime(dt uint64) { + atomic.AddUint64(&ss.RequestTime, dt) } -func (cs *ConnStats) addBytesWritten(n uint64) { - atomic.AddUint64(&cs.BytesWritten, n) +func (ss *ServerStats) addBytesWritten(n uint64) { + atomic.AddUint64(&ss.BytesWritten, n) } -func (cs *ConnStats) addBytesRead(n uint64) { - atomic.AddUint64(&cs.BytesRead, n) +func (ss *ServerStats) addBytesRead(n uint64) { + atomic.AddUint64(&ss.BytesRead, n) } -func (cs *ConnStats) incReadCalls() { - atomic.AddUint64(&cs.ReadCalls, 1) +func (ss *ServerStats) incReadCalls() { + atomic.AddUint64(&ss.ReadCalls, 1) } -func (cs *ConnStats) incReadErrors() { - atomic.AddUint64(&cs.ReadErrors, 1) +func (ss *ServerStats) incReadErrors() { + atomic.AddUint64(&ss.ReadErrors, 1) } -func (cs *ConnStats) incWriteCalls() { - atomic.AddUint64(&cs.WriteCalls, 1) +func (ss *ServerStats) incWriteCalls() { + atomic.AddUint64(&ss.WriteCalls, 1) } -func (cs *ConnStats) incWriteErrors() { - atomic.AddUint64(&cs.WriteErrors, 1) +func (ss *ServerStats) incWriteErrors() { + atomic.AddUint64(&ss.WriteErrors, 1) } -func (cs *ConnStats) incDialCalls() { - atomic.AddUint64(&cs.DialCalls, 1) +func (ss *ServerStats) incDialCalls() { + atomic.AddUint64(&ss.DialCalls, 1) } -func (cs *ConnStats) incDialErrors() { - atomic.AddUint64(&cs.DialErrors, 1) +func (ss *ServerStats) incDialErrors() { + atomic.AddUint64(&ss.DialErrors, 1) } -func (cs *ConnStats) incAcceptCalls() { - atomic.AddUint64(&cs.AcceptCalls, 1) +func (ss *ServerStats) incAcceptCalls() { + atomic.AddUint64(&ss.AcceptCalls, 1) } -func (cs *ConnStats) incAcceptErrors() { - atomic.AddUint64(&cs.AcceptErrors, 1) +func (ss *ServerStats) incAcceptErrors() { + atomic.AddUint64(&ss.AcceptErrors, 1) } diff --git a/tcp/client_handlers.go b/tcp/client_handlers.go deleted file mode 100644 index 9f37b95..0000000 --- a/tcp/client_handlers.go +++ /dev/null @@ -1,22 +0,0 @@ -package tcp - -import ( - "io" - "net" - "time" - - "git.loafle.net/commons_go/server" -) - -type ClientHandlers struct { - server.ClientHandlers -} - -func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { - dialer := &net.Dialer{ - Timeout: ch.RequestTimeout * time.Second, - KeepAlive: ch.KeepAlivePeriod * time.Second, - } - - return dialer.Dial("tcp", ch.Addr) -} diff --git a/tcp/server_handlers.go b/tcp/server_handlers.go deleted file mode 100644 index 31ff142..0000000 --- a/tcp/server_handlers.go +++ /dev/null @@ -1,16 +0,0 @@ -package tcp - -import ( - "net" - - "git.loafle.net/commons_go/server" -) - -type ServerHandlers struct { - server.ServerHandlers -} - -func (sh *ServerHandlers) Listen() (net.Listener, error) { - - return net.Listen("tcp", sh.Addr) -} diff --git a/tcp_tls/client_handlers.go b/tcp_tls/client_handlers.go deleted file mode 100644 index 83f768d..0000000 --- a/tcp_tls/client_handlers.go +++ /dev/null @@ -1,25 +0,0 @@ -package tcp_tls - -import ( - "crypto/tls" - "io" - "net" - "time" - - "git.loafle.net/commons_go/server" -) - -type ClientHandlers struct { - server.ClientHandlers - - tlsConfig *tls.Config -} - -func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) { - dialer := &net.Dialer{ - Timeout: ch.RequestTimeout * time.Second, - KeepAlive: ch.KeepAlivePeriod * time.Second, - } - - return tls.DialWithDialer(dialer, "tcp", ch.Addr, ch.tlsConfig) -} diff --git a/tcp_tls/server_handlers.go b/tcp_tls/server_handlers.go deleted file mode 100644 index a34c163..0000000 --- a/tcp_tls/server_handlers.go +++ /dev/null @@ -1,19 +0,0 @@ -package tcp_tls - -import ( - "crypto/tls" - "net" - - "git.loafle.net/commons_go/server" -) - -type ServerHandlers struct { - server.ServerHandlers - - tlsConfig *tls.Config -} - -func (sh *ServerHandlers) Listen() (net.Listener, error) { - - return tls.Listen("tcp", sh.Addr, sh.tlsConfig) -}