This commit is contained in:
crusader 2017-11-29 12:46:40 +09:00
parent ce8a4781f8
commit f7435b7fa0
6 changed files with 66 additions and 30 deletions

View File

@ -23,4 +23,11 @@ const (
// DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection. // DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection.
DefaultKeepAlivePeriod = 0 DefaultKeepAlivePeriod = 0
// DefaultMaxMessageSize is default size for a message read from the peer
DefaultMaxMessageSize = 4096
// DefaultReadTimeout is default value of read timeout
DefaultReadTimeout = 0
// DefaultWriteTimeout is default value of write timeout
DefaultWriteTimeout = 0
) )

View File

@ -72,11 +72,15 @@ func (s *server) Stop() {
s.stopWg.Wait() s.stopWg.Wait()
s.stopChan = nil s.stopChan = nil
s.sh.OnStop() s.sh.OnStop(s.ctx)
logging.Logger().Info(fmt.Sprintf("Server[%s] is stopped", s.sh.GetName())) logging.Logger().Info(fmt.Sprintf("Server[%s] is stopped", s.sh.GetName()))
} }
func (s *server) Context() ServerContext {
return s.ctx
}
func handleServer(s *server) { func handleServer(s *server) {
defer s.stopWg.Done() defer s.stopWg.Done()
@ -118,35 +122,44 @@ func handleServer(s *server) {
continue continue
} }
var err error
var socketID string
socketHandler := s.sh.GetSocketHandler()
if socketID = socketHandler.Handshake(s.ctx, conn); "" == socketID {
logging.Logger().Error(fmt.Sprintf("Server: Handshake err addr[%s] %v", conn.RemoteAddr(), conn))
conn.Close()
continue
}
socketCTX := socketHandler.SocketContext(s.ctx)
soc := newSocket(socketHandler, socketCTX, conn, socketID)
logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is connected.", soc.RemoteAddr()))
socketHandler.addSocket(soc)
socketHandler.OnConnect(soc)
s.stopWg.Add(1) s.stopWg.Add(1)
go handleConnection(s, conn) go handleConnection(s, soc, socketHandler)
} }
} }
func handleConnection(s *server, conn net.Conn) { func handleConnection(s *server, soc Socket, socketHandler SocketHandler) {
defer s.stopWg.Done() defer s.stopWg.Done()
var err error
if conn, err = s.sh.OnConnect(conn); nil != err {
logging.Logger().Error(fmt.Sprintf("Server: connecting[%s] failed %v", conn.RemoteAddr(), err))
return
}
logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is connected.", conn.RemoteAddr()))
clientStopChan := make(chan struct{}) clientStopChan := make(chan struct{})
handleDoneChan := make(chan struct{}) handleDoneChan := make(chan error, 1)
go s.sh.Handle(conn, clientStopChan, handleDoneChan) go socketHandler.Handle(soc, clientStopChan, handleDoneChan)
select { select {
case <-s.stopChan: case <-s.stopChan:
close(clientStopChan) close(clientStopChan)
conn.Close()
<-handleDoneChan <-handleDoneChan
case <-handleDoneChan: case <-handleDoneChan:
close(clientStopChan) close(clientStopChan)
conn.Close() socketHandler.OnDisconnect(soc)
logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", conn.RemoteAddr())) logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", soc.RemoteAddr()))
socketHandler.removeSocket(soc)
soc.Close()
} }
} }

View File

@ -22,6 +22,7 @@ type ServerHandler interface {
OnStop(serverCTX ServerContext) OnStop(serverCTX ServerContext)
GetName() string GetName() string
GetSocketHandler() SocketHandler
Validate() Validate()
} }

View File

@ -2,7 +2,10 @@ package server
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"git.loafle.net/commons_go/logging"
) )
func NewServerHandler() ServerHandler { func NewServerHandler() ServerHandler {
@ -10,36 +13,51 @@ func NewServerHandler() ServerHandler {
return sh return sh
} }
type ServerHandlers struct {
// Server name for sending in response headers.
//
// Default server name is used if left blank.
Name string
SocketHandler SocketHandler
}
func (sh *ServerHandlers) ServerContext() ServerContext { func (sh *ServerHandlers) ServerContext() ServerContext {
return newServerContext() return newServerContext()
} }
func (sh *ServerHandlers) Init(serverCTX ServerContext) error { func (sh *ServerHandlers) Init(serverCTX ServerContext) error {
// no op if err := sh.SocketHandler.Init(serverCTX); nil != err {
return err
}
return nil
} }
func (sh *ServerHandlers) Listen(serverCTX ServerContext) (net.Listener, error) { func (sh *ServerHandlers) Listen(serverCTX ServerContext) (net.Listener, error) {
return nil, errors.New("Server: Handler method[Listen] of Server is not implement") return nil, errors.New("Server: Handler method[Listen] of Server is not implement")
} }
func (sh *ServerHandlers) OnStart(serverCTX ServerContext) {
// no op
}
func (sh *ServerHandlers) OnError(serverCTX ServerContext, conn net.Conn, status int, reason error) { func (sh *ServerHandlers) OnError(serverCTX ServerContext, conn net.Conn, status int, reason error) {
logging.Logger().Error(fmt.Sprintf("Server: error status: %d, reason: %v, [%v]", status, reason, conn))
} }
// OnStop invoked when server is stopped // OnStop invoked when server is stopped
// If you override ths method, must call // If you override ths method, must call
func (sh *ServerHandlers) OnStop(serverCTX ServerContext) { func (sh *ServerHandlers) OnStop(serverCTX ServerContext) {
// no op sh.SocketHandler.Destroy()
} }
func (sh *ServerHandlers) GetName() string { func (sh *ServerHandlers) GetName() string {
return sh.Name return sh.Name
} }
func (sh *ServerHandlers) Validate() { func (sh *ServerHandlers) GetSocketHandler() SocketHandler {
return sh.SocketHandler
}
func (sh *ServerHandlers) Validate() {
if nil == sh.SocketHandler {
panic("Server: SocketHandler must be specified")
}
} }

View File

@ -1,11 +1,9 @@
package server package server
import ( import (
"io"
"net" "net"
"sync" "sync"
"time" "time"
) )
type Socket interface { type Socket interface {
@ -21,7 +19,7 @@ func newSocket(socketHandler SocketHandler, socketCTX SocketContext, conn net.Co
s.Conn = conn s.Conn = conn
s.sh = socketHandler s.sh = socketHandler
s.id = id s.id = id
s.SetReadLimit(socketHandler.GetMaxMessageSize())
if 0 < socketHandler.GetReadTimeout() { if 0 < socketHandler.GetReadTimeout() {
s.SetReadDeadline(time.Now().Add(socketHandler.GetReadTimeout() * time.Second)) s.SetReadDeadline(time.Now().Add(socketHandler.GetReadTimeout() * time.Second))
} }
@ -49,7 +47,6 @@ func (s *netSocket) ID() string {
return s.id return s.id
} }
func (s *netSocket) Close() error { func (s *netSocket) Close() error {
err := s.Conn.Close() err := s.Conn.Close()
releaseSocket(s) releaseSocket(s)

View File

@ -31,7 +31,7 @@ func (sh *SocketHandlers) Init(serverCTX ServerContext) error {
} }
func (sh *SocketHandlers) Handshake(serverCTX ServerContext, conn net.Conn) (id string) { func (sh *SocketHandlers) Handshake(serverCTX ServerContext, conn net.Conn) (id string) {
return "", nil return ""
} }
func (sh *SocketHandlers) SocketContext(serverCTX ServerContext) SocketContext { func (sh *SocketHandlers) SocketContext(serverCTX ServerContext) SocketContext {