package websocket_fasthttp import ( "io" "net" "sync" "time" "git.loafle.net/commons_go/websocket_fasthttp/websocket" "github.com/valyala/fasthttp" ) type Socket interface { // ID returns the identity of the client. ID() string // // WaitRequest wait request of client. // WaitRequest() (*SocketConn, error) // Subprotocol returns the negotiated protocol for the connection. Subprotocol() string // Close closes the underlying network connection without sending or waiting for a close frame. Close() error // LocalAddr returns the local network address. LocalAddr() net.Addr // RemoteAddr returns the remote network address. RemoteAddr() net.Addr // WriteControl writes a control message with the given deadline. The allowed // message types are CloseMessage, PingMessage and PongMessage. WriteControl(messageType int, data []byte, deadline time.Time) error // NextWriter returns a writer for the next message to send. The writer's Close // method flushes the complete message to the network. // // There can be at most one open writer on a connection. NextWriter closes the // previous writer if the application has not already done so. NextWriter(messageType int) (io.WriteCloser, error) // WritePreparedMessage writes prepared message into connection. WritePreparedMessage(pm *websocket.PreparedMessage) error // WriteMessage is a helper method for getting a writer using NextWriter, // writing the message and closing the writer. WriteMessage(messageType int, data []byte) error // SetWriteDeadline sets the write deadline on the underlying network // connection. After a write has timed out, the websocket state is corrupt and // all future writes will return an error. A zero value for t means writes will // not time out. SetWriteDeadline(t time.Time) error // NextReader returns the next data message received from the peer. The // returned messageType is either TextMessage or BinaryMessage. // // There can be at most one open reader on a connection. NextReader discards // the previous message if the application has not already consumed it. // // Applications must break out of the application's read loop when this method // returns a non-nil error value. Errors returned from this method are // permanent. Once this method returns a non-nil error, all subsequent calls to // this method return the same error. NextReader() (messageType int, r io.Reader, err error) // ReadMessage is a helper method for getting a reader using NextReader and // reading from that reader to a buffer. ReadMessage() (messageType int, p []byte, err error) // SetReadDeadline sets the read deadline on the underlying network connection. // After a read has timed out, the websocket connection state is corrupt and // all future reads will return an error. A zero value for t means reads will // not time out. SetReadDeadline(t time.Time) error // SetReadLimit sets the maximum size for a message read from the peer. If a // message exceeds the limit, the connection sends a close frame to the peer // and returns ErrReadLimit to the application. SetReadLimit(limit int64) // CloseHandler returns the current close handler CloseHandler() func(code int, text string) error // SetCloseHandler sets the handler for close messages received from the peer. // The code argument to h is the received close code or CloseNoStatusReceived // if the close message is empty. The default close handler sends a close frame // back to the peer. // // The application must read the connection to process close messages as // described in the section on Control Frames above. // // The connection read methods return a CloseError when a close frame is // received. Most applications should handle close messages as part of their // normal error handling. Applications should only set a close handler when the // application must perform some action before sending a close frame back to // the peer. SetCloseHandler(h func(code int, text string) error) // PingHandler returns the current ping handler PingHandler() func(appData string) error // SetPingHandler sets the handler for ping messages received from the peer. // The appData argument to h is the PING frame application data. The default // ping handler sends a pong to the peer. // // The application must read the connection to process ping messages as // described in the section on Control Frames above. SetPingHandler(h func(appData string) error) // PongHandler returns the current pong handler PongHandler() func(appData string) error // SetPongHandler sets the handler for pong messages received from the peer. // The appData argument to h is the PONG frame application data. The default // pong handler does nothing. // // The application must read the connection to process ping messages as // described in the section on Control Frames above. SetPongHandler(h func(appData string) error) // UnderlyingConn returns the internal net.Conn. This can be used to further // modifications to connection specific flags. UnderlyingConn() net.Conn // EnableWriteCompression enables and disables write compression of // subsequent text and binary messages. This function is a noop if // compression was not negotiated with the peer. EnableWriteCompression(enable bool) // SetCompressionLevel sets the flate compression level for subsequent text and // binary messages. This function is a noop if compression was not negotiated // with the peer. See the compress/flate package for a description of // compression levels. SetCompressionLevel(level int) error // SetHeaders sets request headers SetHeaders(h *fasthttp.RequestHeader) // Header returns header by key Header(key string) (value string) // Headers returns the RequestHeader struct Headers() *fasthttp.RequestHeader Context() SocketContext } func newSocket(socketHandler SocketHandler, socketCTX SocketContext, conn *websocket.Conn, id string) Socket { s := retainSocket() s.Conn = conn s.sh = socketHandler s.id = id s.SetReadLimit(socketHandler.GetMaxMessageSize()) if 0 < socketHandler.GetReadTimeout() { s.SetReadDeadline(time.Now().Add(socketHandler.GetReadTimeout() * time.Second)) } s.ctx = socketCTX return s } type fasthttpWebSocket struct { *websocket.Conn ctx SocketContext sh SocketHandler id string // sc *SocketConn } func (s *fasthttpWebSocket) Context() SocketContext { return s.ctx } func (s *fasthttpWebSocket) ID() string { return s.id } // func (s *fasthttpWebSocket) WaitRequest() (*SocketConn, error) { // if nil != s.sc { // releaseSocketConn(s.sc) // s.sc = nil // } // var mt int // var err error // var r io.Reader // if mt, r, err = s.NextReader(); nil != err { // return nil, err // } // s.sc = retainSocketConn() // s.sc.s = s // s.sc.MessageType = mt // s.sc.r = r // return s.sc, nil // } func (s *fasthttpWebSocket) NextWriter(messageType int) (io.WriteCloser, error) { if 0 < s.sh.GetWriteTimeout() { s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) } return s.Conn.NextWriter(messageType) } func (s *fasthttpWebSocket) WriteMessage(messageType int, data []byte) error { if 0 < s.sh.GetWriteTimeout() { s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) } return s.Conn.WriteMessage(messageType, data) } func (s *fasthttpWebSocket) Close() error { err := s.Conn.Close() releaseSocket(s) return err } // type SocketConn struct { // net.Conn // s *fasthttpWebSocket // MessageType int // r io.Reader // wc io.WriteCloser // } // func (sc *SocketConn) Read(b []byte) (n int, err error) { // return sc.r.Read(b) // } // func (sc *SocketConn) Write(b []byte) (n int, err error) { // if nil == sc.wc { // var err error // if sc.wc, err = sc.s.NextWriter(sc.MessageType); nil != err { // return 0, err // } // } // return sc.wc.Write(b) // } // func (sc *SocketConn) Close() error { // var err error // if sc.wc != nil { // err = sc.wc.Close() // } // sc.s.sc = nil // releaseSocketConn(sc) // return err // } // func (sc *SocketConn) LocalAddr() net.Addr { // return sc.s.LocalAddr() // } // func (sc *SocketConn) RemoteAddr() net.Addr { // return sc.s.RemoteAddr() // } // func (sc *SocketConn) SetDeadline(t time.Time) error { // if err := sc.s.SetReadDeadline(t); nil != err { // return err // } // if err := sc.s.SetWriteDeadline(t); nil != err { // return err // } // return nil // } // func (sc *SocketConn) SetReadDeadline(t time.Time) error { // return sc.s.SetReadDeadline(t) // } // func (sc *SocketConn) SetWriteDeadline(t time.Time) error { // return sc.s.SetWriteDeadline(t) // } var socketPool sync.Pool func retainSocket() *fasthttpWebSocket { v := socketPool.Get() if v == nil { return &fasthttpWebSocket{} } return v.(*fasthttpWebSocket) } func releaseSocket(s *fasthttpWebSocket) { s.sh = nil s.ctx = nil s.id = "" socketPool.Put(s) } // var socketConnPool sync.Pool // func retainSocketConn() *SocketConn { // v := socketConnPool.Get() // if v == nil { // return &SocketConn{} // } // return v.(*SocketConn) // } // func releaseSocketConn(sc *SocketConn) { // sc.s = nil // sc.r = nil // sc.wc = nil // socketConnPool.Put(sc) // }