websocket_fasthttp/server.go

194 lines
4.3 KiB
Go
Raw Normal View History

2017-11-07 10:03:40 +00:00
package websocket_fasthttp
import (
"fmt"
"net"
"net/http"
"sync"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/websocket_fasthttp/websocket"
"github.com/valyala/fasthttp"
)
type Server interface {
Start() error
Stop()
2017-11-27 10:05:59 +00:00
2017-11-27 11:19:19 +00:00
Context() ServerContext
2017-11-07 10:03:40 +00:00
}
2017-11-27 14:30:18 +00:00
func New(sh ServerHandler) Server {
2017-11-07 10:03:40 +00:00
s := &server{
sh: sh,
}
2017-11-27 10:05:59 +00:00
2017-11-07 10:03:40 +00:00
return s
}
type server struct {
2017-11-27 11:19:19 +00:00
ctx ServerContext
2017-11-27 10:05:59 +00:00
2017-11-07 10:03:40 +00:00
sh ServerHandler
httpServer *fasthttp.Server
upgrader *websocket.Upgrader
listener net.Listener
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (s *server) Start() error {
if nil == s.sh {
2017-11-10 08:33:40 +00:00
logging.Logger().Panic("Server: server handler must be specified.")
2017-11-07 10:03:40 +00:00
}
s.sh.Validate()
if s.stopChan != nil {
2017-12-01 13:13:15 +00:00
return fmt.Errorf("Server: server is already running. Stop it before starting it again")
2017-11-07 10:03:40 +00:00
}
s.httpServer = &fasthttp.Server{
Handler: s.handleRequest,
Name: s.sh.GetName(),
Concurrency: s.sh.GetConcurrency(),
ReadBufferSize: s.sh.GetReadBufferSize(),
WriteBufferSize: s.sh.GetWriteBufferSize(),
ReadTimeout: s.sh.GetReadTimeout(),
WriteTimeout: s.sh.GetWriteTimeout(),
}
s.upgrader = &websocket.Upgrader{
HandshakeTimeout: s.sh.GetHandshakeTimeout(),
ReadBufferSize: s.sh.GetReadBufferSize(),
WriteBufferSize: s.sh.GetWriteBufferSize(),
CheckOrigin: s.sh.CheckOrigin,
Error: s.handleError,
EnableCompression: s.sh.IsEnableCompression(),
}
var err error
2017-11-27 14:30:18 +00:00
s.ctx = s.sh.ServerContext()
2017-11-27 10:05:59 +00:00
2017-11-27 11:19:19 +00:00
if err = s.sh.Init(s.ctx); nil != err {
2017-12-01 13:13:15 +00:00
return fmt.Errorf("Server: Initialization of server has been failed %v", err)
2017-11-27 10:05:59 +00:00
}
2017-11-07 10:03:40 +00:00
var listener net.Listener
2017-11-27 11:19:19 +00:00
if listener, err = s.sh.Listen(s.ctx); nil != err {
2017-11-07 10:03:40 +00:00
return err
}
s.listener = newGracefulListener(listener, s.sh.GetMaxStopWaitTime())
2017-11-10 09:45:48 +00:00
s.stopChan = make(chan struct{})
2017-11-07 10:03:40 +00:00
s.stopWg.Add(1)
go handleServer(s)
return nil
}
func (s *server) Stop() {
if s.stopChan == nil {
2017-12-01 13:13:15 +00:00
logging.Logger().Warn("Server: server must be started before stopping it")
return
2017-11-07 10:03:40 +00:00
}
close(s.stopChan)
s.stopWg.Wait()
s.stopChan = nil
2017-11-09 11:06:05 +00:00
2017-11-27 11:19:19 +00:00
s.sh.OnStop(s.ctx)
2017-11-09 11:06:05 +00:00
logging.Logger().Info(fmt.Sprintf("Server[%s] is stopped", s.sh.GetName()))
2017-11-07 10:03:40 +00:00
}
2017-11-27 11:19:19 +00:00
func (s *server) Context() ServerContext {
return s.ctx
2017-11-27 10:05:59 +00:00
}
2017-11-07 10:03:40 +00:00
func handleServer(s *server) {
go func() {
defer s.stopWg.Done()
if err := s.httpServer.Serve(s.listener); nil != err {
2017-11-09 06:17:01 +00:00
logging.Logger().Error(fmt.Sprintf("Server: Server err - %v", err))
2017-11-07 10:03:40 +00:00
}
}()
2017-11-10 09:45:48 +00:00
logging.Logger().Info(fmt.Sprintf("Server[%s] is started", s.sh.GetName()))
2017-11-27 11:19:19 +00:00
s.sh.OnStart(s.ctx)
2017-11-10 09:45:48 +00:00
2017-11-07 10:03:40 +00:00
select {
case <-s.stopChan:
s.listener.Close()
return
}
}
func (s *server) handleRequest(ctx *fasthttp.RequestCtx) {
path := string(ctx.Path())
var socketHandler SocketHandler
var err error
if socketHandler, err = s.sh.GetSocketHandler(path); nil != err {
s.handleError(ctx, fasthttp.StatusNotFound, err)
return
}
var responseHeader *fasthttp.ResponseHeader
2017-11-10 08:33:40 +00:00
var socketID string
2017-12-01 08:02:47 +00:00
socketCTX := socketHandler.SocketContext(s.ctx)
if socketID, responseHeader = socketHandler.Handshake(socketCTX, ctx); "" == socketID {
2017-11-07 10:03:40 +00:00
s.handleError(ctx, http.StatusNotAcceptable, fmt.Errorf("Server: Handshake err"))
return
}
s.upgrader.Upgrade(ctx, responseHeader, func(conn *websocket.Conn, err error) {
if err != nil {
s.handleError(ctx, fasthttp.StatusInternalServerError, err)
return
}
2017-11-08 05:30:56 +00:00
2017-11-27 14:30:18 +00:00
soc := newSocket(socketHandler, socketCTX, conn, socketID)
2017-11-27 15:42:27 +00:00
logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is connected.", soc.RemoteAddr()))
socketHandler.addSocket(soc)
socketHandler.OnConnect(soc)
2017-11-08 10:15:09 +00:00
2017-11-07 10:03:40 +00:00
s.stopWg.Add(1)
2017-11-08 10:17:56 +00:00
handleConnection(s, soc, socketHandler)
2017-11-07 10:03:40 +00:00
})
}
func (s *server) handleError(ctx *fasthttp.RequestCtx, status int, reason error) {
ctx.Response.Header.Set("Sec-Websocket-Version", "13")
ctx.Error(http.StatusText(status), status)
2017-11-27 11:19:19 +00:00
s.sh.OnError(s.ctx, ctx, status, reason)
2017-11-07 10:03:40 +00:00
}
2017-11-10 16:49:00 +00:00
func handleConnection(s *server, soc Socket, socketHandler SocketHandler) {
2017-11-07 10:03:40 +00:00
defer s.stopWg.Done()
clientStopChan := make(chan struct{})
2017-11-28 09:41:00 +00:00
handleDoneChan := make(chan error, 1)
2017-11-07 10:03:40 +00:00
2017-11-08 10:17:56 +00:00
go socketHandler.Handle(soc, clientStopChan, handleDoneChan)
2017-11-07 10:03:40 +00:00
select {
case <-s.stopChan:
close(clientStopChan)
<-handleDoneChan
case <-handleDoneChan:
close(clientStopChan)
2017-11-09 09:47:30 +00:00
socketHandler.OnDisconnect(soc)
2017-11-14 05:21:09 +00:00
logging.Logger().Debug(fmt.Sprintf("Server: Client[%s] is disconnected.", soc.RemoteAddr()))
socketHandler.removeSocket(soc)
2017-11-08 10:15:09 +00:00
soc.Close()
2017-11-07 10:03:40 +00:00
}
}