This commit is contained in:
crusader 2017-11-28 18:30:12 +09:00
parent f653cbf551
commit 28723e5290

188
socket.go
View File

@ -14,8 +14,8 @@ type Socket interface {
// ID returns the identity of the client. // ID returns the identity of the client.
ID() string ID() string
// WaitRequest wait request of client. // // WaitRequest wait request of client.
WaitRequest() (*SocketConn, error) // WaitRequest() (*SocketConn, error)
// Subprotocol returns the negotiated protocol for the connection. // Subprotocol returns the negotiated protocol for the connection.
Subprotocol() string Subprotocol() string
@ -161,7 +161,7 @@ func newSocket(socketHandler SocketHandler, socketCTX SocketContext, conn *webso
return s return s
} }
type fasthttpSocket struct { type fasthttpWebSocket struct {
*websocket.Conn *websocket.Conn
ctx SocketContext ctx SocketContext
@ -169,40 +169,40 @@ type fasthttpSocket struct {
id string id string
sc *SocketConn // sc *SocketConn
} }
func (s *fasthttpSocket) Context() SocketContext { func (s *fasthttpWebSocket) Context() SocketContext {
return s.ctx return s.ctx
} }
func (s *fasthttpSocket) ID() string { func (s *fasthttpWebSocket) ID() string {
return s.id return s.id
} }
func (s *fasthttpSocket) WaitRequest() (*SocketConn, error) { // func (s *fasthttpWebSocket) WaitRequest() (*SocketConn, error) {
if nil != s.sc { // if nil != s.sc {
releaseSocketConn(s.sc) // releaseSocketConn(s.sc)
s.sc = nil // s.sc = nil
} // }
var mt int // var mt int
var err error // var err error
var r io.Reader // var r io.Reader
if mt, r, err = s.NextReader(); nil != err { // if mt, r, err = s.NextReader(); nil != err {
return nil, err // return nil, err
} // }
s.sc = retainSocketConn() // s.sc = retainSocketConn()
s.sc.s = s // s.sc.s = s
s.sc.MessageType = mt // s.sc.MessageType = mt
s.sc.r = r // s.sc.r = r
return s.sc, nil // return s.sc, nil
} // }
func (s *fasthttpSocket) NextWriter(messageType int) (io.WriteCloser, error) { func (s *fasthttpWebSocket) NextWriter(messageType int) (io.WriteCloser, error) {
if 0 < s.sh.GetWriteTimeout() { if 0 < s.sh.GetWriteTimeout() {
s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second))
} }
@ -210,7 +210,7 @@ func (s *fasthttpSocket) NextWriter(messageType int) (io.WriteCloser, error) {
return s.Conn.NextWriter(messageType) return s.Conn.NextWriter(messageType)
} }
func (s *fasthttpSocket) WriteMessage(messageType int, data []byte) error { func (s *fasthttpWebSocket) WriteMessage(messageType int, data []byte) error {
if 0 < s.sh.GetWriteTimeout() { if 0 < s.sh.GetWriteTimeout() {
s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second))
} }
@ -218,100 +218,100 @@ func (s *fasthttpSocket) WriteMessage(messageType int, data []byte) error {
return s.Conn.WriteMessage(messageType, data) return s.Conn.WriteMessage(messageType, data)
} }
func (s *fasthttpSocket) Close() error { func (s *fasthttpWebSocket) Close() error {
err := s.Conn.Close() err := s.Conn.Close()
releaseSocket(s) releaseSocket(s)
return err return err
} }
type SocketConn struct { // type SocketConn struct {
net.Conn // net.Conn
s *fasthttpSocket // s *fasthttpWebSocket
MessageType int // MessageType int
r io.Reader // r io.Reader
wc io.WriteCloser // wc io.WriteCloser
} // }
func (sc *SocketConn) Read(b []byte) (n int, err error) { // func (sc *SocketConn) Read(b []byte) (n int, err error) {
return sc.r.Read(b) // return sc.r.Read(b)
} // }
func (sc *SocketConn) Write(b []byte) (n int, err error) { // func (sc *SocketConn) Write(b []byte) (n int, err error) {
if nil == sc.wc { // if nil == sc.wc {
var err error // var err error
if sc.wc, err = sc.s.NextWriter(sc.MessageType); nil != err { // if sc.wc, err = sc.s.NextWriter(sc.MessageType); nil != err {
return 0, err // return 0, err
} // }
} // }
return sc.wc.Write(b) // return sc.wc.Write(b)
} // }
func (sc *SocketConn) Close() error { // func (sc *SocketConn) Close() error {
var err error // var err error
if sc.wc != nil { // if sc.wc != nil {
err = sc.wc.Close() // err = sc.wc.Close()
} // }
sc.s.sc = nil // sc.s.sc = nil
releaseSocketConn(sc) // releaseSocketConn(sc)
return err // return err
} // }
func (sc *SocketConn) LocalAddr() net.Addr { // func (sc *SocketConn) LocalAddr() net.Addr {
return sc.s.LocalAddr() // return sc.s.LocalAddr()
} // }
func (sc *SocketConn) RemoteAddr() net.Addr { // func (sc *SocketConn) RemoteAddr() net.Addr {
return sc.s.RemoteAddr() // return sc.s.RemoteAddr()
} // }
func (sc *SocketConn) SetDeadline(t time.Time) error { // func (sc *SocketConn) SetDeadline(t time.Time) error {
if err := sc.s.SetReadDeadline(t); nil != err { // if err := sc.s.SetReadDeadline(t); nil != err {
return err // return err
} // }
if err := sc.s.SetWriteDeadline(t); nil != err { // if err := sc.s.SetWriteDeadline(t); nil != err {
return err // return err
} // }
return nil // return nil
} // }
func (sc *SocketConn) SetReadDeadline(t time.Time) error { // func (sc *SocketConn) SetReadDeadline(t time.Time) error {
return sc.s.SetReadDeadline(t) // return sc.s.SetReadDeadline(t)
} // }
func (sc *SocketConn) SetWriteDeadline(t time.Time) error { // func (sc *SocketConn) SetWriteDeadline(t time.Time) error {
return sc.s.SetWriteDeadline(t) // return sc.s.SetWriteDeadline(t)
} // }
var socketPool sync.Pool var socketPool sync.Pool
func retainSocket() *fasthttpSocket { func retainSocket() *fasthttpWebSocket {
v := socketPool.Get() v := socketPool.Get()
if v == nil { if v == nil {
return &fasthttpSocket{} return &fasthttpWebSocket{}
} }
return v.(*fasthttpSocket) return v.(*fasthttpWebSocket)
} }
func releaseSocket(s *fasthttpSocket) { func releaseSocket(s *fasthttpWebSocket) {
s.sh = nil s.sh = nil
s.sc = nil s.ctx = nil
s.id = "" s.id = ""
socketPool.Put(s) socketPool.Put(s)
} }
var socketConnPool sync.Pool // var socketConnPool sync.Pool
func retainSocketConn() *SocketConn { // func retainSocketConn() *SocketConn {
v := socketConnPool.Get() // v := socketConnPool.Get()
if v == nil { // if v == nil {
return &SocketConn{} // return &SocketConn{}
} // }
return v.(*SocketConn) // return v.(*SocketConn)
} // }
func releaseSocketConn(sc *SocketConn) { // func releaseSocketConn(sc *SocketConn) {
sc.s = nil // sc.s = nil
sc.r = nil // sc.r = nil
sc.wc = nil // sc.wc = nil
socketConnPool.Put(sc) // socketConnPool.Put(sc)
} // }