package server import ( "fmt" "net" "sync" "sync/atomic" "time" "git.loafle.net/commons_go/logging" ) type Server interface { Start() error Stop() Serve() error } func New(sh ServerHandler) Server { s := &server{ sh: sh, } return s } type server struct { sh ServerHandler listener net.Listener Stats ServerStats stopChan chan struct{} stopWg sync.WaitGroup } func (s *server) Start() error { if nil == s.sh { panic("Server: server handler must be specified.") } s.sh.Validate() if s.stopChan != nil { panic("Server: server is already running. Stop it before starting it again") } var err error if s.listener, err = s.sh.Listen(); nil != err { return err } s.stopChan = make(chan struct{}) s.sh.OnStart() s.stopWg.Add(1) go handleServer(s) return nil } func (s *server) Stop() { if s.stopChan == nil { panic("Server: server must be started before stopping it") } close(s.stopChan) s.stopWg.Wait() s.stopChan = nil s.sh.OnStop() } func (s *server) Serve() error { if err := s.Start(); err != nil { return err } s.stopWg.Wait() return nil } func handleServer(s *server) { defer s.stopWg.Done() var conn net.Conn var err error var stopping atomic.Value for { acceptChan := make(chan struct{}) go func() { if conn, err = s.listener.Accept(); err != nil { if stopping.Load() == nil { logging.Logger.Error(fmt.Sprintf("Server: Cannot accept new connection: [%s]", err)) } } else { conn, err = s.sh.OnAccept(conn) } close(acceptChan) }() select { case <-s.stopChan: stopping.Store(true) s.listener.Close() <-acceptChan return case <-acceptChan: s.Stats.incAcceptCalls() } if err != nil { s.Stats.incAcceptErrors() select { case <-s.stopChan: return case <-time.After(time.Second): } continue } s.stopWg.Add(1) go handleConnection(s, conn) } } func handleConnection(s *server, conn net.Conn) { defer s.stopWg.Done() logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is connected.", conn.RemoteAddr())) clientStopChan := make(chan struct{}) handleDoneCnah := make(chan struct{}) go s.sh.Handle(conn, clientStopChan, handleDoneCnah) select { case <-s.stopChan: close(clientStopChan) conn.Close() <-handleDoneCnah case <-handleDoneCnah: close(clientStopChan) conn.Close() logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", conn.RemoteAddr())) } }