diff --git a/constants.go b/constants.go index aaaf0a8..5a0d09c 100644 --- a/constants.go +++ b/constants.go @@ -23,4 +23,11 @@ const ( // DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection. DefaultKeepAlivePeriod = 0 + + // DefaultMaxMessageSize is default size for a message read from the peer + DefaultMaxMessageSize = 4096 + // DefaultReadTimeout is default value of read timeout + DefaultReadTimeout = 0 + // DefaultWriteTimeout is default value of write timeout + DefaultWriteTimeout = 0 ) diff --git a/server.go b/server.go index f52cbe5..7e97450 100644 --- a/server.go +++ b/server.go @@ -72,11 +72,15 @@ func (s *server) Stop() { s.stopWg.Wait() s.stopChan = nil - s.sh.OnStop() + s.sh.OnStop(s.ctx) logging.Logger().Info(fmt.Sprintf("Server[%s] is stopped", s.sh.GetName())) } +func (s *server) Context() ServerContext { + return s.ctx +} + func handleServer(s *server) { defer s.stopWg.Done() @@ -118,35 +122,44 @@ func handleServer(s *server) { continue } + var err error + var socketID string + socketHandler := s.sh.GetSocketHandler() + + if socketID = socketHandler.Handshake(s.ctx, conn); "" == socketID { + logging.Logger().Error(fmt.Sprintf("Server: Handshake err addr[%s] %v", conn.RemoteAddr(), conn)) + conn.Close() + continue + } + socketCTX := socketHandler.SocketContext(s.ctx) + soc := newSocket(socketHandler, socketCTX, conn, socketID) + logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is connected.", soc.RemoteAddr())) + + socketHandler.addSocket(soc) + socketHandler.OnConnect(soc) + s.stopWg.Add(1) - go handleConnection(s, conn) + go handleConnection(s, soc, socketHandler) } } -func handleConnection(s *server, conn net.Conn) { +func handleConnection(s *server, soc Socket, socketHandler SocketHandler) { defer s.stopWg.Done() - var err error - if conn, err = s.sh.OnConnect(conn); nil != err { - logging.Logger().Error(fmt.Sprintf("Server: connecting[%s] failed %v", conn.RemoteAddr(), err)) - return - } - - logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is connected.", conn.RemoteAddr())) - clientStopChan := make(chan struct{}) - handleDoneChan := make(chan struct{}) + handleDoneChan := make(chan error, 1) - go s.sh.Handle(conn, clientStopChan, handleDoneChan) + go socketHandler.Handle(soc, clientStopChan, handleDoneChan) select { case <-s.stopChan: close(clientStopChan) - conn.Close() <-handleDoneChan case <-handleDoneChan: close(clientStopChan) - conn.Close() - logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", conn.RemoteAddr())) + socketHandler.OnDisconnect(soc) + logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", soc.RemoteAddr())) + socketHandler.removeSocket(soc) + soc.Close() } } diff --git a/server_handler.go b/server_handler.go index 7df1395..d040fac 100644 --- a/server_handler.go +++ b/server_handler.go @@ -22,6 +22,7 @@ type ServerHandler interface { OnStop(serverCTX ServerContext) GetName() string + GetSocketHandler() SocketHandler Validate() } diff --git a/server_handlers.go b/server_handlers.go index 4b47c5c..0a29a5e 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -2,7 +2,10 @@ package server import ( "errors" + "fmt" "net" + + "git.loafle.net/commons_go/logging" ) func NewServerHandler() ServerHandler { @@ -10,36 +13,51 @@ func NewServerHandler() ServerHandler { return sh } +type ServerHandlers struct { + // Server name for sending in response headers. + // + // Default server name is used if left blank. + Name string + + SocketHandler SocketHandler +} + func (sh *ServerHandlers) ServerContext() ServerContext { return newServerContext() } func (sh *ServerHandlers) Init(serverCTX ServerContext) error { - // no op + if err := sh.SocketHandler.Init(serverCTX); nil != err { + return err + } + + return nil } func (sh *ServerHandlers) Listen(serverCTX ServerContext) (net.Listener, error) { return nil, errors.New("Server: Handler method[Listen] of Server is not implement") } -func (sh *ServerHandlers) OnStart(serverCTX ServerContext) { - // no op -} - func (sh *ServerHandlers) OnError(serverCTX ServerContext, conn net.Conn, status int, reason error) { - + logging.Logger().Error(fmt.Sprintf("Server: error status: %d, reason: %v, [%v]", status, reason, conn)) } // OnStop invoked when server is stopped // If you override ths method, must call func (sh *ServerHandlers) OnStop(serverCTX ServerContext) { - // no op + sh.SocketHandler.Destroy() } func (sh *ServerHandlers) GetName() string { return sh.Name } -func (sh *ServerHandlers) Validate() { - +func (sh *ServerHandlers) GetSocketHandler() SocketHandler { + return sh.SocketHandler +} + +func (sh *ServerHandlers) Validate() { + if nil == sh.SocketHandler { + panic("Server: SocketHandler must be specified") + } } diff --git a/socket.go b/socket.go index 07c9111..d8294fd 100644 --- a/socket.go +++ b/socket.go @@ -1,11 +1,9 @@ package server import ( - "io" "net" "sync" "time" - ) type Socket interface { @@ -21,7 +19,7 @@ func newSocket(socketHandler SocketHandler, socketCTX SocketContext, conn net.Co s.Conn = conn s.sh = socketHandler s.id = id - s.SetReadLimit(socketHandler.GetMaxMessageSize()) + if 0 < socketHandler.GetReadTimeout() { s.SetReadDeadline(time.Now().Add(socketHandler.GetReadTimeout() * time.Second)) } @@ -49,7 +47,6 @@ func (s *netSocket) ID() string { return s.id } - func (s *netSocket) Close() error { err := s.Conn.Close() releaseSocket(s) diff --git a/socket_handlers.go b/socket_handlers.go index 2216dfc..ddfe153 100644 --- a/socket_handlers.go +++ b/socket_handlers.go @@ -31,7 +31,7 @@ func (sh *SocketHandlers) Init(serverCTX ServerContext) error { } func (sh *SocketHandlers) Handshake(serverCTX ServerContext, conn net.Conn) (id string) { - return "", nil + return "" } func (sh *SocketHandlers) SocketContext(serverCTX ServerContext) SocketContext {