This commit is contained in:
crusader 2017-11-10 17:33:40 +09:00
parent e939a0c90c
commit 944de8ecf5
6 changed files with 79 additions and 22 deletions

View File

@ -37,12 +37,12 @@ type server struct {
func (s *server) Start() error { func (s *server) Start() error {
if nil == s.sh { if nil == s.sh {
panic("Server: server handler must be specified.") logging.Logger().Panic("Server: server handler must be specified.")
} }
s.sh.Validate() s.sh.Validate()
if s.stopChan != nil { if s.stopChan != nil {
panic("Server: server is already running. Stop it before starting it again") logging.Logger().Panic("Server: server is already running. Stop it before starting it again")
} }
s.httpServer = &fasthttp.Server{ s.httpServer = &fasthttp.Server{
@ -74,9 +74,11 @@ func (s *server) Start() error {
s.stopChan = make(chan struct{}) s.stopChan = make(chan struct{})
logging.Logger().Info(fmt.Sprintf("Server[%s] is started", s.sh.GetName())) if err = s.sh.OnStart(); nil != err {
logging.Logger().Panic(fmt.Sprintf("Server: server cannot start %v", err))
}
s.sh.OnStart() logging.Logger().Info(fmt.Sprintf("Server[%s] is started", s.sh.GetName()))
s.stopWg.Add(1) s.stopWg.Add(1)
go handleServer(s) go handleServer(s)
@ -86,7 +88,7 @@ func (s *server) Start() error {
func (s *server) Stop() { func (s *server) Stop() {
if s.stopChan == nil { if s.stopChan == nil {
panic("Server: server must be started before stopping it") logging.Logger().Panic("Server: server must be started before stopping it")
} }
close(s.stopChan) close(s.stopChan)
s.stopWg.Wait() s.stopWg.Wait()
@ -125,8 +127,8 @@ func (s *server) handleRequest(ctx *fasthttp.RequestCtx) {
return return
} }
var responseHeader *fasthttp.ResponseHeader var responseHeader *fasthttp.ResponseHeader
var socketID interface{} var socketID string
if socketID, responseHeader = socketHandler.Handshake(ctx); nil == socketID { if socketID, responseHeader = socketHandler.Handshake(ctx); "" == socketID {
s.handleError(ctx, http.StatusNotAcceptable, fmt.Errorf("Server: Handshake err")) s.handleError(ctx, http.StatusNotAcceptable, fmt.Errorf("Server: Handshake err"))
return return
} }
@ -142,7 +144,6 @@ func (s *server) handleRequest(ctx *fasthttp.RequestCtx) {
s.stopWg.Add(1) s.stopWg.Add(1)
handleConnection(s, soc, socketHandler) handleConnection(s, soc, socketHandler)
}) })
} }
func (s *server) handleError(ctx *fasthttp.RequestCtx, status int, reason error) { func (s *server) handleError(ctx *fasthttp.RequestCtx, status int, reason error) {

View File

@ -12,7 +12,7 @@ type ServerHandler interface {
CheckOrigin(ctx *fasthttp.RequestCtx) bool CheckOrigin(ctx *fasthttp.RequestCtx) bool
OnError(ctx *fasthttp.RequestCtx, status int, reason error) OnError(ctx *fasthttp.RequestCtx, status int, reason error)
OnStart() OnStart() error
OnStop() OnStop()
RegisterSocketHandler(path string, handler SocketHandler) RegisterSocketHandler(path string, handler SocketHandler)

View File

@ -6,6 +6,8 @@ import (
"net" "net"
"time" "time"
"git.loafle.net/commons_go/logging"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
) )
@ -62,15 +64,31 @@ func (sh *ServerHandlers) CheckOrigin(ctx *fasthttp.RequestCtx) bool {
} }
func (sh *ServerHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) { func (sh *ServerHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) {
// no op logging.Logger().Error(fmt.Sprintf("Server: error status: %d, reason: %v, [%v]", status, reason, ctx))
} }
func (sh *ServerHandlers) OnStart() { // OnStart invoked when server is stated
// no op // If you override ths method, must call
func (sh *ServerHandlers) OnStart() error {
if nil != sh.socketHandlers {
for _, socketHandler := range sh.socketHandlers {
if err := socketHandler.Init(); nil != err {
return err
}
}
} }
return nil
}
// OnStop invoked when server is stopped
// If you override ths method, must call
func (sh *ServerHandlers) OnStop() { func (sh *ServerHandlers) OnStop() {
// no op if nil != sh.socketHandlers {
for _, socketHandler := range sh.socketHandlers {
socketHandler.Destroy()
}
}
} }
func (sh *ServerHandlers) RegisterSocketHandler(path string, handler SocketHandler) { func (sh *ServerHandlers) RegisterSocketHandler(path string, handler SocketHandler) {
@ -100,12 +118,15 @@ func (sh *ServerHandlers) GetName() string {
func (sh *ServerHandlers) GetConcurrency() int { func (sh *ServerHandlers) GetConcurrency() int {
return sh.Concurrency return sh.Concurrency
} }
func (sh *ServerHandlers) GetMaxStopWaitTime() time.Duration { func (sh *ServerHandlers) GetMaxStopWaitTime() time.Duration {
return sh.MaxStopWaitTime return sh.MaxStopWaitTime
} }
func (sh *ServerHandlers) GetHandshakeTimeout() time.Duration { func (sh *ServerHandlers) GetHandshakeTimeout() time.Duration {
return sh.HandshakeTimeout return sh.HandshakeTimeout
} }
func (sh *ServerHandlers) GetReadBufferSize() int { func (sh *ServerHandlers) GetReadBufferSize() int {
return sh.ReadBufferSize return sh.ReadBufferSize
} }

View File

@ -9,7 +9,7 @@ import (
"git.loafle.net/commons_go/websocket_fasthttp/websocket" "git.loafle.net/commons_go/websocket_fasthttp/websocket"
) )
func newSocket(id interface{}, conn *websocket.Conn, sh SocketHandler) *Socket { func newSocket(id string, conn *websocket.Conn, sh SocketHandler) *Socket {
s := retainSocket() s := retainSocket()
s.Conn = conn s.Conn = conn
s.sh = sh s.sh = sh
@ -26,13 +26,13 @@ type Socket struct {
*websocket.Conn *websocket.Conn
sh SocketHandler sh SocketHandler
id interface{} id string
attributes map[interface{}]interface{} attributes map[interface{}]interface{}
sc *SocketConn sc *SocketConn
} }
func (s *Socket) ID() interface{} { func (s *Socket) ID() string {
return s.id return s.id
} }
@ -163,7 +163,7 @@ func retainSocket() *Socket {
func releaseSocket(s *Socket) { func releaseSocket(s *Socket) {
s.sh = nil s.sh = nil
s.sc = nil s.sc = nil
s.id = nil s.id = ""
socketPool.Put(s) socketPool.Put(s)
} }

View File

@ -7,12 +7,25 @@ import (
) )
type SocketHandler interface { type SocketHandler interface {
// Init invoked when server is stated
// If you override ths method, must call
Init() error
// Handshake do handshake client and server // Handshake do handshake client and server
// id is identity of client socket. if id is nil, disallow connection // id is identity of client socket. if id is "", disallow connection
Handshake(ctx *fasthttp.RequestCtx) (id interface{}, extensionsHeader *fasthttp.ResponseHeader) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader)
// OnConnect invoked when client is connected
// If you override ths method, must call
OnConnect(soc *Socket) OnConnect(soc *Socket)
Handle(soc *Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) Handle(soc *Socket, stopChan <-chan struct{}, doneChan chan<- struct{})
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
OnDisconnect(soc *Socket) OnDisconnect(soc *Socket)
// Destroy invoked when server is stopped
// If you override ths method, must call
Destroy()
GetSocket(id string) *Socket
GetSockets() map[string]*Socket
GetMaxMessageSize() int64 GetMaxMessageSize() int64
GetWriteTimeout() time.Duration GetWriteTimeout() time.Duration
@ -21,5 +34,7 @@ type SocketHandler interface {
GetPingTimeout() time.Duration GetPingTimeout() time.Duration
GetPingPeriod() time.Duration GetPingPeriod() time.Duration
// Validate is check handler value
// If you override ths method, must call
Validate() Validate()
} }

View File

@ -25,13 +25,22 @@ type SocketHandlers struct {
PongTimeout time.Duration PongTimeout time.Duration
PingTimeout time.Duration PingTimeout time.Duration
PingPeriod time.Duration PingPeriod time.Duration
sockets map[string]*Socket
} }
func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id interface{}, extensionsHeader *fasthttp.ResponseHeader) { func (sh *SocketHandlers) Init() error {
return nil, nil sh.sockets = make(map[string]*Socket)
return nil
} }
func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
func (sh *SocketHandlers) OnConnect(soc *Socket) { func (sh *SocketHandlers) OnConnect(soc *Socket) {
// no op sh.sockets[soc.ID()] = soc
} }
func (sh *SocketHandlers) Handle(soc *Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { func (sh *SocketHandlers) Handle(soc *Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
@ -39,9 +48,20 @@ func (sh *SocketHandlers) Handle(soc *Socket, stopChan <-chan struct{}, doneChan
} }
func (sh *SocketHandlers) OnDisconnect(soc *Socket) { func (sh *SocketHandlers) OnDisconnect(soc *Socket) {
delete(sh.sockets, soc.ID())
}
func (sh *SocketHandlers) Destroy() {
// no op // no op
} }
func (sh *SocketHandlers) GetSocket(id string) *Socket {
return sh.sockets[id]
}
func (sh *SocketHandlers) GetSockets() map[string]*Socket {
return sh.sockets
}
func (sh *SocketHandlers) GetMaxMessageSize() int64 { func (sh *SocketHandlers) GetMaxMessageSize() int64 {
return sh.MaxMessageSize return sh.MaxMessageSize
} }