From e34e7030c2b4687f141849a95d94d8d5be63e7b8 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 29 Jun 2018 17:53:39 +0900 Subject: [PATCH] ing --- socket/compression-handler.go | 58 ----------------------------------- socket/conn-json.go | 2 +- socket/conn-prepared.go | 8 ++--- socket/conn.go | 58 +++++------------------------------ socket/readwrite-handler.go | 22 +++++++++++++ socket/server-handler.go | 5 --- socket/web/server.go | 15 +++++---- socket/web/upgrade.go | 5 +-- 8 files changed, 41 insertions(+), 132 deletions(-) delete mode 100644 socket/compression-handler.go diff --git a/socket/compression-handler.go b/socket/compression-handler.go deleted file mode 100644 index 3267d8b..0000000 --- a/socket/compression-handler.go +++ /dev/null @@ -1,58 +0,0 @@ -package socket - -import ( - "errors" - "sync/atomic" - - "git.loafle.net/commons/server-go" -) - -type CompressionHandler interface { - IsEnableCompression() bool - GetCompressionLevel() int - GetCompressionThreshold() int -} - -type CompressionHandlers struct { - EnableCompression bool `json:"enableCompression,omitempty"` - CompressionLevel int `json:"compressionLevel,omitempty"` - CompressionThreshold int `json:"compressionThreshold,omitempty"` - - validated atomic.Value -} - -func (ch *CompressionHandlers) IsEnableCompression() bool { - return ch.EnableCompression -} -func (ch *CompressionHandlers) GetCompressionLevel() int { - return ch.CompressionLevel -} -func (ch *CompressionHandlers) GetCompressionThreshold() int { - return ch.CompressionThreshold -} - -func (ch *CompressionHandlers) Clone() *CompressionHandlers { - return &CompressionHandlers{ - EnableCompression: ch.EnableCompression, - CompressionLevel: ch.CompressionLevel, - CompressionThreshold: ch.CompressionThreshold, - validated: ch.validated, - } -} - -func (ch *CompressionHandlers) Validate() error { - if nil != ch.validated.Load() { - return nil - } - ch.validated.Store(true) - - if !IsValidCompressionLevel(ch.CompressionLevel) { - return errors.New("Socket: invalid compression level") - } - - if ch.CompressionThreshold <= 0 { - ch.CompressionThreshold = server.DefaultCompressionThreshold - } - - return nil -} diff --git a/socket/conn-json.go b/socket/conn-json.go index e89edd1..5683eda 100644 --- a/socket/conn-json.go +++ b/socket/conn-json.go @@ -19,7 +19,7 @@ func WriteJSON(c *SocketConn, v interface{}) error { // See the documentation for encoding/json Marshal for details about the // conversion of Go values to JSON. func (c *SocketConn) WriteJSON(v interface{}) error { - w, err := c.NextWriter(TextMessage, true) + w, err := c.NextWriter(TextMessage) if err != nil { return err } diff --git a/socket/conn-prepared.go b/socket/conn-prepared.go index 6cf1fab..30d2dff 100644 --- a/socket/conn-prepared.go +++ b/socket/conn-prepared.go @@ -22,10 +22,9 @@ type PreparedMessage struct { // prepareKey defines a unique set of options to cache prepared frames in PreparedMessage. type prepareKey struct { - isServer bool - compress bool - compressionLevel int - compressionThreshold int + isServer bool + compress bool + compressionLevel int } // preparedFrame contains data in wire representation. @@ -79,7 +78,6 @@ func (pm *PreparedMessage) frame(key prepareKey) (int, []byte, error) { mu: mu, isServer: key.isServer, compressionLevel: key.compressionLevel, - compressionThreshold: key.compressionThreshold, enableWriteCompression: true, writeBuf: make([]byte, defaultWriteBufferSize+maxFrameHeaderSize), } diff --git a/socket/conn.go b/socket/conn.go index b60e535..4ff8aa7 100644 --- a/socket/conn.go +++ b/socket/conn.go @@ -126,7 +126,7 @@ type Conn interface { EnableWriteCompression(enable bool) LocalAddr() net.Addr NextReader() (messageType int, r io.Reader, err error) - NextWriter(messageType int, useCompress bool) (io.WriteCloser, error) + NextWriter(messageType int) (io.WriteCloser, error) PingHandler() func(appData string) error PongHandler() func(appData string) error ReadJSON(v interface{}) error @@ -134,7 +134,6 @@ type Conn interface { RemoteAddr() net.Addr SetCloseHandler(h func(code int, text string) error) SetCompressionLevel(level int) error - SetCompressionThreshold(threshold int) SetPingHandler(h func(appData string) error) SetPongHandler(h func(appData string) error) SetReadDeadline(t time.Time) error @@ -167,7 +166,6 @@ type SocketConn struct { enableWriteCompression bool compressionLevel int - compressionThreshold int newCompressionWriter func(io.WriteCloser, int) io.WriteCloser // Read fields @@ -257,7 +255,6 @@ func newConnBRW(conn net.Conn, isServer bool, readBufferSize, writeBufferSize in writeBuf: writeBuf, enableWriteCompression: true, compressionLevel: defaultCompressionLevel, - compressionThreshold: defaultCompressionThreshold, } c.SetCloseHandler(nil) c.SetPingHandler(nil) @@ -445,7 +442,7 @@ func (c *SocketConn) prepWrite(messageType int) error { // // There can be at most one open writer on a connection. NextWriter closes the // previous writer if the application has not already done so. -func (c *SocketConn) NextWriter(messageType int, useCompress bool) (io.WriteCloser, error) { +func (c *SocketConn) NextWriter(messageType int) (io.WriteCloser, error) { if err := c.prepWrite(messageType); err != nil { return nil, err } @@ -456,7 +453,7 @@ func (c *SocketConn) NextWriter(messageType int, useCompress bool) (io.WriteClos pos: maxFrameHeaderSize, } c.writer = mw - if useCompress && c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) { + if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) { w := c.newCompressionWriter(c.writer, c.compressionLevel) mw.compress = true c.writer = w @@ -667,10 +664,9 @@ func (w *messageWriter) Close() error { // WritePreparedMessage writes prepared message into connection. func (c *SocketConn) WritePreparedMessage(pm *PreparedMessage) error { frameType, frameData, err := pm.frame(prepareKey{ - isServer: c.isServer, - compress: c.newCompressionWriter != nil && c.enableWriteCompression && isData(pm.messageType), - compressionLevel: c.compressionLevel, - compressionThreshold: c.compressionThreshold, + isServer: c.isServer, + compress: c.newCompressionWriter != nil && c.enableWriteCompression && isData(pm.messageType), + compressionLevel: c.compressionLevel, }) if err != nil { return err @@ -704,43 +700,7 @@ func (c *SocketConn) WriteMessage(messageType int, data []byte) error { return mw.flushFrame(true, data) } - w, err := c.NextWriter(messageType, false) - if err != nil { - return err - } - if _, err = w.Write(data); err != nil { - return err - } - return w.Close() -} - -// WriteMessage is a helper method for getting a writer using NextWriter, -// writing the message and closing the writer. -func (c *SocketConn) WriteCompress(messageType int, data []byte) error { - - if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression || len(data) <= c.compressionThreshold) { - // Fast path with no allocations and single frame. - - if err := c.prepWrite(messageType); err != nil { - return err - } - mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize} - n := copy(c.writeBuf[mw.pos:], data) - mw.pos += n - data = data[n:] - return mw.flushFrame(true, data) - } - - var w io.WriteCloser - var err error - length := len(data) - switch { - case length > c.compressionThreshold: - w, err = c.NextWriter(messageType, true) - default: - w, err = c.NextWriter(messageType, false) - } - + w, err := c.NextWriter(messageType) if err != nil { return err } @@ -1138,10 +1098,6 @@ func (c *SocketConn) SetCompressionLevel(level int) error { return nil } -func (c *SocketConn) SetCompressionThreshold(threshold int) { - c.compressionThreshold = threshold -} - // FormatCloseMessage formats closeCode and text as a WebSocket close message. func FormatCloseMessage(closeCode int, text string) []byte { buf := make([]byte, 2+len(text)) diff --git a/socket/readwrite-handler.go b/socket/readwrite-handler.go index 64d1c2e..275d525 100644 --- a/socket/readwrite-handler.go +++ b/socket/readwrite-handler.go @@ -1,6 +1,7 @@ package socket import ( + "errors" "sync/atomic" "time" @@ -12,6 +13,9 @@ type ReadWriteHandler interface { GetPongTimeout() time.Duration GetPingTimeout() time.Duration GetPingPeriod() time.Duration + + IsEnableCompression() bool + GetCompressionLevel() int } type ReadWriteHandlers struct { @@ -21,6 +25,9 @@ type ReadWriteHandlers struct { PingTimeout time.Duration `json:"pingTimeout,omitempty"` PingPeriod time.Duration `json:"pingPeriod,omitempty"` + EnableCompression bool `json:"enableCompression,omitempty"` + CompressionLevel int `json:"compressionLevel,omitempty"` + validated atomic.Value } @@ -34,12 +41,21 @@ func (rwh *ReadWriteHandlers) GetPingPeriod() time.Duration { return rwh.PingPeriod } +func (rwh *ReadWriteHandlers) IsEnableCompression() bool { + return rwh.EnableCompression +} +func (rwh *ReadWriteHandlers) GetCompressionLevel() int { + return rwh.CompressionLevel +} + func (rwh *ReadWriteHandlers) Clone() *ReadWriteHandlers { return &ReadWriteHandlers{ ReadWriteHandlers: *rwh.ReadWriteHandlers.Clone(), PongTimeout: rwh.PongTimeout, PingTimeout: rwh.PingTimeout, PingPeriod: rwh.PingPeriod, + EnableCompression: rwh.EnableCompression, + CompressionLevel: rwh.CompressionLevel, validated: rwh.validated, } } @@ -71,5 +87,11 @@ func (rwh *ReadWriteHandlers) Validate() error { rwh.PingPeriod = rwh.PingPeriod * time.Second } + if rwh.EnableCompression { + if !IsValidCompressionLevel(rwh.CompressionLevel) { + return errors.New("Socket: invalid compression level") + } + } + return nil } diff --git a/socket/server-handler.go b/socket/server-handler.go index 7e3358f..64d2a74 100644 --- a/socket/server-handler.go +++ b/socket/server-handler.go @@ -9,13 +9,11 @@ import ( type ServerHandler interface { server.ServerHandler ReadWriteHandler - CompressionHandler } type ServerHandlers struct { server.ServerHandlers ReadWriteHandlers - CompressionHandlers validated atomic.Value } @@ -32,9 +30,6 @@ func (sh *ServerHandlers) Validate() error { if err := sh.ReadWriteHandlers.Validate(); nil != err { return err } - if err := sh.CompressionHandlers.Validate(); nil != err { - return err - } return nil } diff --git a/socket/web/server.go b/socket/web/server.go index bfa66ea..5ba3bb5 100644 --- a/socket/web/server.go +++ b/socket/web/server.go @@ -57,14 +57,13 @@ func (s *Server) ListenAndServe() error { } s.upgrader = &Upgrader{ - HandshakeTimeout: s.ServerHandler.GetHandshakeTimeout(), - ReadBufferSize: s.ServerHandler.GetReadBufferSize(), - WriteBufferSize: s.ServerHandler.GetWriteBufferSize(), - CheckOrigin: s.ServerHandler.(ServerHandler).CheckOrigin, - Error: s.onError, - EnableCompression: s.ServerHandler.IsEnableCompression(), - CompressionLevel: s.ServerHandler.GetCompressionLevel(), - CompressionThreshold: s.ServerHandler.GetCompressionThreshold(), + HandshakeTimeout: s.ServerHandler.GetHandshakeTimeout(), + ReadBufferSize: s.ServerHandler.GetReadBufferSize(), + WriteBufferSize: s.ServerHandler.GetWriteBufferSize(), + CheckOrigin: s.ServerHandler.(ServerHandler).CheckOrigin, + Error: s.onError, + EnableCompression: s.ServerHandler.IsEnableCompression(), + CompressionLevel: s.ServerHandler.GetCompressionLevel(), } if err = s.ServerHandler.Init(s.ctx); nil != err { diff --git a/socket/web/upgrade.go b/socket/web/upgrade.go index 9ff6a81..53a611a 100644 --- a/socket/web/upgrade.go +++ b/socket/web/upgrade.go @@ -58,9 +58,7 @@ type Upgrader struct { // guarantee that compression will be supported. Currently only "no context // takeover" modes are supported. EnableCompression bool - - CompressionLevel int - CompressionThreshold int + CompressionLevel int } func (u *Upgrader) returnError(ctx *fasthttp.RequestCtx, status int, reason string) (*socket.SocketConn, error) { @@ -199,7 +197,6 @@ func (u *Upgrader) Upgrade(ctx *fasthttp.RequestCtx, responseHeader *fasthttp.Re c.SetSubprotocol(subprotocol) if compress { c.SetCompressionLevel(u.CompressionLevel) - c.SetCompressionThreshold(u.CompressionThreshold) c.SetNewCompressionWriter(socket.CompressNoContextTakeover) c.SetNewDecompressionReader(socket.DecompressNoContextTakeover) }