diff --git a/socket/client.go b/client/connector.go similarity index 60% rename from socket/client.go rename to client/connector.go index 69f7edd..ee365e4 100644 --- a/socket/client.go +++ b/client/connector.go @@ -1,6 +1,8 @@ -package socket +package client -type Client interface { +type Connector interface { Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) Disconnect() error + + Validate() error } diff --git a/socket/net/client.go b/socket/net/client/connector.go similarity index 73% rename from socket/net/client.go rename to socket/net/client/connector.go index 5973475..049e53f 100644 --- a/socket/net/client.go +++ b/socket/net/client/connector.go @@ -1,4 +1,4 @@ -package net +package client import ( "crypto/tls" @@ -8,11 +8,12 @@ import ( "time" "git.loafle.net/commons/logging-go" + "git.loafle.net/commons/server-go/client" "git.loafle.net/commons/server-go/socket" ) -type Client struct { - socket.Client +type Connectors struct { + client.Connector socket.ClientConnHandlers socket.ReadWriteHandlers @@ -35,13 +36,13 @@ type Client struct { crw socket.ClientReadWriter } -func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { +func (c *Connectors) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { var ( conn socket.Conn ) if c.stopChan != nil { - return nil, nil, fmt.Errorf(c.clientMessage("already running. Stop it before starting it again")) + return nil, nil, fmt.Errorf("%s already connected", c.logHeader()) } err = c.Validate() @@ -75,9 +76,9 @@ func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err return c.readChan, c.writeChan, nil } -func (c *Client) Disconnect() error { +func (c *Connectors) Disconnect() error { if c.stopChan == nil { - return fmt.Errorf(c.clientMessage("must be started before stopping it")) + return fmt.Errorf("%s must be connected before disconnection it", c.logHeader()) } close(c.stopChan) c.stopWg.Wait() @@ -87,11 +88,11 @@ func (c *Client) Disconnect() error { return nil } -func (c *Client) clientMessage(msg string) string { - return fmt.Sprintf("Client[%s]: %s", c.Name, msg) +func (c *Connectors) logHeader() string { + return fmt.Sprintf("Connector[%s]: ", c.Name) } -func (c *Client) handleReconnect() { +func (c *Connectors) handleReconnect() { defer func() { c.stopWg.Done() }() @@ -109,10 +110,10 @@ RC_LOOP: continue RC_LOOP } - logging.Logger().Debugf("connection lost") + logging.Logger().Debugf("%s connection lost", c.logHeader()) for indexI := 0; indexI < c.ReconnectTryTime; indexI++ { - logging.Logger().Debugf("trying reconnect[%d]", indexI) + logging.Logger().Debugf("%s trying reconnect[%d]", c.logHeader(), indexI) conn, err := c.connect() if nil == err { @@ -122,11 +123,11 @@ RC_LOOP: } time.Sleep(c.ReconnectInterval) } - logging.Logger().Debugf("reconnecting has been failed") + logging.Logger().Debugf("%s reconnecting has been failed", c.logHeader()) } } -func (c *Client) connect() (socket.Conn, error) { +func (c *Connectors) connect() (socket.Conn, error) { netConn, err := c.dial() if nil != err { return nil, err @@ -134,13 +135,13 @@ func (c *Client) connect() (socket.Conn, error) { conn := socket.NewConn(netConn, false, c.ReadBufferSize, c.WriteBufferSize) conn.SetCloseHandler(func(code int, text string) error { - logging.Logger().Debugf("close") + logging.Logger().Debugf("%s close", c.logHeader()) return nil }) return conn, nil } -func (c *Client) dial() (net.Conn, error) { +func (c *Connectors) dial() (net.Conn, error) { if err := c.Validate(); nil != err { return nil, err } @@ -179,7 +180,7 @@ func (c *Client) dial() (net.Conn, error) { return conn, nil } -func (c *Client) Validate() error { +func (c *Connectors) Validate() error { if err := c.ClientConnHandlers.Validate(); nil != err { return err } @@ -188,15 +189,15 @@ func (c *Client) Validate() error { } if "" == c.Name { - c.Name = "Client" + c.Name = "Connector" } if "" == c.Network { - return fmt.Errorf("Client: Network is not valid") + return fmt.Errorf("%s Network is not valid", c.logHeader()) } if "" == c.Address { - return fmt.Errorf("Client: Address is not valid") + return fmt.Errorf("%s Address is not valid", c.logHeader()) } return nil diff --git a/socket/web/client.go b/socket/web/client/connector.go similarity index 87% rename from socket/web/client.go rename to socket/web/client/connector.go index 9b52b69..c34e935 100644 --- a/socket/web/client.go +++ b/socket/web/client/connector.go @@ -1,4 +1,4 @@ -package web +package client import ( "bufio" @@ -17,13 +17,15 @@ import ( "time" "git.loafle.net/commons/logging-go" + "git.loafle.net/commons/server-go/client" "git.loafle.net/commons/server-go/socket" + "git.loafle.net/commons/server-go/socket/web" ) var errMalformedURL = errors.New("malformed ws or wss URL") -type Client struct { - socket.Client +type Connectors struct { + client.Connector socket.ClientConnHandlers socket.ReadWriteHandlers @@ -64,14 +66,14 @@ type Client struct { crw socket.ClientReadWriter } -func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { +func (c *Connectors) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { var ( conn socket.Conn res *http.Response ) if c.stopChan != nil { - return nil, nil, fmt.Errorf(c.clientMessage("already running. Stop it before starting it again")) + return nil, nil, fmt.Errorf("%s already connected", c.logHeader()) } err = c.Validate() @@ -109,9 +111,9 @@ func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err return c.readChan, c.writeChan, nil } -func (c *Client) Disconnect() error { +func (c *Connectors) Disconnect() error { if c.stopChan == nil { - return fmt.Errorf(c.clientMessage("must be started before stopping it")) + return fmt.Errorf("%s must be connected before disconnection it", c.logHeader()) } close(c.stopChan) c.stopWg.Wait() @@ -121,11 +123,11 @@ func (c *Client) Disconnect() error { return nil } -func (c *Client) clientMessage(msg string) string { - return fmt.Sprintf("Client[%s]: %s", c.Name, msg) +func (c *Connectors) logHeader() string { + return fmt.Sprintf("Connector[%s]:", c.Name) } -func (c *Client) handleReconnect() { +func (c *Connectors) handleReconnect() { defer func() { c.stopWg.Done() }() @@ -143,10 +145,10 @@ RC_LOOP: continue RC_LOOP } - logging.Logger().Debugf("connection lost") + logging.Logger().Debugf("%s connection lost", c.logHeader()) for indexI := 0; indexI < c.ReconnectTryTime; indexI++ { - logging.Logger().Debugf("trying reconnect[%d]", indexI) + logging.Logger().Debugf("%s trying reconnect[%d]", c.logHeader(), indexI) conn, res, err := c.connect() if nil == err { @@ -155,30 +157,30 @@ RC_LOOP: resH(res) } - logging.Logger().Debugf("reconnected") + logging.Logger().Debugf("%s reconnected", c.logHeader()) c.reconnectedChan <- conn continue RC_LOOP } time.Sleep(c.ReconnectInterval) } - logging.Logger().Debugf("reconnecting has been failed") + logging.Logger().Debugf("%s reconnecting has been failed", c.logHeader()) } } -func (c *Client) connect() (socket.Conn, *http.Response, error) { +func (c *Connectors) connect() (socket.Conn, *http.Response, error) { conn, res, err := c.dial() if nil != err { return nil, nil, err } conn.SetCloseHandler(func(code int, text string) error { - logging.Logger().Debugf("close") + logging.Logger().Debugf("%s close", c.logHeader()) return nil }) return conn, res, nil } -func (c *Client) dial() (socket.Conn, *http.Response, error) { +func (c *Connectors) dial() (socket.Conn, *http.Response, error) { var ( err error challengeKey string @@ -189,7 +191,7 @@ func (c *Client) dial() (socket.Conn, *http.Response, error) { return nil, nil, err } - challengeKey, err = generateChallengeKey() + challengeKey, err = web.GenerateChallengeKey() if err != nil { return nil, nil, err } @@ -234,7 +236,7 @@ func (c *Client) dial() (socket.Conn, *http.Response, error) { k == "Sec-Websocket-Version" || k == "Sec-Websocket-Extensions" || (k == "Sec-Websocket-Protocol" && len(c.Subprotocols) > 0): - return nil, nil, errors.New("websocket: duplicate header not allowed: " + k) + return nil, nil, fmt.Errorf("%s duplicate header not allowed: %s", c.logHeader(), k) default: req.Header[k] = vs } @@ -358,7 +360,7 @@ func (c *Client) dial() (socket.Conn, *http.Response, error) { if resp.StatusCode != 101 || !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") || !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") || - resp.Header.Get("Sec-Websocket-Accept") != computeAcceptKey(challengeKey) { + resp.Header.Get("Sec-Websocket-Accept") != web.ComputeAcceptKey(challengeKey) { // Before closing the network connection on return from this // function, slurp up some of the response to aid application // debugging. @@ -368,7 +370,7 @@ func (c *Client) dial() (socket.Conn, *http.Response, error) { return nil, resp, socket.ErrBadHandshake } - for _, ext := range httpParseExtensions(resp.Header) { + for _, ext := range web.HttpParseExtensions(resp.Header) { if ext[""] != "permessage-deflate" { continue } @@ -391,7 +393,7 @@ func (c *Client) dial() (socket.Conn, *http.Response, error) { return conn, resp, nil } -func (c *Client) Validate() error { +func (c *Connectors) Validate() error { if err := c.ClientConnHandlers.Validate(); nil != err { return err } @@ -400,11 +402,11 @@ func (c *Client) Validate() error { } if "" == c.Name { - c.Name = "Client" + c.Name = "Connector" } if "" == c.URL { - return fmt.Errorf("Client: URL is not valid") + return fmt.Errorf("%s URL is not valid", c.logHeader()) } u, err := parseURL(c.URL) diff --git a/socket/web/upgrade.go b/socket/web/upgrade.go index 7c8cd68..37a86a1 100644 --- a/socket/web/upgrade.go +++ b/socket/web/upgrade.go @@ -168,7 +168,7 @@ func (u *Upgrader) Upgrade(ctx *fasthttp.RequestCtx, responseHeader *fasthttp.Re ctx.SetStatusCode(fasthttp.StatusSwitchingProtocols) ctx.Response.Header.Set("Upgrade", "websocket") ctx.Response.Header.Set("Connection", "Upgrade") - ctx.Response.Header.Set("Sec-Websocket-Accept", computeAcceptKey(challengeKey)) + ctx.Response.Header.Set("Sec-Websocket-Accept", ComputeAcceptKey(challengeKey)) if subprotocol != "" { ctx.Response.Header.Set("Sec-Websocket-Protocol", subprotocol) } diff --git a/socket/web/util.go b/socket/web/util.go index d0f511a..7614107 100644 --- a/socket/web/util.go +++ b/socket/web/util.go @@ -17,14 +17,14 @@ import ( var keyGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11") -func computeAcceptKey(challengeKey string) string { +func ComputeAcceptKey(challengeKey string) string { h := sha1.New() h.Write([]byte(challengeKey)) h.Write(keyGUID) return base64.StdEncoding.EncodeToString(h.Sum(nil)) } -func generateChallengeKey() (string, error) { +func GenerateChallengeKey() (string, error) { p := make([]byte, 16) if _, err := io.ReadFull(rand.Reader, p); err != nil { return "", err @@ -213,7 +213,7 @@ headers: } // parseExtensiosn parses WebSocket extensions from a header. -func httpParseExtensions(header http.Header) []map[string]string { +func HttpParseExtensions(header http.Header) []map[string]string { // From RFC 6455: //