From 944de8ecf5e7f7abdd39b9ddf7bd45d251e89159 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 10 Nov 2017 17:33:40 +0900 Subject: [PATCH] ing --- server.go | 17 +++++++++-------- server_handler.go | 2 +- server_handlers.go | 29 +++++++++++++++++++++++++---- socket.go | 8 ++++---- socket_handler.go | 19 +++++++++++++++++-- socket_handlers.go | 26 +++++++++++++++++++++++--- 6 files changed, 79 insertions(+), 22 deletions(-) diff --git a/server.go b/server.go index 1db0afc..af0ac25 100644 --- a/server.go +++ b/server.go @@ -37,12 +37,12 @@ type server struct { func (s *server) Start() error { if nil == s.sh { - panic("Server: server handler must be specified.") + logging.Logger().Panic("Server: server handler must be specified.") } s.sh.Validate() if s.stopChan != nil { - panic("Server: server is already running. Stop it before starting it again") + logging.Logger().Panic("Server: server is already running. Stop it before starting it again") } s.httpServer = &fasthttp.Server{ @@ -74,9 +74,11 @@ func (s *server) Start() error { s.stopChan = make(chan struct{}) - logging.Logger().Info(fmt.Sprintf("Server[%s] is started", s.sh.GetName())) + if err = s.sh.OnStart(); nil != err { + logging.Logger().Panic(fmt.Sprintf("Server: server cannot start %v", err)) + } - s.sh.OnStart() + logging.Logger().Info(fmt.Sprintf("Server[%s] is started", s.sh.GetName())) s.stopWg.Add(1) go handleServer(s) @@ -86,7 +88,7 @@ func (s *server) Start() error { func (s *server) Stop() { if s.stopChan == nil { - panic("Server: server must be started before stopping it") + logging.Logger().Panic("Server: server must be started before stopping it") } close(s.stopChan) s.stopWg.Wait() @@ -125,8 +127,8 @@ func (s *server) handleRequest(ctx *fasthttp.RequestCtx) { return } var responseHeader *fasthttp.ResponseHeader - var socketID interface{} - if socketID, responseHeader = socketHandler.Handshake(ctx); nil == socketID { + var socketID string + if socketID, responseHeader = socketHandler.Handshake(ctx); "" == socketID { s.handleError(ctx, http.StatusNotAcceptable, fmt.Errorf("Server: Handshake err")) return } @@ -142,7 +144,6 @@ func (s *server) handleRequest(ctx *fasthttp.RequestCtx) { s.stopWg.Add(1) handleConnection(s, soc, socketHandler) }) - } func (s *server) handleError(ctx *fasthttp.RequestCtx, status int, reason error) { diff --git a/server_handler.go b/server_handler.go index a478a3e..de80858 100644 --- a/server_handler.go +++ b/server_handler.go @@ -12,7 +12,7 @@ type ServerHandler interface { CheckOrigin(ctx *fasthttp.RequestCtx) bool OnError(ctx *fasthttp.RequestCtx, status int, reason error) - OnStart() + OnStart() error OnStop() RegisterSocketHandler(path string, handler SocketHandler) diff --git a/server_handlers.go b/server_handlers.go index 94bd1d3..6b0a6e3 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -6,6 +6,8 @@ import ( "net" "time" + "git.loafle.net/commons_go/logging" + "github.com/valyala/fasthttp" ) @@ -62,15 +64,31 @@ func (sh *ServerHandlers) CheckOrigin(ctx *fasthttp.RequestCtx) bool { } func (sh *ServerHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) { - // no op + logging.Logger().Error(fmt.Sprintf("Server: error status: %d, reason: %v, [%v]", status, reason, ctx)) } -func (sh *ServerHandlers) OnStart() { - // no op +// OnStart invoked when server is stated +// If you override ths method, must call +func (sh *ServerHandlers) OnStart() error { + if nil != sh.socketHandlers { + for _, socketHandler := range sh.socketHandlers { + if err := socketHandler.Init(); nil != err { + return err + } + } + } + + return nil } +// OnStop invoked when server is stopped +// If you override ths method, must call func (sh *ServerHandlers) OnStop() { - // no op + if nil != sh.socketHandlers { + for _, socketHandler := range sh.socketHandlers { + socketHandler.Destroy() + } + } } func (sh *ServerHandlers) RegisterSocketHandler(path string, handler SocketHandler) { @@ -100,12 +118,15 @@ func (sh *ServerHandlers) GetName() string { func (sh *ServerHandlers) GetConcurrency() int { return sh.Concurrency } + func (sh *ServerHandlers) GetMaxStopWaitTime() time.Duration { return sh.MaxStopWaitTime } + func (sh *ServerHandlers) GetHandshakeTimeout() time.Duration { return sh.HandshakeTimeout } + func (sh *ServerHandlers) GetReadBufferSize() int { return sh.ReadBufferSize } diff --git a/socket.go b/socket.go index 06ea3ca..39280c6 100644 --- a/socket.go +++ b/socket.go @@ -9,7 +9,7 @@ import ( "git.loafle.net/commons_go/websocket_fasthttp/websocket" ) -func newSocket(id interface{}, conn *websocket.Conn, sh SocketHandler) *Socket { +func newSocket(id string, conn *websocket.Conn, sh SocketHandler) *Socket { s := retainSocket() s.Conn = conn s.sh = sh @@ -26,13 +26,13 @@ type Socket struct { *websocket.Conn sh SocketHandler - id interface{} + id string attributes map[interface{}]interface{} sc *SocketConn } -func (s *Socket) ID() interface{} { +func (s *Socket) ID() string { return s.id } @@ -163,7 +163,7 @@ func retainSocket() *Socket { func releaseSocket(s *Socket) { s.sh = nil s.sc = nil - s.id = nil + s.id = "" socketPool.Put(s) } diff --git a/socket_handler.go b/socket_handler.go index b628c8b..d1b5cee 100644 --- a/socket_handler.go +++ b/socket_handler.go @@ -7,12 +7,25 @@ import ( ) type SocketHandler interface { + // Init invoked when server is stated + // If you override ths method, must call + Init() error // Handshake do handshake client and server - // id is identity of client socket. if id is nil, disallow connection - Handshake(ctx *fasthttp.RequestCtx) (id interface{}, extensionsHeader *fasthttp.ResponseHeader) + // id is identity of client socket. if id is "", disallow connection + Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) + // OnConnect invoked when client is connected + // If you override ths method, must call OnConnect(soc *Socket) Handle(soc *Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) + // OnDisconnect invoked when client is disconnected + // If you override ths method, must call OnDisconnect(soc *Socket) + // Destroy invoked when server is stopped + // If you override ths method, must call + Destroy() + + GetSocket(id string) *Socket + GetSockets() map[string]*Socket GetMaxMessageSize() int64 GetWriteTimeout() time.Duration @@ -21,5 +34,7 @@ type SocketHandler interface { GetPingTimeout() time.Duration GetPingPeriod() time.Duration + // Validate is check handler value + // If you override ths method, must call Validate() } diff --git a/socket_handlers.go b/socket_handlers.go index 8c54112..0cc662e 100644 --- a/socket_handlers.go +++ b/socket_handlers.go @@ -25,13 +25,22 @@ type SocketHandlers struct { PongTimeout time.Duration PingTimeout time.Duration PingPeriod time.Duration + + sockets map[string]*Socket } -func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id interface{}, extensionsHeader *fasthttp.ResponseHeader) { - return nil, nil +func (sh *SocketHandlers) Init() error { + sh.sockets = make(map[string]*Socket) + + return nil } + +func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) { + return "", nil +} + func (sh *SocketHandlers) OnConnect(soc *Socket) { - // no op + sh.sockets[soc.ID()] = soc } func (sh *SocketHandlers) Handle(soc *Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { @@ -39,9 +48,20 @@ func (sh *SocketHandlers) Handle(soc *Socket, stopChan <-chan struct{}, doneChan } func (sh *SocketHandlers) OnDisconnect(soc *Socket) { + delete(sh.sockets, soc.ID()) +} + +func (sh *SocketHandlers) Destroy() { // no op } +func (sh *SocketHandlers) GetSocket(id string) *Socket { + return sh.sockets[id] +} +func (sh *SocketHandlers) GetSockets() map[string]*Socket { + return sh.sockets +} + func (sh *SocketHandlers) GetMaxMessageSize() int64 { return sh.MaxMessageSize }