package net import ( "context" "fmt" "net" "sync" "sync/atomic" "time" olog "git.loafle.net/overflow/log-go" "git.loafle.net/overflow/server-go" "git.loafle.net/overflow/server-go/socket" ) type Server struct { ServerHandler ServerHandler ctx server.ServerCtx stopChan chan struct{} stopWg sync.WaitGroup srw socket.ServerReadWriter } func (s *Server) ListenAndServe() error { if s.stopChan != nil { return fmt.Errorf("%s already running. Stop it before starting it again", s.logHeader()) } var ( err error listener net.Listener ) if nil == s.ServerHandler { return fmt.Errorf("%s server handler must be specified", s.logHeader()) } if err = s.ServerHandler.Validate(); nil != err { return err } s.ctx = s.ServerHandler.ServerCtx() if nil == s.ctx { return fmt.Errorf("%s ServerCtx is nil", s.logHeader()) } if err = s.ServerHandler.Init(s.ctx); nil != err { return err } if listener, err = s.ServerHandler.Listener(s.ctx); nil != err { return err } s.stopChan = make(chan struct{}) s.srw.ReadwriteHandler = s.ServerHandler s.srw.ServerStopChan = s.stopChan s.srw.ServerStopWg = &s.stopWg s.stopWg.Add(1) return s.handleServer(listener) } func (s *Server) Shutdown(ctx context.Context) error { if s.stopChan == nil { return fmt.Errorf("%s must be started before stopping it", s.logHeader()) } close(s.stopChan) s.stopWg.Wait() s.ServerHandler.Destroy(s.ctx) s.stopChan = nil return nil } func (s *Server) logHeader() string { return fmt.Sprintf("Server[%s]:", s.ServerHandler.GetName()) } func (s *Server) handleServer(listener net.Listener) error { var ( stopping atomic.Value netConn net.Conn err error ) defer func() { if nil != listener { listener.Close() } s.ServerHandler.OnStop(s.ctx) olog.Logger().Infof("%s Stopped", s.logHeader()) s.stopWg.Done() }() if err = s.ServerHandler.OnStart(s.ctx); nil != err { return err } olog.Logger().Infof("%s Started", s.logHeader()) for { acceptChan := make(chan struct{}) go func() { if netConn, err = listener.Accept(); err != nil { if nil == stopping.Load() { olog.Logger().Errorf("%s %v", s.logHeader(), err) } } close(acceptChan) }() select { case <-s.stopChan: stopping.Store(true) listener.Close() <-acceptChan listener = nil return nil case <-acceptChan: } if nil != err { select { case <-s.stopChan: return nil case <-time.After(time.Second): } continue } if 0 < s.ServerHandler.GetConcurrency() { sz := s.srw.ConnectionSize() if sz >= s.ServerHandler.GetConcurrency() { olog.Logger().Warnf("%s max connections size %d, refuse", s.logHeader(), sz) netConn.Close() continue } } servlet := s.ServerHandler.(ServerHandler).Servlet(s.ctx, netConn) if nil == servlet { olog.Logger().Errorf("%s Servlet is nil", s.logHeader()) continue } servletCtx := servlet.ServletCtx(s.ctx) if nil == servletCtx { olog.Logger().Errorf("%s ServletCtx is nil", s.logHeader()) continue } if err := servlet.Handshake(servletCtx, netConn); nil != err { olog.Logger().Infof("%s Handshaking of Client[%s] has been failed %v", s.logHeader(), netConn.RemoteAddr(), err) continue } conn := socket.NewConn(netConn, true, s.ServerHandler.GetReadBufferSize(), s.ServerHandler.GetWriteBufferSize()) s.stopWg.Add(1) go s.srw.HandleConnection(servlet, servletCtx, conn) } }