commit ed93163eb6ead427ecf625a51789793674a01d34 Author: crusader Date: Thu Oct 26 20:14:00 2017 +0900 ing diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a5cf485 --- /dev/null +++ b/.gitignore @@ -0,0 +1,70 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +.idea/ +*.iml + +vendor/ +glide.lock +.DS_Store +dist/ +debug + + diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..20af2f6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +// Place your settings in this file to overwrite default and user settings. +{ +} \ No newline at end of file diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..2098be0 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,3 @@ +package: git.loafle.net/commons_go/server +import: +- package: git.loafle.net/commons_go/logging diff --git a/server.go b/server.go new file mode 100644 index 0000000..f95c5ce --- /dev/null +++ b/server.go @@ -0,0 +1,124 @@ +package server + +import ( + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "time" + + "git.loafle.net/commons_go/logging" +) + +type Server interface { + Start() error + Stop() + Serve() error +} + +type server struct { + serverHandler ServerHandler + + listener net.Listener + + Stats ServerStats + + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (s *server) Start() error { + if nil != s.serverHandler { + panic("Server: server handler must be specified.") + } + s.serverHandler.Validate() + + if s.stopChan != nil { + panic("Server: server is already running. Stop it before starting it again") + } + s.stopChan = make(chan struct{}) + var err error + if s.listener, err = s.serverHandler.Listen(); nil != err { + return err + } + + s.stopWg.Add(1) + go runServer(s) + return nil +} + +func (s *server) Stop() { + if s.stopChan == nil { + panic("Server: server must be started before stopping it") + } + close(s.stopChan) + s.stopWg.Wait() + s.stopChan = nil +} + +func (s *server) Serve() error { + if err := s.Start(); err != nil { + return err + } + s.stopWg.Wait() + return nil +} + +func runServer(s *server) { + defer s.stopWg.Done() + + var conn io.ReadWriteCloser + var clientAddr string + var err error + var stopping atomic.Value + + for { + acceptChan := make(chan struct{}) + go func() { + if conn, clientAddr, err = s.serverHandler.accept(s.listener); err != nil { + if stopping.Load() == nil { + logging.Logger.Error(fmt.Sprintf("Server: [%s]. Cannot accept new connection: [%s]", s.serverHandler.GetAddr(), err)) + } + } + close(acceptChan) + }() + + select { + case <-s.stopChan: + stopping.Store(true) + s.listener.Close() + <-acceptChan + return + case <-acceptChan: + s.Stats.incAcceptCalls() + } + + if err != nil { + s.Stats.incAcceptErrors() + select { + case <-s.stopChan: + return + case <-time.After(time.Second): + } + continue + } + + s.stopWg.Add(1) + go handleConnection(s, conn, clientAddr) + } +} + +func handleConnection(s *server, conn io.ReadWriteCloser, clientAddr string) { + defer s.stopWg.Done() + + stopChan := make(chan struct{}) + go s.serverHandler.Handle(clientAddr, conn, stopChan) + + select { + case <-s.stopChan: + close(stopChan) + conn.Close() + return + } +} diff --git a/server_handler.go b/server_handler.go new file mode 100644 index 0000000..1517cc1 --- /dev/null +++ b/server_handler.go @@ -0,0 +1,18 @@ +package server + +import ( + "io" + "net" +) + +type ServerHandler interface { + Listen() (net.Listener, error) + Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) + + GetAddr() string + GetPendingResponses() int + + Validate() + + accept(l net.Listener) (io.ReadWriteCloser, string, error) +} diff --git a/server_handlers.go b/server_handlers.go new file mode 100644 index 0000000..63dc037 --- /dev/null +++ b/server_handlers.go @@ -0,0 +1,162 @@ +package server + +import ( + "errors" + "io" + "net" + "time" +) + +const ( + // DefaultConcurrency is the default number of concurrent rpc calls + // the server can process. + DefaultConcurrency = 8 * 1024 + + // DefaultRequestTimeout is the default timeout for client request. + DefaultRequestTimeout = 20 * time.Second + + // DefaultPendingMessages is the default number of pending messages + // handled by Client and Server. + DefaultPendingMessages = 32 * 1024 + + // DefaultFlushDelay is the default delay between message flushes + // on Client and Server. + DefaultFlushDelay = -1 + + // DefaultBufferSize is the default size for Client and Server buffers. + DefaultBufferSize = 64 * 1024 + + // DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection. + DefaultKeepAlivePeriod = 0 +) + +type ServerHandlers struct { + // Address to listen to for incoming connections. + // + // The address format depends on the underlying transport provided + // by Server.Listener. The following transports are provided + // out of the box: + // * TCP - see NewTCPServer() and NewTCPClient(). + // * TLS (aka SSL) - see NewTLSServer() and NewTLSClient(). + // * Unix sockets - see NewUnixServer() and NewUnixClient(). + // + // By default TCP transport is used. + Addr string + + // The maximum delay between response flushes to clients. + // + // Negative values lead to immediate requests' sending to the client + // without their buffering. This minimizes rpc latency at the cost + // of higher CPU and network usage. + // + // Default is DefaultFlushDelay. + FlushDelay time.Duration + + // The maximum number of pending responses in the queue. + // Default is DefaultPendingMessages. + PendingResponses int + + // Size of send buffer per each underlying connection in bytes. + // Default is DefaultBufferSize. + SendBufferSize int + + // Size of recv buffer per each underlying connection in bytes. + // Default is DefaultBufferSize. + RecvBufferSize int + + KeepAlivePeriod time.Duration +} + +func (sh *ServerHandlers) Listen() (net.Listener, error) { + return nil, errors.New("Server: Handler method[Listen] of Server is not implement") +} + +func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) { + +} + +func (sh *ServerHandlers) GetAddr() string { + return sh.Addr +} + +func (sh *ServerHandlers) GetPendingResponses() int { + return sh.PendingResponses +} + +// func (sh *ServerHandlers) { + +// } +// func (sh *ServerHandlers) { + +// } +// func (sh *ServerHandlers) { + +// } +// func (sh *ServerHandlers) { + +// } +// func (sh *ServerHandlers) { + +// } +// func (sh *ServerHandlers) { + +// } +// func (sh *ServerHandlers) { + +// } +// func (sh *ServerHandlers) { + +// } + +func (sh *ServerHandlers) accept(l net.Listener) (io.ReadWriteCloser, string, error) { + conn, err := l.Accept() + if nil != err { + return nil, "", err + } + + if 0 < sh.KeepAlivePeriod { + if err = setupKeepalive(conn, sh.KeepAlivePeriod); nil != err { + conn.Close() + return nil, "", err + } + } + + return conn, conn.RemoteAddr().String(), nil +} + +func (sh *ServerHandlers) Validate() { + if sh.Concurrency <= 0 { + sh.Concurrency = DefaultConcurrency + } + if sh.FlushDelay == 0 { + sh.FlushDelay = DefaultFlushDelay + } + if sh.PendingResponses <= 0 { + sh.PendingResponses = DefaultPendingMessages + } + if sh.SendBufferSize <= 0 { + sh.SendBufferSize = DefaultBufferSize + } + if sh.RecvBufferSize <= 0 { + sh.RecvBufferSize = DefaultBufferSize + } + if sh.KeepAlivePeriod <= 0 { + sh.KeepAlivePeriod = DefaultKeepAlivePeriod + } + +} + +func setupKeepalive(conn net.Conn, keepAlivePeriod time.Duration) error { + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return errors.New("Server: Keepalive is valid when connection is net.TCPConn") + } + + if err := tcpConn.SetKeepAlive(true); err != nil { + return err + } + if err := tcpConn.SetKeepAlivePeriod(keepAlivePeriod * time.Second); err != nil { + return err + } + return nil +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..c0ce5a4 --- /dev/null +++ b/stats.go @@ -0,0 +1,120 @@ +package server + +import ( + "io" + "sync" + "time" +) + +type ServerStats struct { + // The number of request performed. + RequestCount uint64 + + // The total aggregate time for all request in milliseconds. + // + // This time can be used for calculating the average response time + // per request: + // avgRequesttime = RequestTime / RequestCount + RequestTime uint64 + + // The number of bytes written to the underlying connections. + BytesWritten uint64 + + // The number of bytes read from the underlying connections. + BytesRead uint64 + + // The number of Read() calls. + ReadCalls uint64 + + // The number of Read() errors. + ReadErrors uint64 + + // The number of Write() calls. + WriteCalls uint64 + + // The number of Write() errors. + WriteErrors uint64 + + // The number of Dial() calls. + DialCalls uint64 + + // The number of Dial() errors. + DialErrors uint64 + + // The number of Accept() calls. + AcceptCalls uint64 + + // The number of Accept() errors. + AcceptErrors uint64 + + // lock is for 386 builds. See https://github.com/valyala/gorpc/issues/5 . + lock sync.Mutex +} + +// AvgRequestTime returns the average Request execution time. +// +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// since the original stats can be updated by concurrently running goroutines. +func (ss *ServerStats) AvgRequestTime() time.Duration { + return time.Duration(float64(ss.RequestTime)/float64(ss.RequestCount)) * time.Millisecond +} + +// AvgRequestBytes returns the average bytes sent / received per Request. +// +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// since the original stats can be updated by concurrently running goroutines. +func (ss *ServerStats) AvgRequestBytes() (send float64, recv float64) { + return float64(ss.BytesWritten) / float64(ss.RequestCount), float64(ss.BytesRead) / float64(ss.RequestCount) +} + +// AvgRequestCalls returns the average number of write() / read() syscalls per Request. +// +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// since the original stats can be updated by concurrently running goroutines. +func (ss *ServerStats) AvgRequestCalls() (write float64, read float64) { + return float64(ss.WriteCalls) / float64(ss.RequestCount), float64(ss.ReadCalls) / float64(ss.RequestCount) +} + +type writerCounter struct { + w io.Writer + ss *ServerStats +} + +type readerCounter struct { + r io.Reader + ss *ServerStats +} + +func newWriterCounter(w io.Writer, ss *ServerStats) io.Writer { + return &writerCounter{ + w: w, + ss: ss, + } +} + +func newReaderCounter(r io.Reader, ss *ServerStats) io.Reader { + return &readerCounter{ + r: r, + ss: ss, + } +} + +func (w *writerCounter) Write(p []byte) (int, error) { + n, err := w.w.Write(p) + w.ss.incWriteCalls() + if err != nil { + w.ss.incWriteErrors() + } + w.ss.addBytesWritten(uint64(n)) + return n, err +} + +func (r *readerCounter) Read(p []byte) (int, error) { + n, err := r.r.Read(p) + r.ss.incReadCalls() + if err != nil { + r.ss.incReadErrors() + } + r.ss.addBytesRead(uint64(n)) + return n, err +} diff --git a/stats_386.go b/stats_386.go new file mode 100644 index 0000000..091efac --- /dev/null +++ b/stats_386.go @@ -0,0 +1,106 @@ +package server + +import "sync" + +// Snapshot returns connection statistics' snapshot. +// +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// since the original stats can be updated by concurrently running goroutines. +func (ss *ServerStats) Snapshot() *ServerStats { + ss.lock.Lock() + snapshot := *ss + ss.lock.Unlock() + + snapshot.lock = sync.Mutex{} + return &snapshot +} + +// Reset resets all the stats counters. +func (ss *ServerStats) Reset() { + ss.lock.Lock() + ss.RequestCount = 0 + ss.RequestTime = 0 + ss.BytesWritten = 0 + ss.BytesRead = 0 + ss.WriteCalls = 0 + ss.WriteErrors = 0 + ss.ReadCalls = 0 + ss.ReadErrors = 0 + ss.DialCalls = 0 + ss.DialErrors = 0 + ss.AcceptCalls = 0 + ss.AcceptErrors = 0 + ss.lock.Unlock() +} + +func (ss *ServerStats) incRPCCalls() { + ss.lock.Lock() + ss.RequestCount++ + ss.lock.Unlock() +} + +func (ss *ServerStats) incRPCTime(dt uint64) { + ss.lock.Lock() + ss.RequestTime += dt + ss.lock.Unlock() +} + +func (ss *ServerStats) addBytesWritten(n uint64) { + ss.lock.Lock() + ss.BytesWritten += n + ss.lock.Unlock() +} + +func (ss *ServerStats) addBytesRead(n uint64) { + ss.lock.Lock() + ss.BytesRead += n + ss.lock.Unlock() +} + +func (ss *ServerStats) incReadCalls() { + ss.lock.Lock() + ss.ReadCalls++ + ss.lock.Unlock() +} + +func (ss *ServerStats) incReadErrors() { + ss.lock.Lock() + ss.ReadErrors++ + ss.lock.Unlock() +} + +func (ss *ServerStats) incWriteCalls() { + ss.lock.Lock() + ss.WriteCalls++ + ss.lock.Unlock() +} + +func (ss *ServerStats) incWriteErrors() { + ss.lock.Lock() + ss.WriteErrors++ + ss.lock.Unlock() +} + +func (ss *ServerStats) incDialCalls() { + ss.lock.Lock() + ss.DialCalls++ + ss.lock.Unlock() +} + +func (ss *ServerStats) incDialErrors() { + ss.lock.Lock() + ss.DialErrors++ + ss.lock.Unlock() +} + +func (ss *ServerStats) incAcceptCalls() { + ss.lock.Lock() + ss.AcceptCalls++ + ss.lock.Unlock() +} + +func (ss *ServerStats) incAcceptErrors() { + ss.lock.Lock() + ss.AcceptErrors++ + ss.lock.Unlock() +} diff --git a/stats_generic.go b/stats_generic.go new file mode 100644 index 0000000..dd9cba6 --- /dev/null +++ b/stats_generic.go @@ -0,0 +1,88 @@ +package server + +import "sync/atomic" + +// Snapshot returns connection statistics' snapshot. +// +// Use stats returned from ServerStats.Snapshot() on live Client and / or Server, +// since the original stats can be updated by concurrently running goroutines. +func (ss *ServerStats) Snapshot() *ServerStats { + return &ServerStats{ + RequestCount: atomic.LoadUint64(&ss.RequestCount), + RequestTime: atomic.LoadUint64(&ss.RequestTime), + BytesWritten: atomic.LoadUint64(&ss.BytesWritten), + BytesRead: atomic.LoadUint64(&ss.BytesRead), + ReadCalls: atomic.LoadUint64(&ss.ReadCalls), + ReadErrors: atomic.LoadUint64(&ss.ReadErrors), + WriteCalls: atomic.LoadUint64(&ss.WriteCalls), + WriteErrors: atomic.LoadUint64(&ss.WriteErrors), + DialCalls: atomic.LoadUint64(&ss.DialCalls), + DialErrors: atomic.LoadUint64(&ss.DialErrors), + AcceptCalls: atomic.LoadUint64(&ss.AcceptCalls), + AcceptErrors: atomic.LoadUint64(&ss.AcceptErrors), + } +} + +// Reset resets all the stats counters. +func (ss *ServerStats) Reset() { + atomic.StoreUint64(&ss.RequestCount, 0) + atomic.StoreUint64(&ss.RequestTime, 0) + atomic.StoreUint64(&ss.BytesWritten, 0) + atomic.StoreUint64(&ss.BytesRead, 0) + atomic.StoreUint64(&ss.WriteCalls, 0) + atomic.StoreUint64(&ss.WriteErrors, 0) + atomic.StoreUint64(&ss.ReadCalls, 0) + atomic.StoreUint64(&ss.ReadErrors, 0) + atomic.StoreUint64(&ss.DialCalls, 0) + atomic.StoreUint64(&ss.DialErrors, 0) + atomic.StoreUint64(&ss.AcceptCalls, 0) + atomic.StoreUint64(&ss.AcceptErrors, 0) +} + +func (ss *ServerStats) incRPCCalls() { + atomic.AddUint64(&ss.RequestCount, 1) +} + +func (ss *ServerStats) incRPCTime(dt uint64) { + atomic.AddUint64(&ss.RequestTime, dt) +} + +func (ss *ServerStats) addBytesWritten(n uint64) { + atomic.AddUint64(&ss.BytesWritten, n) +} + +func (ss *ServerStats) addBytesRead(n uint64) { + atomic.AddUint64(&ss.BytesRead, n) +} + +func (ss *ServerStats) incReadCalls() { + atomic.AddUint64(&ss.ReadCalls, 1) +} + +func (ss *ServerStats) incReadErrors() { + atomic.AddUint64(&ss.ReadErrors, 1) +} + +func (ss *ServerStats) incWriteCalls() { + atomic.AddUint64(&ss.WriteCalls, 1) +} + +func (ss *ServerStats) incWriteErrors() { + atomic.AddUint64(&ss.WriteErrors, 1) +} + +func (ss *ServerStats) incDialCalls() { + atomic.AddUint64(&ss.DialCalls, 1) +} + +func (ss *ServerStats) incDialErrors() { + atomic.AddUint64(&ss.DialErrors, 1) +} + +func (ss *ServerStats) incAcceptCalls() { + atomic.AddUint64(&ss.AcceptCalls, 1) +} + +func (ss *ServerStats) incAcceptErrors() { + atomic.AddUint64(&ss.AcceptErrors, 1) +}