package websocket_fasthttp import ( "io" "net" "sync" "time" "git.loafle.net/commons_go/websocket_fasthttp/websocket" "github.com/valyala/fasthttp" ) type Socket interface { // ID returns the identity of the client. ID() string // WaitRequest wait request of client. WaitRequest() (*SocketConn, error) // Subprotocol returns the negotiated protocol for the connection. Subprotocol() string // Close closes the underlying network connection without sending or waiting for a close frame. Close() error // LocalAddr returns the local network address. LocalAddr() net.Addr // RemoteAddr returns the remote network address. RemoteAddr() net.Addr // WriteControl writes a control message with the given deadline. The allowed // message types are CloseMessage, PingMessage and PongMessage. WriteControl(messageType int, data []byte, deadline time.Time) error // NextWriter returns a writer for the next message to send. The writer's Close // method flushes the complete message to the network. // // There can be at most one open writer on a connection. NextWriter closes the // previous writer if the application has not already done so. NextWriter(messageType int) (io.WriteCloser, error) // WritePreparedMessage writes prepared message into connection. WritePreparedMessage(pm *websocket.PreparedMessage) error // WriteMessage is a helper method for getting a writer using NextWriter, // writing the message and closing the writer. WriteMessage(messageType int, data []byte) error // SetWriteDeadline sets the write deadline on the underlying network // connection. After a write has timed out, the websocket state is corrupt and // all future writes will return an error. A zero value for t means writes will // not time out. SetWriteDeadline(t time.Time) error // NextReader returns the next data message received from the peer. The // returned messageType is either TextMessage or BinaryMessage. // // There can be at most one open reader on a connection. NextReader discards // the previous message if the application has not already consumed it. // // Applications must break out of the application's read loop when this method // returns a non-nil error value. Errors returned from this method are // permanent. Once this method returns a non-nil error, all subsequent calls to // this method return the same error. NextReader() (messageType int, r io.Reader, err error) // ReadMessage is a helper method for getting a reader using NextReader and // reading from that reader to a buffer. ReadMessage() (messageType int, p []byte, err error) // SetReadDeadline sets the read deadline on the underlying network connection. // After a read has timed out, the websocket connection state is corrupt and // all future reads will return an error. A zero value for t means reads will // not time out. SetReadDeadline(t time.Time) error // SetReadLimit sets the maximum size for a message read from the peer. If a // message exceeds the limit, the connection sends a close frame to the peer // and returns ErrReadLimit to the application. SetReadLimit(limit int64) // CloseHandler returns the current close handler CloseHandler() func(code int, text string) error // SetCloseHandler sets the handler for close messages received from the peer. // The code argument to h is the received close code or CloseNoStatusReceived // if the close message is empty. The default close handler sends a close frame // back to the peer. // // The application must read the connection to process close messages as // described in the section on Control Frames above. // // The connection read methods return a CloseError when a close frame is // received. Most applications should handle close messages as part of their // normal error handling. Applications should only set a close handler when the // application must perform some action before sending a close frame back to // the peer. SetCloseHandler(h func(code int, text string) error) // PingHandler returns the current ping handler PingHandler() func(appData string) error // SetPingHandler sets the handler for ping messages received from the peer. // The appData argument to h is the PING frame application data. The default // ping handler sends a pong to the peer. // // The application must read the connection to process ping messages as // described in the section on Control Frames above. SetPingHandler(h func(appData string) error) // PongHandler returns the current pong handler PongHandler() func(appData string) error // SetPongHandler sets the handler for pong messages received from the peer. // The appData argument to h is the PONG frame application data. The default // pong handler does nothing. // // The application must read the connection to process ping messages as // described in the section on Control Frames above. SetPongHandler(h func(appData string) error) // UnderlyingConn returns the internal net.Conn. This can be used to further // modifications to connection specific flags. UnderlyingConn() net.Conn // EnableWriteCompression enables and disables write compression of // subsequent text and binary messages. This function is a noop if // compression was not negotiated with the peer. EnableWriteCompression(enable bool) // SetCompressionLevel sets the flate compression level for subsequent text and // binary messages. This function is a noop if compression was not negotiated // with the peer. See the compress/flate package for a description of // compression levels. SetCompressionLevel(level int) error // SetHeaders sets request headers SetHeaders(h *fasthttp.RequestHeader) // Header returns header by key Header(key string) (value string) // Headers returns the RequestHeader struct Headers() *fasthttp.RequestHeader Context() SocketContext } func newSocket(socketHandler SocketHandler, socketCTX SocketContext, conn *websocket.Conn, id string) Socket { s := retainSocket() s.Conn = conn s.sh = socketHandler s.id = id s.SetReadLimit(socketHandler.GetMaxMessageSize()) if 0 < socketHandler.GetReadTimeout() { s.SetReadDeadline(time.Now().Add(socketHandler.GetReadTimeout() * time.Second)) } s.ctx = socketCTX return s } type fasthttpSocket struct { *websocket.Conn ctx SocketContext sh SocketHandler id string sc *SocketConn } func (s *fasthttpSocket) Context() SocketContext { return s.ctx } func (s *fasthttpSocket) ID() string { return s.id } func (s *fasthttpSocket) WaitRequest() (*SocketConn, error) { if nil != s.sc { releaseSocketConn(s.sc) s.sc = nil } var mt int var err error var r io.Reader if mt, r, err = s.NextReader(); nil != err { return nil, err } s.sc = retainSocketConn() s.sc.s = s s.sc.MessageType = mt s.sc.r = r return s.sc, nil } func (s *fasthttpSocket) NextWriter(messageType int) (io.WriteCloser, error) { if 0 < s.sh.GetWriteTimeout() { s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) } return s.Conn.NextWriter(messageType) } func (s *fasthttpSocket) WriteMessage(messageType int, data []byte) error { if 0 < s.sh.GetWriteTimeout() { s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) } return s.Conn.WriteMessage(messageType, data) } func (s *fasthttpSocket) Close() error { err := s.Conn.Close() releaseSocket(s) return err } type SocketConn struct { net.Conn s *fasthttpSocket MessageType int r io.Reader wc io.WriteCloser } func (sc *SocketConn) Read(b []byte) (n int, err error) { return sc.r.Read(b) } func (sc *SocketConn) Write(b []byte) (n int, err error) { if nil == sc.wc { var err error if sc.wc, err = sc.s.NextWriter(sc.MessageType); nil != err { return 0, err } } return sc.wc.Write(b) } func (sc *SocketConn) Close() error { var err error if sc.wc != nil { err = sc.wc.Close() } sc.s.sc = nil releaseSocketConn(sc) return err } func (sc *SocketConn) LocalAddr() net.Addr { return sc.s.LocalAddr() } func (sc *SocketConn) RemoteAddr() net.Addr { return sc.s.RemoteAddr() } func (sc *SocketConn) SetDeadline(t time.Time) error { if err := sc.s.SetReadDeadline(t); nil != err { return err } if err := sc.s.SetWriteDeadline(t); nil != err { return err } return nil } func (sc *SocketConn) SetReadDeadline(t time.Time) error { return sc.s.SetReadDeadline(t) } func (sc *SocketConn) SetWriteDeadline(t time.Time) error { return sc.s.SetWriteDeadline(t) } var socketPool sync.Pool func retainSocket() *fasthttpSocket { v := socketPool.Get() if v == nil { return &fasthttpSocket{} } return v.(*fasthttpSocket) } func releaseSocket(s *fasthttpSocket) { s.sh = nil s.sc = nil s.id = "" socketPool.Put(s) } var socketConnPool sync.Pool func retainSocketConn() *SocketConn { v := socketConnPool.Get() if v == nil { return &SocketConn{} } return v.(*SocketConn) } func releaseSocketConn(sc *SocketConn) { sc.s = nil sc.r = nil sc.wc = nil socketConnPool.Put(sc) }