From 6440d89d7244d3c0b68f77d334ca31e8bfc4707b Mon Sep 17 00:00:00 2001 From: crusader Date: Wed, 29 Nov 2017 21:24:48 +0900 Subject: [PATCH] ing --- client/socket.go | 221 ++++++++++++++++++++++++++++++++++++++ client/socket_context.go | 20 ++++ client/socket_handler.go | 21 ++++ client/socket_handlers.go | 66 ++++++++++++ socket.go | 99 ----------------- 5 files changed, 328 insertions(+), 99 deletions(-) create mode 100644 client/socket.go create mode 100644 client/socket_context.go create mode 100644 client/socket_handler.go create mode 100644 client/socket_handlers.go diff --git a/client/socket.go b/client/socket.go new file mode 100644 index 0000000..71ec183 --- /dev/null +++ b/client/socket.go @@ -0,0 +1,221 @@ +package client + +import ( + "io" + "net" + "sync" + "time" + + "git.loafle.net/commons_go/websocket_fasthttp/websocket" + "github.com/valyala/fasthttp" +) + +type Socket interface { + // Subprotocol returns the negotiated protocol for the connection. + Subprotocol() string + + // Close closes the underlying network connection without sending or waiting for a close frame. + Close() error + + // LocalAddr returns the local network address. + LocalAddr() net.Addr + + // RemoteAddr returns the remote network address. + RemoteAddr() net.Addr + + // WriteControl writes a control message with the given deadline. The allowed + // message types are CloseMessage, PingMessage and PongMessage. + WriteControl(messageType int, data []byte, deadline time.Time) error + + // NextWriter returns a writer for the next message to send. The writer's Close + // method flushes the complete message to the network. + // + // There can be at most one open writer on a connection. NextWriter closes the + // previous writer if the application has not already done so. + NextWriter(messageType int) (io.WriteCloser, error) + + // WritePreparedMessage writes prepared message into connection. + WritePreparedMessage(pm *websocket.PreparedMessage) error + + // WriteMessage is a helper method for getting a writer using NextWriter, + // writing the message and closing the writer. + WriteMessage(messageType int, data []byte) error + + // SetWriteDeadline sets the write deadline on the underlying network + // connection. After a write has timed out, the websocket state is corrupt and + // all future writes will return an error. A zero value for t means writes will + // not time out. + SetWriteDeadline(t time.Time) error + + // NextReader returns the next data message received from the peer. The + // returned messageType is either TextMessage or BinaryMessage. + // + // There can be at most one open reader on a connection. NextReader discards + // the previous message if the application has not already consumed it. + // + // Applications must break out of the application's read loop when this method + // returns a non-nil error value. Errors returned from this method are + // permanent. Once this method returns a non-nil error, all subsequent calls to + // this method return the same error. + NextReader() (messageType int, r io.Reader, err error) + + // ReadMessage is a helper method for getting a reader using NextReader and + // reading from that reader to a buffer. + ReadMessage() (messageType int, p []byte, err error) + + // SetReadDeadline sets the read deadline on the underlying network connection. + // After a read has timed out, the websocket connection state is corrupt and + // all future reads will return an error. A zero value for t means reads will + // not time out. + SetReadDeadline(t time.Time) error + + // SetReadLimit sets the maximum size for a message read from the peer. If a + // message exceeds the limit, the connection sends a close frame to the peer + // and returns ErrReadLimit to the application. + SetReadLimit(limit int64) + + // CloseHandler returns the current close handler + CloseHandler() func(code int, text string) error + + // SetCloseHandler sets the handler for close messages received from the peer. + // The code argument to h is the received close code or CloseNoStatusReceived + // if the close message is empty. The default close handler sends a close frame + // back to the peer. + // + // The application must read the connection to process close messages as + // described in the section on Control Frames above. + // + // The connection read methods return a CloseError when a close frame is + // received. Most applications should handle close messages as part of their + // normal error handling. Applications should only set a close handler when the + // application must perform some action before sending a close frame back to + // the peer. + SetCloseHandler(h func(code int, text string) error) + + // PingHandler returns the current ping handler + PingHandler() func(appData string) error + + // SetPingHandler sets the handler for ping messages received from the peer. + // The appData argument to h is the PING frame application data. The default + // ping handler sends a pong to the peer. + // + // The application must read the connection to process ping messages as + // described in the section on Control Frames above. + SetPingHandler(h func(appData string) error) + + // PongHandler returns the current pong handler + PongHandler() func(appData string) error + + // SetPongHandler sets the handler for pong messages received from the peer. + // The appData argument to h is the PONG frame application data. The default + // pong handler does nothing. + // + // The application must read the connection to process ping messages as + // described in the section on Control Frames above. + SetPongHandler(h func(appData string) error) + + // UnderlyingConn returns the internal net.Conn. This can be used to further + // modifications to connection specific flags. + UnderlyingConn() net.Conn + + // EnableWriteCompression enables and disables write compression of + // subsequent text and binary messages. This function is a noop if + // compression was not negotiated with the peer. + EnableWriteCompression(enable bool) + + // SetCompressionLevel sets the flate compression level for subsequent text and + // binary messages. This function is a noop if compression was not negotiated + // with the peer. See the compress/flate package for a description of + // compression levels. + SetCompressionLevel(level int) error + + // SetHeaders sets response headers + SetHeaders(h *fasthttp.ResponseHeader) + + // Header returns header by key + Header(key string) (value string) + + // Headers returns the RequestHeader struct + Headers() *fasthttp.ResponseHeader +} + +func newSocket(socketHandler SocketHandler, socketCTX SocketContext, conn *websocket.Conn, id string) Socket { + s := retainSocket() + s.Conn = conn + s.sh = socketHandler + s.ctx = socketCTX + + s.SetReadLimit(socketHandler.GetMaxMessageSize()) + if 0 < socketHandler.GetReadTimeout() { + s.SetReadDeadline(time.Now().Add(socketHandler.GetReadTimeout() * time.Second)) + } + + return s +} + +type fasthttpWebSocket struct { + *websocket.Conn + + ctx SocketContext + sh SocketHandler + + resHeaders *fasthttp.ResponseHeader +} + +func (s *fasthttpWebSocket) Context() SocketContext { + return s.ctx +} + +func (s *fasthttpWebSocket) NextWriter(messageType int) (io.WriteCloser, error) { + if 0 < s.sh.GetWriteTimeout() { + s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) + } + + return s.Conn.NextWriter(messageType) +} + +func (s *fasthttpWebSocket) WriteMessage(messageType int, data []byte) error { + if 0 < s.sh.GetWriteTimeout() { + s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) + } + + return s.Conn.WriteMessage(messageType, data) +} + +func (s *fasthttpWebSocket) SetHeaders(h *fasthttp.ResponseHeader) { + s.resHeaders = h +} + +func (s *fasthttpWebSocket) Header(key string) (value string) { + if nil == s.resHeaders { + return "" + } + return string(s.resHeaders.Peek(key)) +} + +func (s *fasthttpWebSocket) Headers() *fasthttp.ResponseHeader { + return s.resHeaders +} + +func (s *fasthttpWebSocket) Close() error { + err := s.Conn.Close() + releaseSocket(s) + return err +} + +var socketPool sync.Pool + +func retainSocket() *fasthttpWebSocket { + v := socketPool.Get() + if v == nil { + return &fasthttpWebSocket{} + } + return v.(*fasthttpWebSocket) +} + +func releaseSocket(s *fasthttpWebSocket) { + s.sh = nil + s.ctx = nil + + socketPool.Put(s) +} diff --git a/client/socket_context.go b/client/socket_context.go new file mode 100644 index 0000000..4f48d3d --- /dev/null +++ b/client/socket_context.go @@ -0,0 +1,20 @@ +package client + +import ( + cuc "git.loafle.net/commons_go/util/context" +) + +type SocketContext interface { + cuc.Context +} + +func newSocketContext() SocketContext { + sCTX := &socketContext{} + sCTX.Context = cuc.NewContext(nil) + + return sCTX +} + +type socketContext struct { + cuc.Context +} diff --git a/client/socket_handler.go b/client/socket_handler.go new file mode 100644 index 0000000..ffe04e2 --- /dev/null +++ b/client/socket_handler.go @@ -0,0 +1,21 @@ +package client + +import "time" + +type SocketHandler interface { + GetMaxMessageSize() int64 + GetWriteTimeout() time.Duration + GetReadTimeout() time.Duration + GetPongTimeout() time.Duration + GetPingTimeout() time.Duration + GetPingPeriod() time.Duration + + // Validate is check handler value + // If you override ths method, must call + // + // func (sh *SocketHandlers) Validate() { + // sh.SocketHandlers.Validate() + // ... + // } + Validate() +} diff --git a/client/socket_handlers.go b/client/socket_handlers.go new file mode 100644 index 0000000..2692986 --- /dev/null +++ b/client/socket_handlers.go @@ -0,0 +1,66 @@ +package client + +import ( + "time" +) + +type SocketHandlers struct { + // MaxMessageSize is the maximum size for a message read from the peer. If a + // message exceeds the limit, the connection sends a close frame to the peer + // and returns ErrReadLimit to the application. + MaxMessageSize int64 + // WriteTimeout is the write deadline on the underlying network + // connection. After a write has timed out, the websocket state is corrupt and + // all future writes will return an error. A zero value for t means writes will + // not time out. + WriteTimeout time.Duration + // ReadTimeout is the read deadline on the underlying network connection. + // After a read has timed out, the websocket connection state is corrupt and + // all future reads will return an error. A zero value for t means reads will + // not time out. + ReadTimeout time.Duration + + PongTimeout time.Duration + PingTimeout time.Duration + PingPeriod time.Duration +} + +func (sh *SocketHandlers) GetMaxMessageSize() int64 { + return sh.MaxMessageSize +} +func (sh *SocketHandlers) GetWriteTimeout() time.Duration { + return sh.WriteTimeout +} +func (sh *SocketHandlers) GetReadTimeout() time.Duration { + return sh.ReadTimeout +} +func (sh *SocketHandlers) GetPongTimeout() time.Duration { + return sh.PongTimeout +} +func (sh *SocketHandlers) GetPingTimeout() time.Duration { + return sh.PingTimeout +} +func (sh *SocketHandlers) GetPingPeriod() time.Duration { + return sh.PingPeriod +} + +func (sh *SocketHandlers) Validate() { + if sh.MaxMessageSize <= 0 { + sh.MaxMessageSize = DefaultMaxMessageSize + } + if sh.WriteTimeout <= 0 { + sh.WriteTimeout = DefaultWriteTimeout + } + if sh.ReadTimeout <= 0 { + sh.ReadTimeout = DefaultReadTimeout + } + if sh.PongTimeout <= 0 { + sh.PongTimeout = DefaultPongTimeout + } + if sh.PingTimeout <= 0 { + sh.PingTimeout = DefaultPingTimeout + } + if sh.PingPeriod <= 0 { + sh.PingPeriod = DefaultPingPeriod + } +} diff --git a/socket.go b/socket.go index 624723d..965b4b8 100644 --- a/socket.go +++ b/socket.go @@ -14,9 +14,6 @@ type Socket interface { // ID returns the identity of the client. ID() string - // // WaitRequest wait request of client. - // WaitRequest() (*SocketConn, error) - // Subprotocol returns the negotiated protocol for the connection. Subprotocol() string @@ -180,28 +177,6 @@ func (s *fasthttpWebSocket) ID() string { return s.id } -// func (s *fasthttpWebSocket) WaitRequest() (*SocketConn, error) { -// if nil != s.sc { -// releaseSocketConn(s.sc) -// s.sc = nil -// } - -// var mt int -// var err error -// var r io.Reader - -// if mt, r, err = s.NextReader(); nil != err { -// return nil, err -// } - -// s.sc = retainSocketConn() -// s.sc.s = s -// s.sc.MessageType = mt -// s.sc.r = r - -// return s.sc, nil -// } - func (s *fasthttpWebSocket) NextWriter(messageType int) (io.WriteCloser, error) { if 0 < s.sh.GetWriteTimeout() { s.SetWriteDeadline(time.Now().Add(s.sh.GetWriteTimeout() * time.Second)) @@ -224,62 +199,6 @@ func (s *fasthttpWebSocket) Close() error { return err } -// type SocketConn struct { -// net.Conn - -// s *fasthttpWebSocket - -// MessageType int -// r io.Reader -// wc io.WriteCloser -// } - -// func (sc *SocketConn) Read(b []byte) (n int, err error) { -// return sc.r.Read(b) -// } - -// func (sc *SocketConn) Write(b []byte) (n int, err error) { -// if nil == sc.wc { -// var err error -// if sc.wc, err = sc.s.NextWriter(sc.MessageType); nil != err { -// return 0, err -// } -// } -// return sc.wc.Write(b) -// } - -// func (sc *SocketConn) Close() error { -// var err error -// if sc.wc != nil { -// err = sc.wc.Close() -// } -// sc.s.sc = nil -// releaseSocketConn(sc) -// return err -// } - -// func (sc *SocketConn) LocalAddr() net.Addr { -// return sc.s.LocalAddr() -// } -// func (sc *SocketConn) RemoteAddr() net.Addr { -// return sc.s.RemoteAddr() -// } -// func (sc *SocketConn) SetDeadline(t time.Time) error { -// if err := sc.s.SetReadDeadline(t); nil != err { -// return err -// } -// if err := sc.s.SetWriteDeadline(t); nil != err { -// return err -// } -// return nil -// } -// func (sc *SocketConn) SetReadDeadline(t time.Time) error { -// return sc.s.SetReadDeadline(t) -// } -// func (sc *SocketConn) SetWriteDeadline(t time.Time) error { -// return sc.s.SetWriteDeadline(t) -// } - var socketPool sync.Pool func retainSocket() *fasthttpWebSocket { @@ -297,21 +216,3 @@ func releaseSocket(s *fasthttpWebSocket) { socketPool.Put(s) } - -// var socketConnPool sync.Pool - -// func retainSocketConn() *SocketConn { -// v := socketConnPool.Get() -// if v == nil { -// return &SocketConn{} -// } -// return v.(*SocketConn) -// } - -// func releaseSocketConn(sc *SocketConn) { -// sc.s = nil -// sc.r = nil -// sc.wc = nil - -// socketConnPool.Put(sc) -// }