package server import ( "fmt" "io" "net" "sync" "sync/atomic" "time" "git.loafle.net/commons_go/logging" ) type Server interface { Start() error Stop() Serve() error GetServerHandler() ServerHandler } func NewServer(sh ServerHandler) Server { s := &server{ serverHandler: sh, } return s } type server struct { serverHandler ServerHandler listener net.Listener Stats ConnStats stopChan chan struct{} stopWg sync.WaitGroup } func (s *server) GetServerHandler() ServerHandler { return s.serverHandler } func (s *server) Start() error { if nil == s.serverHandler { panic("Server: server handler must be specified.") } s.serverHandler.Validate() if s.stopChan != nil { panic("Server: server is already running. Stop it before starting it again") } s.stopChan = make(chan struct{}) var err error if s.listener, err = s.serverHandler.Listen(); nil != err { return err } s.stopWg.Add(1) go runServer(s) return nil } func (s *server) Stop() { if s.stopChan == nil { panic("Server: server must be started before stopping it") } close(s.stopChan) s.stopWg.Wait() s.stopChan = nil s.serverHandler.OnStopped() } func (s *server) Serve() error { if err := s.Start(); err != nil { return err } s.stopWg.Wait() return nil } func runServer(s *server) { defer s.stopWg.Done() var conn io.ReadWriteCloser var clientAddr string var err error var stopping atomic.Value for { acceptChan := make(chan struct{}) go func() { if conn, clientAddr, err = s.serverHandler.accept(s.listener); err != nil { if stopping.Load() == nil { logging.Logger.Error(fmt.Sprintf("Server: [%s]. Cannot accept new connection: [%s]", s.serverHandler.GetAddr(), err)) } } close(acceptChan) }() select { case <-s.stopChan: stopping.Store(true) s.listener.Close() <-acceptChan return case <-acceptChan: s.Stats.incAcceptCalls() } if err != nil { s.Stats.incAcceptErrors() select { case <-s.stopChan: return case <-time.After(time.Second): } continue } s.stopWg.Add(1) go handleServerConnection(s, conn, clientAddr) } } func handleServerConnection(s *server, conn io.ReadWriteCloser, clientAddr string) { defer s.stopWg.Done() if err := s.serverHandler.OnHandshake(clientAddr, conn); nil != err { logging.Logger.Error(fmt.Sprintf("Server: [%s]. handshake error: [%s]", clientAddr, err)) conn.Close() return } logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is connected.", clientAddr)) clientStopChan := make(chan struct{}) go s.serverHandler.Handle(clientAddr, conn, clientStopChan) select { case <-s.stopChan: close(clientStopChan) conn.Close() return case <-clientStopChan: conn.Close() logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", clientAddr)) return } }