server-go/socket/net/client.go

203 lines
3.8 KiB
Go
Raw Normal View History

2018-04-04 04:01:26 +00:00
package net
2018-04-03 08:55:48 +00:00
import (
"crypto/tls"
"fmt"
"net"
2018-04-04 05:31:10 +00:00
"sync"
2018-04-03 08:55:48 +00:00
"time"
2018-04-04 05:31:10 +00:00
2018-04-04 05:47:10 +00:00
"git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go"
2018-04-05 15:15:29 +00:00
"git.loafle.net/commons/server-go/socket"
2018-04-03 08:55:48 +00:00
)
type Client struct {
2018-04-04 13:50:34 +00:00
server.ClientConnHandlers
2018-04-05 15:15:29 +00:00
socket.ReadWriteHandlers
2018-04-04 13:28:35 +00:00
2018-04-03 08:55:48 +00:00
Name string
2018-04-04 13:28:35 +00:00
Network string
Address string
LocalAddress net.Addr
stopChan chan struct{}
stopWg sync.WaitGroup
2018-04-04 05:31:10 +00:00
readChan chan []byte
writeChan chan []byte
2018-04-04 13:28:35 +00:00
disconnectedChan chan struct{}
2018-04-05 15:15:29 +00:00
reconnectedChan chan socket.Conn
2018-04-04 13:28:35 +00:00
2018-04-05 15:15:29 +00:00
crw socket.ClientReadWriter
2018-04-04 05:31:10 +00:00
}
func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) {
var (
2018-04-05 15:15:29 +00:00
conn socket.Conn
2018-04-04 05:31:10 +00:00
)
if c.stopChan != nil {
return nil, nil, fmt.Errorf(c.clientMessage("already running. Stop it before starting it again"))
}
err = c.Validate()
if nil != err {
return nil, nil, err
}
conn, err = c.connect()
if nil != err {
return nil, nil, err
}
c.readChan = make(chan []byte, 256)
c.writeChan = make(chan []byte, 256)
2018-04-04 13:28:35 +00:00
c.disconnectedChan = make(chan struct{})
2018-04-05 15:15:29 +00:00
c.reconnectedChan = make(chan socket.Conn)
2018-04-04 05:31:10 +00:00
c.stopChan = make(chan struct{})
2018-04-04 13:28:35 +00:00
2018-04-04 13:50:34 +00:00
c.crw.ReadwriteHandler = c
c.crw.ReadChan = c.readChan
c.crw.WriteChan = c.writeChan
c.crw.ClientStopChan = c.stopChan
c.crw.ClientStopWg = &c.stopWg
c.crw.DisconnectedChan = c.disconnectedChan
c.crw.ReconnectedChan = c.reconnectedChan
2018-04-04 13:28:35 +00:00
2018-04-04 14:10:09 +00:00
c.stopWg.Add(2)
go c.handleReconnect()
2018-04-04 13:50:34 +00:00
go c.crw.HandleConnection(conn)
2018-04-04 05:31:10 +00:00
return c.readChan, c.writeChan, nil
}
func (c *Client) Disconnect() error {
if c.stopChan == nil {
return fmt.Errorf(c.clientMessage("must be started before stopping it"))
}
close(c.stopChan)
c.stopWg.Wait()
c.stopChan = nil
return nil
}
func (c *Client) clientMessage(msg string) string {
return fmt.Sprintf("Client[%s]: %s", c.Name, msg)
}
2018-04-04 14:10:09 +00:00
func (c *Client) handleReconnect() {
defer func() {
c.stopWg.Done()
}()
RC_LOOP:
for {
select {
case <-c.disconnectedChan:
case <-c.stopChan:
return
}
if 0 >= c.ReconnectTryTime {
c.reconnectedChan <- nil
continue RC_LOOP
}
2018-04-04 14:21:40 +00:00
logging.Logger().Debugf("connection lost")
2018-04-04 14:10:09 +00:00
for indexI := 0; indexI < c.ReconnectTryTime; indexI++ {
2018-04-04 14:21:40 +00:00
logging.Logger().Debugf("trying reconnect[%d]", indexI)
2018-04-04 14:10:09 +00:00
conn, err := c.connect()
if nil == err {
2018-04-04 14:21:40 +00:00
logging.Logger().Debugf("reconnected")
2018-04-04 14:10:09 +00:00
c.reconnectedChan <- conn
continue RC_LOOP
}
time.Sleep(c.ReconnectInterval)
}
2018-04-04 14:21:40 +00:00
logging.Logger().Debugf("reconnecting has been failed")
2018-04-04 14:10:09 +00:00
}
}
2018-04-05 15:15:29 +00:00
func (c *Client) connect() (socket.Conn, error) {
2018-04-04 17:08:07 +00:00
netConn, err := c.dial()
2018-04-04 05:31:10 +00:00
if nil != err {
return nil, err
}
2018-04-05 15:15:29 +00:00
conn := socket.NewConn(netConn, false, c.ReadBufferSize, c.WriteBufferSize)
2018-04-04 05:31:10 +00:00
conn.SetCloseHandler(func(code int, text string) error {
logging.Logger().Debugf("close")
return nil
})
return conn, nil
}
2018-04-04 17:08:07 +00:00
func (c *Client) dial() (net.Conn, error) {
2018-04-03 08:55:48 +00:00
if err := c.Validate(); nil != err {
return nil, err
}
var deadline time.Time
if 0 != c.HandshakeTimeout {
deadline = time.Now().Add(c.HandshakeTimeout)
}
d := &net.Dialer{
KeepAlive: c.KeepAlive,
Deadline: deadline,
LocalAddr: c.LocalAddress,
}
conn, err := d.Dial(c.Network, c.Address)
if nil != err {
return nil, err
}
if nil != c.TLSConfig {
cfg := c.TLSConfig.Clone()
tlsConn := tls.Client(conn, cfg)
if err := tlsConn.Handshake(); err != nil {
tlsConn.Close()
return nil, err
}
if !cfg.InsecureSkipVerify {
if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
return nil, err
}
}
conn = tlsConn
}
return conn, nil
}
func (c *Client) Validate() error {
2018-04-04 13:56:14 +00:00
if err := c.ClientConnHandlers.Validate(); nil != err {
2018-04-04 13:28:35 +00:00
return err
}
if err := c.ReadWriteHandlers.Validate(); nil != err {
return err
}
2018-04-03 08:55:48 +00:00
if "" == c.Name {
c.Name = "Client"
}
if "" == c.Network {
return fmt.Errorf("Client: Network is not valid")
}
if "" == c.Address {
return fmt.Errorf("Client: Address is not valid")
}
return nil
}