This commit is contained in:
crusader 2017-11-30 12:15:05 +09:00
parent 58bfd4b982
commit eb528c5a43
6 changed files with 257 additions and 93 deletions

View File

@ -3,11 +3,13 @@ package client
import ( import (
"io" "io"
"net" "net"
"net/http"
"sync" "sync"
"time" "time"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/websocket_fasthttp/websocket" "git.loafle.net/commons_go/websocket_fasthttp/websocket"
"github.com/valyala/fasthttp"
) )
type Socket interface { type Socket interface {
@ -129,28 +131,58 @@ type Socket interface {
// compression levels. // compression levels.
SetCompressionLevel(level int) error SetCompressionLevel(level int) error
// SetHeaders sets response headers
SetHeaders(h *fasthttp.ResponseHeader)
// Header returns header by key // Header returns header by key
Header(key string) (value string) Header(key string) (value string)
// Headers returns the RequestHeader struct // Headers returns the RequestHeader struct
Headers() *fasthttp.ResponseHeader Headers() http.Header
} }
func newSocket(socketHandler SocketHandler, socketCTX SocketContext, conn *websocket.Conn, id string) Socket { func NewSocket(sb SocketBuilder, parentContext cuc.Context) (Socket, error) {
if nil == sb {
logging.Logger().Panic("Client Socket: SocketBuilder must be specified")
}
sb.Validate()
sc := sb.SocketContext(parentContext)
if nil == sc {
logging.Logger().Panic("Client Socket: SocketContext must be specified")
}
sh := sb.SocketHandler()
if nil == sh {
logging.Logger().Panic("Client Socket: SocketHandler must be specified")
}
sh.Validate()
d := &websocket.Dialer{}
d.NetDial = sb.Dial
d.Proxy = sb.UseProxy
d.TLSClientConfig = sb.GetTLSConfig()
d.HandshakeTimeout = sb.GetHandshakeTimeout()
d.ReadBufferSize = sb.GetReadBufferSize()
d.WriteBufferSize = sb.GetWriteBufferSize()
d.Subprotocols = sb.GetSubProtocols()
d.EnableCompression = sb.IsEnableCompression()
d.Jar = sb.GetRequestCookie()
url := sb.GetURL()
reqHeader := sb.GetRequestHeader()
conn, res, err := d.Dial(url.String(), reqHeader)
if nil != err {
return nil, err
}
sh.OnConnect(sc, res)
s := retainSocket() s := retainSocket()
s.Conn = conn s.Conn = conn
s.sh = socketHandler s.ctx = sc
s.ctx = socketCTX s.sh = sh
s.resHeader = res.Header
s.SetReadLimit(socketHandler.GetMaxMessageSize()) return s, nil
if 0 < socketHandler.GetReadTimeout() {
s.SetReadDeadline(time.Now().Add(socketHandler.GetReadTimeout() * time.Second))
}
return s
} }
type fasthttpWebSocket struct { type fasthttpWebSocket struct {
@ -159,7 +191,7 @@ type fasthttpWebSocket struct {
ctx SocketContext ctx SocketContext
sh SocketHandler sh SocketHandler
resHeaders *fasthttp.ResponseHeader resHeader http.Header
} }
func (s *fasthttpWebSocket) Context() SocketContext { func (s *fasthttpWebSocket) Context() SocketContext {
@ -182,23 +214,20 @@ func (s *fasthttpWebSocket) WriteMessage(messageType int, data []byte) error {
return s.Conn.WriteMessage(messageType, data) return s.Conn.WriteMessage(messageType, data)
} }
func (s *fasthttpWebSocket) SetHeaders(h *fasthttp.ResponseHeader) {
s.resHeaders = h
}
func (s *fasthttpWebSocket) Header(key string) (value string) { func (s *fasthttpWebSocket) Header(key string) (value string) {
if nil == s.resHeaders { if nil == s.resHeader {
return "" return ""
} }
return string(s.resHeaders.Peek(key)) return s.resHeader.Get(key)
} }
func (s *fasthttpWebSocket) Headers() *fasthttp.ResponseHeader { func (s *fasthttpWebSocket) Headers() http.Header {
return s.resHeaders return s.resHeader
} }
func (s *fasthttpWebSocket) Close() error { func (s *fasthttpWebSocket) Close() error {
err := s.Conn.Close() err := s.Conn.Close()
s.sh.OnDisconnect(s)
releaseSocket(s) releaseSocket(s)
return err return err
} }

37
client/socket_builder.go Normal file
View File

@ -0,0 +1,37 @@
package client
import (
"crypto/tls"
"net"
"net/http"
"net/url"
"time"
cuc "git.loafle.net/commons_go/util/context"
)
type SocketBuilder interface {
SocketContext(parent cuc.Context) SocketContext
SocketHandler() SocketHandler
GetURL() *url.URL
GetRequestCookie() http.CookieJar
GetRequestHeader() http.Header
GetSubProtocols() []string
IsEnableCompression() bool
UseProxy(req *http.Request) (*url.URL, error)
GetHandshakeTimeout() time.Duration
Dial(network, addr string) (net.Conn, error)
GetTLSConfig() *tls.Config
GetReadBufferSize() int
GetWriteBufferSize() int
// Validate is check handler value
// If you override ths method, must call
//
// func (sh *SocketHandlers) Validate() {
// sh.SocketHandlers.Validate()
// ...
// }
Validate()
}

101
client/socket_builders.go Normal file
View File

@ -0,0 +1,101 @@
package client
import (
"crypto/tls"
"net"
"net/http"
"net/url"
"time"
"git.loafle.net/commons_go/logging"
cuc "git.loafle.net/commons_go/util/context"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
type SocketBuilders struct {
URL *url.URL
RequestCookie http.CookieJar
RequestHeader http.Header
SubProtocols []string
EnableCompression bool
HandshakeTimeout time.Duration
TLSConfig *tls.Config
ReadBufferSize int
WriteBufferSize int
}
func (sb *SocketBuilders) SocketContext(parent cuc.Context) SocketContext {
return newSocketContext(parent)
}
func (sb *SocketBuilders) SocketHandler() SocketHandler {
return NewSocketHandler()
}
func (sb *SocketBuilders) UseProxy(req *http.Request) (*url.URL, error) {
return http.ProxyFromEnvironment(req)
}
func (sb *SocketBuilders) Dial(network, addr string) (net.Conn, error) {
var deadline time.Time
if 0 != sb.HandshakeTimeout {
deadline = time.Now().Add(sb.HandshakeTimeout)
}
netDialer := &net.Dialer{Deadline: deadline}
return netDialer.Dial(network, addr)
}
func (sb *SocketBuilders) GetHandshakeTimeout() time.Duration {
return sb.HandshakeTimeout
}
func (sb *SocketBuilders) GetURL() *url.URL {
return sb.URL
}
func (sb *SocketBuilders) GetRequestCookie() http.CookieJar {
return sb.RequestCookie
}
func (sb *SocketBuilders) GetRequestHeader() http.Header {
return sb.RequestHeader
}
func (sb *SocketBuilders) GetSubProtocols() []string {
return sb.SubProtocols
}
func (sb *SocketBuilders) IsEnableCompression() bool {
return sb.EnableCompression
}
func (sb *SocketBuilders) GetTLSConfig() *tls.Config {
return sb.TLSConfig
}
func (sb *SocketBuilders) GetReadBufferSize() int {
return sb.ReadBufferSize
}
func (sb *SocketBuilders) GetWriteBufferSize() int {
return sb.WriteBufferSize
}
func (sb *SocketBuilders) Validate() {
if nil == sb.URL {
logging.Logger().Panic("Client Socket: URL must be specified")
}
if 0 >= sb.HandshakeTimeout {
sb.HandshakeTimeout = cwf.DefaultHandshakeTimeout
}
if 0 >= sb.ReadBufferSize {
sb.ReadBufferSize = cwf.DefaultReadBufferSize
}
if 0 >= sb.WriteBufferSize {
sb.WriteBufferSize = cwf.DefaultWriteBufferSize
}
}

View File

@ -1,29 +1,20 @@
package client package client
import ( import (
"crypto/tls"
"net"
"net/http" "net/http"
"net/url"
"time" "time"
cuc "git.loafle.net/commons_go/util/context"
) )
type SocketHandler interface { type SocketHandler interface {
SocketContext(parent cuc.Context) SocketContext OnConnect(socketContext SocketContext, res *http.Response)
OnDisconnect(soc Socket)
GetURL() *url.URL GetMaxMessageSize() int64
GetRequestCookie() http.CookieJar GetWriteTimeout() time.Duration
GetRequestHeader() http.Header GetReadTimeout() time.Duration
GetSubProtocols() []string GetPongTimeout() time.Duration
EnableCompression() bool GetPingTimeout() time.Duration
UseProxy(req *http.Request) (*url.URL, error) GetPingPeriod() time.Duration
GetHandshakeTimeout() time.Duration
Dial(network, addr string) (net.Conn, error)
GetTLSConfig() *tls.Config
GetReadBufferSize() int
GetWriteBufferSize() int
// Validate is check handler value // Validate is check handler value
// If you override ths method, must call // If you override ths method, must call

View File

@ -1,75 +1,81 @@
package client package client
import ( import (
"crypto/tls"
"net"
"net/http" "net/http"
"net/url"
"time" "time"
cuc "git.loafle.net/commons_go/util/context" cwf "git.loafle.net/commons_go/websocket_fasthttp"
) )
type SocketHandlers struct { type SocketHandlers struct {
URL *url.URL // MaxMessageSize is the maximum size for a message read from the peer. If a
RequestCookie http.CookieJar // message exceeds the limit, the connection sends a close frame to the peer
RequestHeader http.Header // and returns ErrReadLimit to the application.
SubProtocols []string MaxMessageSize int64
EnableCompression bool // WriteTimeout is the write deadline on the underlying network
HandshakeTimeout time.Duration // connection. After a write has timed out, the websocket state is corrupt and
TLSConfig *tls.Config // all future writes will return an error. A zero value for t means writes will
ReadBufferSize int // not time out.
WriteBufferSize int WriteTimeout time.Duration
// ReadTimeout is the read deadline on the underlying network connection.
// After a read has timed out, the websocket connection state is corrupt and
// all future reads will return an error. A zero value for t means reads will
// not time out.
ReadTimeout time.Duration
PongTimeout time.Duration
PingTimeout time.Duration
PingPeriod time.Duration
} }
func (sh *SocketHandlers) SocketContext(parent cuc.Context) SocketContext { func (sh *SocketHandlers) OnConnect(socketContext SocketContext, res *http.Response) {
return newSocketContext(parent) // no op
} }
func (sh *SocketHandlers) GetURL() *url.URL { func (sh *SocketHandlers) OnDisconnect(soc Socket) {
return sh.URL // no op
} }
func (sh *SocketHandlers) GetRequestCookie() http.CookieJar { func (sh *SocketHandlers) GetMaxMessageSize() int64 {
return sh.RequestCookie return sh.MaxMessageSize
} }
func (sh *SocketHandlers) GetWriteTimeout() time.Duration {
func (sh *SocketHandlers) GetRequestHeader() http.Header { return sh.WriteTimeout
return sh.RequestHeader
} }
func (sh *SocketHandlers) GetReadTimeout() time.Duration {
func (sh *SocketHandlers) GetSubProtocols() []string { return sh.ReadTimeout
return sh.SubProtocols
} }
func (sh *SocketHandlers) GetPongTimeout() time.Duration {
func (sh *SocketHandlers) EnableCompression() bool { return sh.PongTimeout
return sh.EnableCompression
} }
func (sh *SocketHandlers) GetPingTimeout() time.Duration {
func (sh *SocketHandlers) UseProxy(req *http.Request) (*url.URL, error) { return sh.PingTimeout
return nil, nil
} }
func (sh *SocketHandlers) GetPingPeriod() time.Duration {
func (sh *SocketHandlers) GetHandshakeTimeout() time.Duration { return sh.PingPeriod
return sh.HandshakeTimeout
}
func (sh *SocketHandlers) Dial(network, addr string) (net.Conn, error) {
return nil, nil
}
func (sh *SocketHandlers) GetTLSConfig() *tls.Config {
return sh.TLSConfig
}
func (sh *SocketHandlers) GetReadBufferSize() int {
return sh.ReadBufferSize
}
func (sh *SocketHandlers) GetWriteBufferSize() int {
return sh.WriteBufferSize
} }
func (sh *SocketHandlers) Validate() { func (sh *SocketHandlers) Validate() {
if sh.MaxMessageSize <= 0 {
sh.MaxMessageSize = cwf.DefaultMaxMessageSize
}
if sh.WriteTimeout <= 0 {
sh.WriteTimeout = cwf.DefaultWriteTimeout
}
if sh.ReadTimeout <= 0 {
sh.ReadTimeout = cwf.DefaultReadTimeout
}
if sh.PongTimeout <= 0 {
sh.PongTimeout = cwf.DefaultPongTimeout
}
if sh.PingTimeout <= 0 {
sh.PingTimeout = cwf.DefaultPingTimeout
}
if sh.PingPeriod <= 0 {
sh.PingPeriod = cwf.DefaultPingPeriod
}
}
func NewSocketHandler() SocketHandler {
return &SocketHandlers{}
} }

View File

@ -10,9 +10,9 @@ const (
// DefaultHandshakeTimeout is default value of websocket handshake Timeout // DefaultHandshakeTimeout is default value of websocket handshake Timeout
DefaultHandshakeTimeout = 0 DefaultHandshakeTimeout = 0
// DefaultReadBufferSize is default value of Read Buffer Size // DefaultReadBufferSize is default value of Read Buffer Size
DefaultReadBufferSize = 4096 DefaultReadBufferSize = 0
// DefaultWriteBufferSize is default value of Write Buffer Size // DefaultWriteBufferSize is default value of Write Buffer Size
DefaultWriteBufferSize = 4096 DefaultWriteBufferSize = 0
// DefaultReadTimeout is default value of read timeout // DefaultReadTimeout is default value of read timeout
DefaultReadTimeout = 0 DefaultReadTimeout = 0
// DefaultWriteTimeout is default value of write timeout // DefaultWriteTimeout is default value of write timeout