server/server_handlers.go

163 lines
3.8 KiB
Go
Raw Normal View History

2017-10-26 11:14:00 +00:00
package server
import (
"errors"
"io"
"net"
"time"
)
const (
// DefaultConcurrency is the default number of concurrent rpc calls
// the server can process.
DefaultConcurrency = 8 * 1024
// DefaultRequestTimeout is the default timeout for client request.
DefaultRequestTimeout = 20 * time.Second
// DefaultPendingMessages is the default number of pending messages
// handled by Client and Server.
DefaultPendingMessages = 32 * 1024
// DefaultFlushDelay is the default delay between message flushes
// on Client and Server.
DefaultFlushDelay = -1
// DefaultBufferSize is the default size for Client and Server buffers.
DefaultBufferSize = 64 * 1024
// DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection.
DefaultKeepAlivePeriod = 0
)
type ServerHandlers struct {
// Address to listen to for incoming connections.
//
// The address format depends on the underlying transport provided
// by Server.Listener. The following transports are provided
// out of the box:
// * TCP - see NewTCPServer() and NewTCPClient().
// * TLS (aka SSL) - see NewTLSServer() and NewTLSClient().
// * Unix sockets - see NewUnixServer() and NewUnixClient().
//
// By default TCP transport is used.
Addr string
// The maximum delay between response flushes to clients.
//
// Negative values lead to immediate requests' sending to the client
// without their buffering. This minimizes rpc latency at the cost
// of higher CPU and network usage.
//
// Default is DefaultFlushDelay.
FlushDelay time.Duration
// The maximum number of pending responses in the queue.
// Default is DefaultPendingMessages.
PendingResponses int
// Size of send buffer per each underlying connection in bytes.
// Default is DefaultBufferSize.
SendBufferSize int
// Size of recv buffer per each underlying connection in bytes.
// Default is DefaultBufferSize.
RecvBufferSize int
KeepAlivePeriod time.Duration
}
func (sh *ServerHandlers) Listen() (net.Listener, error) {
return nil, errors.New("Server: Handler method[Listen] of Server is not implement")
}
func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) {
}
func (sh *ServerHandlers) GetAddr() string {
return sh.Addr
}
func (sh *ServerHandlers) GetPendingResponses() int {
return sh.PendingResponses
}
// func (sh *ServerHandlers) {
// }
// func (sh *ServerHandlers) {
// }
// func (sh *ServerHandlers) {
// }
// func (sh *ServerHandlers) {
// }
// func (sh *ServerHandlers) {
// }
// func (sh *ServerHandlers) {
// }
// func (sh *ServerHandlers) {
// }
// func (sh *ServerHandlers) {
// }
func (sh *ServerHandlers) accept(l net.Listener) (io.ReadWriteCloser, string, error) {
conn, err := l.Accept()
if nil != err {
return nil, "", err
}
if 0 < sh.KeepAlivePeriod {
if err = setupKeepalive(conn, sh.KeepAlivePeriod); nil != err {
conn.Close()
return nil, "", err
}
}
return conn, conn.RemoteAddr().String(), nil
}
func (sh *ServerHandlers) Validate() {
if sh.Concurrency <= 0 {
sh.Concurrency = DefaultConcurrency
}
if sh.FlushDelay == 0 {
sh.FlushDelay = DefaultFlushDelay
}
if sh.PendingResponses <= 0 {
sh.PendingResponses = DefaultPendingMessages
}
if sh.SendBufferSize <= 0 {
sh.SendBufferSize = DefaultBufferSize
}
if sh.RecvBufferSize <= 0 {
sh.RecvBufferSize = DefaultBufferSize
}
if sh.KeepAlivePeriod <= 0 {
sh.KeepAlivePeriod = DefaultKeepAlivePeriod
}
}
func setupKeepalive(conn net.Conn, keepAlivePeriod time.Duration) error {
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
return errors.New("Server: Keepalive is valid when connection is net.TCPConn")
}
if err := tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err := tcpConn.SetKeepAlivePeriod(keepAlivePeriod * time.Second); err != nil {
return err
}
return nil
}