This commit is contained in:
crusader 2017-11-27 20:19:19 +09:00
parent dff228d0db
commit 667ba44013
8 changed files with 87 additions and 51 deletions

View File

@ -7,7 +7,6 @@ import (
"sync" "sync"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
cuc "git.loafle.net/commons_go/util/context"
"git.loafle.net/commons_go/websocket_fasthttp/websocket" "git.loafle.net/commons_go/websocket_fasthttp/websocket"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
@ -17,19 +16,20 @@ type Server interface {
Start() error Start() error
Stop() Stop()
Attributes() cuc.Attributes Context() ServerContext
} }
func New(sh ServerHandler) Server { func New(sh ServerHandler) Server {
s := &server{ s := &server{
sh: sh, sh: sh,
} }
s.ctx = newServerContext()
return s return s
} }
type server struct { type server struct {
attributes cuc.Attributes ctx ServerContext
sh ServerHandler sh ServerHandler
@ -72,18 +72,17 @@ func (s *server) Start() error {
var err error var err error
if err = s.sh.Init(); nil != err { if err = s.sh.Init(s.ctx); nil != err {
logging.Logger().Panic(fmt.Sprintf("Server: Initialization of server has been failed %v", err)) logging.Logger().Panic(fmt.Sprintf("Server: Initialization of server has been failed %v", err))
} }
var listener net.Listener var listener net.Listener
if listener, err = s.sh.Listen(); nil != err { if listener, err = s.sh.Listen(s.ctx); nil != err {
return err return err
} }
s.listener = newGracefulListener(listener, s.sh.GetMaxStopWaitTime()) s.listener = newGracefulListener(listener, s.sh.GetMaxStopWaitTime())
s.stopChan = make(chan struct{}) s.stopChan = make(chan struct{})
s.attributes = cuc.NewAttributes(nil)
s.stopWg.Add(1) s.stopWg.Add(1)
go handleServer(s) go handleServer(s)
@ -99,15 +98,13 @@ func (s *server) Stop() {
s.stopWg.Wait() s.stopWg.Wait()
s.stopChan = nil s.stopChan = nil
s.attributes.Destroy() s.sh.OnStop(s.ctx)
s.sh.OnStop()
logging.Logger().Info(fmt.Sprintf("Server[%s] is stopped", s.sh.GetName())) logging.Logger().Info(fmt.Sprintf("Server[%s] is stopped", s.sh.GetName()))
} }
func (s *server) Attributes() cuc.Attributes { func (s *server) Context() ServerContext {
return s.attributes return s.ctx
} }
func handleServer(s *server) { func handleServer(s *server) {
@ -121,7 +118,7 @@ func handleServer(s *server) {
}() }()
logging.Logger().Info(fmt.Sprintf("Server[%s] is started", s.sh.GetName())) logging.Logger().Info(fmt.Sprintf("Server[%s] is started", s.sh.GetName()))
s.sh.OnStart() s.sh.OnStart(s.ctx)
select { select {
case <-s.stopChan: case <-s.stopChan:
@ -142,7 +139,7 @@ func (s *server) handleRequest(ctx *fasthttp.RequestCtx) {
} }
var responseHeader *fasthttp.ResponseHeader var responseHeader *fasthttp.ResponseHeader
var socketID string var socketID string
if socketID, responseHeader = socketHandler.Handshake(ctx); "" == socketID { if socketID, responseHeader = socketHandler.Handshake(s.ctx, ctx); "" == socketID {
s.handleError(ctx, http.StatusNotAcceptable, fmt.Errorf("Server: Handshake err")) s.handleError(ctx, http.StatusNotAcceptable, fmt.Errorf("Server: Handshake err"))
return return
} }
@ -153,7 +150,7 @@ func (s *server) handleRequest(ctx *fasthttp.RequestCtx) {
return return
} }
soc := newSocket(socketID, conn, socketHandler) soc := newSocket(s.ctx, socketID, conn, socketHandler)
s.stopWg.Add(1) s.stopWg.Add(1)
handleConnection(s, soc, socketHandler) handleConnection(s, soc, socketHandler)
@ -164,7 +161,7 @@ func (s *server) handleError(ctx *fasthttp.RequestCtx, status int, reason error)
ctx.Response.Header.Set("Sec-Websocket-Version", "13") ctx.Response.Header.Set("Sec-Websocket-Version", "13")
ctx.Error(http.StatusText(status), status) ctx.Error(http.StatusText(status), status)
s.sh.OnError(ctx, status, reason) s.sh.OnError(s.ctx, ctx, status, reason)
} }
func handleConnection(s *server, soc Socket, socketHandler SocketHandler) { func handleConnection(s *server, soc Socket, socketHandler SocketHandler) {

20
server_context.go Normal file
View File

@ -0,0 +1,20 @@
package websocket_fasthttp
import (
cuc "git.loafle.net/commons_go/util/context"
)
type ServerContext interface {
cuc.Context
}
type serverContext struct {
cuc.Context
}
func newServerContext() ServerContext {
sCTX := &serverContext{}
sCTX.Context = cuc.NewContext(nil)
return sCTX
}

View File

@ -18,8 +18,8 @@ type ServerHandler interface {
// ... // ...
// return nil // return nil
// } // }
Init() error Init(serverCTX ServerContext) error
Listen() (net.Listener, error) Listen(serverCTX ServerContext) (net.Listener, error)
// OnStart invoked when server is started // OnStart invoked when server is started
// If you override ths method, must call // If you override ths method, must call
// //
@ -28,10 +28,10 @@ type ServerHandler interface {
// ... // ...
// return nil // return nil
// } // }
OnStart() OnStart(serverCTX ServerContext)
CheckOrigin(ctx *fasthttp.RequestCtx) bool CheckOrigin(ctx *fasthttp.RequestCtx) bool
OnError(ctx *fasthttp.RequestCtx, status int, reason error) OnError(serverCTX ServerContext, ctx *fasthttp.RequestCtx, status int, reason error)
// OnStop invoked when server is stopped // OnStop invoked when server is stopped
// If you override ths method, must call // If you override ths method, must call
// //
@ -39,7 +39,7 @@ type ServerHandler interface {
// ... // ...
// sh.ServerHandler.OnStop() // sh.ServerHandler.OnStop()
// } // }
OnStop() OnStop(serverCTX ServerContext)
RegisterSocketHandler(path string, handler SocketHandler) RegisterSocketHandler(path string, handler SocketHandler)
GetSocketHandler(path string) (SocketHandler, error) GetSocketHandler(path string) (SocketHandler, error)

View File

@ -55,10 +55,10 @@ type ServerHandlers struct {
socketHandlers map[string]SocketHandler socketHandlers map[string]SocketHandler
} }
func (sh *ServerHandlers) Init() error { func (sh *ServerHandlers) Init(serverCTX ServerContext) error {
if nil != sh.socketHandlers { if nil != sh.socketHandlers {
for _, socketHandler := range sh.socketHandlers { for _, socketHandler := range sh.socketHandlers {
if err := socketHandler.Init(); nil != err { if err := socketHandler.Init(serverCTX); nil != err {
return err return err
} }
} }
@ -67,13 +67,13 @@ func (sh *ServerHandlers) Init() error {
return nil return nil
} }
func (sh *ServerHandlers) Listen() (net.Listener, error) { func (sh *ServerHandlers) Listen(serverCTX ServerContext) (net.Listener, error) {
return nil, errors.New("Server: Handler method[Listen] of Server is not implement") return nil, errors.New("Server: Handler method[Listen] of Server is not implement")
} }
// OnStart invoked when server is stated // OnStart invoked when server is stated
// If you override ths method, must call // If you override ths method, must call
func (sh *ServerHandlers) OnStart() { func (sh *ServerHandlers) OnStart(serverCTX ServerContext) {
// no op // no op
} }
@ -81,13 +81,13 @@ func (sh *ServerHandlers) CheckOrigin(ctx *fasthttp.RequestCtx) bool {
return true return true
} }
func (sh *ServerHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) { func (sh *ServerHandlers) OnError(serverCTX ServerContext, ctx *fasthttp.RequestCtx, status int, reason error) {
logging.Logger().Error(fmt.Sprintf("Server: error status: %d, reason: %v, [%v]", status, reason, ctx)) logging.Logger().Error(fmt.Sprintf("Server: error status: %d, reason: %v, [%v]", status, reason, ctx))
} }
// OnStop invoked when server is stopped // OnStop invoked when server is stopped
// If you override ths method, must call // If you override ths method, must call
func (sh *ServerHandlers) OnStop() { func (sh *ServerHandlers) OnStop(serverCTX ServerContext) {
if nil != sh.socketHandlers { if nil != sh.socketHandlers {
for _, socketHandler := range sh.socketHandlers { for _, socketHandler := range sh.socketHandlers {
socketHandler.Destroy() socketHandler.Destroy()

View File

@ -13,10 +13,7 @@ import (
type Socket interface { type Socket interface {
// ID returns the identity of the client. // ID returns the identity of the client.
ID() string ID() string
// GetAttribute returns a attribute for the key.
GetAttribute(key interface{}) interface{}
// SetAttribute store a attribute for the key.
SetAttribute(key interface{}, value interface{})
// WaitRequest wait request of client. // WaitRequest wait request of client.
WaitRequest() (*SocketConn, error) WaitRequest() (*SocketConn, error)
@ -146,9 +143,11 @@ type Socket interface {
// Headers returns the RequestHeader struct // Headers returns the RequestHeader struct
Headers() *fasthttp.RequestHeader Headers() *fasthttp.RequestHeader
Context() SocketContext
} }
func newSocket(id string, conn *websocket.Conn, sh SocketHandler) Socket { func newSocket(serverCTX ServerContext, id string, conn *websocket.Conn, sh SocketHandler) Socket {
s := retainSocket() s := retainSocket()
s.Conn = conn s.Conn = conn
s.sh = sh s.sh = sh
@ -157,38 +156,30 @@ func newSocket(id string, conn *websocket.Conn, sh SocketHandler) Socket {
if 0 < sh.GetReadTimeout() { if 0 < sh.GetReadTimeout() {
s.SetReadDeadline(time.Now().Add(sh.GetReadTimeout() * time.Second)) s.SetReadDeadline(time.Now().Add(sh.GetReadTimeout() * time.Second))
} }
s.ctx = newSocketContext(serverCTX)
return s return s
} }
type fasthttpSocket struct { type fasthttpSocket struct {
*websocket.Conn *websocket.Conn
ctx SocketContext
sh SocketHandler sh SocketHandler
id string id string
attributes map[interface{}]interface{}
sc *SocketConn sc *SocketConn
} }
func (s *fasthttpSocket) Context() SocketContext {
return s.ctx
}
func (s *fasthttpSocket) ID() string { func (s *fasthttpSocket) ID() string {
return s.id return s.id
} }
func (s *fasthttpSocket) GetAttribute(key interface{}) interface{} {
if nil == s.attributes {
return nil
}
return s.attributes[key]
}
func (s *fasthttpSocket) SetAttribute(key interface{}, value interface{}) {
if nil == s.attributes {
s.attributes = make(map[interface{}]interface{})
}
s.attributes[key] = value
}
func (s *fasthttpSocket) WaitRequest() (*SocketConn, error) { func (s *fasthttpSocket) WaitRequest() (*SocketConn, error) {
if nil != s.sc { if nil != s.sc {
releaseSocketConn(s.sc) releaseSocketConn(s.sc)

28
socket_context.go Normal file
View File

@ -0,0 +1,28 @@
package websocket_fasthttp
import (
cuc "git.loafle.net/commons_go/util/context"
)
type SocketContext interface {
cuc.Context
ServerContext() ServerContext
}
func newSocketContext(serverCTX ServerContext) SocketContext {
sCTX := &socketContext{}
sCTX.Context = cuc.NewContext(serverCTX)
sCTX.serverCTX = serverCTX
return sCTX
}
type socketContext struct {
cuc.Context
serverCTX ServerContext
}
func (sc *socketContext) ServerContext() ServerContext {
return sc.serverCTX
}

View File

@ -18,10 +18,10 @@ type SocketHandler interface {
// ... // ...
// return nil // return nil
// } // }
Init() error Init(serverCTX ServerContext) error
// Handshake do handshake client and server // Handshake do handshake client and server
// id is identity of client socket. if id is "", disallow connection // id is identity of client socket. if id is "", disallow connection
Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) Handshake(serverCTX ServerContext, ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader)
// OnConnect invoked when client is connected // OnConnect invoked when client is connected
// If you override ths method, must call // If you override ths method, must call
// //

View File

@ -29,7 +29,7 @@ type SocketHandlers struct {
sockets map[string]Socket sockets map[string]Socket
} }
func (sh *SocketHandlers) Init() error { func (sh *SocketHandlers) Init(serverCTX ServerContext) error {
sh.sockets = make(map[string]Socket) sh.sockets = make(map[string]Socket)
return nil return nil