package client import ( "crypto/tls" "fmt" "net" "sync" "time" "git.loafle.net/commons/logging-go" "git.loafle.net/commons/server-go/client" "git.loafle.net/commons/server-go/socket" ) type Connectors struct { client.Connectors socket.ClientConnHandlers socket.ReadWriteHandlers Network string `json:"network"` Address string `json:"address"` LocalAddress net.Addr stopChan chan struct{} stopWg sync.WaitGroup readChan chan []byte writeChan chan []byte disconnectedChan chan struct{} reconnectedChan chan socket.Conn crw socket.ClientReadWriter } func (c *Connectors) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { var ( conn socket.Conn ) if nil != c.stopChan { return nil, nil, fmt.Errorf("%s already connected", c.logHeader()) } conn, err = c.connect() if nil != err { return nil, nil, err } c.readChan = make(chan []byte, 256) c.writeChan = make(chan []byte, 256) c.disconnectedChan = make(chan struct{}) c.reconnectedChan = make(chan socket.Conn) c.stopChan = make(chan struct{}) c.crw.ReadwriteHandler = c c.crw.ReadChan = c.readChan c.crw.WriteChan = c.writeChan c.crw.ClientStopChan = c.stopChan c.crw.ClientStopWg = &c.stopWg c.crw.DisconnectedChan = c.disconnectedChan c.crw.ReconnectedChan = c.reconnectedChan c.stopWg.Add(2) go c.handleReconnect() go c.crw.HandleConnection(conn) return c.readChan, c.writeChan, nil } func (c *Connectors) Disconnect() error { if c.stopChan == nil { return fmt.Errorf("%s must be connected before disconnection it", c.logHeader()) } close(c.stopChan) c.stopWg.Wait() c.stopChan = nil return nil } func (c *Connectors) logHeader() string { return fmt.Sprintf("Connector[%s]: ", c.Name) } func (c *Connectors) handleReconnect() { defer func() { c.stopWg.Done() }() RC_LOOP: for { select { case <-c.disconnectedChan: case <-c.stopChan: return } if 0 >= c.GetReconnectTryTime() { c.reconnectedChan <- nil continue RC_LOOP } logging.Logger().Debugf("%s connection lost", c.logHeader()) for indexI := 0; indexI < c.GetReconnectTryTime(); indexI++ { logging.Logger().Debugf("%s trying reconnect[%d]", c.logHeader(), indexI) conn, err := c.connect() if nil == err { logging.Logger().Debugf("reconnected") c.reconnectedChan <- conn continue RC_LOOP } time.Sleep(c.GetReconnectInterval()) } logging.Logger().Debugf("%s reconnecting has been failed", c.logHeader()) } } func (c *Connectors) connect() (socket.Conn, error) { netConn, err := c.dial() if nil != err { return nil, err } conn := socket.NewConn(netConn, false, c.GetReadBufferSize(), c.GetWriteBufferSize()) conn.SetCloseHandler(func(code int, text string) error { logging.Logger().Debugf("%s close", c.logHeader()) return nil }) return conn, nil } func (c *Connectors) dial() (net.Conn, error) { var deadline time.Time if 0 != c.GetHandshakeTimeout() { deadline = time.Now().Add(c.GetHandshakeTimeout()) } d := &net.Dialer{ KeepAlive: c.GetKeepAlive(), Deadline: deadline, LocalAddr: c.LocalAddress, } conn, err := d.Dial(c.Network, c.Address) if nil != err { return nil, err } if nil != c.GetTLSConfig() { cfg := c.GetTLSConfig().Clone() tlsConn := tls.Client(conn, cfg) if err := tlsConn.Handshake(); err != nil { tlsConn.Close() return nil, err } if !cfg.InsecureSkipVerify { if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil { return nil, err } } conn = tlsConn } return conn, nil } func (c *Connectors) Clone() client.Connector { return &Connectors{ Connectors: *c.Connectors.Clone(), ClientConnHandlers: *c.ClientConnHandlers.Clone(), ReadWriteHandlers: *c.ReadWriteHandlers.Clone(), Network: c.Network, Address: c.Address, LocalAddress: c.LocalAddress, } } func (c *Connectors) Validate() error { if err := c.Connectors.Validate(); nil != err { return err } if err := c.ClientConnHandlers.Validate(); nil != err { return err } if err := c.ReadWriteHandlers.Validate(); nil != err { return err } if "" == c.Network { return fmt.Errorf("%s Network is not valid", c.logHeader()) } if "" == c.Address { return fmt.Errorf("%s Address is not valid", c.logHeader()) } return nil }