package websocket import ( "context" "fmt" "net" "net/http" "sync" "time" "git.loafle.net/commons/logging-go" "git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go/internal" "github.com/valyala/fasthttp" ) type Server struct { ServerHandler ServerHandler ctx server.ServerCtx hs *fasthttp.Server upgrader *Upgrader connections sync.Map stopChan chan struct{} stopWg sync.WaitGroup } func (s *Server) ListenAndServe() error { var ( err error listener net.Listener ) if nil == s.ServerHandler { return fmt.Errorf("Server: server handler must be specified") } s.ServerHandler.Validate() if s.stopChan != nil { return fmt.Errorf(s.serverMessage("already running. Stop it before starting it again")) } s.ctx = s.ServerHandler.ServerCtx() if nil == s.ctx { return fmt.Errorf(s.serverMessage("ServerCtx is nil")) } s.hs = &fasthttp.Server{ Handler: s.httpHandler, Name: s.ServerHandler.GetName(), Concurrency: s.ServerHandler.GetConcurrency(), ReadBufferSize: s.ServerHandler.GetReadBufferSize(), WriteBufferSize: s.ServerHandler.GetWriteBufferSize(), ReadTimeout: s.ServerHandler.GetReadTimeout(), WriteTimeout: s.ServerHandler.GetWriteTimeout(), } s.upgrader = &Upgrader{ HandshakeTimeout: s.ServerHandler.GetHandshakeTimeout(), ReadBufferSize: s.ServerHandler.GetReadBufferSize(), WriteBufferSize: s.ServerHandler.GetWriteBufferSize(), CheckOrigin: s.ServerHandler.CheckOrigin, Error: s.onError, EnableCompression: s.ServerHandler.IsEnableCompression(), } if err = s.ServerHandler.Init(s.ctx); nil != err { return err } if listener, err = s.ServerHandler.Listener(s.ctx); nil != err { return err } s.stopChan = make(chan struct{}) s.stopWg.Add(1) return s.handleServer(listener) } func (s *Server) Shutdown(ctx context.Context) error { if s.stopChan == nil { return fmt.Errorf(s.serverMessage("server must be started before stopping it")) } close(s.stopChan) s.stopWg.Wait() s.ServerHandler.Destroy(s.ctx) s.stopChan = nil return nil } func (s *Server) ConnectionSize() int { var sz int s.connections.Range(func(k, v interface{}) bool { sz++ return true }) return sz } func (s *Server) serverMessage(msg string) string { return fmt.Sprintf("Server[%s]: %s", s.ServerHandler.GetName(), msg) } func (s *Server) handleServer(listener net.Listener) error { var ( err error ) errChan := make(chan error) defer func() { if nil != listener { listener.Close() } s.stopWg.Done() }() go func() { if err := s.hs.Serve(listener); nil != err { errChan <- err return } close(errChan) }() select { case err, _ := <-errChan: if nil != err { return err } } defer func() { s.ServerHandler.OnStop(s.ctx) logging.Logger().Infof(s.serverMessage("Stopped")) }() if err = s.ServerHandler.OnStart(s.ctx); nil != err { return err } logging.Logger().Infof(s.serverMessage("Started")) select { case <-s.stopChan: listener.Close() listener = nil } return nil } func (s *Server) httpHandler(ctx *fasthttp.RequestCtx) { path := string(ctx.Path()) var ( servlet Servlet err error ) if 0 < s.ServerHandler.GetConcurrency() { sz := s.ConnectionSize() if sz >= s.ServerHandler.GetConcurrency() { logging.Logger().Warnf(s.serverMessage(fmt.Sprintf("max connections size %d, refuse", sz))) s.onError(ctx, fasthttp.StatusServiceUnavailable, err) return } } if servlet = s.ServerHandler.Servlet(path); nil == servlet { s.onError(ctx, fasthttp.StatusInternalServerError, err) return } var responseHeader *fasthttp.ResponseHeader servletCtx := servlet.ServletCtx(s.ctx) if responseHeader, err = servlet.Handshake(servletCtx, ctx); nil != err { s.onError(ctx, http.StatusNotAcceptable, fmt.Errorf("Handshake err: %v", err)) return } s.upgrader.Upgrade(ctx, responseHeader, func(conn *internal.Conn, err error) { if err != nil { s.onError(ctx, fasthttp.StatusInternalServerError, err) return } s.stopWg.Add(1) go s.handleConnection(servlet, servletCtx, conn) }) } func (s *Server) handleConnection(servlet Servlet, servletCtx server.ServletCtx, conn *internal.Conn) { addr := conn.RemoteAddr() defer func() { s.connections.Delete(conn) conn.Close() servlet.OnDisconnect(servletCtx) logging.Logger().Infof(s.serverMessage(fmt.Sprintf("Client[%s] has been disconnected", addr))) s.stopWg.Done() }() logging.Logger().Infof(s.serverMessage(fmt.Sprintf("Client[%s] has been connected", addr))) s.connections.Store(conn, true) servlet.OnConnect(servletCtx, conn) servletStopChan := make(chan struct{}) doneChan := make(chan struct{}) readChan := make(chan []byte) writeChan := make(chan []byte) go servlet.Handle(servletCtx, doneChan, servletStopChan, readChan, writeChan) go handleRead(s, conn, readChan) go handleWrite(s, conn, writeChan) select { case <-doneChan: close(servletStopChan) case <-s.stopChan: close(servletStopChan) <-doneChan } } func (s *Server) onError(ctx *fasthttp.RequestCtx, status int, reason error) { s.ServerHandler.OnError(s.ctx, ctx, status, reason) } func handleRead(s *Server, conn *internal.Conn, readChan chan []byte) { conn.SetReadLimit(s.ServerHandler.GetMaxMessageSize()) conn.SetReadDeadline(time.Now().Add(s.ServerHandler.GetReadTimeout())) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(s.ServerHandler.GetPongTimeout())) return nil }) for { _, message, err := conn.ReadMessage() if err != nil { if internal.IsUnexpectedCloseError(err, internal.CloseGoingAway, internal.CloseAbnormalClosure) { logging.Logger().Debugf(s.serverMessage(fmt.Sprintf("Read error %v", err))) } break } readChan <- message } } func handleWrite(s *Server, conn *internal.Conn, writeChan chan []byte) { ticker := time.NewTicker(s.ServerHandler.GetPingPeriod()) defer func() { ticker.Stop() }() for { select { case message, ok := <-writeChan: conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetWriteTimeout())) if !ok { conn.WriteMessage(internal.CloseMessage, []byte{}) return } w, err := conn.NextWriter(internal.TextMessage) if err != nil { return } w.Write(message) if err := w.Close(); nil != err { return } case <-ticker.C: conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetPingTimeout())) if err := conn.WriteMessage(internal.PingMessage, nil); nil != err { return } } } }