package server import ( "fmt" "net" "sync" "sync/atomic" "time" "git.loafle.net/commons_go/logging" ) func New(sh ServerHandler) Server { s := &server{ sh: sh, } return s } type Server interface { Start() error Stop() Context() ServerContext } type server struct { ctx ServerContext sh ServerHandler listener net.Listener Stats ServerStats stopChan chan struct{} stopWg sync.WaitGroup } func (s *server) Start() error { if nil == s.sh { panic("Server: server handler must be specified.") } s.sh.Validate() if s.stopChan != nil { panic("Server: server is already running. Stop it before starting it again") } var err error s.ctx = s.sh.ServerContext() if err = s.sh.Init(s.ctx); nil != err { logging.Logger().Panic(fmt.Sprintf("Server: Initialization of server has been failed %v", err)) } if s.listener, err = s.sh.Listen(s.ctx); nil != err { return err } s.stopChan = make(chan struct{}) s.stopWg.Add(1) go handleServer(s) return nil } func (s *server) Stop() { if s.stopChan == nil { panic("Server: server must be started before stopping it") } close(s.stopChan) s.stopWg.Wait() s.stopChan = nil s.sh.OnStop(s.ctx) logging.Logger().Info(fmt.Sprintf("Server[%s] is stopped", s.sh.GetName())) } func (s *server) Context() ServerContext { return s.ctx } func handleServer(s *server) { defer s.stopWg.Done() logging.Logger().Info(fmt.Sprintf("Server[%s] is started", s.sh.GetName())) s.sh.OnStart(s.ctx) var conn net.Conn var err error var stopping atomic.Value for { acceptChan := make(chan struct{}) go func() { if conn, err = s.listener.Accept(); err != nil { if stopping.Load() == nil { logging.Logger().Error(fmt.Sprintf("Server: Cannot accept new connection: [%s]", err)) } } close(acceptChan) }() select { case <-s.stopChan: stopping.Store(true) s.listener.Close() <-acceptChan return case <-acceptChan: s.Stats.incAcceptCalls() } if err != nil { s.Stats.incAcceptErrors() select { case <-s.stopChan: return case <-time.After(time.Second): } continue } var socketID string socketHandler := s.sh.GetSocketHandler() if socketID = socketHandler.Handshake(s.ctx, conn); "" == socketID { logging.Logger().Error(fmt.Sprintf("Server: Handshake err addr[%s] %v", conn.RemoteAddr(), conn)) conn.Close() continue } socketCTX := socketHandler.SocketContext(s.ctx) soc := newSocket(socketHandler, socketCTX, conn, socketID) logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is connected.", soc.RemoteAddr())) socketHandler.addSocket(soc) socketHandler.OnConnect(soc) s.stopWg.Add(1) go handleConnection(s, soc, socketHandler) } } func handleConnection(s *server, soc Socket, socketHandler SocketHandler) { defer s.stopWg.Done() clientStopChan := make(chan struct{}) handleDoneChan := make(chan error, 1) go socketHandler.Handle(soc, clientStopChan, handleDoneChan) select { case <-s.stopChan: close(clientStopChan) <-handleDoneChan case <-handleDoneChan: close(clientStopChan) socketHandler.OnDisconnect(soc) logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", soc.RemoteAddr())) socketHandler.removeSocket(soc) soc.Close() } }