package client import ( "fmt" "io" "net" "net/http" "sync" "time" "git.loafle.net/commons_go/logging" cuc "git.loafle.net/commons_go/util/context" "git.loafle.net/commons_go/websocket_fasthttp/websocket" ) type Socket interface { Context() SocketContext // Subprotocol returns the negotiated protocol for the connection. Subprotocol() string // Close closes the underlying network connection without sending or waiting for a close frame. Close() error // LocalAddr returns the local network address. LocalAddr() net.Addr // RemoteAddr returns the remote network address. RemoteAddr() net.Addr // WriteControl writes a control message with the given deadline. The allowed // message types are CloseMessage, PingMessage and PongMessage. WriteControl(messageType int, data []byte, deadline time.Time) error // NextWriter returns a writer for the next message to send. The writer's Close // method flushes the complete message to the network. // // There can be at most one open writer on a connection. NextWriter closes the // previous writer if the application has not already done so. NextWriter(messageType int) (io.WriteCloser, error) // WritePreparedMessage writes prepared message into connection. WritePreparedMessage(pm *websocket.PreparedMessage) error // WriteMessage is a helper method for getting a writer using NextWriter, // writing the message and closing the writer. WriteMessage(messageType int, data []byte) error // SetWriteDeadline sets the write deadline on the underlying network // connection. After a write has timed out, the websocket state is corrupt and // all future writes will return an error. A zero value for t means writes will // not time out. SetWriteDeadline(t time.Time) error // NextReader returns the next data message received from the peer. The // returned messageType is either TextMessage or BinaryMessage. // // There can be at most one open reader on a connection. NextReader discards // the previous message if the application has not already consumed it. // // Applications must break out of the application's read loop when this method // returns a non-nil error value. Errors returned from this method are // permanent. Once this method returns a non-nil error, all subsequent calls to // this method return the same error. NextReader() (messageType int, r io.Reader, err error) // ReadMessage is a helper method for getting a reader using NextReader and // reading from that reader to a buffer. ReadMessage() (messageType int, p []byte, err error) // SetReadDeadline sets the read deadline on the underlying network connection. // After a read has timed out, the websocket connection state is corrupt and // all future reads will return an error. A zero value for t means reads will // not time out. SetReadDeadline(t time.Time) error // SetReadLimit sets the maximum size for a message read from the peer. If a // message exceeds the limit, the connection sends a close frame to the peer // and returns ErrReadLimit to the application. SetReadLimit(limit int64) // CloseHandler returns the current close handler CloseHandler() func(code int, text string) error // SetCloseHandler sets the handler for close messages received from the peer. // The code argument to h is the received close code or CloseNoStatusReceived // if the close message is empty. The default close handler sends a close frame // back to the peer. // // The application must read the connection to process close messages as // described in the section on Control Frames above. // // The connection read methods return a CloseError when a close frame is // received. Most applications should handle close messages as part of their // normal error handling. Applications should only set a close handler when the // application must perform some action before sending a close frame back to // the peer. SetCloseHandler(h func(code int, text string) error) // PingHandler returns the current ping handler PingHandler() func(appData string) error // SetPingHandler sets the handler for ping messages received from the peer. // The appData argument to h is the PING frame application data. The default // ping handler sends a pong to the peer. // // The application must read the connection to process ping messages as // described in the section on Control Frames above. SetPingHandler(h func(appData string) error) // PongHandler returns the current pong handler PongHandler() func(appData string) error // SetPongHandler sets the handler for pong messages received from the peer. // The appData argument to h is the PONG frame application data. The default // pong handler does nothing. // // The application must read the connection to process ping messages as // described in the section on Control Frames above. SetPongHandler(h func(appData string) error) // UnderlyingConn returns the internal net.Conn. This can be used to further // modifications to connection specific flags. UnderlyingConn() net.Conn // EnableWriteCompression enables and disables write compression of // subsequent text and binary messages. This function is a noop if // compression was not negotiated with the peer. EnableWriteCompression(enable bool) // SetCompressionLevel sets the flate compression level for subsequent text and // binary messages. This function is a noop if compression was not negotiated // with the peer. See the compress/flate package for a description of // compression levels. SetCompressionLevel(level int) error // Header returns header by key Header(key string) (value string) // Headers returns the RequestHeader struct Headers() http.Header } func NewSocket(sb SocketBuilder, parentContext cuc.Context) (Socket, error) { if nil == sb { logging.Logger().Panicf("Client Socket: SocketBuilder must be specified") } sb.Validate() sc := sb.SocketContext(parentContext) if nil == sc { logging.Logger().Panicf("Client Socket: SocketContext must be specified") } sh := sb.GetSocketHandler() d := &websocket.Dialer{} d.NetDial = sb.Dial d.Proxy = sb.UseProxy d.TLSClientConfig = sb.GetTLSConfig() d.HandshakeTimeout = sb.GetHandshakeTimeout() d.ReadBufferSize = sb.GetReadBufferSize() d.WriteBufferSize = sb.GetWriteBufferSize() d.Subprotocols = sb.GetSubProtocols() d.EnableCompression = sb.IsEnableCompression() d.Jar = sb.GetRequestCookie() url := sb.GetURL() if "" == url { return nil, fmt.Errorf("Client Socket: URL of server must be specified") } reqHeader := sb.GetRequestHeader() conn, res, err := d.Dial(url, reqHeader) if nil != err { return nil, err } sh.OnConnect(sc, res) s := retainSocket() s.Conn = conn s.ctx = sc s.sh = sh s.resHeader = res.Header return s, nil } type fasthttpWebSocket struct { *websocket.Conn ctx SocketContext sh SocketHandler resHeader http.Header } func (s *fasthttpWebSocket) Context() SocketContext { return s.ctx } func (s *fasthttpWebSocket) NextWriter(messageType int) (io.WriteCloser, error) { if 0 < s.sh.GetWriteTimeout() { s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) } return s.Conn.NextWriter(messageType) } func (s *fasthttpWebSocket) WriteMessage(messageType int, data []byte) error { if 0 < s.sh.GetWriteTimeout() { s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) } return s.Conn.WriteMessage(messageType, data) } func (s *fasthttpWebSocket) Header(key string) (value string) { if nil == s.resHeader { return "" } return s.resHeader.Get(key) } func (s *fasthttpWebSocket) Headers() http.Header { return s.resHeader } func (s *fasthttpWebSocket) Close() error { err := s.Conn.Close() s.sh.OnDisconnect(s) releaseSocket(s) return err } var socketPool sync.Pool func retainSocket() *fasthttpWebSocket { v := socketPool.Get() if v == nil { return &fasthttpWebSocket{} } return v.(*fasthttpWebSocket) } func releaseSocket(s *fasthttpWebSocket) { s.sh = nil s.ctx = nil socketPool.Put(s) }