diff --git a/server.go b/server.go index f95c5ce..88e3bde 100644 --- a/server.go +++ b/server.go @@ -22,7 +22,7 @@ type server struct { listener net.Listener - Stats ServerStats + Stats ConnStats stopChan chan struct{} stopWg sync.WaitGroup @@ -55,6 +55,7 @@ func (s *server) Stop() { close(s.stopChan) s.stopWg.Wait() s.stopChan = nil + s.serverHandler.Stopped() } func (s *server) Serve() error { diff --git a/server_handler.go b/server_handler.go index 1517cc1..62f2a4a 100644 --- a/server_handler.go +++ b/server_handler.go @@ -8,9 +8,9 @@ import ( type ServerHandler interface { Listen() (net.Listener, error) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) + Stopped() GetAddr() string - GetPendingResponses() int Validate() diff --git a/server_handlers.go b/server_handlers.go index 63dc037..b302fb2 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -43,27 +43,6 @@ type ServerHandlers struct { // By default TCP transport is used. Addr string - // The maximum delay between response flushes to clients. - // - // Negative values lead to immediate requests' sending to the client - // without their buffering. This minimizes rpc latency at the cost - // of higher CPU and network usage. - // - // Default is DefaultFlushDelay. - FlushDelay time.Duration - - // The maximum number of pending responses in the queue. - // Default is DefaultPendingMessages. - PendingResponses int - - // Size of send buffer per each underlying connection in bytes. - // Default is DefaultBufferSize. - SendBufferSize int - - // Size of recv buffer per each underlying connection in bytes. - // Default is DefaultBufferSize. - RecvBufferSize int - KeepAlivePeriod time.Duration } @@ -75,12 +54,12 @@ func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stop } -func (sh *ServerHandlers) GetAddr() string { - return sh.Addr +func (sh *ServerHandlers) Stopped() { + } -func (sh *ServerHandlers) GetPendingResponses() int { - return sh.PendingResponses +func (sh *ServerHandlers) GetAddr() string { + return sh.Addr } // func (sh *ServerHandlers) { @@ -125,21 +104,6 @@ func (sh *ServerHandlers) accept(l net.Listener) (io.ReadWriteCloser, string, er } func (sh *ServerHandlers) Validate() { - if sh.Concurrency <= 0 { - sh.Concurrency = DefaultConcurrency - } - if sh.FlushDelay == 0 { - sh.FlushDelay = DefaultFlushDelay - } - if sh.PendingResponses <= 0 { - sh.PendingResponses = DefaultPendingMessages - } - if sh.SendBufferSize <= 0 { - sh.SendBufferSize = DefaultBufferSize - } - if sh.RecvBufferSize <= 0 { - sh.RecvBufferSize = DefaultBufferSize - } if sh.KeepAlivePeriod <= 0 { sh.KeepAlivePeriod = DefaultKeepAlivePeriod } diff --git a/stats.go b/stats.go index c0ce5a4..78d231f 100644 --- a/stats.go +++ b/stats.go @@ -6,7 +6,7 @@ import ( "time" ) -type ServerStats struct { +type ConnStats struct { // The number of request performed. RequestCount uint64 @@ -53,68 +53,68 @@ type ServerStats struct { // AvgRequestTime returns the average Request execution time. // -// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (ss *ServerStats) AvgRequestTime() time.Duration { - return time.Duration(float64(ss.RequestTime)/float64(ss.RequestCount)) * time.Millisecond +func (cs *ConnStats) AvgRequestTime() time.Duration { + return time.Duration(float64(cs.RequestTime)/float64(cs.RequestCount)) * time.Millisecond } // AvgRequestBytes returns the average bytes sent / received per Request. // -// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (ss *ServerStats) AvgRequestBytes() (send float64, recv float64) { - return float64(ss.BytesWritten) / float64(ss.RequestCount), float64(ss.BytesRead) / float64(ss.RequestCount) +func (cs *ConnStats) AvgRequestBytes() (send float64, recv float64) { + return float64(cs.BytesWritten) / float64(cs.RequestCount), float64(cs.BytesRead) / float64(cs.RequestCount) } // AvgRequestCalls returns the average number of write() / read() syscalls per Request. // -// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (ss *ServerStats) AvgRequestCalls() (write float64, read float64) { - return float64(ss.WriteCalls) / float64(ss.RequestCount), float64(ss.ReadCalls) / float64(ss.RequestCount) +func (cs *ConnStats) AvgRequestCalls() (write float64, read float64) { + return float64(cs.WriteCalls) / float64(cs.RequestCount), float64(cs.ReadCalls) / float64(cs.RequestCount) } type writerCounter struct { w io.Writer - ss *ServerStats + cs *ConnStats } type readerCounter struct { r io.Reader - ss *ServerStats + cs *ConnStats } -func newWriterCounter(w io.Writer, ss *ServerStats) io.Writer { +func newWriterCounter(w io.Writer, cs *ConnStats) io.Writer { return &writerCounter{ w: w, - ss: ss, + cs: cs, } } -func newReaderCounter(r io.Reader, ss *ServerStats) io.Reader { +func newReaderCounter(r io.Reader, cs *ConnStats) io.Reader { return &readerCounter{ r: r, - ss: ss, + cs: cs, } } func (w *writerCounter) Write(p []byte) (int, error) { n, err := w.w.Write(p) - w.ss.incWriteCalls() + w.cs.incWriteCalls() if err != nil { - w.ss.incWriteErrors() + w.cs.incWriteErrors() } - w.ss.addBytesWritten(uint64(n)) + w.cs.addBytesWritten(uint64(n)) return n, err } func (r *readerCounter) Read(p []byte) (int, error) { n, err := r.r.Read(p) - r.ss.incReadCalls() + r.cs.incReadCalls() if err != nil { - r.ss.incReadErrors() + r.cs.incReadErrors() } - r.ss.addBytesRead(uint64(n)) + r.cs.addBytesRead(uint64(n)) return n, err } diff --git a/stats_386.go b/stats_386.go index 091efac..9161044 100644 --- a/stats_386.go +++ b/stats_386.go @@ -4,103 +4,103 @@ import "sync" // Snapshot returns connection statistics' snapshot. // -// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (ss *ServerStats) Snapshot() *ServerStats { - ss.lock.Lock() - snapshot := *ss - ss.lock.Unlock() +func (cs *ConnStats) Snapshot() *ConnStats { + cs.lock.Lock() + snapshot := *cs + cs.lock.Unlock() snapshot.lock = sync.Mutex{} return &snapshot } // Reset resets all the stats counters. -func (ss *ServerStats) Reset() { - ss.lock.Lock() - ss.RequestCount = 0 - ss.RequestTime = 0 - ss.BytesWritten = 0 - ss.BytesRead = 0 - ss.WriteCalls = 0 - ss.WriteErrors = 0 - ss.ReadCalls = 0 - ss.ReadErrors = 0 - ss.DialCalls = 0 - ss.DialErrors = 0 - ss.AcceptCalls = 0 - ss.AcceptErrors = 0 - ss.lock.Unlock() +func (cs *ConnStats) Reset() { + cs.lock.Lock() + cs.RequestCount = 0 + cs.RequestTime = 0 + cs.BytesWritten = 0 + cs.BytesRead = 0 + cs.WriteCalls = 0 + cs.WriteErrors = 0 + cs.ReadCalls = 0 + cs.ReadErrors = 0 + cs.DialCalls = 0 + cs.DialErrors = 0 + cs.AcceptCalls = 0 + cs.AcceptErrors = 0 + cs.lock.Unlock() } -func (ss *ServerStats) incRPCCalls() { - ss.lock.Lock() - ss.RequestCount++ - ss.lock.Unlock() +func (cs *ConnStats) incRPCCalls() { + cs.lock.Lock() + cs.RequestCount++ + cs.lock.Unlock() } -func (ss *ServerStats) incRPCTime(dt uint64) { - ss.lock.Lock() - ss.RequestTime += dt - ss.lock.Unlock() +func (cs *ConnStats) incRPCTime(dt uint64) { + cs.lock.Lock() + cs.RequestTime += dt + cs.lock.Unlock() } -func (ss *ServerStats) addBytesWritten(n uint64) { - ss.lock.Lock() - ss.BytesWritten += n - ss.lock.Unlock() +func (cs *ConnStats) addBytesWritten(n uint64) { + cs.lock.Lock() + cs.BytesWritten += n + cs.lock.Unlock() } -func (ss *ServerStats) addBytesRead(n uint64) { - ss.lock.Lock() - ss.BytesRead += n - ss.lock.Unlock() +func (cs *ConnStats) addBytesRead(n uint64) { + cs.lock.Lock() + cs.BytesRead += n + cs.lock.Unlock() } -func (ss *ServerStats) incReadCalls() { - ss.lock.Lock() - ss.ReadCalls++ - ss.lock.Unlock() +func (cs *ConnStats) incReadCalls() { + cs.lock.Lock() + cs.ReadCalls++ + cs.lock.Unlock() } -func (ss *ServerStats) incReadErrors() { - ss.lock.Lock() - ss.ReadErrors++ - ss.lock.Unlock() +func (cs *ConnStats) incReadErrors() { + cs.lock.Lock() + cs.ReadErrors++ + cs.lock.Unlock() } -func (ss *ServerStats) incWriteCalls() { - ss.lock.Lock() - ss.WriteCalls++ - ss.lock.Unlock() +func (cs *ConnStats) incWriteCalls() { + cs.lock.Lock() + cs.WriteCalls++ + cs.lock.Unlock() } -func (ss *ServerStats) incWriteErrors() { - ss.lock.Lock() - ss.WriteErrors++ - ss.lock.Unlock() +func (cs *ConnStats) incWriteErrors() { + cs.lock.Lock() + cs.WriteErrors++ + cs.lock.Unlock() } -func (ss *ServerStats) incDialCalls() { - ss.lock.Lock() - ss.DialCalls++ - ss.lock.Unlock() +func (cs *ConnStats) incDialCalls() { + cs.lock.Lock() + cs.DialCalls++ + cs.lock.Unlock() } -func (ss *ServerStats) incDialErrors() { - ss.lock.Lock() - ss.DialErrors++ - ss.lock.Unlock() +func (cs *ConnStats) incDialErrors() { + cs.lock.Lock() + cs.DialErrors++ + cs.lock.Unlock() } -func (ss *ServerStats) incAcceptCalls() { - ss.lock.Lock() - ss.AcceptCalls++ - ss.lock.Unlock() +func (cs *ConnStats) incAcceptCalls() { + cs.lock.Lock() + cs.AcceptCalls++ + cs.lock.Unlock() } -func (ss *ServerStats) incAcceptErrors() { - ss.lock.Lock() - ss.AcceptErrors++ - ss.lock.Unlock() +func (cs *ConnStats) incAcceptErrors() { + cs.lock.Lock() + cs.AcceptErrors++ + cs.lock.Unlock() } diff --git a/stats_generic.go b/stats_generic.go index dd9cba6..5fffdb6 100644 --- a/stats_generic.go +++ b/stats_generic.go @@ -4,85 +4,85 @@ import "sync/atomic" // Snapshot returns connection statistics' snapshot. // -// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// Use stats returned from ConnStats.Snapshot() on live Client and / or Server, // since the original stats can be updated by concurrently running goroutines. -func (ss *ServerStats) Snapshot() *ServerStats { - return &ServerStats{ - RequestCount: atomic.LoadUint64(&ss.RequestCount), - RequestTime: atomic.LoadUint64(&ss.RequestTime), - BytesWritten: atomic.LoadUint64(&ss.BytesWritten), - BytesRead: atomic.LoadUint64(&ss.BytesRead), - ReadCalls: atomic.LoadUint64(&ss.ReadCalls), - ReadErrors: atomic.LoadUint64(&ss.ReadErrors), - WriteCalls: atomic.LoadUint64(&ss.WriteCalls), - WriteErrors: atomic.LoadUint64(&ss.WriteErrors), - DialCalls: atomic.LoadUint64(&ss.DialCalls), - DialErrors: atomic.LoadUint64(&ss.DialErrors), - AcceptCalls: atomic.LoadUint64(&ss.AcceptCalls), - AcceptErrors: atomic.LoadUint64(&ss.AcceptErrors), +func (cs *ConnStats) Snapshot() *ConnStats { + return &ConnStats{ + RequestCount: atomic.LoadUint64(&cs.RequestCount), + RequestTime: atomic.LoadUint64(&cs.RequestTime), + BytesWritten: atomic.LoadUint64(&cs.BytesWritten), + BytesRead: atomic.LoadUint64(&cs.BytesRead), + ReadCalls: atomic.LoadUint64(&cs.ReadCalls), + ReadErrors: atomic.LoadUint64(&cs.ReadErrors), + WriteCalls: atomic.LoadUint64(&cs.WriteCalls), + WriteErrors: atomic.LoadUint64(&cs.WriteErrors), + DialCalls: atomic.LoadUint64(&cs.DialCalls), + DialErrors: atomic.LoadUint64(&cs.DialErrors), + AcceptCalls: atomic.LoadUint64(&cs.AcceptCalls), + AcceptErrors: atomic.LoadUint64(&cs.AcceptErrors), } } // Reset resets all the stats counters. -func (ss *ServerStats) Reset() { - atomic.StoreUint64(&ss.RequestCount, 0) - atomic.StoreUint64(&ss.RequestTime, 0) - atomic.StoreUint64(&ss.BytesWritten, 0) - atomic.StoreUint64(&ss.BytesRead, 0) - atomic.StoreUint64(&ss.WriteCalls, 0) - atomic.StoreUint64(&ss.WriteErrors, 0) - atomic.StoreUint64(&ss.ReadCalls, 0) - atomic.StoreUint64(&ss.ReadErrors, 0) - atomic.StoreUint64(&ss.DialCalls, 0) - atomic.StoreUint64(&ss.DialErrors, 0) - atomic.StoreUint64(&ss.AcceptCalls, 0) - atomic.StoreUint64(&ss.AcceptErrors, 0) +func (cs *ConnStats) Reset() { + atomic.StoreUint64(&cs.RequestCount, 0) + atomic.StoreUint64(&cs.RequestTime, 0) + atomic.StoreUint64(&cs.BytesWritten, 0) + atomic.StoreUint64(&cs.BytesRead, 0) + atomic.StoreUint64(&cs.WriteCalls, 0) + atomic.StoreUint64(&cs.WriteErrors, 0) + atomic.StoreUint64(&cs.ReadCalls, 0) + atomic.StoreUint64(&cs.ReadErrors, 0) + atomic.StoreUint64(&cs.DialCalls, 0) + atomic.StoreUint64(&cs.DialErrors, 0) + atomic.StoreUint64(&cs.AcceptCalls, 0) + atomic.StoreUint64(&cs.AcceptErrors, 0) } -func (ss *ServerStats) incRPCCalls() { - atomic.AddUint64(&ss.RequestCount, 1) +func (cs *ConnStats) incRPCCalls() { + atomic.AddUint64(&cs.RequestCount, 1) } -func (ss *ServerStats) incRPCTime(dt uint64) { - atomic.AddUint64(&ss.RequestTime, dt) +func (cs *ConnStats) incRPCTime(dt uint64) { + atomic.AddUint64(&cs.RequestTime, dt) } -func (ss *ServerStats) addBytesWritten(n uint64) { - atomic.AddUint64(&ss.BytesWritten, n) +func (cs *ConnStats) addBytesWritten(n uint64) { + atomic.AddUint64(&cs.BytesWritten, n) } -func (ss *ServerStats) addBytesRead(n uint64) { - atomic.AddUint64(&ss.BytesRead, n) +func (cs *ConnStats) addBytesRead(n uint64) { + atomic.AddUint64(&cs.BytesRead, n) } -func (ss *ServerStats) incReadCalls() { - atomic.AddUint64(&ss.ReadCalls, 1) +func (cs *ConnStats) incReadCalls() { + atomic.AddUint64(&cs.ReadCalls, 1) } -func (ss *ServerStats) incReadErrors() { - atomic.AddUint64(&ss.ReadErrors, 1) +func (cs *ConnStats) incReadErrors() { + atomic.AddUint64(&cs.ReadErrors, 1) } -func (ss *ServerStats) incWriteCalls() { - atomic.AddUint64(&ss.WriteCalls, 1) +func (cs *ConnStats) incWriteCalls() { + atomic.AddUint64(&cs.WriteCalls, 1) } -func (ss *ServerStats) incWriteErrors() { - atomic.AddUint64(&ss.WriteErrors, 1) +func (cs *ConnStats) incWriteErrors() { + atomic.AddUint64(&cs.WriteErrors, 1) } -func (ss *ServerStats) incDialCalls() { - atomic.AddUint64(&ss.DialCalls, 1) +func (cs *ConnStats) incDialCalls() { + atomic.AddUint64(&cs.DialCalls, 1) } -func (ss *ServerStats) incDialErrors() { - atomic.AddUint64(&ss.DialErrors, 1) +func (cs *ConnStats) incDialErrors() { + atomic.AddUint64(&cs.DialErrors, 1) } -func (ss *ServerStats) incAcceptCalls() { - atomic.AddUint64(&ss.AcceptCalls, 1) +func (cs *ConnStats) incAcceptCalls() { + atomic.AddUint64(&cs.AcceptCalls, 1) } -func (ss *ServerStats) incAcceptErrors() { - atomic.AddUint64(&ss.AcceptErrors, 1) +func (cs *ConnStats) incAcceptErrors() { + atomic.AddUint64(&cs.AcceptErrors, 1) }