package net import ( "fmt" "net" "sync/atomic" "time" "git.loafle.net/commons/logging-go" "git.loafle.net/commons/server-go" ) type Server interface { server.Server } func NewServer(serverHandler ServerHandler) Server { s := &netServer{} s.ServerHandler = serverHandler return s } type netServer struct { server.Servers } func (s *netServer) ListenAndServe() error { if s.StopChan != nil { return fmt.Errorf(s.ServerMessage("already running. Stop it before starting it again")) } var ( err error listener net.Listener ) if nil == s.ServerHandler { return fmt.Errorf("Server: server handler must be specified") } if err = s.ServerHandler.Validate(); nil != err { return err } s.ServerCtx = s.ServerHandler.ServerCtx() if nil == s.ServerCtx { return fmt.Errorf(s.ServerMessage("ServerCtx is nil")) } if err = s.ServerHandler.Init(s.ServerCtx); nil != err { return err } if listener, err = s.ServerHandler.Listener(s.ServerCtx); nil != err { return err } s.StopChan = make(chan struct{}) s.StopWg.Add(1) return s.handleServer(listener) } func (s *netServer) handleServer(listener net.Listener) error { var ( stopping atomic.Value netConn net.Conn err error ) defer func() { if nil != listener { listener.Close() } s.ServerHandler.OnStop(s.ServerCtx) logging.Logger().Infof(s.ServerMessage("Stopped")) s.StopWg.Done() }() if err = s.ServerHandler.OnStart(s.ServerCtx); nil != err { return err } logging.Logger().Infof(s.ServerMessage("Started")) for { acceptChan := make(chan struct{}) go func() { if netConn, err = listener.Accept(); err != nil { if nil == stopping.Load() { logging.Logger().Errorf(s.ServerMessage(fmt.Sprintf("%v", err))) } } close(acceptChan) }() select { case <-s.StopChan: stopping.Store(true) listener.Close() <-acceptChan listener = nil return nil case <-acceptChan: } if nil != err { select { case <-s.StopChan: return nil case <-time.After(time.Second): } continue } if 0 < s.ServerHandler.GetConcurrency() { sz := s.ConnectionSize() if sz >= s.ServerHandler.GetConcurrency() { logging.Logger().Warnf(s.ServerMessage(fmt.Sprintf("max connections size %d, refuse", sz))) netConn.Close() continue } } servlet := s.ServerHandler.(ServerHandler).Servlet() if nil == servlet { logging.Logger().Errorf(s.ServerMessage("Servlet is nil")) continue } servletCtx := servlet.ServletCtx(s.ServerCtx) if nil == servletCtx { logging.Logger().Errorf(s.ServerMessage("ServletCtx is nil")) continue } if err := servlet.Handshake(servletCtx, netConn); nil != err { logging.Logger().Infof(s.ServerMessage(fmt.Sprintf("Handshaking of Client[%s] has been failed %v", netConn.RemoteAddr(), err))) continue } conn := server.NewConn(netConn, true, s.ServerHandler.GetReadBufferSize(), s.ServerHandler.GetWriteBufferSize()) s.StopWg.Add(1) go s.HandleConnection(servlet, servletCtx, conn) } }